Monthly Archives: March 2024

Apache Spark Aggregation Methods: Hash-based Vs. Sort-based

Apache Spark provides two primary methods for performing aggregations: Sort-based aggregation and Hash-based aggregation. These methods are optimized for different scenarios and have distinct performance characteristics.

Hash-based Aggregation

Hash-based aggregation, as implemented by HashAggregateExec, is the preferred method for aggregation in Spark SQL when the conditions allow it. This method creates a hash table where each entry corresponds to a unique group key. As Spark processes rows, it quickly uses the group key to locate the corresponding entry in the hash table and updates the aggregate values accordingly. This method is generally faster because it avoids sorting the data before aggregation. However, it requires that all intermediate aggregate values fit into memory. If the dataset is too large or there are too many unique keys, Spark might be unable to use hash-based aggregation due to memory constraints. Key points about Hash-based aggregation include:

  • It is preferred when the aggregate functions and group by keys are supported by the hash aggregation strategy.
  • It can be significantly faster than sort-based aggregation because it avoids sorting data.
  • It uses off-heap memory for storing the aggregation map.
  • It may fall back to sort-based aggregation if the dataset is too large or has too many unique keys, leading to memory pressure.

Sort-based Aggregation

Sort-based aggregation, as implemented by SortAggregateExec, is used when hash-based aggregation is not feasible, either due to memory constraints or because the aggregation functions or group by keys are not supported by the hash aggregation strategy. This method involves sorting the data based on the group by keys and then processing the sorted data to compute aggregate values. While this method can handle larger datasets since it only requires some intermediate results to fit into memory, it is generally slower than hash-based aggregation due to the additional sorting step. Key points about Sort-based aggregation include:

  • It is used when hash-based aggregation is not feasible due to memory constraints or unsupported aggregation functions or group by keys.
  • It involves sorting the data based on the group by keys before performing the aggregation.
  • It can handle larger datasets since it streams data through disk and memory.

Detailed Explanation of Hash-based Aggregation

Hash-based Aggregation in Apache Spark operates through the HashAggregateExec physical operator. This process is optimized for aggregations where the dataset can fit into memory, and it leverages mutable types for efficient in-place updates of aggregation states.

  • Initialization: When a query that requires aggregation is executed, Spark determines whether it can use hash-based aggregation. This decision is based on factors such as the types of aggregation functions (e.g., sum, avg, min, max, count), the data types of the columns involved, and whether the dataset is expected to fit into memory.
  • Partial Aggregation (Map Side): The aggregation process begins with a “map-side” partial aggregation. For each partition of the input data, Spark creates an in-memory hash map where each entry corresponds to a unique group key. As rows are processed, Spark updates the aggregation buffer for each group key directly in the hash map. This step produces partial aggregate results for each partition.
  • Shuffling: After the partial aggregation, Spark shuffles the data by the grouping keys, so that all records belonging to the same group are moved to the same partition. This step is necessary to ensure that the final aggregation produces accurate results across the entire dataset.
  • Final Aggregation (Reduce Side): Once the shuffled data is partitioned, Spark performs the final aggregation. It again uses a hash map to aggregate the partially aggregated results. This step combines the partial results from different partitions to produce the final aggregate value for each group.
  • Spill to Disk: If the dataset is too large to fit into memory, Spark’s hash-based aggregation can spill data to disk. This mechanism ensures that Spark can handle datasets larger than the available memory by using external storage.
  • Fallback to Sort-based Aggregation: In cases where the hash map becomes too large or if there are memory issues, Spark can fall back to sort-based aggregation. This decision is made dynamically based on runtime conditions and memory availability.
  • Output: The final output of the HashAggregateExec operator is a new dataset where each row represents a group along with its aggregated value(s).

The efficiency of hash-based aggregation comes from its ability to perform in-place updates to the aggregation buffer and its avoidance of sorting the data. However, its effectiveness is limited by the available memory and the nature of the dataset. For datasets that do not fit well into memory or when dealing with complex aggregation functions that are not supported by hash-based aggregation, Spark might opt for sort-based aggregation instead.

