Daily Archives: October 6, 2023

Apache Flink 101: Stream as Append & Upsert in Dynamic Tables

Apache Flink is a powerful data processing framework that handles batch and stream processing tasks in a single system. Flink provides a flexible and efficient architecture to process large-scale data in real time. In this article, we will discuss two important use cases for stream processing in Apache Flink: Stream as Append and Upsert in Dynamic Tables.

Stream as Append:

Stream as Append refers to continuously adding new data to an existing table. It is an everyday use case in real-time data processing where the new data must be combined with the current data to form a complete and up-to-date view. In Flink, this can be achieved using Dynamic Tables, which are a way to interact with stateful data streams and tables in Flink.

Suppose we have a sales data stream which a retail company is continuously generating. We want to store this data in a table and append the new data to the existing data.

Here is an example of how to achieve this in PyFlink:

from pyflink.table import StreamTableEnvironment, CsvTableSink, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
# create a StreamTableEnvironment
st_env = StreamTableEnvironment.create()
# define the schema for the sales data stream
sales_schema = Schema().field("item", DataTypes.STRING())\\
.field("price", DataTypes.DOUBLE())\\
.field("timestamp", DataTypes.TIMESTAMP())
# register the sales data stream as a table
st_env.connect(FileSystem().path("/path/to/sales/data"))\\
.with_format(OldCsv().field_delimiter(",").field("item", DataTypes.STRING())\\
.field("price", DataTypes.DOUBLE())\\
.field("timestamp", DataTypes.TIMESTAMP()))\\
.with_schema(sales_schema)\\
.create_temporary_table("sales_table")
# define a table sink to store the sales data
sales_sink = CsvTableSink(["/path/to/sales/table"], ",", 1, FileSystem.WriteMode.OVERWRITE)
# register the sales sink as a table
st_env.register_table_sink("sales_table_sink", sales_sink)
# stream the sales data as-append into the sales sink
st_env.from_path("sales_table").insert_into("sales_table_sink")
# execute the Flink job
st_env.execute("stream-as-append-in-dynamic-table-example")

In this example, we first define the schema for the sales data stream using the Schema API. Then, we use the connect API to register the sales data stream as a table in the StreamTableEnvironment.

Next, the with_format API is used to specify the data format in the sales data stream, which is CSV in this example. Finally, the with_schema API is used to determine the schema of the data in the sales data stream.

Reference: https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

Next, we define a table sink using the CsvTableSink API, and register it as a table in the StreamTableEnvironment using the register_table_sink API. Next, the insert_into API is used to stream the sales data as-append into the sales sink. Finally, we execute the Flink job using the implemented API.

Reference: https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/table/sinks/CsvTableSink.html

Upsert in Dynamic Tables:

Upsert refers to the process of updating an existing record or inserting a new record if it does not exist. It is an everyday use case in real-time data processing where the data might need to be updated with new information. In Flink, this can be achieved using Dynamic Tables, which provide a flexible way to interact with stateful data streams and tables in Flink.

Here is an example of how to implement upsert in dynamic tables using PyFlink:

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
# create a StreamExecutionEnvironment and set the time characteristic to EventTime
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)
# register a dynamic table from the input stream with a unique key
t_env.connect(FileSystem().path("/tmp/sales_data.csv")) \\
.with_format(OldCsv().field("transaction_id", DataTypes.BIGINT())
.field("product", DataTypes.STRING())
.field("amount", DataTypes.DOUBLE())
.field("timestamp", DataTypes.TIMESTAMP())) \\
.with_schema(Schema().field("transaction_id", DataTypes.BIGINT())
.field("product", DataTypes.STRING())
.field("amount", DataTypes.DOUBLE())
.field("timestamp", DataTypes.TIMESTAMP())) \\
.create_temporary_table("sales_table")
# specify the updates using a SQL query
update_sql = "UPDATE sales_table SET amount = new_amount " \\
"FROM (SELECT transaction_id, SUM(amount) AS new_amount " \\
"FROM sales_table GROUP BY transaction_id)"
t_env.sql_update(update_sql)
# start the data processing and sink the result to a CSV file
t_env.execute("upsert_example")

In this example, we first create a StreamExecutionEnvironment and set the time characteristic to EventTime. Then, we create a StreamTableEnvironment and register a dynamic table from the input data stream using the connect method. Finally, the with_format method specifies the input data format, and the with_schema method defines the data schema.

