Deploy Flink SQL Statements with dbt and Confluent Cloud for Apache Flink
dbt (data build tool) is a transformation framework that enables data teams to define, test, and document data transformations using SQL. The dbt-confluent adapter enables you to use dbt to manage your Flink SQL transformations in Confluent Cloud for Apache Flink®.
If your team already uses dbt for analytics workflows, the dbt-confluent adapter enables bringing the same development patterns — models, tests, documentation, and CI/CD — to your streaming transformations on Confluent Cloud.
In this walkthrough, you perform the following steps:
After the walkthrough, you can learn more about:
Prerequisites
You need the following prerequisites to complete this tutorial:
A Flink compute pool in your Confluent Cloud environment
A Flink API key for a service account with appropriate RBAC permissions
Your Confluent Cloud organization ID, environment ID, and compute pool ID
Python 3.10+ installed
dbt Core 1.10 or later installed
Step 1: Install the dbt-confluent adapter
Install the dbt-confluent adapter using pip:
pip install dbt-confluent
Verify the installation:
dbt --version
The output should list confluent as an installed adapter.
Step 2: Set up a dbt project
Create a new dbt project:
dbt init my_flink_project
When prompted, select
confluentas the database adapter.Configure your connection profile. Open the
profiles.ymlfile (typically at~/.dbt/profiles.yml) and add the following configuration:my_flink_project: target: dev outputs: dev: type: confluent cloud_provider: <your-cloud-provider> cloud_region: <your-region> organization_id: <your-organization-id> environment_id: <your-environment-id> compute_pool_id: <your-compute-pool-id> flink_api_key: <your-flink-api-key> flink_api_secret: <your-flink-api-secret> dbname: <your-kafka-cluster-name> threads: 1
Tip
For production deployments, use environment variables instead of hardcoding credentials:
flink_api_key: "{{ env_var('CONFLUENT_FLINK_API_KEY') }}" flink_api_secret: "{{ env_var('CONFLUENT_FLINK_API_SECRET') }}"
Verify the connection:
dbt debugIf the connection is successful, you see a
All checks passed!message.
The project structure for Flink SQL transformations is:
my_flink_project/
├── dbt_project.yml # Project configuration
├── profiles.yml # Connection profiles (or use ~/.dbt/)
├── models/
│ ├── staging/ # Source-aligned transformations
│ │ └── stg_orders.sql
│ ├── intermediates/ # Business logic transformations
│ │ └── int_order_totals.sql
│ └── marts/ # Final output tables
│ ├── fct_revenue.sql
│ └── schema.yml # Model documentation and tests
├── macros/ # Custom macros
├── tests/ # Custom data tests
└── packages.yml # dbt package dependencies
Supported materializations
The dbt-confluent adapter supports the following materializations:
Materialization | Description |
|---|---|
| Creates a Flink SQL view. This is the default materialization. |
| Creates a table using |
| Creates a materialized view that Flink continuously maintains. |
| Creates a streaming table with changelog semantics for streaming insert operations. |
| Creates a source table backed by a connector. Requires a |
Note
The incremental materialization is not supported by the dbt-confluent adapter. Use streaming_table for continuously updated results.
Create connector-backed sources with streaming_source
The streaming_source materialization creates a Flink SQL table backed by a connector, such as the faker connector for generating test data. Unlike other materializations where the model SQL is a SELECT statement, a streaming_source model defines the table’s column schema. The connector populates the table with data automatically.
For example, a faker source that generates sample order events:
-- models/sources/src_orders.sql
{{ config(
materialized='streaming_source',
connector='faker',
with={
'rows-per-second': '1',
'number-of-rows': '100',
}
) }}
order_id BIGINT,
price DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
PRIMARY KEY(`order_id`) NOT ENFORCED
Downstream models can reference the source with {{ ref('src_orders') }} like any other model. The streaming_source materialization requires --full-refresh to replace an existing table.
For available connectors and options, see the Flink SQL CREATE TABLE documentation.
Configure materializations in your dbt_project.yml:
models:
my_flink_project:
staging:
+materialized: view
+schema: my_kafka_cluster
intermediates:
+materialized: materialized_view
+schema: my_kafka_cluster
marts:
+materialized: materialized_view
+schema: my_kafka_cluster
Step 3: Write dbt models
Each dbt model corresponds to a Flink SQL statement. Create a model file in the models/ directory.
For example, create models/staging/stg_orders.sql to select from an Kafka topic in the examples sample catalog:
SELECT
`order_id`,
`customer_id`,
`product_id`,
`price`,
`$rowtime` AS order_time
FROM `examples`.`marketplace`.`orders`
Create models/marts/fct_revenue.sql to aggregate order data using a tumbling window. Use {{ ref() }} to reference other dbt models:
SELECT
window_start,
window_end,
SUM(price) AS total_revenue,
COUNT(*) AS order_count
FROM TABLE(
TUMBLE(TABLE {{ ref('stg_orders') }}, DESCRIPTOR(order_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end
Note
For tables in the current catalog and database (set by environment_id and dbname in your profile), you can use {{ ref() }} to reference other dbt models. For source tables in other catalogs, use fully qualified three-part names (catalog.database.table) directly in your SQL.
Flink SQL uses backtick-quoted identifiers. Use backticks around column names that contain special characters, like the $rowtime system column.
Step 4: Run your models
Deploy your Flink SQL statements to Confluent Cloud by running your dbt models:
dbt run
This command submits each model as a Flink SQL statement to your compute pool. You can verify that the statements are running in the Cloud Console or with the Confluent CLI.
To run a specific model:
dbt run --select stg_orders
To run all models in a specific directory:
dbt run --select staging.*
Step 5: Test your models
The dbt-confluent adapter supports both unit tests for verifying model logic and data tests for validating data quality. Run all tests before deploying to production.
Write unit tests
Unit tests validate your model logic by providing mock input data and comparing the output against expected results. Define unit tests in a schema.yml file alongside your models.
For example, to test the stg_orders model, create models/staging/schema.yml:
unit_tests:
- name: test_stg_orders
model: stg_orders
given:
- input: source('marketplace', 'orders')
rows:
- order_id: 1
customer_id: 100
product_id: 10
price: 29.99
order_time: '2024-01-15 10:00:00'
- order_id: 2
customer_id: 101
product_id: 11
price: 49.99
order_time: '2024-01-15 10:05:00'
expect:
rows:
- order_id: 1
customer_id: 100
product_id: 10
price: 29.99
order_time: '2024-01-15 10:00:00'
- order_id: 2
customer_id: 101
product_id: 11
price: 49.99
order_time: '2024-01-15 10:05:00'
Run unit tests:
dbt test --select "test_type:unit"
Under the hood, the dbt-confluent adapter creates temporary tables on Confluent Cloud using CREATE TABLE ... LIKE, inserts the fixture data, runs your model SQL against those tables, and compares the actual output to your expected rows. Temporary tables are cleaned up automatically after the test completes.
Write data tests
Data tests validate the quality of data produced by your models. Define column tests in a schema.yml file:
models:
- name: stg_orders
columns:
- name: order_id
data_type: bigint
tests:
- not_null
- unique
- name: price
data_type: decimal(10,2)
tests:
- not_null
Run data tests:
dbt test
You can also write custom data tests as SQL files in the tests/ directory. A test passes if the query returns zero rows:
-- tests/assert_positive_prices.sql
SELECT order_id, price
FROM {{ ref('stg_orders') }}
WHERE price <= 0
Note
In Flink SQL streaming mode, count(*) over an empty result set returns zero rows instead of one row with value 0. The dbt-confluent adapter handles this automatically for test assertions.
Step 6: Automate with CI/CD
You can integrate dbt with a CI/CD system to automate deployments. The following example shows a GitHub Actions workflow that runs your dbt models when changes are pushed to the main branch.
on:
push:
branches:
- main
jobs:
dbt_deploy:
name: "Deploy Flink SQL with dbt"
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install dbt-confluent
- name: Run unit tests
run: dbt test --select "test_type:unit" --profiles-dir .
env:
CONFLUENT_FLINK_API_KEY: ${{ secrets.CONFLUENT_FLINK_API_KEY }}
CONFLUENT_FLINK_API_SECRET: ${{ secrets.CONFLUENT_FLINK_API_SECRET }}
- name: Deploy models
run: dbt run --profiles-dir .
env:
CONFLUENT_FLINK_API_KEY: ${{ secrets.CONFLUENT_FLINK_API_KEY }}
CONFLUENT_FLINK_API_SECRET: ${{ secrets.CONFLUENT_FLINK_API_SECRET }}
Store your Flink API key and secret as GitHub Action Secrets in your repository settings.
Multi-environment deployments
To deploy across development, staging, and production environments, define separate dbt targets in your profiles.yml:
my_flink_project:
target: dev
outputs:
dev:
type: confluent
cloud_provider: <your-cloud-provider>
cloud_region: <your-region>
organization_id: <your-organization-id>
environment_id: <dev-environment-id>
compute_pool_id: <dev-compute-pool-id>
flink_api_key: "{{ env_var('DEV_CONFLUENT_FLINK_API_KEY') }}"
flink_api_secret: "{{ env_var('DEV_CONFLUENT_FLINK_API_SECRET') }}"
dbname: <dev-kafka-cluster-name>
threads: 1
prod:
type: confluent
cloud_provider: <your-cloud-provider>
cloud_region: <your-region>
organization_id: <your-organization-id>
environment_id: <prod-environment-id>
compute_pool_id: <prod-compute-pool-id>
flink_api_key: "{{ env_var('PROD_CONFLUENT_FLINK_API_KEY') }}"
flink_api_secret: "{{ env_var('PROD_CONFLUENT_FLINK_API_SECRET') }}"
dbname: <prod-kafka-cluster-name>
threads: 4
Run against a specific target:
dbt run --target prod
Manage pipeline dependencies
In streaming pipelines, Flink SQL statements form a directed acyclic graph (DAG) of dependencies. A fact table depends on intermediate tables, which depend on source tables. dbt manages this automatically through the {{ ref() }} function.
How ref() manages deployment order
When you use {{ ref('model_name') }} in your SQL, dbt:
Builds a dependency graph of all models based on their
ref()calls.Deploys models in topological order, so that upstream models are created before downstream models that depend on them.
Resolves the correct fully qualified table name for the target environment.
For example, if fct_revenue references stg_orders, running dbt run deploys stg_orders first, then fct_revenue.
Understand downstream impact
Before changing a model, understand which downstream models depend on it. Use dbt ls to list a model and all its downstream dependents:
# List all models downstream of stg_orders
dbt ls --select stg_orders+
# List all models upstream of fct_revenue
dbt ls --select +fct_revenue
# List a model and everything upstream and downstream
dbt ls --select +stg_orders+
Use dbt docs to visualize your pipeline DAG and explore dependencies interactively:
dbt docs generate
dbt docs serve
This opens a browser with an interactive lineage graph showing how models connect to each other. Use this to assess the blast radius of changes and plan deployments.
Selective deployment
Deploy only a model and its dependencies using the --select flag with graph operators:
# Deploy stg_orders and everything downstream
dbt run --select stg_orders+
# Deploy fct_revenue and all its upstream dependencies
dbt run --select +fct_revenue
# Deploy only models that have changed compared to the last run
dbt run --select state:modified --defer --state target/
Manage running statements
Streaming Flink SQL statements run continuously, so deploying changes requires consideration of statement state and data continuity.
Understand materialization behavior
Each materialization type handles existing relations differently during dbt run:
Materialization | Stateful | Redeployment behavior |
|---|---|---|
| No | Drops and recreates the view. No state to preserve. |
| No | Drops and recreates with |
| Yes | Drops and recreates. Flink rebuilds the materialized view from source data. |
| Yes | Requires |
| Yes | Requires |
Use --full-refresh to force redeployment of stateful materializations:
dbt run --full-refresh
Important
Redeploying stateful materializations (streaming_table, streaming_source, materialized_view) drops and recreates the underlying Kafka topics. This means historical data in those topics is lost, and the statement starts processing from the beginning. Plan full refreshes during maintenance windows and coordinate with downstream consumers.
Configure streaming table options
Use the with config to set Flink SQL table options on streaming_table models:
-- models/intermediates/int_order_totals.sql
{{ config(
materialized='streaming_table',
with={
'changelog.mode': 'upsert',
}
) }}
SELECT
customer_id,
SUM(price) AS total_spent,
COUNT(*) AS order_count
FROM {{ ref('stg_orders') }}
GROUP BY customer_id
Handle schema evolution
When you change the columns in a model’s SELECT statement, consider the impact on downstream models and Schema Registry compatibility:
Use
FULL_TRANSITIVEschema compatibility to prevent breaking changes.Changes to source schemas may require redeploying dependent statements. Use
dbt ls --select model_name+to identify affected downstream models.For comprehensive guidance on how schema changes affect running statements, see Schema and Statement Evolution with Confluent Cloud for Apache Flink.
For stateless statements, you can carry over offsets from a previous statement version to avoid reprocessing data. See Carry-over Offsets in Confluent Cloud for Apache Flink.
Limitations
The dbt-confluent adapter has the following limitations:
No incremental materialization: Use
materialized_viewfor continuously updated results instead.No snapshots: Flink SQL does not support the transaction operations (
MERGE,UPDATEwith CTEs) required for dbt snapshots.No schema management: Schemas (Kafka clusters) must be created in Confluent Cloud before use. The adapter cannot create or drop schemas.
No table renames:
ALTER TABLErename is not supported in Flink SQL.Non-transactional: Confluent Cloud Flink SQL is non-transactional, so partial deployments are possible if a
dbt runfails midway.