Detailed Explanation of Sort-based Aggregation

Sort-based Aggregation in Apache Spark works through a series of steps that involve shuffling, sorting, and then aggregating the data.

  • Shuffling: The data is partitioned across the cluster based on the grouping keys. This step ensures that all records with the same key end up in the same partition.
  • Sorting: Within each partition, the data is sorted by the grouping keys. This is necessary because the aggregation will be performed on groups of data with the same key, and having the data sorted ensures that all records for a given key are contiguous.
  • Aggregation: Once the data is sorted, Spark can perform the aggregation. For each partition, Spark uses a SortBasedAggregationIterator to iterate over the sorted records. This iterator maintains a buffer row to cache the aggregated values for the current group.
  • Processing Rows: As the iterator goes through the rows, it processes them one by one, updating the buffer with the aggregate values. When the end of a group is reached (i.e., the next row has a different grouping key), the iterator outputs a row with the final aggregate value for that group and resets the buffer for the next group.
  • Memory Management: Unlike hash-based aggregation, which requires a hash map to hold all group keys and their corresponding aggregate values, sort-based aggregation only needs to maintain the aggregate buffer for the current group. This means that sort-based aggregation can handle larger datasets that might not fit entirely in memory.
  • Fallback Mechanism: Although not part of the normal operation, it’s worth noting that Spark’s HashAggregateExec can theoretically fall back to sort-based aggregation if it encounters memory issues during hash-based processing.

The sort-based aggregation process is less efficient than hash-based aggregation because it involves the extra step of sorting the data, which is computationally expensive. However, it is more scalable for large datasets or when dealing with immutable types in the aggregation columns that prevent the use of hash-based aggregation.

Stackademic 🎓

Thank you for reading until the end. Before you go:

A Solution Architect’s Diary: Navigating Tech Frontiers with My Team of Superheroes

Back in the 2020s, a project requirement came my way that was both challenging and interesting. My manager and I were very new to the company, so the success of this project was crucial for us. Although the technical stack of this project was relatively simple, it holds a special place in my heart for many reasons. In this article, I aim to share my experience and journey with it. This article is not about the how-to steps; rather, I want to share about mindset and culture on a personal level and as a team.

“The only way to do great work is to love what you do.” — Steve Jobs

The challenges were manifold: there was no budget or only a minimal budget available, a shortage of technical team members, and a need to deliver in a very short timeframe. Sounds interesting, right? I was thrilled by the requirement.

“Start where you are. Use what you have. Do what you can.” — Arthur Ashe

The high-level project details are as follows: From an upstream application to an Oracle-based application, we needed to transfer data downstream in JSON format. The backend-to-frontend relationship of the upstream application is one-to-many, meaning the backend Oracle data has multiple frontend views to represent the data based on business use or objects. Therefore, the data pipeline needed the capacity to transform Oracle-based data into many front-end views, necessitating mapping the data with XML configuration for the front end. The daily transaction volume was not too large, but a decent amount of 3 million records.

Oversimplified/high-level

“It always seems impossible until it’s done.” — Nelson Mandela

Since we had no budget or resource strength, I decided to explore open-source and no-code or low-code solutions. After a day or two of research, I decided to use Apache NiFi and MongoDB. The idea was to reverse-engineer the current upstream application using NiFi, which means NiFi would read data from Oracle and simultaneously map those data according to the frontend configuration stored in XML for the frontend application.

“Teamwork is the ability to work together toward a common vision. The ability to direct individual accomplishments toward organizational objectives. It is the fuel that allows common people to attain uncommon results.” — Andrew Carnegie

When I first presented this solution to the team and management, the initial response was not very positive. Except for my manager, most stakeholders expressed doubts, which is understandable since most of them were hearing about Apache NiFi for the first time, and their experience lay primarily in Oracle and Java. MongoDB was also new to them. Normally, people think about “how” first, but I was focused on “why” and “what” before “how.”

After several rounds of discussion, including visual flowcharts and small prototypes, everyone agreed on the tech stack and the solution.

