Create Statements with Confluent Manager for Apache Flink
Confluent Manager for Apache Flink® (CMF) lets you deploy and manage Flink applications and SQL statements in Confluent Platform. You can create a Flink SQL statement resource using the REST API or the Confluent CLI, passing a JSON document that defines the statement resource.
Important
The examples in this topic assume that CMF was installed with the examples catalog enabled (cmf.sql.examples-catalog.enabled=true).
Example Statement Definitions
SELECT statement example
The following example shows a JSON document that creates a SELECT statement resource.
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "Statement",
"metadata": {
"name": "stmt-1"
},
"spec": {
"statement": "SELECT url FROM clicks WHERE url like '%a%';",
"properties": {
"sql.current-catalog": "examples",
"sql.current-database": "marketplace"
},
"flinkConfiguration": {
"state.backend.type": "hashmap",
"execution.checkpointing.interval": "60 s"
},
"computePoolName": "pool",
"parallelism": 4,
"stopped": false
}
}
ALTER TABLE statement example
The following example shows a JSON document that creates an ALTER TABLE Statement resource.
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "Statement",
"metadata": {
"name": "alter-orders"
},
"spec": {
"statement": "ALTER TABLE orders ADD total_value AS quantity * price;",
"properties": {
"sql.current-catalog": "kafka-cat",
"sql.current-database": "kafka-db"
},
"computePoolName": "pool",
"stopped": false
}
}
A statement resource spec includes the following fields:
statement: The SQL statement to execute.computePoolName: the name of theComputePoolon which the statement should be executed.properties: A map of properties that provide context for the compilation of the statement. The current CATALOG and DATABASE are configured in this field. If not specified, the default catalog and database are used.flinkConfiguration: A map of Flink configuration parameters. Statement configuration is first merged with the Environment’s default configuration. This combined configuration is used for statement translation and passed to the Flink job that executes the statement. Separately, the Flink configuration of the statement’sComputePoolinitializes theJobManagerandTaskManagerprocesses; an empty configuration is used if none is specified.parallelism: the statement’s execution parallelism. If not specified, the statement is executed withparallelism = 1.stopped: This flag determines if the job is running or stopped. If omitted, the query defaults tostopped=false.
Create a statement
Given the resource definition listed in the previous section, you can create a statement either using the Confluent CLI or by calling the REST API as shown in the following examples.
The following example shows how to create a statement with the REST API. For a full list of options, REST APIs.
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/environments/env-1/statements \
-d @/path/to/stmt-1.json
The following example creates a statement using the Confluent CLI. For a full list of options, see the confluent flink statement create reference.
confluent --environment env-1 --compute-pool pool \
flink statement create stmt-1 \
--catalog examples --database marketplace \
--parallelism 4 --flink-configuration /path/to/flink-config.json \
--sql "SELECT url FROM clicks WHERE url like '%a%';"
The Flink configuration is passed with a JSON file:
{
"state.backend.type": "hashmap",
"execution.checkpointing.interval": "60 s"
}
Statement response
When CMF receives a request to create a statement, it compiles the SQL statement. Key properties, such as the statement’s type and the result’s schema, are then saved as part of the Statement resource. If the SQL statement is incorrect and cannot be compiled, an error message is recorded instead.
CMF’s execution method varies by statement type: some statements are executed immediately, while others trigger the deployment of a Flink cluster on Kubernetes for execution. The result of an immediate execution is also saved within the Statement resource. The resource generated from the example statement could appear as follows:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "Statement",
"metadata": {
"creationTimestamp": "2025-07-24T16:43:47.036Z",
"name": "stmt-1",
"uid": "a827600e-fca9-4fd7-a047-2728f263269d",
"updateTimestamp": "2025-07-24T16:43:47.036Z"
},
"spec": {
"computePoolName": "pool",
"flinkConfiguration": {
"state.backend.type": "hashmap",
"execution.checkpointing.interval": "60 s"
},
"parallelism": 4,
"properties": {
"sql.current-catalog": "examples",
"sql.current-database": "marketplace"
},
"statement": "SELECT url FROM clicks WHERE url like '%a%';",
"stopped": false
},
"status": {
"detail": "Statement execution in progress.",
"phase": "RUNNING",
"traits": {
"isAppendOnly": true,
"isBounded": false,
"schema": {
"columns": [
{
"name": "url",
"type": {
"length": 2147483647,
"nullable": false,
"type": "VARCHAR"
}
}
]
},
"sqlKind": "SELECT"
}
}
}