-- Ingest data from connected systems
IMPORT banking_data.*;
-- Enrich debit transactions with creditor information using time-consistent join
SpendingTransactions :=
SELECT
t.*,
h.name AS creditor_name,
h.type AS creditor_type
FROM Transactions t
JOIN Accounts FOR SYSTEM_TIME AS OF t.tx_time a
ON t.credit_account_id = a.account_id
JOIN AccountHolders FOR SYSTEM_TIME AS OF t.tx_time h
ON a.holder_id = h.holder_id;
-- Create secure MCP tooling endpoint with description for agentic retrieval
/** Retrieve spending transactions within the given time-range.
from_time (inclusive) and to_time (exclusive) must be RFC-3339 compliant date time.
*/
SpendingTransactionsByTime(
account_id STRING NOT NULL METADATA FROM 'auth.accountId',
from_time TIMESTAMP NOT NULL,
to_time TIMESTAMP NOT NULL
) :=
SELECT * FROM SpendingTransactions
WHERE debit_account_id = :account_id
AND :from_time <= tx_time
AND :to_time > tx_time
ORDER BY tx_time DESC;
SQL: The Logical Layer for Agent Output
SQL is ideal for AI-generated data pipelines: its declarative nature enables deep introspection by the compiler, most LLMs are well-trained on SQL syntax, and humans can easily verify agent output.
The relational algebra foundation provides mathematical rigor for deterministic validation and optimization that agents can rely on.

Physical Layer Optimization
The harness handles what agents struggle with: mapping logical operations to physical engines. A cost-based optimizer assigns computations to Flink, Kafka, Postgres, or Iceberg while respecting capability constraints.
Schema alignment, data type mapping, and connector configuration are generated deterministically—eliminating subtle bugs that probabilistic generation introduces.
/*+test */
EnrichedTransactionsTest :=
SELECT debit_holder_name,
COUNT(*) AS debit_tx_count,
SUM(amount) AS total_debit_amount
FROM EnrichedTransactions
GROUP BY debit_holder_name ORDER BY debit_holder_name ASC;
Simulation for Real-World Feedback
Agents need feedback beyond static validation. The simulator executes pipelines locally with timestamp-accurate event replay, providing real-world results that drive iterative refinement.
100% reproducibility means agents can test edge cases—late data, race conditions, schema changes—that only occur rarely in production.

Deterministic Artifact Generation
The transpiler generates deployment artifacts from the optimized DAG: Flink plans, Kafka topics, Postgres schemas, GraphQL models. Deterministic generation means consistent results regardless of how many iterations the agent runs.
Agents focus on business logic while the harness handles the complex plumbing that would otherwise introduce data inconsistencies.
IMPORT stdlib.openai.*;
ContentEmbedding :=
SELECT
vector_embedd(text, 'text-embedding-3-small') AS embedding,
completions(concat('Summarize:', text), 'gpt-4o') AS summary
FROM Content;
AI-Native Data Processing
Built-in functions for vector embeddings, LLM invocation, and ML model inference. Agents can generate pipelines that incorporate AI capabilities without needing to understand the underlying integration complexity.

Non-Functional Requirements Built In
The harness encodes requirements that agents struggle with: scalability through proper partitioning, reliability through proven technologies like Flink and Kafka, and consistency through exactly-once semantics.
Agent-generated code benefits from production-grade infrastructure without needing to reason about distributed systems complexity.
-- Create a relationship between holder and accounts
AccountHolders.accounts(status STRING) :=
SELECT * FROM Accounts a
WHERE a.holder_id = this.holder_id AND a.status = :status
ORDER BY a.account_type ASC;
-- Link accounts with spending transactions
Accounts.spendingTransactions(since TIMESTAMP NOT NULL) :=
SELECT * FROM SpendingTransactions t
WHERE t.debit_account_id = this.account_id AND :since <= tx_time
ORDER BY tx_time DESC;
Serving Layer for Data APIs
Table functions and relationships extend the logical layer to support data serving. The harness maps these to GraphQL, REST, and MCP endpoints automatically.
Agents define data access patterns in SQL; the harness generates the API schema and query mappings with proper validation.
Transactions(account_id STRING METADATA FROM 'auth.acct_id') :=
SELECT * FROM SpendingTransactions
WHERE debit_account_id = :account_id
ORDER BY tx_time DESC;
Security Guardrails
The harness enforces security patterns that agents might overlook: JWT authentication, parameterized queries that prevent injection attacks, and fine-grained authorization.
Agent-generated code inherits these protections through the transpilation process, not through prompting the agent to remember security best practices.