This led to the next challenge: the team we assembled had never worked with these tech stacks. To move forward, we needed to build a production-grade platform. I started by reading books and devised infrastructure build steps for both NiFi and MongoDB clusters. It was challenging, and there were extensive troubleshooting sessions, but we finally built our platform.

“Data Engineering with Python: Work with massive datasets to design data models and automate data pipelines using Python” by Paul Crickard. Even though we use Groovy for scripting and not Python, this book helped us understand Apache NiFi in greater detail. I remember those days; I used to carry this book with me everywhere I went, even to the park and while waiting for signals on the road. Thanks to the author, Paul Crickard.

“Don’t watch the clock; do what it does. Keep going.” — Sam Levenson

The team was very small, comprising two Java developers and two Linux developers, all new to these tech stacks. Once the platform was ready, my next task was to train our developers to bring them up to speed. Fortunately, I found a book that helped me with Apache NiFi, and for MongoDB, I took a MongoDB University course. I filtered the required information for my team to learn. There were also challenges, as my team was in China and I was in the North American timezone, requiring me to work mostly at night to help my team.

Another challenge that later became an advantage was communication. My Chinese team was not very comfortable with verbal explanations, so I decided to share knowledge through visual aids like flowcharts and diagrams, along with developing small prototypes for them to try and walk through. This approach helped a lot later on, as I learned how to represent complex topics in simple diagrams, a skill I use in my articles today.

“What you get by achieving your goals is not as important as what you become by achieving your goals.” — Zig Ziglar

The lucky factor was that my team was extremely hardworking and quick learners. They adapted to the new tech stack in minimal time. In three months, we built the first end-to-end MVP and presented a full demo to the business, which was a huge success. Three months later, we scaled the application to production-grade and delivered our first release. Today, this team is the fastest in the release cycle and operates independently.

“It is not mandatory for a Solution Architect to be an SME in any specific tool or technology. Instead, he or she should be a strategic thinker, putting “why” and “what” first and adapting quickly to take advantage of opportunities where we can add value to the business or end user despite all limitations and trade-offs.” — Shanoj Kumar V


“Success is not the key to happiness. Happiness is the key to success. If you love what you are doing, you will be successful.” — Albert Schweitzer

Stackademic 🎓

Thank you for reading until the end. Before you go:

Understanding Memory Spills in Apache Spark

Memory spill in Apache Spark is the process of transferring data from RAM to disk, and potentially back again. This happens when the dataset exceeds the available memory capacity of an executor during tasks that require more memory than is available. In such cases, data is spilled to disk to free up RAM and prevent out-of-memory errors. However, this process can slow down processing due to the slower speed of disk I/O compared to memory access.

Dynamic Occupancy Mechanism

Apache Spark employs a dynamic occupancy mechanism for managing Execution and Storage memory pools. This mechanism enhances the flexibility of memory usage by allowing Execution Memory and Storage Memory to borrow from each other, depending on workload demands:

  • Execution Memory: Primarily used for computation tasks such as shuffles, joins, and sorts. When execution tasks demand more memory, they can borrow from the Storage Memory if it is underutilized.
  • Storage Memory: Used for caching and persisting RDDs, DataFrames, and Datasets. If the demand for storage memory exceeds its allocation, and Execution Memory is not fully utilized, Storage Memory can expand into the space allocated for Execution Memory.

Spark’s internal memory manager controls this dynamic sharing and is crucial for optimizing the utilization of available memory resources, significantly reducing the likelihood of memory spills.

Common Performance Issues Related to Spills

Spill(disk) and Spill(memory): When data doesn’t fit in RAM, it is temporarily written to disk. This operation, while enabling Spark to handle larger datasets, impacts computation time and efficiency because disk access is slower than memory access.

Impact on Performance: Spills to disk can negatively affect performance, increasing both the cost and operational complexity of Spark applications. The strength of Spark lies in its in-memory computing capabilities; thus, disk spills are counterproductive to its design philosophy.

Solutions for Memory Spill in Apache Spark