Next, we specify the updates using a SQL query. In this case, we are updating the amount field of the sales_table by summing up the amounts for each transaction ID. Finally, the sql_update method is used to apply the updates to the dynamic table.

Finally, we start the data processing and sink the result to a CSV file using the execute method.

Stackademic

Thank you for reading until the end. Before you go:

  • Please consider clapping and following the writer! 👏
  • Follow us on Twitter(X), LinkedIn, and YouTube.
  • Visit Stackademic.com to find out more about how we are democratizing free programming education around the world.

Data Modeling 101: Modern Data Stack

What is Data Modeling?

Data modeling is the foundational process of creating a structured representation of data stored in a database. This representation, a data model, serves as a conceptual blueprint for data objects, their relationships, and the governing rules that ensure data integrity and consistency. Data modeling helps us define how data is organized, connected, and utilized within a database or data management system.

Common Data Modeling Approaches:

Normalized Modeling:

The normalized modeling approach, popularized by Bill Inmon, is focused on maintaining data integrity by eliminating redundancy. It involves creating a data warehouse that closely mirrors the structure of the source systems. While this approach ensures a single source of truth, it can lead to complex join operations and may not be ideal for modern column-based data warehouses.

Denormalized Modeling (Dimensional Modeling):

Ralph Kimball’s denormalized modeling, dimensional modeling, emphasizes simplicity and efficiency. It utilizes a star schema structure, which reduces the need for complex joins. Denormalized modeling is designed around business functions, making it well-suited for analytical reporting. It strikes a balance between data redundancy and query performance.

Data Vault Modeling:

The Data Vault modeling approach is complex and organized, dividing data into hubs, links, and satellites. It focuses on preserving raw data without compromising future transformations. While it is excellent for data storage and organization, a presentation layer is often required for analytical reporting, making it a comprehensive but intricate approach.

One Big Table (OBT) Modeling:

The OBT modeling approach takes advantage of modern storage and computational capabilities. It involves creating wide denormalized tables, minimizing the need for intermediate transformations. While this approach simplifies data modeling, it can increase computational costs and data redundancy, particularly as the organization scales.

Why is Data Modeling Important?

Now that we understand what data modeling entails, let’s explore why it holds such significance in data management and analytics.

Visual Representation and Rule Enforcement:

Data modeling provides a visual representation of data structures, making it easier for data professionals to understand and work with complex datasets. It also plays a crucial role in enforcing business rules, regulatory compliance, and government policies governing data usage. By translating these rules into the data model, organizations ensure that data is handled according to legal and operational standards.

Consistency and Quality Assurance:

Data models serve as a framework for maintaining consistency across various aspects of data management, such as naming conventions, default values, semantics, and security measures. This consistency is essential to ensure data quality and accuracy. A well-designed data model acts as a guardian, preventing inconsistencies and errors arising from ad-hoc data handling.

Facilitating Data Integration:

Organizations often deal with data from multiple sources in today’s data-rich landscape. Data modeling is pivotal in designing structures that enable seamless data integration. Whether you’re working with Power BI, other data visualization tools, or databases, data modeling ensures that data from different entities can be effectively combined and analyzed.

Things to Consider:

Organizational and Mental Clarity:

Regardless of the chosen data modeling approach, organizational clarity and mental clarity should remain paramount. A structured data modeling strategy provides a foundation for managing diverse data sources effectively and maintaining consistency throughout the data pipeline.

Embracing New Technologies:

Modern data technologies offer advanced storage and processing capabilities. Organizations should consider hybrid approaches that combine the best features of different data modeling methods to leverage the benefits of both simplicity and efficiency.

Supporting Data Consumers:

Data modeling should not cater solely to individual users or reporting tools. Consider a robust data mart layer to support various data consumption scenarios, ensuring that data remains accessible and usable by various stakeholders.

🌟 Enjoying my content? 🙏 Follow me here: Shanoj Kumar V

Stackademic

Thank you for reading until the end. Before you go:

  • Please consider clapping and following the writer! 👏
  • Follow us on Twitter(X), LinkedIn, and YouTube.
  • Visit Stackademic.com to find out more about how we are democratizing free programming education around the world.