Skip to main content

4 posts tagged with "Flink"

Apache Flink related posts

View All Tags

Avoiding Duplicate Processing in Flink SQL Streaming Jobs

· 6 min read
Ferenc Csaky
Apache Flink PMC
Matthias Broecheler
CEO of DataSQRL

Flink SQL is a powerful abstraction layer that unifies batch and stream processing over semi-structured data. It extends the widely used SQL language with streaming constructs such as tumbling windows, session windows, and, more recently, process table functions. This enables non-experts in streaming technologies to express complex real-time data processing logic succinctly.

As a result, Flink SQL significantly lowers the barrier to entry for building real-time data systems.

However, developing streaming applications differs fundamentally from traditional SQL query processing. One key difference is that streaming jobs often have multiple sinks populated by a single pipeline, sharing large portions of common data processing logic.

While Flink SQL provides mechanisms to express this concisely—using views and statement sets—in practice, this often results in duplicate processing in the generated job graph.

The Core Problem: Why Duplication Happens

In Flink SQL, each sink maps to its own relational tree. These trees are:

  1. Optimized individually.
  2. Combined afterward into a single job graph.

By the time they are merged, the query optimization has introduced subtle difference in the shared data processing which render the subgraph merging ineffective. As a result, common processing logic, such as joins, gets duplicated.

Ironically, common SQL optimization techniques like predicate pushdown and projection pruning can make this worse in streaming contexts. While these optimizations are beneficial in traditional query processing (because they avoid computing unused data), they can fragment pipelines in streaming jobs and prevent subgraph sharing.

Example: Clickstream Enrichment with Two Aggregations

Consider a streaming application that:

  1. Ingests ad click events.
  2. Enriches them with ad metadata.
  3. Produces two separate aggregations.

We perform a temporal join between the clickstream and the ad inventory to enrich each click event with ad metadata.

After enrichment, we compute two aggregations:

  • Hourly tumbling window that counts number of clicks per ad category.
  • Daily tumbling window that counts clicks per advertiser.

Both results are written to a PostgreSQL database for querying.

Below is a simplified Flink SQL script that implements our clickstream aggregation.

-- Clickstream source
CREATE TABLE Clickstream (
user_id STRING,
event_time TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
ad_id STRING,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka', ...
);

-- Ad inventory source
CREATE TABLE AdInventory (
ad_id STRING,
category STRING,
advertiser STRING,
launch_date TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
PRIMARY KEY (ad_id) NOT ENFORCED,
WATERMARK FOR launch_date AS launch_date - INTERVAL '5' SECOND
) WITH (
'connector' = 'upsert-kafka', ...
);

-- Enrich clickstream with ads
CREATE VIEW EnrichedClicks AS
SELECT
c.user_id,
c.event_time,
c.ad_id,
a.category,
a.advertiser
FROM Clickstream AS c
LEFT JOIN AdInventory FOR SYSTEM_TIME AS OF c.event_time AS a
ON c.ad_id = a.ad_id;

BEGIN STATEMENT SET;

-- Hourly tumbling window: clicks per category
INSERT INTO hourly_category_clicks
SELECT
category,
window_start,
COUNT(*) AS click_count
FROM TABLE(
TUMBLE(
TABLE EnrichedClicks,
DESCRIPTOR(event_time),
INTERVAL '1' HOUR
)
)
GROUP BY
category,
window_start;

-- Daily tumbling window: clicks per advertiser
INSERT INTO daily_advertiser_clicks
SELECT
advertiser,
window_start,
COUNT(*) AS click_count
FROM TABLE(
TUMBLE(
TABLE EnrichedClicks,
DESCRIPTOR(event_time),
INTERVAL '1' DAY
)
)
GROUP BY
advertiser,
window_start;

END;

Why Does this Cause Duplicate Processing?

When running this Flink SQL script, the generated job graph duplicates the temporal join.

Duplicate Temporal Join

Why? Because the two aggregations needs slightly different data: The hourly aggregation needs category. The daily aggregation needs advertiser. Thus, predicate pushdown and projection pruning cause the optimizer to generate slightly different pipelines for each sink. As a result, the temporal join is executed twice.

In traditional SQL systems, this behavior is desirable because it avoids processing unnecessary fields when you submit a query. In streaming systems, however, this approach is suboptimal because it considers each sink in isolation and not the overall data processing of the entire streaming job.

For our simple application, it is much more efficient to compute the temporal join once and enrich the clickstream with all the ad metadata we need downstream. That eliminates an operator and redundant data processing. More importantly, it cuts the number of state requests to RocksDB in half which is the primary bottleneck for this job.

Apache Flink already provides two important building blocks to eliminate this wasteful duplication:

  • Subgraph elimination within the physical RelNode graph to remove duplicate processing.
  • Compile plans, which generate a static artifact representing the generated job graph from Flink SQL.

The compile plan is especially useful because it allows validation of the generated job graph at compile time and assigns stable operator IDs. That's important for job evolution by preserving state mappings across job changes or Flink version upgrades.

