Manage Flink Applications Using Confluent for Kubernetes¶
Apache Flink® is a powerful, scalable, and secure stream processing framework for running complex, stateful, low-latency streaming applications on large volumes of data.
Offered with Confluent Platform, Confluent Manager for Apache Flink® (CMF) is a self-managed service for Flink that integrates seamlessly with Apache Kafka®.
To learn more about CMF, see Overview of Confluent Platform for Apache Flink.
You can use Confluent for Kubernetes (CFK) to manage CMF and Flink applications within the familiar Kubernetes environment and custom resources.
The high-level workflow to manage Flink applications with CFK is:
Install CMF.
Install or upgrade CFK with Flink integration enabled. Use the
--set enableCMFDay2Ops=true
flag:helm upgrade --install confluent-operator \ confluentinc/confluent-for-kubernetes \ --set enableCMFDay2Ops=true
For more information about CFK installation, see Deploy Confluent for Kubernetes.
In the Flink Web UI, verify that the application job you created is running.
An example scenario of using CMF with CFK is available in the CFK Example Repository.
Requirements and considerations¶
- To manage Flink in CFK, you need the following versions:
- CMF version V1
- CFK 2.10.0 and higher
- Confluent Platform 7.8.0 and higher
- Currently, CFK can authenticate to CMF without authentication or using mTLS.
Create a CMF REST Class¶
When managing CMF in CFK, the CMF custom resources, namely, FlinkEnvironment and FlinkApplication, communicate with CMF through the CMF REST Class (CMFRestClass).
You need to first set up a CMF REST Class custom resource (CR).
CMF REST Class is only used by CFK and is not part of CMF.
If using mTLS or TLS to connect to the Flink host, create a secret.
Certificates with appropriate Subject Alternate Names (SANs) are required for the mTLS setup.
- mTLS: You need to create a secret with certs and reference it in the CMFRestClass CR in the next step.
- TLS: The secret is only required if using a self-signed certificate.
See Provide TLS keys and certificates in PEM format and Provide TLS keys and certificates in Java KeyStore format for the expected keys in the TLS secret.
Create a a CMF REST Class (CMFRestClass CR) with the following spec and deploy the resource using the
kubectl apply -f
command.apiVersion: platform.confluent.io/v1beta1 kind: CMFRestClass metadata: name: --- [1] namespace: --- [2] spec: cmfRest: --- [3] authentication: type: --- [4] endpoint: --- [5] tls: --- [6] secretRef: --- [7]
- [1] The name of the REST Class.
- [2] The namespace of the CMF REST Class.
- [3] The CMF cluster.
- [4] To use mTLS authentication, set to
mtls
and specify the certificates in [7]. - [5] The endpoint of the CMF host.
- [6] Required when you set the authentication type ([4]) is set to
mtls
. - [7] The name of the secret that contains the TLS certificates.
An example CMFRestClass CR:
apiVersion: platform.confluent.io/v1beta1 kind: CMFRestClass metadata: name: default namespace: operator spec: cmfRest: endpoint: https://cmf-service:80 authentication: type: mtls sslClientAuthentication: true tls: secretRef: cmf-day2-tls
Check the status:
kubectl get CMFRestClass default -n <namespace> -oyaml
Create a Flink environment¶
A Flink environment is a set of configurations that Flink applications use.
Create a FlinkEnvironment CR using the following spec, and deploy it with the
kubectl apply -f
command.apiVersion: platform.confluent.io/v1beta1 kind: FlinkEnvironment metadata: name: namespace: spec: kubernetesNamespace: --- [1] flinkApplicationDefaults: --- [2] metadata: --- [3] spec: --- [4] flinkConfiguration: cmfRestClassRef: --- [5] name: namespace:
[1] The namespace of the Flink cluster.
Typically, you would install the FlinkEnvironment CR in the CFK namespace (
metadata.namespace
), but the Flink would be in another namespace (spec.kubernetesNamespace
), for example,default
.[2] Configurations for the Flink cluster to specify the deployment-wide default application settings.
[3] Kubernetes API metadata.
[4] Spec of the FlinkApplicationSpec type.
[5] The reference to the REST Class you created in Create a CMF REST Class.
You can install FlinkEnvironment CR and the CMF REST class in different namespaces.
If omitted, the CMFRestClass of the name
default
in the same namespace is used.
An example FlinkEnvironment CR:
apiVersion: platform.confluent.io/v1beta1 kind: FlinkEnvironment metadata: name: my-env1 namespace: operator spec: kubernetesNamespace: default flinkApplicationDefaults: metadata: labels: "acmecorp.com/owned-by": "analytics-team" spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" rest.profiling.enabled": "true" cmfRestClassRef: name: default namespace: operator
Check the status.
kubectl get flinkEnvironment -n <namespace> -oyaml
Create a Flink application¶
A Flink application is a user program that creates one or more Flink jobs to process data.
To create a Flink application resource in CFK:
Create a FlinkApplication CR using the following spec and deploy the resource using the
kubectl apply -f
command.apiVersion: platform.confluent.io/v1beta1 kind: FlinkApplication metadata: spec: cmfRestClassRef: name: --- [1] namespace: --- [2] image: --- [3] flinkEnvironment: --- [4] image: flinkVersion: flinkConfiguration: --- [5] serviceAccount: --- [6] jobManager: --- [7] taskManager: --- [8] job: --- [9]
[1] The reference to the REST Class you created in Create a CMF REST Class.
If omitted, the CMFRestClass of the name
default
in the same namespace is used.[2] The namespace of this FlinkApplication CR.
The namespace of the Flink cluster is determined by
FlinkEnvironment.spec.kubernetesNamespace
.[3] The CMF image.
[4] The reference to the FlinkEnvironment CR you created in Create a Flink environment.
[5] Flink configurations.
[6] The service account that runs Flink.
[7] FlinkJobManager
[8] FlinkTaskManager
[9] FlinkJob
An example FlinkApplication CR:
apiVersion: platform.confluent.io/v1beta1 kind: FlinkApplication metadata: name: my-app1 namespace: default spec: flinkEnvironment: my-env1 image: confluentinc/cp-flink:1.19.1-cp1 flinkVersion: v1_19 flinkConfiguration: "taskmanager.numberOfTaskSlots": "2" "metrics.reporter.prom.factory.class": "org.apache.flink.metrics.prometheus.PrometheusReporterFactory" "metrics.reporter.prom.port": "9249-9250" "rest.profiling.enabled": "true" serviceAccount: flink jobManager: resource: memory: 1048m cpu: 1 taskManager: resource: memory: 1048m cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar state: running parallelism: 3 upgradeMode: stateless cmfRestClassRef: name: default namespace: operator
Check the status.
kubectl get flinkApplication -n <namespace> -oyaml
For status details, see Check the Flink application status.
Check the Flink application status¶
The following are the notable status fields in the CFK-manged FlinkApplication CR:
status:
cmfSync: --- [1]
errorMessage: --- [2]
lastSyncTime: --- [3]
status: --- [4]
error: --- [5]
clusterInfo: --- [6]
jobManagerDeploymentStatus: --- [7]
jobStatus:
state: --- [8]
[1] The status of the sync between CFK and CMF through the CMFRestClass CR.
[2] Any error message related to the sync between CFK and CMF ([1]), for example, a connection, authentication, or validation error.
[3] The time when the latest sync between CFK and CMF happened.
[4] The sync status. The possible values:
CREATED
,DELETED
,UNKNOWN
,FAILED
.[5] Indicates async errors during from the Flink deployment.
This is only set if
status.cmfSync.errorMessage
([2]) is empty andstatus.cmfSync.status: CREATED
.
For details about the below status fields, refer to the CMF documentation.
- [6] Information about the Flink cluster when deployed. This section is only set
if the
status.error
([5]) is not set. - [7] Status of the JobManager deployment in Kubernetes.
- [8] Status of the Flink job inside the FlinkApplication’s Flink cluster.
It is important to note, that there is a hierarchy of status/error fields in the
FlinkApplication.status
:
- Level 1. The
status.cmfSync
field needs to be error-free, as this indicates that CFK was able to submit the FlinkApplication to the CMF backend. - Level 2, The CMF backend or the internal Kubernetes Operator might report an
error in the
status.error
field. - Level 3. Once the errors with the above field are resolved, the rest of the status fields, get populated.
The following is an example of error status:
status:
cfkInternalState: CREATED
clusterInfo: {}
cmfSync:
errorMessage: ""
lastSyncTime: "2024-11-05T19:15:09Z"
status: Created
error: '{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.configuration.IllegalConfigurationException:
JobManager memory configuration failed: Sum of configured JVM Metaspace (256.000mb
(268435456 bytes)) and JVM Overhead (192.000mb (201326592 bytes)) exceed configured
Total Process Memory (1 bytes).","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.configuration.IllegalConfigurationException","message":"JobManager
memory configuration failed: Sum of configured JVM Metaspace (256.000mb (268435456
bytes)) and JVM Overhead (192.000mb (201326592 bytes)) exceed configured Total
Process Memory (1 bytes).","additionalMetadata":{}},{"type":"org.apache.flink.configuration.IllegalConfigurationException","message":"Sum
of configured JVM Metaspace (256.000mb (268435456 bytes)) and JVM Overhead (192.000mb
(201326592 bytes)) exceed configured Total Process Memory (1 bytes).","additionalMetadata":{}}]}'
jobManagerDeploymentStatus: MISSING
jobStatus:
checkpointInfo:
lastPeriodicCheckpointTimestamp: 0
jobId: a6251e5a0f3f2e00f56874b56bc0780c
jobName: ""
savepointInfo:
lastPeriodicSavepointTimestamp: 0
savepointHistory: []
state: ""
lifecycleState: UPGRADING
observedGeneration: 5
reconciliationStatus:
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":1,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"rest.profiling.enabled":"true","taskmanager.numberOfTaskSlots":"2"},"image":"confluentinc/cp-flink:1.19.1- cp1","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_19","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"1","ephemeralStorage":null},"replicas":1,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/origin":"flink"}}}},"taskManager":{"resource":{"cpu":1.0,"memory":"1","ephemeralStorage":null},"replicas":null,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/ origin":"flink"}}}},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":6},"firstDeployment":true}}'
reconciliationTimestamp: 1730834099726
state: UPGRADING
taskManager:
labelSelector: ""
replicas: 0
The following is an example status of a successful FlinkApplication creation:
status:
cfkInternalState: CREATED
clusterInfo:
flink-revision: 89d0b8f @ 2024-06-22T13:19:31+02:00
flink-version: 1.19.1-cp1
total-cpu: "2.0"
total-memory: "2516582400"
cmfSync:
errorMessage: ""
lastSyncTime: "2024-11-05T19:19:10Z"
status: Created
jobManagerDeploymentStatus: READY
jobStatus:
checkpointInfo:
lastPeriodicCheckpointTimestamp: 0
jobId: 522d7ff7f15b4e138ffb9ea4053abbd3
jobName: State machine job
savepointInfo:
lastPeriodicSavepointTimestamp: 0
savepointHistory: []
startTime: "1730834237948"
state: RUNNING
updateTime: "1730834248753"
lifecycleState: STABLE
observedGeneration: 6
reconciliationStatus:
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":1,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"rest.profiling.enabled":"true","taskmanager.numberOfTaskSlots":"2"},"image":"confluentinc/cp-flink:1.19.1- cp1","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_19","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"1200m","ephemeralStorage":null},"replicas":1,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/origin":"flink"}}}},"taskManager":{"resource":{"cpu":1.0,"memory":"1200m","ephemeralStorage":null},"replicas":null,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/ origin":"flink"}}}},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":12},"firstDeployment":true}}'
lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":1,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"rest.profiling.enabled":"true","taskmanager.numberOfTaskSlots":"2"},"image":"confluentinc/cp-flink:1.19.1- cp1","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_19","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"1200m","ephemeralStorage":null},"replicas":1,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/origin":"flink"}}}},"taskManager":{"resource":{"cpu":1.0,"memory":"1200m","ephemeralStorage":null},"replicas":null,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/ origin":"flink"}}}},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":12},"firstDeployment":true}}'
reconciliationTimestamp: 1730834229475
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=app111
replicas: 1