Create Confluent Manager for Apache Flink Applications¶
To create a Confluent Manager for Apache Flink® Application, you create a JSON or YAML file that specifies details of the application. You can then use the Confluent CLI or REST API to create the application, passing the file as an argument.
Application specification¶
The following JSON and YAML examples show equivalent application specifications.
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "FlinkApplication",
"metadata": {
"name": "app-1"
},
"spec": {
"image": "confluentinc/cp-flink:1.19.3-cp1",
"flinkVersion": "v1_19",
"flinkConfiguration": {
"taskmanager.numberOfTaskSlots": "1"
},
"serviceAccount": "flink",
"jobManager": {
"resource": {
"memory": "1024m",
"cpu": 1
}
},
"taskManager": {
"resource": {
"memory": "1024m",
"cpu": 1
}
},
"job": {
"jarURI": "local:///opt/flink/examples/streaming/StateMachineExample.jar",
"state": "running",
"parallelism": 1,
"upgradeMode": "stateless"
}
}
}
apiVersion: cmf.confluent.io/v1
kind: FlinkApplication
metadata:
name: app-1
spec:
image: confluentinc/cp-flink:1.19.3-cp1
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: 1024m
cpu: 1
taskManager:
resource:
memory: 1024m
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
state: running
parallelism: 1
upgradeMode: stateless
The resource spec includes the following fields:
image
: The name of the Docker image that is used to start the Flink cluster. CMF expects this image to be a Confluent Platform Flink image or to be derived from a Confluent Platform Flink image.flinkVersion
: The Flink version corresponding to the Flink version of the Docker image.flinkConfiguration
: A map of Flink configuration parameters. Before the configuration is passed to the Flink cluster, it is merged with the Environment’s default configuration for applications. The Flink configuration is used to configure cluster and job behavior, such as checkpointing, security, logging, and more. For more on Flink job configuration, see Configure Flink Jobs in Confluent Manager for Apache Flink.serviceAccount
: The name of the Kubernetes service account that is used to start and run the application’s Flink cluster.jobManager
&taskManager
: TheKubernetes specification of the Flink Job Manager and Task Manager pods.job.jarURI
: The path to the Flink job JAR file. To learn how to package Flink jobs and make the job JAR available to the cluster, see Package Flink Jobs.job.state
: The desired state of the application. Can berunning
orsuspended
.job.parallelism
: The desired execution parallelism of the application. Can be adapted to rescale the application.
Create the application¶
Use the REST API or the Confluent CLI to create the application.
The following sections show how examples of how to do this with a JSON file named app-1.json
created
using the application specification shown in the previous section.
You can create the application using the REST API by passing the JSON file as an argument.
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/environments/env-1/applications \
-d @/path/to/app-1.json
You can create the application using the Confluent CLI, passing the JSON file as an argument.
confluent --environment env-1 \
flink application create /path/to/app-1.json
Application response¶
After a request to create an application, CMF creates a Flink Deployment and submits it for execution. Based on the application’s spec, a Flink Kubernetes cluster is started that executes the application’s Flink job.
The response to an application creation request specifying a JSON file could look like the following:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "|af|Application",
"metadata": {
"creationTimestamp": "2025-08-27T12:30:23.616863679Z",
"name": "app-1",
"updateTimestamp": "2025-08-27T12:30:23.616865470Z"
},
"spec": {
"flinkConfiguration": {
"taskmanager.numberOfTaskSlots": "1"
},
"flinkVersion": "v1_19",
"image": "confluentinc/cp-flink:1.19.3-cp1",
"job": {
"jarURI": "local:///opt/flink/examples/streaming/StateMachineExample.jar",
"parallelism": 1,
"state": "running",
"upgradeMode": "stateless"
},
"jobManager": {
"resource": {
"cpu": 1,
"memory": "1024m"
}
},
"serviceAccount": "flink",
"taskManager": {
"resource": {
"cpu": 1,
"memory": "1024m"
}
}
},
"status": {
"jobStatus": {
"jobName": "",
"state": ""
}
}
}