Skip to content

Important Concepts

Driver and Executors

Spark Architecture

Driver

The driver is the process where the main method runs. First it converts the user program into tasks and after that it schedules the tasks on the executors.

Executors

Executors are worker nodes' processes in charge of running individual tasks in a given Spark job. They are launched at the beginning of a Spark application and typically run for the entire lifetime of an application. Once they have run the task they send the results to the driver. They also provide in-memory storage for RDDs that are cached by user programs through Block Manager.

SparkApps Sizing

TIP

READ these series of posts by Brad Caffey.

Brad Caffey Medium

CPU

Avaiable CPU Cores = Total CPU Cores - 1

1 cpu reserved for OS and other processes

If your cluster has 16 cores, then you should set the number of executors to 15.

But how ? What is the best way to set the number of executors and cores?

Imagine the following scenario:

OptionExecutorsCores per ExecutorTotal Cores
A11515
B3515
C5315
D15115

What is the best option?

  • 🔴 Option A -> Garbage collection delays would slow down your job
  • 🟢 Option B -> Best option (in general, max 5 cores per executor)
  • 🔴 Option C -> Less parallelism and more memory overhead
  • 🔴 Option D -> No parallelism and hard to calculate memory overhead

Memory

Memory Overhead

For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs.

python
spark.kubernetes.memoryOverheadFactor

Memory per Executor

Lets supose that you have 122GB of memory available in your cluster and you want to use 15 executors.

How much memory should you allocate to each executor?

available memory = cluster_total_memory - (cluster_manager_memory + os_memory)

Example:

  • 122GB - 16GB = 112GB
  • 112GB / 3 = 37GB
  • 37 / 1.1 = 33GB change 1.1 to 1.4 if you are using kubernetes

Partitions

python
# Default optimal partition size is 128MB.
partitions = dataset_size / 128MB

# Or change the default partition size
spark.conf.set("spark.sql.files.maxPartitionBytes", "128MB")

Coalesce

If you want to reduce the number of partitions, you can use coalesce().

ALWAYS use coalesce() instead of repartition() when you want to reduce the number's of partitions as coalesce() does not perform shuffle.

Repartition

Repartition is used to increase or decrease the number of partitions in a DataFrame. It does a full shuffle of the data and creates equal sized partitions.

Prefer coalesce() over repartition() as coalesce() does not perform shuffle.

If you need to increase the number of partitions, use repartition(). Be aware that this will incur a full shuffle and it's a fairly expensive operation.

More Details

Coalesce vs Repartition

Shuffle

This default shuffle partition number comes from Spark SQL configuration spark.sql.shuffle.partitions which is by default set to 200.

Shuffling is expensive.

  • transfer data over the network
  • reorganization of data before transferring

Example of shuffling in a group by operation:

Shuffling

Spill

TIP

Work in progress

Skew

TIP

Work in progress

Spark UI

TIP

Work in progress

Caching

TIP

Work in progress

Checkpointing

TIP

Work in progress

Article by Adrian Chang

Feel free to use any content here.