This is the second article in the « Why is Spark Fast? And how to make it run faster « series ». The serie discusses the mechanisms behind Apacha Spark and how this data-processing Framework disrupted the Big Data ecosystem. Reading the first part beforehand is recommended.

Throughout the first article of the series we went through the basics of Spark, and more precisely, its SparkSQL module. We now know that Spark sees the jobs that we submit as a list of transformations and actions that it needs to execute. This article will go through the mechanisms that turn that list into the job’s physical execution plan.

Spark is a lazy magician

First of all, an important concept to remember when working with Spark is that it relies on lazy evaluation. That means that when you submit a job, Spark will only do its magic when it must to — i.e.. When it receives an action (like when the driver asks for some data or when it needs to store data into HDFS). 

Instead of running the transformations one by one as soon as it receives them, Spark stores these transformations in a DAG (Directed Acyclic Graph). And as soon as it receives an action, it runs the whole DAG and delivers the requested output. This enables it to optimize its execution plan based on the job’s DAG, instead of running the transformations sequentially.

How it all happens

Spark relies on Catalyst, its optimizer, to perform the necessary optimizations to generate the most efficient execution plan. At its core, Catalyst includes a general library dedicated to representing trees and applying rules to manipulate them. It leverages functional programming constructs in Scala and offers libraries specific to relational query processing.

Catalyst’s main data type is a tree composed of node objects, on which it applies a set of rules to optimize it. These optimizations are performed via four different phases, as indicated in the diagram below:

Catalyst’s optimization phases (source: Databricks)

Logical/Physical plan

One distinction that may not be very clear at first is the usage of the terms “logical plan” and “physical plan”. To put it simply, a logical plan consists of a tree describing what needs to be done, without implying how to do it. Whereas a physical plan describes exactly what every node in the tree would do.

For example, a logical plan simply indicates that there’s a join operation that needs to be done. While the physical plan fixes the join type (e.g. ShuffleHashJoin) for that specific operation.

Now let’s go through these four steps and delve deeper into Catalyst’s logic.

Step 1: Analysis

The starting point for the Catalyst optimization pipeline is a set of unresolved attribute references or relations. Whether you’re using SQL or the DataFrame/Dataset APIs, SparkSQL has no idea at first on your data types or even the existence of the columns that you’re referring to (this is what we mean by unresolved). If you submit a select query, SparkSQL will first use Catalyst to determine the type of every column you pass and whether the columns you’re using actually exist. To do so it relies mainly on Catalyst’s trees and rules mechanisms.

It first creates a tree for the unresolved logical plan. Then, starts applying rules on it until it resolves all of the attribute references and relations. Throughout this process Catalyst relies on a Catalog object that tracks the tables in all data sources.

Step 2: Logical optimization

In this phase, Catalyst gets some help. With the release of Spark 2.2 in 2017, a cost-based optimizer framework was introduced. Contrarily to rule-based optimizations, a cost-based optimizer uses statistics and cardinalities to find the most efficient execution plan, instead of simply applying a set of rules.

The output of the analysis step is a logical plan that then goes through a series of rule-based and cost-based optimizations in this second step. Catalyst applies all of the optimization rules on the logical plan and works with the cost-based optimizer to deliver an optimized logical plan to the next step.

Step 3: Physical planning

Just like the previous step, SparkSQL uses both Catalyst and the cost-based optimizer for the physical planning. It generates multiple physical plans based on the optimized logical plan before leveraging a set of physical rules and statistics to offer the most efficient physical plan.

Step 4: Code generation

Finally, Catalyst uses quasiquotes, a special feature offered by Scala, to generate the Java bytecode to run on each machine. Catalysts uses this feature by transforming the job’s tree into an abstract syntax tree (AST) that is evaluated by Scala. Which then compiles and runs the generated code.

To sum up

Spark SQL relies on a sophisticated pipeline to optimize the jobs that it needs to execute, and it uses Catalyst, its optimizer, in all of the steps of this process. This optimization mechanism is one of the main reasons for Spark’s astronomical performance and its effectiveness.

In the next article in the series we’ll discuss how we could optimize Spark jobs even more from our side. Stay tuned!

Meet us at Big Data Paris 2019! We will talk about #DataControl our stand. More info