Deep Dive: How DataSQRL Works
The DataSQRL Compiler executes the following steps:
- Read Configuration: Read and combine all package.json configuration files to initialize the configuration for the compiler
- Build Project: The packager builds the project structure in
build/
directory. - Parse Scripts: The parser reads the main SQRL script in the
build/
directory and resolves allIMPORT
andEXPORT
statements locally against the folder structure in thebuild
directory. - Create Logical Plans: The parser converts all statements to logical plans. Most of the work is delegated to the FlinkSQL parser and Apache Calcite. The parser detects the language elements SQRL adds and converts them.
- Analyze Logical Plans: The logical plan analyzer validates each statement and extract information needed for planning.
- Build DAG: The DAG Planner combines all logical statement plans into a processing DAG that defines the flow of data from sources to sinks.
- Optimize DAG: The DAG Planner optimizes the DAG and assigns each statement to an engine for execution.
- Generate Physical Plans: The Physical Planner generates deployment assets for each engine and connector configuration to move data between engines.
- Write Deployment Artifacts: The deployment artifacts are written to the
build/deploy
folder with the engine plans inbuild/deploy/plan
.
The DataSQRL run command executes all compilation steps above and:
- Launch: Launches all engines in docker
- Deploy: Deploys the deployment assets to the engines, e.g. installs the database schema, passes the GraphQL execution plan to Vert.x, creates topics in RedPanda, and executes the compiled plan in Flink.
- Runs: Runs and monitors the engines as they execute the pipeline.
The running data pipeline and the individual engines running each component are accessible locally via the mapped ports.
The DataSQRL test command executes all compilation and run steps above and:
- Subscriptions: Installs subscription queries to listen for test results (if any)
- Mutations: Runs the mutation queries against the API in order (if any) and snapshots the results.
- Waits for the configured interval, number of checkpoints, or Flink job completion based on configuration.
- Queries: Runs the queries against the API to snapshot the results.
- Snapshots: Snapshots all subscription results in string order.
Architectureโ
DataSQRL supports a pluggable engine architecture. A data pipeline or microservice consists of multiple stages and each stage is executed by an engine. For example, a data pipeline may consist of a stream processing, storage, and serving stage which are executed by Apache Flink, PostgreSQL, and Vert.x, respectively.
DataSQRL supports the following types of stages:
- Stream Processing: For processing data as it is ingested
- Log: For moving data between stages reliably
- Apache Kafka
- RedPanda
- Apache Kafka-compatible (e.g. Azure EventHub)
- Database: For storage and querying data
- PostgreSQL
- Apache Iceberg
- Yugabyte
- DuckDB
- Snowflake
- PostgreSQL wire compatible (e.g. CockroachDB)
- Server: For returning data through an API upon request
- Cache: For caching data on the server (coming soon)
Currently, DataSQRL is closely tied to Flink as the stream processing engine. The other engines are modular, making it simple to add additional engines.
A data pipeline topology is a sequence of stages. A pipeline topology may contain multiple stages of the same type (e.g. two different database stages). An engine is what executes the deployment assets for a given stage. For example, the FlinkSQL generated by the compiler as part of the deployment assets for the "stream" stage is executed by the Flink engine.
The pipeline topology as well as other compiler configuration options are
specified in a json configuration file typically called package.json
.
The configuration documentation lists all the configuration options.
Planner Componentsโ
The planner parses a SQRL script, i.e. a sequence of SQL statements, analyzes the statements, constructs a data processing DAG, optimizes the DAG, and finally produces deployment assets for the engines executing the data processing steps.
The planner consists of the following components.
Packagerโ
The Packager populates the build/
directory with scripts and folders based on the local folder structure and the dependency mapping defined in the configuration file.
As part of this process, the packager executes all registered Preprocessors which prepare local data or copy elements into the build directory.
In addition, the packager executes the following special purpose preprocessors:
- The Discovery preprocessor analyzes any
.csv
or.jsonl
file in the local directory tree, determines the schema, and creates a table configuration. - The UDF preprocessor extracts UDF definitions from provided jar files.
- The static data preprocessors copies
.csv
and.jsonl
files into a consolidated data directory for Flink to read at runtime. This requires that filenames for static data files are unique.
Preprocessors are internal to DataSQRL and can be extended within the framework.
Parserโ
The parser is the first stage of the compiler. The parser parses the SQRL script into a logical plan by pre-processing any SQRL specific syntax and then passing the result to the FlinkSQL parser to produces a logical plan.
The parser resolves imports against the build directory using module loaders that retrieve dependencies. It maintains a schema of all defined tables in a SQRL script.
The transpiler is build on top of Apache Calcite by way of FlinkSQL for all SQL handling. It prepares the statements that are analyzed and planned by the planner.
Logical Plan Analyzerโ
The logical plan analyzer is the second stage of the compiler. It takes the logical plan produced by the transpiler for each table or function defined in the SQRL script and analyzes the logical plan to extract a TableAnalysis that contains information needed by the planner.
- It keeps track of important metadata like timestamps, primary keys, sort orders, etc
- It analyzes the SQL to identify potential issues, semantic inconsistencies, or optimization potential and produces warnings or notices.
- It extracts cost information for the optimizer.
DAG Plannerโ
The DAG planner takes all the individual table and function definitions and assembles them into a data processing DAG (directed acyclic graph). It prunes the DAG and rewrites the DAG before optimizing the DAG to assign each node (i.e. table or function) to a stage in the pipeline.
The optimizer uses a cost model and is constrained to produce only viable pipelines.
At the end of the DAG planning process, each table or function defined in the SQRL script is assigned to a stage in the pipeline.
Physical Plannerโ
All the tables in a given stage are then passed to the stage engine's physical planner which produces the physical plan for the engine that has been configured to execute that stage.
The physical plan assets produced depend on the engine:
- Apache Flink: A FlinkSQL script and compiled plan which contains the generated connector configuration
- Postgres: A SQL schema for the tables and index structure as well as a set of SQL view and query definitions.
- Kafka: A list of topics with configuration
- GraphQL: A GraphQL schema and query execution plan
Physical planning can contain additional optimization such as selecting optimal index structures for database tables.
An important step in generating the physical plan is generating the connector configuration between engines. When two adjacent nodes in the DAG are assigned to different engines for execution, we consider this a "cut" in the DAG since it cuts the DAG into multiple sub-graphs -- one for each engine. To move data between engines at the cut points (i.e. the edges that connect the respective nodes), a connection needs to be established.
Connector configuration is generated for the stream processing engine (Apache Flink) and the server engine (Vert.x) to connect to the database, filesystem, streaming platform, etc. The connector configuration is determined by the physical planner based on the logical plan analysis above and instantiated in connector templates that are configured in the configuration file.
The physical planner is also responsible for generating the API schemas (e.g. GraphQL schema) for the exposed API if the pipeline contains a server engine. Optionally, the user may provide the API schema in which case the physical planner validates the schema and maps it to the SQRL script.
The physical plans are then written out as deployment artifacts to the build/deploy
directory.