Apache Spark 3.0 Exciting Capabilities

Teepika R M
7 min readJun 11, 2022

--

This post elaborates the features that are highlighted as part of Apache Spark Release 3.0.0.

1. Accelerator Aware Scheduling:

Project Hydrogen is a major Spark initiative to unify state of the art AI and big data workloads. It enables the support for running deep learning and machine learning frameworks in distributed way on Spark thereby improving the performance and fault recovery of the frameworks.

Project Hydrogen has three subsections — 1)Barrier Execution Mode. 2) Optimized Data Exchange 3)Accelerator Aware Scheduling. Accelerator Aware Scheduling is implemented as part of Apache Spark 3.0.

What are accelerators?

An AI accelerator(GPUs,FPGAs) is a class of specialized hardware accelerator or computer system designed to accelerate artificial intelligence and machine learning applications, including artificial neural networks and machine vision.

In a case of a spark cluster provided with accelerators, it needs to be aware of the available accelerators assigned to the driver and executors and schedule them according to the user requests. Spark being unaware of the GPUs leads to problems like assigning tasks in an executor greater than the number of available GPUs in an executor, tasks getting assigned to the same GPU though the executor has other available & free GPUs. Hence, to efficiently complete the processing Spark needs to be aware of the GPUs in the cluster. This feature is implemented and available in Spark 3.0.0.

2. Adaptive Query Execution

Adaptive Query Execution is query re-optimization that occurs during query execution.

In general, each query is split into “Query Stages” whenever a shuffle or broadcast exchange happens in the query execution plan. Single query is cut into many query stages based on the number of data exchanges it has. Each stage calculates its intermediate results and the following stage proceeds only if the previous query stage gets completed, running all the parallel processes (tasks). It presents the right time to calculate the statistics from completed stages and use them for optimizing the logical plan for the upcoming stages. This process of completing the leaf query stages, calculating the statistics and using them to optimize the upcoming stages can be reapplied until the query execution is completed. This execute-reoptimize-execute is followed for the entire query under Adaptive Query Execution. Please refer to the below diagram to understand the cycle.

Adaptive Query Execution Cycle

AQE framework comes with three special features,

i) Dynamically coalescing shuffle partitions

Consider a table with data in two partitions and you perform a shuffle operation on the table based on a business requirement like group by. The shuffle operation moves the data across the network and results in five partitions because the shuffle partition number is set to 5 by the user. Without AQE, there will be 5 tasks running the task though 3 of them holds very little data slowing down the query execution because of the I/O overhead.

With AQE, it coalesces the 3 small tasks into a single task, resulting in three tasks in total to complete the query execution. It speeds up the execution process since it reduces I/O movement.

ii) Dynamically switching join strategies

Spark supports many join strategies and broadcast hash join is the most performative one. On joining tables, if a table’s size is lower than the broadcast-size threshold then the table gets cached into the memory of all executors thereby reducing the network traffic and performing the local join operation. But many times, Spark miss to perform the broadcast hash join because the table is larger but a filter or a complex operator is applied while joining that the resultant table size is lower than the broadcast-size threshold. With AQE, it replans the join strategy at runtime based on the most accurate join relation size. It helps in choosing the efficient join strategy whenever possible to boost the performance.

iii) Dynamically optimizing skew joins

Data skew is one of the major problems to address in performance tuning.

What is data skew?

One key has the most values or heavily populated whereas the other keys have comparatively less values or scarcely populated.

Consider table A, where key A0 has the most values and it makes the join operation spill data into disk for the larger key. As a result, the task handling partition A0 runs for very long time while the other tasks gets completed faster. This make the other resources wait for the large partition task to be completed leading to resource wastage.

Without AQE, skewed partition join

AQE skew join optimization detects such skew automatically from shuffle file statistics, and breaks the larger partition into few smaller partitions. It makes all the tasks run approximately for same amount of time.

With AQE, skewness detected, partition split & joined

3. Dynamic Partition Pruning

Dynamic Partition Pruning is implemented based on two concepts — partition pruning and broadcast hashing. Let us start with what is partition pruning and broadcast hashing.

Partition Pruning:

It is based on Predicate Push Down property. What happens is the filter is pushed down to the source that only the partitions that satisfy the filter criteria are processed reducing the total number of partitions to be considered for the join operation.

Consider table A partitioned based on column city and table B partitioned based on the same column city. While inner joining two tables, a filter is applied on the table B to consider only 5 cities. By applying partition pruning, only the partitions that satisfy the filter on city column are selected from both the tables, thereby reducing the amount of data to be processed for the joining operation. It improvises the query performance.

Broadcast Hashing:

While joining tables, if one of the table size is smaller than the broadcast-size threshold, then the table will be created as a hash table and broadcasted across all the executors. This makes the joining operation performative since it reduces the I/O overhead.

Dynamic partition pruning occurs when the optimizer is unable to identify at parse time the partitions it has to eliminate. It comes handy particularly while joining fact & dimension tables. A fact table consists of facts of a particular business process e.g., sales revenue by month by product. Facts are also known as measurements or metrics. A fact table record captures a measurement or a metric. A dimension table contains the dimensions along which the values of the attributes are taken in the fact table. Dimension tables are associated with a fact table through foreign keys. The Dimension Tables are expected to be much smaller in size compared to Fact Tables.

While joining these fact & dimensional tables, Dynamic partition pruning creates an internal subquery which is formed based on the filter applied on the dimension table. From the internal subquery, a hash table is made and it is broadcasted to all the executors. This subquery is applied in the physical plan of the Fact table that only required partitions are processed removing the unwanted partitions during scan phase. It reduces the number of partitions to be scanned for the large table in order to join with the other table(s). Thus it results in performative join operation, reducing the I/O overhead.

4. Redesigned pandas UDF API with type hints

A new interface that uses Python type hints is introduced in Apache Spark 3.0 to address the inconsistencies caused due to the rapid growth of Pandas UDF types. It helps users utilize Pandas UDFs without any confusion. Pandas UDFs is one of the very important enhancements for data science in Apache Spark.

5. Structured Streaming UI

A new visualization UI for Structured Streaming is introduced to monitor all streaming jobs. It makes it easier to troubleshoot with the help of real-time metrics of the streaming jobs.

6. Catalog plugin API

Accessing and manipulating the metadata of external sources is made possible with the introduction of external catalog registration in Apache Spark 3. Once the catalog is registered, users can access & manipulate the external tables metadata.

7. Java 11 and Hadoop 3 support

Support for Java 11 and Hadoop 3 is enabled with Apache Spark 3.

8. Better ANSI SQL compatibility

Apache Spark 3 is enhanced with better compatibility with ANSI SQL which eases its usage for data engineers.

--

--

Teepika R M
Teepika R M

Written by Teepika R M

AWS Certified Big Data Specialty| Linux Certified Kubernetes Application Developer| Hortonworks Certified Spark Developer|Hortonworks Certified Hadoop Developer

No responses yet