Mitigating memory spill issues involve several strategies aimed at optimizing memory use, partitioning data more effectively, and improving overall application performance.

Optimizing Memory Configuration

  • Adjust memory allocation settings to provide sufficient memory for both execution and storage, potentially increasing the memory per executor.
  • Tune the ratio between execution and storage memory based on the specific requirements of your workload.

Partitioning Data

  • Optimize data partitioning to ensure even data distribution across partitions, which helps in avoiding memory overloads in individual partitions.
  • Consider different partitioning strategies such as range, hash, or custom partitioning based on the nature of your data.

Caching and Persistence

  • Use caching and persistence methods (e.g., cache() or persist()) to store intermediate results or frequently accessed data in memory, reducing the need for recomputation.
  • Select the appropriate storage level for caching to balance between memory usage and CPU efficiency.

Monitoring and Tuning

  • Monitor memory usage and spills using Spark UI or other monitoring tools to identify and address bottlenecks.
  • Adjust configurations dynamically based on performance metrics and workload patterns.

Data Compression

  • Employ data compression techniques and columnar storage formats (e.g., Parquet, ORC) to reduce the memory footprint.
  • Compress RDDs using serialization mechanisms like MEMORY_ONLY_SER to minimize memory usage.

Avoiding Heavy Shuffles

  • Optimize join operations and minimize unnecessary data movement by using strategies such as broadcasting smaller tables or implementing partition pruning.
  • Reduce shuffle operations which can lead to spills by avoiding wide dependencies and optimizing shuffle operations.

Formulaic Approach to Avoid Memory Spills

Apache Spark’s memory management model is designed to balance between execution memory (used for computation like shuffles, joins, sorts) and storage memory (used for caching and persisting data). Understanding and optimizing the use of these memory segments can significantly reduce the likelihood of memory spills.

Memory Configuration Parameters:

  • Total Executor Memory (spark.executor.memory): The total memory allocated per executor.
  • Memory Overhead (spark.executor.memoryOverhead): Additional memory allocated to each executor, beyond spark.executor.memory, for Spark to execute smoothly.
  • Spark Memory Fraction (spark.memory.fraction): Specifies the proportion of the executor memory dedicated to Spark’s memory management system (default is 0.6 or 60%).

Simplified Memory Calculation:

Calculate Available Memory for Spark:

Available Memory=(Total Executor Memory−Memory Overhead)×Spark Memory FractionAvailable Memory=(Total Executor Memory−Memory Overhead)×Spark Memory Fraction

Determine Execution and Storage Memory: Spark splits the available memory between execution and storage. The division is dynamic, but under memory pressure, storage can shrink to as low as the value defined by spark.memory.storageFraction (default is 0.5 or 50% of Spark memory).

Example Calculation:

  • Suppose an executor is configured with 10GB (spark.executor.memory = 10GB) and the default overhead (10% of executor memory or at least 384MB). Let’s assume an overhead of 1GB for simplicity and the default memory fractions.
  • Total Executor Memory: 10GB
  • Memory Overhead: 1GB
  • Spark Memory Fraction: 0.6 (60%)

Available Memory for Spark=(10GB−1GB)×0.6=5.4GBAvailable Memory for Spark=(10GB−1GB)×0.6=5.4GB

  • Assuming spark.memory.storageFraction is set to 0.5, both execution and storage memory pools could use up to 2.7GB each under balanced conditions.

Strategies to Avoid Memory Spills:

  • Increase Memory Allocation: If possible, increasing spark.executor.memory ensures more memory is available for Spark processes.
  • Adjust Memory Fractions: Tweaking spark.memory.fraction and spark.memory.storageFraction can help allocate memory more efficiently based on the workload. For compute-intensive operations, you might allocate more memory for execution.

Real-life Use Case: E-commerce Sales Analysis

An e-commerce platform experienced frequent memory spills while processing extensive sales data during holiday seasons, leading to performance bottlenecks.

Problem:

Large-scale aggregations and joins were causing spills to disk, slowing down the analysis of sales data, impacting the ability to generate real-time insights for inventory and pricing adjustments.

