Daily Archives: November 15, 2023

Apache Spark 101: Understanding Spark Code Execution

Apache Spark is a powerful distributed data processing engine widely used in big data and machine learning applications. Thanks to its enriched API and robust data structures such as DataFrames and Datasets, it offers a higher level of abstraction than traditional map-reduce jobs.

Spark Code Execution Journey

Parsing

  • Spark SQL queries or DataFrame API methods are parsed into an unresolved logical plan.
  • The parsing step converts the code into a Spark-understandable format without checking table or column existence.

Analysis

  • The unresolved logical plan undergoes analysis by the Catalyst optimizer, Spark’s optimization framework.
  • This phase confirms the existence of tables, columns, and functions, resulting in a resolved logical plan where all references are validated against the catalogue schema.

Logical Plan Optimization

  • The Catalyst optimizer applies optimization rules to the resolved logical plan, potentially reordering joins, pushing down predicates, or combining filters, creating an optimized logical plan.

Physical Planning

  • The optimized logical plan is transformed into one or more physical plans, outlining the execution strategy and order of operations like map, filter, and join.

Cost Model

  • Spark evaluates these physical plans using a cost model, selecting the most efficient one based on data sizes and distribution heuristics.

Code Generation

  • Once the final physical plan is chosen, Spark employs WholeStage CodeGen to generate optimized Java bytecode that will run on the executors, minimizing JVM calls and optimizing execution.

Execution

  • The bytecode is distributed to executors across the cluster for execution, with tasks running in parallel, processing data in partitions, and producing the final output.

The Catalyst optimizer is integral throughout these steps, enhancing the performance of Spark SQL queries and DataFrame operations using rule-based and cost-based optimization.

Example Execution Plan

Consider a SQL query that joins two tables and filters and aggregates the data:

SELECT department, COUNT(*)
FROM employees
JOIN departments ON employees.dep_id = departments.id
WHERE employees.age > 30
GROUP BY department

The execution plan may follow these steps:

· Parsed Logical Plan: The initial SQL command is parsed into an unresolved logical plan.

· Analyzed Logical Plan: The plan is analyzed and resolved against the table schemas.

· Optimized Logical Plan: The Catalyst optimizer optimizes the plan.

· Physical Plan: A cost-effective physical plan is selected.

· Execution: The physical plan is executed across the Spark cluster.


The execution plan for the given SQL query in Apache Spark involves several stages, from logical planning to physical execution. Here’s a simplified breakdown:

Parsed Logical Plan: Spark parses the SQL query into an initial logical plan. This plan is unresolved as it only represents the structure of the query without checking the existence of the tables or columns.

'Project ['department]
+- 'Aggregate ['department], ['department, 'COUNT(1)]
+- 'Filter ('age > 30)
+- 'Join Inner, ('employees.dep_id = 'departments.id)
:- 'UnresolvedRelation `employees`
+- 'UnresolvedRelation `departments`

Analyzed Logical Plan: The parsed logical plan is analyzed against the database catalogue. This resolves table and column names and checks for invalid operations or data types.

Project [department#123]
+- Aggregate [department#123], [department#123, COUNT(1) AS count#124]
+- Filter (age#125 > 30)
+- Join Inner, (dep_id#126 = id#127)
:- SubqueryAlias employees
: +- Relation[age#125,dep_id#126] parquet
+- SubqueryAlias departments
+- Relation[department#123,id#127] parquet

Optimized Logical Plan: The Catalyst optimizer applies a series of rules to the logical plan to optimize it. It may reorder joins, push down filters, and perform other optimizations.

Aggregate [department#123], [department#123, COUNT(1) AS count#124]
+- Project [department#123, age#125]
+- Join Inner, (dep_id#126 = id#127)
:- Filter (age#125 > 30)
: +- Relation[age#125,dep_id#126] parquet
+- Relation[department#123,id#127] parquet

Physical Plan: Spark generates one or more physical plans from the logical plan. It then uses a cost model to choose the most efficient physical plan for execution.

*(3) HashAggregate(keys=[department#123], functions=[count(1)], output=[department#123, count#124])
+- Exchange hashpartitioning(department#123, 200)
+- *(2) HashAggregate(keys=[department#123], functions=[partial_count(1)], output=[department#123, count#125])
+- *(2) Project [department#123, age#125]
+- *(2) BroadcastHashJoin [dep_id#126], [id#127], Inner, BuildRight
:- *(2) Filter (age#125 > 30)
: +- *(2) ColumnarToRow
: +- FileScan parquet [age#125,dep_id#126]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, int, false]))
+- *(1) ColumnarToRow
+- FileScan parquet [department#123,id#127]

Code Generation: Spark generates Java bytecode for the chosen physical plan to run on each executor. This process is known as WholeStage CodeGen.

Execution: The bytecode is sent to Spark executors distributed across the cluster. Executors run the tasks in parallel, processing the data in partitions.

During execution, tasks are executed within stages, and stages may have shuffle boundaries where data is redistributed across the cluster. The Exchange hashpartitioning indicates a shuffle operation due to the GROUP BY clause.

Stackademic

Thank you for reading until the end. Before you go:

  • Please consider clapping and following the writer! 👏
  • Follow us on Twitter(X), LinkedIn, and YouTube.
  • Visit Stackademic.com to find out more about how we are democratizing free programming education around the world.