Manage Confluent Cloud for Apache Flink with Terraform

You can manage all Confluent Cloud for Apache Flink® resources using the Confluent Terraform provider. This lets you define your streaming infrastructure as code and integrate it into GitOps and CI/CD workflows.

The following Flink resources are available in the Confluent Terraform provider:

Resource

Terraform resource

Description

Compute pools

confluent_flink_compute_pool

Create and manage the compute resources that run Flink SQL workloads.

Materialized tables

confluent_flink_materialized_table

Create and evolve persistent streaming pipelines backed by Kafka topics.

Statements

confluent_flink_statement

Submit and manage Flink SQL statements, including DDL and DML queries.

For a tutorial on automating deployments with Terraform and GitHub Actions, see Deploy a Flink SQL Statement Using CI/CD.

Compute pools

Use the confluent_flink_compute_pool resource to create and manage the compute capacity that runs your Flink SQL workloads. For the full Terraform walkthrough, including create, view, update, and delete operations, see Manage Compute Pools.

To create a compute pool by using the Confluent Terraform provider, use the confluent_flink_compute_pool resource.

  1. Configure your Terraform file. Provide your Confluent Cloud API key and secret.

    terraform {
      required_providers {
        confluent = {
          source = "confluentinc/confluent"
          version = "2.72.0"
        }
      }
    }
    
    provider "confluent" {
      cloud_api_key    = var.confluent_cloud_api_key    # optionally use CONFLUENT_CLOUD_API_KEY env var
      cloud_api_secret = var.confluent_cloud_api_secret # optionally use CONFLUENT_CLOUD_API_SECRET env var
    }
    
  2. Define the environment where the compute pool will be created.

    resource "confluent_environment" "development" {
      display_name = "Development"
      lifecycle {
        prevent_destroy = true
      }
    }
    
  3. Define the confluent_flink_compute_pool resource with the required parameters, like display_name, cloud, region, max_cfu, and the environment ID.

    resource "confluent_flink_compute_pool" "main" {
      display_name = "standard_compute_pool"
      cloud        = "AWS"
      region       = "us-east-1"
      max_cfu      = 5
    
      environment {
        id = confluent_environment.development.id
      }
    }
    

    Note

    To create a default compute pool manually, add default_pool = true to the resource definition. This pre-provisions a default pool with your specified configuration before users start running statements. The FlinkAdmin role or higher is required to create default pools.

    resource "confluent_flink_compute_pool" "main" {
      display_name = "default_compute_pool"
      cloud        = "AWS"
      region       = "us-east-1"
      max_cfu      = 50
      default_pool = true
    
      environment {
        id = confluent_environment.development.id
      }
    }
    
  4. Run the terraform apply command to create the resources.

    terraform apply
    
  5. If you need to import an existing compute pool, use the terraform import command.

    export CONFLUENT_CLOUD_API_KEY="<cloud_api_key>"
    export CONFLUENT_CLOUD_API_SECRET="<cloud_api_secret>"
    terraform import confluent_flink_compute_pool.main <your-environment-id>/<compute-pool-id>
    

For more information, see confluent_flink_compute_pool resource.

Materialized tables

Use the confluent_flink_materialized_table resource to create persistent streaming pipelines that write continuously to a backing Apache Kafka® topic. Unlike statements, materialized tables can be evolved in place: change the query attribute and Terraform updates the pipeline without manual offset management.

Create a materialized table

To create a materialized table by using the Confluent Terraform provider, use the confluent_flink_materialized_table resource.

  1. Configure your Terraform file with the required provider and credentials.

    terraform {
      required_providers {
        confluent = {
          source  = "confluentinc/confluent"
          version = "2.72.0"
        }
      }
    }
    
    provider "confluent" {
      flink_rest_endpoint = var.flink_rest_endpoint
      flink_api_key       = var.flink_api_key
      flink_api_secret    = var.flink_api_secret
    }
    
  2. Define the confluent_flink_materialized_table resource.

    resource "confluent_flink_materialized_table" "high_value_orders" {
      display_name  = "high_value_orders"
      kafka_cluster = confluent_kafka_cluster.my_cluster.id
    
      query = <<-EOT
        SELECT order_id, customer_id, price
        FROM examples.marketplace.orders
        WHERE price > 50.00
      EOT
    
      organization {
        id = data.confluent_organization.my_org.id
      }
    
      environment {
        id = confluent_environment.my_env.id
      }
    
      compute_pool {
        id = confluent_flink_compute_pool.my_compute_pool.id
      }
    
      principal {
        id = confluent_service_account.my_service_account.id
      }
    
      credentials {
        key    = confluent_api_key.my_flink_api_key.id
        secret = confluent_api_key.my_flink_api_key.secret
      }
    }
    
  3. Run terraform apply. Flink creates the backing Kafka topic, registers the output schema in Schema Registry, and starts the continuous query.

    terraform apply
    

For more information, refer to confluent_flink_materialized_table resource.

Evolve a materialized table

To evolve a materialized table by using the Confluent Terraform provider, update the query attribute of the confluent_flink_materialized_table resource and run terraform apply.

  1. Update the query attribute in your Terraform configuration. The following example raises the price threshold and adds a product_id column.

    resource "confluent_flink_materialized_table" "high_value_orders" {
      display_name  = "high_value_orders"
      kafka_cluster = confluent_kafka_cluster.my_cluster.id
    
      query = <<-EOT
        SELECT order_id, customer_id, price, product_id
        FROM examples.marketplace.orders
        WHERE price > 100.00
      EOT
    
      # ... other attributes unchanged
    }
    

    Note

    START_MODE support is available only through SQL statements and is not supported by the Terraform provider or the REST API. To control how much historical data is reprocessed on an evolution, use the CREATE OR ALTER MATERIALIZED TABLE SQL statement with the START_MODE clause. When omitted, the default is RESUME_OR_FROM_BEGINNING.

  2. Run terraform apply. The provider stops the current query, starts a new one with the updated logic, and waits up to two minutes for the evolution to converge.

    terraform apply
    