Validation Through Introspection
The compiler produces detailed representations of the computational DAG: table types, execution stages, inferred keys, timestamps, and complete schemas. This output feeds back to agents for iterative refinement.
Data lineage tracking provides governance compliance without agent intervention.
# Agent compiles and gets validation feedback
docker run --rm -v $PWD:/build \
datasqrl/cmd compile pipeline.sqrl;
# Agent runs tests and gets real-world feedback
docker run --rm -v $PWD:/build \
datasqrl/cmd test pipeline.sqrl;
# Agent iterates until tests pass, then deploys
docker run --rm -v $PWD:/build \
datasqrl/cmd compile pipeline.sqrl;
# Deployment artifacts ready for K8s or cloud
(cd build/deploy/plan; ls)
Agentic Workflow Integration
Simple CLI commands that agents invoke in iterative loops: compile for validation feedback, test for simulation results, compile again after refinement.
Each command produces structured output that agents consume to improve the pipeline toward production requirements.

Flexible Deployment
The same artifacts that run in local simulation deploy to Kubernetes or cloud-managed services. Agents don't need to reason about deployment targets—the harness abstracts that complexity.
Production telemetry hooks correlate runtime behavior back to source code for autonomous troubleshooting.
CREATE TABLE Transactions (
`timestamp` TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
WATERMARK FOR `timestamp` AS `timestamp`
) WITH (
'connector' = 'kafka',
'topic' = 'indicators',
'properties.bootstrap.servers' = '${BOOTSTRAP_SERVERS}',
'properties.group.id' = 'mygroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'flexible-json'
);
Connector Configuration
Agents generate connector configurations for Kafka, databases, data lakes, and APIs. The harness validates configuration parameters and handles the integration complexity.
Schema discovery and alignment happen automatically during compilation.

Extensible Open-Source Framework
DataSQRL compiles to proven open-source technologies: Flink, Kafka, Postgres, Iceberg. The framework is extensible—add custom functions, connectors, or execution engines to build a harness tailored to your organization.
Encode domain-specific knowledge into the harness so agents benefit from it automatically.
-- Deduplicate an update stream to a stateful table
Accounts := DISTINCT AccountsCDC ON account_id ORDER BY update_time DESC;
-- Join transactions with accounts at the time of the transaction consistently
SpendingTransactions :=
SELECT t.*,
h.name AS creditor_name,
FROM Transactions t JOIN Accounts FOR SYSTEM_TIME AS OF t.tx_time a
ON t.credit_account_id=a.account_id;
-- Aggregate over tumbling time windows
SpendingByWeek := SELECT
account_id,
type,
window_start AS week,
SUM(amount) AS total_spending
FROM TABLE(TUMBLE(
TABLE SpendingTransactions,
DESCRIPTOR(tx_time),
INTERVAL '1' DAY
))
GROUP BY debit_account_id, type, window_start, window_end;
Complex Transformations, Simple Syntax
CDC deduplication, temporal joins, windowed aggregations—expressed concisely in SQL. Agents reason about business logic using familiar syntax while the harness handles the complex stream processing semantics.
Custom UDFs extend the vocabulary when SQL alone isn't enough.
-- Compute enriched transaction to Iceberg with partition
/*+engine(iceberg), partition_key(credit_holder_type) */
EnrichedTransactions := SELECT
t.*,
hc.name AS credit_holder_name,
FROM Transactions t JOIN AccountHolders hc
ON t.credit_holder_id = hc.holder_id;
Agent Hints as Constraints
Agents provide hints to guide the optimizer: force specific engine assignments, set partition keys, or configure execution parameters. The optimizer respects these constraints while ensuring consistency.
Humans can inject domain knowledge through hints that agents then propagate.
See the Harness in Action
Watch a coding agent use DataSQRL to build a complete data pipeline—from initial SQL through iterative refinement to production deployment.
The harness guides the agent at every step, providing the feedback needed to produce pipelines that actually work in production.
Get StartedLearn More