Spark Node Memory and Defaults
In this post, let’s see the very basics of what an executor container comprises-of, how the memory is split up and the purpose of each split up memory.
Each executor container has off-heap memory, heap memory and memory overhead. From Spark 3.0, off-heap memory space is separate from memory overhead.
Off-Heap Memory
It is free from Garbage Collection and the memory is handled directly. This memory is used for shuffle operations, storing serialized dataframes & RDDs (Project Tungsten). To speed up the process execution, shuffled data can be stored in off-heap memory by Project Tungsten because it does not undergo any garbage collection. The purpose of project tungsten is to boost the spark applications by improving its CPU efficiency and reducing memory pressure through memory management and binary processing. By default, off-heap memory is disabled, we can enable by setting spark.memory.offHeap.enabled = true
and set the size with spark.memory.offHeap.size
parameter.
Even when off-heap memory is not enabled, with default heap memory Project Tungsten manages the memory itself and avoids the overhead of JVM object model (large object size) and garbage collection.
spark.memory.offHeap.size = execution memory ( spark.memory.offHeap.size * (1 — spark.memory.storageFraction)) + storage memory (spark.memory.offHeap.size * spark.memory.storageFraction)
Memory Overhead
It is used for internal and Non-JVM processes, network buffers, shuffle exchange, interned strings, other native overheads, etc. The default size of memory overhead is 10% of Executor memory with a minimum of 384 MB.
Heap Memory
It is split into three sections — Spark Executor Memory Pool, Reserved Memory and User Memory
Reserved Memory
Spark engine itself uses this memory. It reserves this memory to store internal objects.
User Memory
It is used for non-dataframe operations, user defined data structures like hash-maps, spark internal metadata, user defined functions, RDD information & its operations. We can reduce the user memory if you don’t have RDD or user defined structures or user defined functions in your application but at the same time we cannot nullify or put 0 to user memory because of the spark internal metadata and other internal information.
Spark Executor Memory Pool
It is defined by spark.memory.fraction and its default value is 60%. It is used for dataframes and its operation. It is divided into two sub pools
i) Storage Memory
It is used for caching and broadcasting data.
ii) Execution Memory
It is used to perform dataframe computations ie, to store the intermediate data in the shuffle, join, sort, aggregation, etc. The default split up between storage and execution memory is 50/50.
Dynamic occupancy mechanism:
Storage and Executor have a shared memory. Based on the necessities, one can borrow from the other and the mechanism used for this is Dynamic occupancy mechanism.
When no execution memory is in use, then storage memory can acquire all the available execution memory and vice versa. The execution memory occupies the storage memory when it is available, but when the storage memory starts needing its space back, it has to wait for the used memory to be released by the executor processes. Storage memory cannot evict the occupied execution memory because execution memory is given priority over the storage memory due to the fact that task execution is important than caching of data.
When it comes to the other way around ie, storage memory occupies the available execution memory and the execution processes need its memory back, the occupied storage memory is evicted. The space occupied by storage memory in execution memory area is immediately handed over back to the execution memory to support uninterrupted process execution.
Conclusion:
This post gives an overall idea of how the memory inside an executor container is split and used for various purposes. Please feel free to comment for discussion.