Note

Changing query always triggers a new evolution, even if the query text is identical after normalization. This matches the behavior of the CREATE OR ALTER MATERIALIZED TABLE SQL statement.

Stop and resume a materialized table

To stop or resume a materialized table by using the Confluent Terraform provider, set the stopped attribute of the confluent_flink_materialized_table resource.

  1. To pause the continuous query without dropping the table, set stopped = true and run terraform apply. The backing Kafka topic and its data are preserved.

    resource "confluent_flink_materialized_table" "high_value_orders" {
      display_name  = "high_value_orders"
      kafka_cluster = confluent_kafka_cluster.my_cluster.id
      stopped       = true
    
      query = <<-EOT
        SELECT order_id, customer_id, price
        FROM examples.marketplace.orders
        WHERE price > 50.00
      EOT
    
      # ... other attributes unchanged
    }
    
  2. To resume the query, set stopped = false (or remove the attribute, since it defaults to false) and run terraform apply. The query restarts and continues reading from the last committed offset in the source topics.

    terraform apply
    

Import an existing materialized table

To manage an existing materialized table with the Confluent Terraform provider, import it using the confluent_flink_materialized_table resource.

  1. The import ID format is {environment_id}/{kafka_cluster_id}/{table_name}. Run the following command, replacing the placeholders with your values.

    terraform import confluent_flink_materialized_table.my_table \
      env-abc123/lkc-xyz789/high_value_orders
    
  2. After importing, run terraform plan to reconcile the Terraform state with the actual resource configuration.

    terraform plan
    

Resource reference

confluent_flink_materialized_table

Note

START_MODE is not yet supported through the Terraform provider or the REST API. To control how much historical data is reprocessed on create or evolution, use the CREATE OR ALTER MATERIALIZED TABLE SQL statement with the START_MODE clause. Terraform and REST API support is available only through SQL statements.

Attribute

Required

Description

display_name

Yes

The unique name of the materialized table within the environment and Kafka cluster.

kafka_cluster

Yes

The ID of the Kafka cluster that hosts the materialized table’s backing topic.

query

No

The SELECT query that defines the continuous pipeline. Changing this attribute triggers an evolution.

stopped

No

When true, the continuous query is paused without dropping the table or its backing topic. Defaults to false.

watermark_column_name

No

The name of the column used as the watermark source.

watermark_expression

No

The watermark strategy expression.

distributed_by_column_names

No

Set of column names used to distribute the table (DISTRIBUTED BY clause).

distributed_by_buckets

No

Number of buckets for the DISTRIBUTED BY clause.

columns

No

List of column definitions. Each entry can include columns_physical, columns_computed, and columns_metadata sub-blocks.

constraints

No

List of table constraints with name, kind, column_names, and enforced attributes.

organization

No

Block with the id of the Confluent Cloud organization. Uses the provider default if omitted.

environment

No

Block with the id of the Confluent Cloud environment.

compute_pool

No

Block with the id of the Flink compute pool. Can be updated in-place.

principal

No

Block with the id of the service account that runs the query. Can be updated in-place.

rest_endpoint

No

The Flink REST API endpoint URL. Required if not set at the provider level.

credentials

No

Block with Flink API key and secret. Required if not set at the provider level.

Updatable in-place (no resource recreation): query, stopped, compute_pool, principal, columns, watermark_column_name, watermark_expression, constraints.

Requires resource recreation: display_name, kafka_cluster, rest_endpoint.

Statements

Use the confluent_flink_statement resource to submit and manage Flink SQL statements. Statements are suitable for one-time DDL operations (such as CREATE TABLE) and DML queries (such as INSERT INTO). For persistent streaming pipelines, use materialized tables instead.

The recommended pattern is one SQL statement per Terraform resource, using depends_on to enforce execution order:

# DDL: create a table to hold results
resource "confluent_flink_statement" "create-order-summary" {
  statement = "CREATE TABLE order_summary (window_start TIMESTAMP(3), total_revenue DOUBLE);"
  properties = {
    "sql.current-catalog"  = var.current_catalog
    "sql.current-database" = var.current_database
  }
}

# DML: populate the table continuously
resource "confluent_flink_statement" "insert-order-summary" {
  statement = <<-EOT
    INSERT INTO order_summary
    SELECT window_start, SUM(price) AS total_revenue
    FROM TABLE(
      TUMBLE(TABLE examples.marketplace.orders, DESCRIPTOR($rowtime), INTERVAL '1' MINUTE)
    )
    GROUP BY window_start;
  EOT
  properties = {
    "sql.current-catalog"  = var.current_catalog
    "sql.current-database" = var.current_database
  }
  depends_on = [confluent_flink_statement.create-order-summary]
}

Important

Running terraform destroy on a confluent_flink_statement resource stops the statement but does not execute the corresponding reverse DDL. Tables, schemas, and other catalog objects created by DDL statements (such as CREATE TABLE) persist in the Flink catalog even after the statement resource is destroyed. To remove catalog objects, run the appropriate DROP statement explicitly.

For full Terraform documentation and a complete quickstart example, see: