Manage Confluent Cloud for Apache Flink with Terraform
You can manage all Confluent Cloud for Apache Flink® resources using the Confluent Terraform provider. This lets you define your streaming infrastructure as code and integrate it into GitOps and CI/CD workflows.
The following Flink resources are available in the Confluent Terraform provider:
Resource | Terraform resource | Description |
|---|---|---|
| Create and manage the compute resources that run Flink SQL workloads. | |
| Create and evolve persistent streaming pipelines backed by Kafka topics. | |
| Submit and manage Flink SQL statements, including DDL and DML queries. |
For a tutorial on automating deployments with Terraform and GitHub Actions, see Deploy a Flink SQL Statement Using CI/CD.
Compute pools
Use the confluent_flink_compute_pool resource to create and manage the compute capacity that runs your Flink SQL workloads. For the full Terraform walkthrough, including create, view, update, and delete operations, see Manage Compute Pools.
To create a compute pool by using the Confluent Terraform provider, use the confluent_flink_compute_pool resource.
Configure your Terraform file. Provide your Confluent Cloud API key and secret.
terraform { required_providers { confluent = { source = "confluentinc/confluent" version = "2.72.0" } } } provider "confluent" { cloud_api_key = var.confluent_cloud_api_key # optionally use CONFLUENT_CLOUD_API_KEY env var cloud_api_secret = var.confluent_cloud_api_secret # optionally use CONFLUENT_CLOUD_API_SECRET env var }
Define the environment where the compute pool will be created.
resource "confluent_environment" "development" { display_name = "Development" lifecycle { prevent_destroy = true } }
Define the
confluent_flink_compute_poolresource with the required parameters, likedisplay_name,cloud,region,max_cfu, and the environment ID.resource "confluent_flink_compute_pool" "main" { display_name = "standard_compute_pool" cloud = "AWS" region = "us-east-1" max_cfu = 5 environment { id = confluent_environment.development.id } }
Note
To create a default compute pool manually, add
default_pool = trueto the resource definition. This pre-provisions a default pool with your specified configuration before users start running statements. TheFlinkAdminrole or higher is required to create default pools.resource "confluent_flink_compute_pool" "main" { display_name = "default_compute_pool" cloud = "AWS" region = "us-east-1" max_cfu = 50 default_pool = true environment { id = confluent_environment.development.id } }
Run the
terraform applycommand to create the resources.terraform applyIf you need to import an existing compute pool, use the
terraform importcommand.export CONFLUENT_CLOUD_API_KEY="<cloud_api_key>" export CONFLUENT_CLOUD_API_SECRET="<cloud_api_secret>" terraform import confluent_flink_compute_pool.main <your-environment-id>/<compute-pool-id>
For more information, see confluent_flink_compute_pool resource.
Materialized tables
Use the confluent_flink_materialized_table resource to create persistent streaming pipelines that write continuously to a backing Apache Kafka® topic. Unlike statements, materialized tables can be evolved in place: change the query attribute and Terraform updates the pipeline without manual offset management.
Create a materialized table
To create a materialized table by using the Confluent Terraform provider, use the confluent_flink_materialized_table resource.
Configure your Terraform file with the required provider and credentials.
terraform { required_providers { confluent = { source = "confluentinc/confluent" version = "2.72.0" } } } provider "confluent" { flink_rest_endpoint = var.flink_rest_endpoint flink_api_key = var.flink_api_key flink_api_secret = var.flink_api_secret }
Define the
confluent_flink_materialized_tableresource.resource "confluent_flink_materialized_table" "high_value_orders" { display_name = "high_value_orders" kafka_cluster = confluent_kafka_cluster.my_cluster.id query = <<-EOT SELECT order_id, customer_id, price FROM examples.marketplace.orders WHERE price > 50.00 EOT organization { id = data.confluent_organization.my_org.id } environment { id = confluent_environment.my_env.id } compute_pool { id = confluent_flink_compute_pool.my_compute_pool.id } principal { id = confluent_service_account.my_service_account.id } credentials { key = confluent_api_key.my_flink_api_key.id secret = confluent_api_key.my_flink_api_key.secret } }
Run
terraform apply. Flink creates the backing Kafka topic, registers the output schema in Schema Registry, and starts the continuous query.terraform apply
For more information, refer to confluent_flink_materialized_table resource.
Evolve a materialized table
To evolve a materialized table by using the Confluent Terraform provider, update the query attribute of the confluent_flink_materialized_table resource and run terraform apply.
Update the
queryattribute in your Terraform configuration. The following example raises the price threshold and adds aproduct_idcolumn.resource "confluent_flink_materialized_table" "high_value_orders" { display_name = "high_value_orders" kafka_cluster = confluent_kafka_cluster.my_cluster.id query = <<-EOT SELECT order_id, customer_id, price, product_id FROM examples.marketplace.orders WHERE price > 100.00 EOT # ... other attributes unchanged }
Note
START_MODEsupport is available only through SQL statements and is not supported by the Terraform provider or the REST API. To control how much historical data is reprocessed on an evolution, use the CREATE OR ALTER MATERIALIZED TABLE SQL statement with theSTART_MODEclause. When omitted, the default isRESUME_OR_FROM_BEGINNING.Run
terraform apply. The provider stops the current query, starts a new one with the updated logic, and waits up to two minutes for the evolution to converge.terraform apply
Note
Changing query always triggers a new evolution, even if the query text is identical after normalization. This matches the behavior of the CREATE OR ALTER MATERIALIZED TABLE SQL statement.
Stop and resume a materialized table
To stop or resume a materialized table by using the Confluent Terraform provider, set the stopped attribute of the confluent_flink_materialized_table resource.
To pause the continuous query without dropping the table, set
stopped = trueand runterraform apply. The backing Kafka topic and its data are preserved.resource "confluent_flink_materialized_table" "high_value_orders" { display_name = "high_value_orders" kafka_cluster = confluent_kafka_cluster.my_cluster.id stopped = true query = <<-EOT SELECT order_id, customer_id, price FROM examples.marketplace.orders WHERE price > 50.00 EOT # ... other attributes unchanged }
To resume the query, set
stopped = false(or remove the attribute, since it defaults tofalse) and runterraform apply. The query restarts and continues reading from the last committed offset in the source topics.terraform apply
Import an existing materialized table
To manage an existing materialized table with the Confluent Terraform provider, import it using the confluent_flink_materialized_table resource.
The import ID format is
{environment_id}/{kafka_cluster_id}/{table_name}. Run the following command, replacing the placeholders with your values.terraform import confluent_flink_materialized_table.my_table \ env-abc123/lkc-xyz789/high_value_orders
After importing, run
terraform planto reconcile the Terraform state with the actual resource configuration.terraform plan
Resource reference
confluent_flink_materialized_table
Note
START_MODE is not yet supported through the Terraform provider or the REST API. To control how much historical data is reprocessed on create or evolution, use the CREATE OR ALTER MATERIALIZED TABLE SQL statement with the START_MODE clause. Terraform and REST API support is available only through SQL statements.
Attribute | Required | Description |
|---|---|---|
| Yes | The unique name of the materialized table within the environment and Kafka cluster. |
| Yes | The ID of the Kafka cluster that hosts the materialized table’s backing topic. |
| No | The |
| No | When |
| No | The name of the column used as the watermark source. |
| No | The watermark strategy expression. |
| No | Set of column names used to distribute the table ( |
| No | Number of buckets for the |
| No | List of column definitions. Each entry can include |
| No | List of table constraints with |
| No | Block with the |
| No | Block with the |
| No | Block with the |
| No | Block with the |
| No | The Flink REST API endpoint URL. Required if not set at the provider level. |
| No | Block with Flink API |
Updatable in-place (no resource recreation): query, stopped, compute_pool, principal, columns, watermark_column_name, watermark_expression, constraints.
Requires resource recreation: display_name, kafka_cluster, rest_endpoint.
Statements
Use the confluent_flink_statement resource to submit and manage Flink SQL statements. Statements are suitable for one-time DDL operations (such as CREATE TABLE) and DML queries (such as INSERT INTO). For persistent streaming pipelines, use materialized tables instead.
The recommended pattern is one SQL statement per Terraform resource, using depends_on to enforce execution order:
# DDL: create a table to hold results
resource "confluent_flink_statement" "create-order-summary" {
statement = "CREATE TABLE order_summary (window_start TIMESTAMP(3), total_revenue DOUBLE);"
properties = {
"sql.current-catalog" = var.current_catalog
"sql.current-database" = var.current_database
}
}
# DML: populate the table continuously
resource "confluent_flink_statement" "insert-order-summary" {
statement = <<-EOT
INSERT INTO order_summary
SELECT window_start, SUM(price) AS total_revenue
FROM TABLE(
TUMBLE(TABLE examples.marketplace.orders, DESCRIPTOR($rowtime), INTERVAL '1' MINUTE)
)
GROUP BY window_start;
EOT
properties = {
"sql.current-catalog" = var.current_catalog
"sql.current-database" = var.current_database
}
depends_on = [confluent_flink_statement.create-order-summary]
}
Important
Running terraform destroy on a confluent_flink_statement resource stops the statement but does not execute the corresponding reverse DDL. Tables, schemas, and other catalog objects created by DDL statements (such as CREATE TABLE) persist in the Flink catalog even after the statement resource is destroyed. To remove catalog objects, run the appropriate DROP statement explicitly.
For full Terraform documentation and a complete quickstart example, see: