Connecting External Data Sources and Sinks
Use CREATE TABLE
statements to connect external data sources and sinks with your SQRL script using the WITH
clause to provide connector configuration.
DataSQRL uses Apache Flink connectors and formats. To find a connector for your data system, use:
- The Official Apache Flink connectors for Kafka, Filesystem, Kinesis, and many more.
- DataSQRL provided connectors
- Safe Kafka Source Connectors which support dead-letter queues for faulty messages.
- Apache Flink CDC connectors for Postgres, MySQL, Oracle, SqlServer, and other databases.
Connector Managementโ
The best practice for managing connectors in your DataSQRL project is to create a folder for each system that you are
connecting to and place all source or sink CREATE TABLE
statements in separate files ending in .table.sql
in that folder.
You can then import from and export to those sources and sinks in the SQRL script.
For example, to ingest data from the User
and Transaction
topics of a Kafka cluster, you would:
- Create a sub-directory
kafka-sources
in your project directory that contains your SQRL script - Create two files
user.table.sql
andtransaction.table.sql
. - Each file contains a
CREATE TABLE
statement that defines columns for each field in the message and aWITH
clause that contains the connector configuration. They will look like this:CREATE TABLE User (
user_id BIGINT,
user_name STRING,
last_updated TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
WATERMARK FOR last_updated AS last_updated - INTERVAL '1' SECOND
WATERMARK
) WITH (
'connector' = 'kafka',
'topic' = 'user',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'user-consumer-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro',
); - Import those sources into your SQRL script with
IMPORT kafak-sources.User;
- Keep sources and sinks in separate folders (e.g.
kafka-sink
)
By following this structure, you modularize your sources and sinks from your processing logic which makes it easier to read and maintain.
External Schemasโ
When ingesting data from external systems, the schema is often defined in or by those systems. For example, Avro is a popular schema language for encoding messages in Kafka topics. It can be very cumbersome to convert that schema to SQL and maintain that translation.
With DataSQRL, you can easily create a table that fetches the schema from a given avro schema file.
Following the example above and assuming that the schema for the User
topic is user.avsc
,
and that file is placed next to the user.table.sql
file, then a LIKE <schema-file-path>
statement can be added
to the CREATE TABLE
statement, so in the user.table.sql
we only need to define the metadata, the watermark,
and the connector options:
CREATE TABLE User (
last_updated TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
WATERMARK FOR last_updated AS last_updated - INTERVAL '1' SECOND
WATERMARK
) WITH (
'connector' = 'kafka',
'topic' = 'user',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'user-consumer-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro',
) LIKE `user.avsc`;
We can even include files from other folder via relative path, but in most cases it makes sense to put the schema file next to the table sql.