Table API on Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® supports programming applications with the Table API in Java and Python. Confluent provides a plugin for running applications that use the Table API on Confluent Cloud.
The Table API enables a programmatic way of developing, testing, and submitting Flink pipelines for processing data streams. Streams can be finite or infinite, with insert-only or changelog data. Changelog data enables handling Change Data Capture (CDC) events.
To use the Table API, you work with tables that change over time, a concept inspired by relational databases. A Table program is a declarative and structured graph of transformations. The Table API is inspired by SQL and complements it with additional tools for manipulating real-time data. You can use both Flink SQL and the Table API in your applications.
A table program has these characteristics:
Runs in a regular
main()method (Java)Uses Flink APIs
Communicates with Confluent Cloud by using REST requests, for example, Statements endpoint.
For a list of Table API functions supported by Confluent Cloud for Apache Flink, see Table API functions.
For a list of Table API limitations in Confluent Cloud for Apache Flink, see Known limitations.
Use the Confluent for VS Code extension to generate a new Flink Table API project that interacts with your Confluent Cloud resources. This option is ideal if you’re learning about the Table API.
For more information see Confluent for VS Code for Confluent Cloud.
Note
The Flink Table API is available for preview.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
Comments, questions, and suggestions related to the Table API are encouraged and can be submitted through the established channels.
Add the Table API to an existing Java project
To add the Table API to an existing project, include the following dependencies in the <dependencies> section of your pom.xml file.
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Confluent Flink Table API Java plugin -->
<dependency>
<groupId>io.confluent.flink</groupId>
<artifactId>confluent-flink-table-api-java-plugin</artifactId>
<version>${confluent-plugin.version}</version>
</dependency>
Configure the plugin
The plugin requires a set of configuration options for establishing a connection to Confluent Cloud. The following configuration options are required.
Property key | Command-line argument | Environment variable | Notes |
|---|---|---|---|
client.cloud | –cloud | CLOUD_PROVIDER | Confluent identifier for a cloud provider. Valid values are |
client.compute-pool-id | –compute-pool-id | COMPUTE_POOL_ID | ID of the compute pool, for example, |
client.environment-id | –environment-id | ENV_ID | ID of the environment, for example, |
client.flink-api-key | –flink-api-key | FLINK_API_KEY | API key for Flink access. For more information, see Generate an API Key. |
client.flink-api-secret | –flink-api-secret | FLINK_API_SECRET | API secret for Flink access. For more information, see Generate an API Key. |
client.organization-id | –organization-id | ORG_ID | ID of the organization, for example, |
client.region | –region | CLOUD_REGION | Confluent identifier for a cloud provider’s region, for example, |
The following configuration options are required for supporting UDF uploads. For more information, see Upload the jar as a Flink artifact.
Note
Create a Confluent Cloud API key artifact key and secret in Confluent Cloud Console. For more information, see Manage API Keys in |ccloud|.
Property key | Command-line argument | Environment variable | Notes |
|---|---|---|---|
client.artifact-api-key | –artifact-api-key | ARTIFACT_API_KEY | API key for artifact creation |
client.artifact-api-secret | –artifact-api-secret | ARTIFACT_API_SECRET | API secret for artifact creation |
The following configuration options are optional.
Property key | Command-line argument | Environment variable | Notes |
|---|---|---|---|
client.artifact-endpoint-template | –artifact-endpoint-template | ARTIFACT_ENDPOINT_TEMPLATE | A template for the artifact endpoint URL, for example, |
client.catalog-cache | Expiration time for catalog objects, for example, | ||
client.context | –context | A name for the current Table API session, for example, my_table_program. | |
client.endpoint-template | –endpoint-template | ENDPOINT_TEMPLATE | A template for the endpoint URL, for example, |
client.principal-id | –principal-id | PRINCIPAL_ID | Principal that runs submitted statements, for example, |
client.rest-endpoint | –rest-endpoint | REST_ENDPOINT | URL to the REST endpoint, for example, |
client.statement-name | –statement-name | Unique name for statement submission. By default, generated using a UUID. | |
client.tmp-dir | –tmp-dir | Directory for temporary files created by the plugin, like UDF jars, for example, |
Endpoint configuration
The Confluent Flink plugin provides options to configure endpoints for connecting to Confluent Cloud services. The template-based approach is the recommended method.
client.endpoint-template
This option provides a template for constructing the Flink statement API endpoint URL.
Default value:
https://flink.{region}.{cloud}.confluent.cloudExample:
https://flinkpls-dom123.{region}.{cloud}.confluent.cloudUsage: The template supports placeholders {region} and {cloud} that are replaced with the configured region and cloud provider values.
Environment Variable:
ENDPOINT_TEMPLATE
client.artifact-endpoint-template
This option provides a template for constructing the URL used for uploading artifacts, like UDF JARs.
Default value:
https://api.confluent.cloudExample:
https://api.{region}.{cloud}.confluent.cloudUsage: Similar to the endpoint template, this supports placeholders
{region}and{cloud}.Environment Variable:
ARTIFACT_ENDPOINT_TEMPLATE
client.rest-endpoint (Deprecated)
This option specifies the base domain for REST API calls to Confluent Cloud. While still supported, using the template-based configuration is preferred.
Default value: No default value
Example:
proxy.confluent.cloudUsage: When specified, the plugin constructs the full Flink statement API endpoint URL as
https://flink.{region}.{cloud}.{rest-endpoint}where{region}and{cloud}are replaced with the configured region and cloud provider values.Environment Variable:
REST_ENDPOINT
Important
client.endpoint-template and client.rest-endpoint are mutually exclusive. If both are set, an exception is thrown.
Relationship and default behavior
The following rules control the relationship between the configuration options.
The
client.endpoint-templateandclient.rest-endpointconfiguration options can’t be set simultaneouslyThe
client.artifact-endpoint-templateandclient.rest-endpointconfiguration options can’t be set simultaneously.
The following rules control the default behavior.
If neither
client.rest-endpointnorclient.endpoint-templateis configured, the default template,https://flink.{region}.{cloud}.confluent.cloudis used for statement APIIf neither
client.rest-endpointnorclient.artifact-endpoint-templateis specified, the default artifact endpoint,https://api.confluent.cloudis usedIf endpoint templates are used, each endpoint is constructed independently with the provided templates.
The following simple example shows different ways to configure endpoints.
// Option 1 (RECOMMENDED): Using endpoint templates
// Resolved endpoints:
// - Statement API: https://flinkpls-dom123.us-east-1.aws.confluent.cloud
ConfluentSettings settings1 = ConfluentSettings.newBuilder()
.setRegion("us-east-1")
.setCloud("aws")
.setEndpointTemplate("https://flinkpls-dom123.{region}.{cloud}.confluent.cloud")
.setArtifactEndpointTemplate("https://artifacts.{region}.{cloud}.custom-domain.com")
// Other required settings...
.build();
// Option 2: Using properties file with endpoint templates
// cloud.properties:
// client.region=us-east-1
// client.cloud=aws
// client.endpoint-template=https://flinkpls-dom123.{region}.{cloud}.confluent.cloud
// Resolved endpoints:
// - Statement API: https://flinkpls-dom123.us-east-1.aws.confluent.cloud
// - Artifact API: https://api.confluent.cloud (default)
ConfluentSettings settings2 = ConfluentSettings.fromResource("/cloud.properties");
// Option 3 (DISCOURAGED): Using rest-endpoint (both statement endpoint will be derived from this)
// Resolved endpoints:
// - Statement API: https://flink.us-east-1.aws.proxy.confluent.cloud
// - Artifact API: https://api.proxy.confluent.cloud
ConfluentSettings settings3 = ConfluentSettings.newBuilder()
.setRegion("us-east-1")
.setCloud("aws")
.setRestEndpoint("proxy.confluent.cloud")
// Other required settings...
.build();
ConfluentSettings class
The ConfluentSettings class provides configuration options from various sources, so you can combine external input, code, and environment variables to set up your applications.
The following precedence order applies to configuration sources, from highest to lowest:
CLI arguments or properties file
Code
Environment variables
The following code example shows a TableEnvironment that’s configured by a combination of command-line arguments and code.
public static void main(String[] args) {
// Args might set cloud, region, org, env, and compute pool.
// Environment variables might pass key and secret.
// Code sets the session name and SQL-specific options.
ConfluentSettings settings = ConfluentSettings.newBuilder(args)
.setContextName("MyTableProgram")
.setOption("sql.local-time-zone", "UTC")
.build();
TableEnvironment env = TableEnvironment.create(settings);
}
from pyflink.table.confluent import ConfluentSettings
from pyflink.table import TableEnvironment
def run():
# Properties file might set cloud, region, org, env, and compute pool.
# Environment variables might pass key and secret.
# Code sets the session name and SQL-specific options.
settings = ConfluentSettings.new_builder_from_file(...) \
.set_context_name("MyTableProgram") \
.set_option("sql.local-time-zone", "UTC") \
.build()
env = TableEnvironment.create(settings)
Properties file
You can store options in a cloud.properties file and reference the file in code.
# Cloud region
client.cloud=aws
client.region=eu-west-1
# Access & compute resources
client.flink-api-key=XXXXXXXXXXXXXXXX
client.flink-api-secret=XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx
client.organization-id=00000000-0000-0000-0000-000000000000
client.environment-id=env-xxxxx
client.compute-pool-id=lfcp-xxxxxxxxxx
Reference the cloud.properties file in code:
// Arbitrary file location in file system
ConfluentSettings settings = ConfluentSettings.fromPropertiesFile("/path/to/cloud.properties");
// Part of the JAR package (in src/main/resources)
ConfluentSettings settings = ConfluentSettings.fromPropertiesResource("/cloud.properties");
from pyflink.table.confluent import ConfluentSettings
# Arbitrary file location in file system
settings = ConfluentSettings.from_file("/path/to/cloud.properties")
Command-line arguments
You can pass the configuration settings as command-line options when you run your application’s jar:
java -jar my-table-program.jar \
--cloud aws \
--region us-east-1 \
--flink-api-key key \
--flink-api-secret secret \
--organization-id b0b21724-4586-4a07-b787-d0bb5aacbf87 \
--environment-id env-z3y2x1 \
--compute-pool-id lfcp-8m03rm
Access the configuration settings from the command-line arguments by using the ConfluentSettings.fromArgs method:
public static void main(String[] args) {
ConfluentSettings settings = ConfluentSettings.fromArgs(args);
}
from pyflink.table.confluent import ConfluentSettings
settings = ConfluentSettings.from_global_variables()
Code
You can assign the configuration settings in code by using the builder provided with the ConfluentSettings class:
ConfluentSettings settings = ConfluentSettings.newBuilder()
.setCloud("aws")
.setRegion("us-east-1")
.setFlinkApiKey("key")
.setFlinkApiSecret("secret")
.setOrganizationId("b0b21724-4586-4a07-b787-d0bb5aacbf87")
.setEnvironmentId("env-z3y2x1")
.setComputePoolId("lfcp-8m03rm")
.build();
from pyflink.table.confluent import ConfluentSettings
settings = ConfluentSettings.new_builder() \
.set_cloud("aws") \
.set_region("us-east-1") \
.set_flink_api_key("key") \
.set_flink_api_secret("secret") \
.set_organization_id("b0b21724-4586-4a07-b787-d0bb5aacbf87") \
.set_environment_id("env-z3y2x1") \
.set_compute_pool_id("lfcp-8m03rm") \
.build()
Environment variables
Set the following environment variables to provide configuration settings.
export CLOUD_PROVIDER="aws"
export CLOUD_REGION="us-east-1"
export FLINK_API_KEY="key"
export FLINK_API_SECRET="secret"
export ORG_ID="b0b21724-4586-4a07-b787-d0bb5aacbf87"
export ENV_ID="env-z3y2x1"
export COMPUTE_POOL_ID="lfcp-8m03rm"
java -jar my-table-program.jar
In code, call:
ConfluentSettings settings = ConfluentSettings.fromGlobalVariables();
from pyflink.table.confluent import ConfluentSettings
settings = ConfluentSettings.from_global_variables()
Confluent utilities
The ConfluentTools class provides more methods that you can use for developing and testing Table API programs.
ConfluentTools.collectChangelog and ConfluentTools.printChangelog
Runs the specified table transformations on Confluent Cloud and returns the results locally as a list of changelog rows or prints to the console in a table style.
These methods run table.execute().collect() and consume a fixed number of rows from the returned iterator.
These methods can work on both finite and infinite input tables. If the pipeline is potentially unbounded, they stop fetching after the desired number of rows has been reached.
// On a Table object
Table table = env.from("examples.marketplace.customers");
List<Row> rows = ConfluentTools.collectMaterialized(table, 100);
ConfluentTools.printMaterialized(table, 100);
// On a TableResult object
TableResult tableResult = env.executeSql("SELECT * FROM examples.marketplace.customers");
List<Row> rows = ConfluentTools.collectMaterialized(tableResult, 100);
ConfluentTools.printMaterialized(tableResult, 100);
// For finite (i.e. bounded) tables
ConfluentTools.collectMaterialized(table);
ConfluentTools.printMaterialized(table);
from pyflink.table.confluent import ConfluentSettings, ConfluentTools
from pyflink.table import TableEnvironment
settings = ConfluentSettings.from_global_variables()
env = TableEnvironment.create(settings)
# On a Table object
table = env.from_path("examples.marketplace.customers")
rows = ConfluentTools.collect_changelog_limit(table, 100)
ConfluentTools.print_changelog_limit(table, 100)
# On a TableResult object
tableResult = env.execute_sql("SELECT * FROM examples.marketplace.customers")
rows = ConfluentTools.collect_changelog_limit(tableResult, 100)
ConfluentTools.print_changelog_limit(tableResult, 100)
# For finite (i.e. bounded) tables
ConfluentTools.collect_changelog(table)
ConfluentTools.print_changelog(table)
ConfluentTools.collect_materialized and ConfluentTools.print_materialized
Runs the specified table transformations on Confluent Cloud and returns the results locally as a materialized changelog. Changes are applied to an in-memory table and returned as a list of insert-only rows or printed to the console in a table style.
These methods run table.execute().collect() and consume a fixed number of rows from the returned iterator.
These methods can work on both finite and infinite input tables. If the pipeline is potentially unbounded, they stop fetching after the desired number of rows have been reached.
// On a Table object
Table table = env.from("examples.marketplace.customers");
List<Row> rows = ConfluentTools.collectMaterialized(table, 100);
ConfluentTools.printMaterialized(table, 100);
// On a TableResult object
TableResult tableResult = env.executeSql("SELECT * FROM examples.marketplace.customers");
List<Row> rows = ConfluentTools.collectMaterialized(tableResult, 100);
ConfluentTools.printMaterialized(tableResult, 100);
// For finite (i.e. bounded) tables
ConfluentTools.collectMaterialized(table);
ConfluentTools.printMaterialized(table);
from pyflink.table.confluent import ConfluentSettings, ConfluentTools
from pyflink.table import TableEnvironment
settings = ConfluentSettings.from_global_variables()
env = TableEnvironment.create(settings)
# On Table object
table = env.from_path("examples.marketplace.customers")
rows = ConfluentTools.collect_materialized_limit(table, 100)
ConfluentTools.print_materialized_limit(table, 100)
# On TableResult object
tableResult = env.execute_sql("SELECT * FROM examples.marketplace.customers")
rows = ConfluentTools.collect_materialized_limit(tableResult, 100)
ConfluentTools.print_materialized_limit(tableResult, 100)
# For finite (i.e. bounded) tables
ConfluentTools.collect_materialized(table)
ConfluentTools.print_materialized(table)
ConfluentTools.getStatementName and ConfluentTools.stopStatement
Additional lifecycle methods for controlling statements on Confluent Cloud after they have been submitted.
// On TableResult object
TableResult tableResult = env.executeSql("SELECT * FROM examples.marketplace.customers");
String statementName = ConfluentTools.getStatementName(tableResult);
ConfluentTools.stopStatement(tableResult);
// Based on statement name
ConfluentTools.stopStatement(env, "table-api-2024-03-21-150457-36e0dbb2e366-sql");
# On TableResult object
table_result = env.execute_sql("SELECT * FROM examples.marketplace.customers")
statement_name = ConfluentTools.get_statement_name(table_result)
ConfluentTools.stop_statement(table_result)
# Based on statement name
ConfluentTools.stop_statement_by_name(env, "table-api-2024-03-21-150457-36e0dbb2e366-sql")
Confluent table descriptor
A table descriptor for creating tables located in Confluent Cloud programmatically.
Compared to the regular Flink class, the ConfluentTableDescriptor class adds support for Confluent’s system columns and convenience methods for working with Confluent tables.
The for_managed() method corresponds to TableDescriptor.for_connector("confluent").
TableDescriptor descriptor = ConfluentTableDescriptor.forManaged()
.schema(
Schema.newBuilder()
.column("i", DataTypes.INT())
.column("s", DataTypes.INT())
.watermark("$rowtime", $("$rowtime").minus(lit(5).seconds())) // Access $rowtime system column
.build())
.build();
env.createTable("t1", descriptor);
from pyflink.table.confluent import ConfluentTableDescriptor
from pyflink.table import Schema, DataTypes
from pyflink.table.expressions import col, lit
descriptor = ConfluentTableDescriptor.for_managed() \
.schema(
Schema.new_builder()
.column("i", DataTypes.INT())
.column("s", DataTypes.INT())
.watermark("$rowtime", col("$rowtime").minus(lit(5).seconds)) # Access $rowtime system column
.build()) \
.build()
env.createTable("t1", descriptor)
Known limitations
The Table API plugin is in Open Preview stage.
Unsupported by Table API Plugin
The following features are not supported.
Temporary catalog objects (including tables, views, functions)
Custom modules
Custom catalogs
User-defined functions (including system functions)
Anonymous, inline objects (including functions, data types)
CompiledPlan features are not supported
Batch mode
Restrictions from Confluent Cloud
custom connectors/formats
processing time operations
structured data types
many configuration options
limited SQL syntax
batch execution mode
Issues in Apache Flink
Both catalog and database must be set, or identifiers must be fully qualified. A mixture of setting a current catalog and using two-part identifiers can cause errors.
String concatenation with
.pluscauses errors. Instead, useExpressions.concat.Selecting
.rowtimein windows causes errors.Using
.limit()can cause errors.