Solution:

  • Memory Optimization: The data team increased the executor memory from 8GB to 16GB per executor and adjusted the spark.memory.fraction to 0.8 to dedicate more memory to Spark’s managed memory system.
  • Partitioning and Data Skew Management: They implemented custom partitioning strategies to distribute the data more evenly across nodes, reducing the likelihood of individual tasks running out of memory.
  • Caching Strategy: Important datasets used repeatedly across different stages of the analysis were persisted in memory, and the team carefully chose the storage levels to balance between memory usage and CPU efficiency.
  • Monitoring and Tuning: Continuous monitoring of the Spark UI and logs allowed the team to identify memory-intensive operations and adjust configurations dynamically. They also fine-tuned spark.memory.storageFraction to better balance between execution and storage memory, based on the nature of their tasks.

These strategies significantly reduced the occurrence of memory spills, improved the processing speed of sales data analysis, and enabled the e-commerce platform to adjust inventory and pricing strategies in near real-time during peak sales periods.

This example demonstrates the importance of a holistic approach to Spark memory management, including proper configuration, efficient data partitioning, and strategic use of caching, to mitigate memory spill issues and enhance application performance.

Stackademic 🎓

Thank you for reading until the end. Before you go:

Data Analytics with AWS Redshift and Redshift Spectrum: A Scenario-Based Approach

In exploring the integration of Amazon Redshift and Redshift Spectrum for data warehousing and data lake architectures, it’s essential to consider a scenario where a data engineer sets up a daily data loading pipeline into a data warehouse.

This setup is geared towards optimizing the warehouse for the majority of reporting queries, which typically focus on the latest 12 months of data. To maintain efficiency and manage storage, the engineer might also implement a process to remove data older than 12 months. However, this strategy raises a question: how to handle the 20% of queries that require historical data beyond this period?

Amazon Redshift is a powerful, scalable data warehouse service that simplifies the process of analyzing large volumes of data with high speed and efficiency. It allows for complex queries over vast datasets, providing the backbone for modern data analytics. Redshift’s architecture is designed to handle high query loads and vast amounts of data, making it an ideal solution for businesses seeking to leverage their data for insights and decision-making. Its columnar storage and data compression capabilities ensure that data is stored efficiently, reducing the cost and increasing the performance of data operations.

Redshift Spectrum extends the capabilities of Amazon Redshift by allowing users to query and analyze data stored in Amazon S3 directly from within Redshift, without the need for loading or transferring the data into the data warehouse. This feature is significant because it enables users to access both recent and historical data seamlessly, bridging the gap between the data stored in Redshift and the extensive, unstructured data residing in a data lake. Spectrum offers the flexibility to query vast amounts of data across a data lake, providing the ability to run complex analyses on data that is not stored within the Redshift cluster itself.

Here, Redshift Spectrum plays a crucial role. It’s a feature that extends the capabilities of the Amazon Redshift data warehouse, allowing it to query data stored externally in a data lake. This functionality is significant because it enables users to access both recent and historical data without the need to store all of it directly within the data warehouse.

The process starts with the AWS Glue Data Catalog, which acts as a central repository for all the databases and tables in the data lake. By setting up Amazon Redshift to work with the AWS Glue Data Catalog, users can seamlessly query tables both inside Redshift and those cataloged in the AWS Glue. This setup is particularly advantageous for comprehensive data analysis, bridging the gap between the structured environment of the data warehouse and the more extensive, unstructured realm of the data lake.

AWS Glue Data Catalog and Apache Hive Metastore are both metadata repositories for managing data structures in data lakes and warehouses. AWS Glue Data Catalog, a cloud-native service, integrates seamlessly with AWS analytics services, offering automatic schema discovery and a fully managed experience. In contrast, Hive Metastore requires more manual setup and maintenance and is primarily used in on-premises or hybrid cloud environments. AWS Glue Data Catalog is easier to use, automated, and tightly integrated within the AWS ecosystem, making it the preferred choice for users invested in AWS services.

In Plain English 🚀

Thank you for being a part of the In Plain English community! Before you go: