Controlling Executors and Cores in Spark Applications

In this post, I will cover the core concepts which govern the execution model of Spark. I will talk about the different components, how they interact with each other and what happens when you fire a query. I will also take few examples to illustrate how Spark configs change these behaviours.

Let's talk about the architecture first. Each Spark application (instance of SparkContext) starts with a Driver program. The Driver talks to the Cluster Manager( YARN, Mesos, Kubernetes etc.) to demand the resources it needs. Executors (being one such resource) are processes which run computations and stored data for applications. Each application gets its own executor processes on Worker Nodes. Effectively, it means that data can't be shared across different applications without writing to a storage layer. Also, executors typically run for the lifetime of a Spark application and run multiple tasks over its lifetime (parallelly and sequentially).


What happens when you submit a job?
When you submit any job to Spark, following events occur:

1. SparkContext connects to the cluster manager
2. Spark acquires Executors on worker nodes. By default, number of executors is equal to the number of worker nodes available
3. Spark sends the application code to executors. This code might be JARs or Python files passed to the SparkContext
4. SparkContext sends the tasks to the executors to run

Optimizing Spark?
Now then, how do we control these components to extract the best out of our Spark cluster? There are some Spark configurations which can be used to make sure that our cluster resources are being used in an optimal way by our applications. Some of the important configs are:

--executor-cores : Number of concurrent tasks an executor can run. In other words, this is the number of cores per executor. It also means that if you have assigned only one core per executor, you will not be able to run any parallel tasks in the JVM
--num-executors: Though, by default, the number of executors is equal to the number of worker nodes, it can be changed using this property. Keep in mind that this property controls the number of executors requested not assigned. Assignment is successful only if there are enough cores available on worker nodes (depending on --executor-cores value)
--total-executor-cores: Maximum number of executor cores allowed per application

Let's try to understand these concepts using some examples:

Example 1: A standalone cluster with 5 worker nodes (each node having 8 cores) When i start an application with default settings. How many executors will get assigned?
Answer: Spark will greedily acquire as many cores and executors as are offered by the scheduler. So in the end you will get 5 executors with 8 cores each.
Example 2 Same cluster config as example 1, but I run an application with the following settings --executor-cores 10 --total-executor-cores 10.
Answer: Spark won't be able to allocate as many cores as requested in a single worker (as each worker only has 8 cores), hence no executors will be launched.
Another excellent example comes from the Cloudera blog

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically.
The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:
63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers. The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node. 15 cores per executor can lead to bad HDFS I/O throughput.
A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why?
This config results in three executors on all nodes except for the one with the AM, which will have two executors. --executor-memory was derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19. 

Comments

Popular posts from this blog

Uber Data Model

Data Engineer Interview Questions: SQL

Cracking Data Engineering Interviews

Hive Challenges: Bucketing, Bloom Filters and More

UBER Data Architecture