Selective Rule Control in Calcite

The issue lies in how the Calcite optimizer applies certain optimization rules, particularly those related to projection pruning and filter pushdown. These are the most likely culprits for causing minor differences in the optimized versions of shared logical plans.

In many real-world streaming scenarios with multiple sinks, these rules prevent effective subgraph elimination because they cause subtle difference in the RelNode graph whereas the subgraph elimination requires strict equality.

The solution is to selectively disable specific Calcite rules so that intermediate views do not get optimized for each RelNode tree and remain identical. Identical RelNode trees are then removed during the subgraph elimination phase of the Flink SQL optimizer.

Duplicate Temporal Join

How Do I use this?

An easy way to remove job graph duplication is to use the DataSQRL compiler and configure the following in your project's package.json:

  "compiler": {
"predicate-pushdown-rules": "LIMITED_RULES"
}

This ensures only a limited set of Calcite rules are applied during the compiled plan optimization. DataSQRL is a data automation framework that compiles Flink SQL to data pipelines and it can compile your Flink SQL to a compiled plan with a single command you execute locally. For more information, check out the getting started tutorial.

As an alternative, you can selectively disable Calcite rules in your own instrumentation framework for Flink. Check out this code snippet to see what rules we are disabling. We highly recommend that you produce a compiled plan for your Flink SQL jobs for introspection and predictability. You can use the open-source Flink SQL Runner to execute compiled plans.

In effect, it allows you to define a streaming-specific optimization ruleset, rather than relying solely on optimizations designed primarily for traditional query workloads.

What's Next?

While the ruleset tweaks described above address most cases of subgraph duplication in streaming Flink SQL, we still have some more work to do for certain edge cases.

In particular, SQL constructs that introduce correlation variables (e.g. UNNEST) into the Calcite logical plan do not get deduplicated yet because correlation variable have a static counter that makes each variable unique.

We implemented a normalization algorithm for correlation variables and are looking for ways to contribute it directly to the Apache Flink project.

Flink SQL Runner: Run Flink SQL Without JARs or Glue Code

· 2 min read
Matthias Broecheler
CEO of DataSQRL

Apache Flink has long been a powerhouse for streaming and batch data processing. And with the rise of Flink SQL, developers can now build sophisticated pipelines using a declarative language they already know. But getting Flink SQL applications into production still comes with friction: packaging JARs, managing connectors, injecting secrets, and wiring up deployment infrastructure.

FlinkSQL Runner >

Flink SQL Runner is here to change that. It's an open-source toolkit that simplifies development, deployment, and operation of Flink SQL applications—locally or in Kubernetes—without manual JAR assembly or scripting custom infrastructure pipelines.

Defining Data Interfaces with FlinkSQL

· 4 min read
Matthias Broecheler
CEO of DataSQRL

FlinkSQL is an amazing innovation in data processing: it packages the power of realtime stream processing within the simplicity of SQL. That means you can start with the SQL you know and introduce stream processing constructs as you need them.

FlinkSQL API Extension >

FlinkSQL adds the ability to process data incrementally to the classic set-based semantics of SQL. In addition, FlinkSQL supports source and sink connectors making it easy to ingest data from and move data to other systems. That's a powerful combination which covers a lot of data processing use cases.

In fact, it only takes a few extensions to FlinkSQL to build entire data applications. Let's see how that works.

Building Data APIs with FlinkSQL

CREATE TABLE UserTokens (
userid BIGINT NOT NULL,
tokens BIGINT NOT NULL,
request_time TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp'
);

/*+query_by_all(userid) */
TotalUserTokens := SELECT userid, sum(tokens) as total_tokens,
count(tokens) as total_requests
FROM UserTokens GROUP BY userid;

UserTokensByTime(userid BIGINT NOT NULL, fromTime TIMESTAMP NOT NULL, toTime TIMESTAMP NOT NULL):=
SELECT * FROM UserTokens WHERE userid = :userid,
request_time >= :fromTime AND request_time < :toTime ORDER BY request_time DESC;

UsageAlert := SUBSCRIBE SELECT * FROM UserTokens WHERE tokens > 100000;

This script defines a sequence of tables. We introduce := as syntactic sugar for the verbose CREATE TEMPORARY VIEW syntax.

The UserTokens table does not have a configured connector, which mean we treat it as an API mutation endpoint connected to Flink via a Kafka topic that captures the events. This makes it easy to build APIs that capture user activity, transactions, or other types of events.

Why Temporal Join is Stream Processing’s Superpower

· 8 min read
Matthias Broecheler
CEO of DataSQRL

Stream processing technologies like Apache Flink introduce a new type of data transformation that’s very powerful: the temporal join. Temporal joins add context to data streams while being efficient and fast to execute.

Temporal Join >

This article introduces the temporal join, compares it to the traditional inner join, explains when to use it, and why it is a secret superpower.

Table of Contents: