Apache Spark crucial component — Catalyst Optimizer

Teepika R M
5 min readApr 5, 2022

Catalyst Optimizer optimizes queries expressed in SQL or through DataFrame/ Dataset APIs, which in turn reduces the runtime of the spark application. In this post, we will see the phases involved in that optimization process in detail.

There are two purposes for the extensible(extendable nature) design for the catalyst optimizer,

  1. For including new optimization techniques to Spark SQL. It’s useful to have the capability of adding new features for handling various problems that evolve while dealing with advanced analytics.
  2. For allowing the external developers to extend the optimizer for their specific use cases like adding any data source related rules or including any data types.

What triggers the optimization process?

A SQL/Hive query or DataFrame/Dataset API with action triggers the computation of the plan for execution through catalyst optimizer. The optimizer starts at a high level and it undergoes many phases of optimization and results in final execution plan for scheduling.

What is the datatype involved in this optimization phases? Trees

All the types of plans formed during the phases are implemented as Scala Trees. Trees are composed of nodes and each node can have zero or more children. Trees are immutable(cannot make in-place modifications) and can be modified using functional transformations.

Consider the expression,

Merge(Attribute(x), Merge(Literal(1), Literal(2))

Here Merge, Attribute and Literal are nodes.

Literal(value: Int ) — a constant value

Attribute(x: String) — an attribute x from input row

Merge(left: TreeNode, right: TreeNode) — Merge/Mix two input expressions

How the transformations are applied on trees?

Rules are applied are on the trees in each phase to transform from one tree to another tree. These rules are applied as pattern matching functions on every node of the tree recursively until it reaches a point where no longer it matches any pattern (saturated form which cannot be modified any longer).

tree.transform {
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
}

The pattern matching expression used to transform node is partial function. The name partial function implies that only the part of the tree which matches the expression get transformed, leaving behind the remaining part of the tree. Many partial functions/expressions can be clubbed together in a single transform call, making multiple transformations in a concise manner.

Catalyst optimizer uses both rules based and cost based optimization techniques. In rules based optimization techniques, transformations are performed based on a set of defined rules. Whereas in cost based optimization, multiple execution plans are compared on the basis of their cost and the one with the lowest cost is chosen.

Catalyst Optimizer’s tree transformation phases

Analysis phase:

The process starts with transforming input query to unresolved logical plan. The plan is called unresolved because it has unresolved attribute references in it. For eg, in the user query, select colA from tableA -> type of the column colA or its existence in tableA is unknown until we refer to the source table tableA. An attribute remains unknown until we lookup the source table for its type and existence. Such unresolved attributes are resolved with the help of catalyst rules and catalog object in this phase. Catalog object tracks all the tables in all data sources to help resolving the attributes. The result of the phase is generating resolved logical plan from unresolved logical plan.

Logical Optimization phase:

In this phase, rules based optimization is applied on the resolved logical plan. New rules can also be added to the existing rules because of the extensible design of the optimizer. Some of the existing rules applied here are constant folding, predicate pushdown, projection pruning, null propagation, Boolean expression simplification, etc. Lets see few in detail below,

Constant folding:

It’s an optimization technique where expressions that can be calculated before code execution are eliminated and replaced by constant values.

For eg,

i <- 320 * 200 * 32

In the above statement, the expression gets calculated and replaced with computed values

i <- 2048000

Predicate pushdown:

When a where or filter operation is executed right after the data loading statement, the optimizer will push the where condition down to the datasource thereby reducing the source data only to the records required for subsequence operations. It improves the performance by reducing the records loaded from source by applying the filter condition at source level itself, provided the source supports the process. This technique helps in avoiding the movement of a lot of unnecessary data across the network.

Projection pruning:

Same as predicate pushdown, instead of filter/predicates, only the selected columns in the query are only retrieved from the source, thereby boosting the performance of the query. This technique also helps in avoiding unnecessary movement of huge volume of data across the network.

Boolean expression simplification:

This optimization technique simplifies boolean expressions by merging them, removing NOT operator, extracting common factors or reformatting expression etc.

Physical Planning phase:

From the optimized logical plan generated in the previous phase, one or more physical plans are generated and the one with least cost is selected for execution. Here both rules based physical optimizations like pipelining projections or filters into one Spark map operation and cost based optimizations are applied.

Code Generation phase:

To make the query get executed, it is essential to generate java bytecode for the chosen physical plan to send to each machine. Also, it is important to boost the CPU processing for spark query execution, since Spark SQL often operates on in-memory datasets. Hence, supporting the generation of bytecode will speed up the execution. Catalyst uses a special feature of Scala programming language ie, quasiquotes to make code generation part simpler.

I have written a small blog post on “Quasiquotes” to give the clear picture of how the code generation happens with Catalyst Optimizer. Please feel free to check that.

Conclusion:

Spark Catalyst Optimizer improves the performance of the written queries without any human intervention by transforming them with the help of predefined rules to an optimized format. The input queries go through various phases to generate the final java bytecode for execution by machines. Also, the extensible design of the optimizer enables the Spark community to implement and extend it with new features.

--

--

Teepika R M

AWS Certified Big Data Specialty| Linux Certified Kubernetes Application Developer| Hortonworks Certified Spark Developer|Hortonworks Certified Hadoop Developer