Manage Flink Applications Using Confluent for Kubernetes

Apache Flink® is a powerful, scalable, and secure stream processing framework for running complex, stateful, low-latency streaming applications on large volumes of data.

Offered with Confluent Platform, Confluent Manager for Apache Flink® (CMF) is a self-managed service for Flink that integrates seamlessly with Apache Kafka®.

To learn more about CMF, see Overview of Confluent Platform for Apache Flink.

You can use Confluent for Kubernetes (CFK) to manage CMF and Flink applications within the familiar Kubernetes environment and custom resources.

The high-level workflow to manage Flink applications with CFK is:

  1. Install CMF.

  2. Install or upgrade CFK with Flink integration enabled. Use the --set enableCMFDay2Ops=true flag:

    helm upgrade --install confluent-operator \
      confluentinc/confluent-for-kubernetes \
      --set enableCMFDay2Ops=true
    

    For more information about CFK installation, see Deploy Confluent for Kubernetes.

  3. Create a CMF REST class.

  4. Create a Flink environment.

  5. Create a Flink application.

  6. In the Flink Web UI, verify that the application job you created is running.

An example scenario of using CMF with CFK is available in the CFK Example Repository.

Requirements and considerations

  • To manage Flink in CFK, you need the following versions:
    • CMF version V1
    • CFK 2.10.0 and higher
    • Confluent Platform 7.8.0 and higher
  • Currently, CFK can authenticate to CMF without authentication or using mTLS.

Create a CMF REST Class

When managing CMF in CFK, the CMF custom resources, namely, FlinkEnvironment and FlinkApplication, communicate with CMF through the CMF REST Class (CMFRestClass).

You need to first set up a CMF REST Class custom resource (CR).

CMF REST Class is only used by CFK and is not part of CMF.

  1. If using mTLS or TLS to connect to the Flink host, create a secret.

    Certificates with appropriate Subject Alternate Names (SANs) are required for the mTLS setup.

    • mTLS: You need to create a secret with certs and reference it in the CMFRestClass CR in the next step.
    • TLS: The secret is only required if using a self-signed certificate.

    See Provide TLS keys and certificates in PEM format and Provide TLS keys and certificates in Java KeyStore format for the expected keys in the TLS secret.

  2. Create a a CMF REST Class (CMFRestClass CR) with the following spec and deploy the resource using the kubectl apply -f command.

    apiVersion: platform.confluent.io/v1beta1
    kind: CMFRestClass
    metadata:
      name:                     --- [1]
      namespace:                --- [2]
    spec:
      cmfRest:                  --- [3]
        authentication:
          type:                 --- [4]
        endpoint:               --- [5]
        tls:                    --- [6]
          secretRef:            --- [7]
    
    • [1] The name of the REST Class.
    • [2] The namespace of the CMF REST Class.
    • [3] The CMF cluster.
    • [4] To use mTLS authentication, set to mtls and specify the certificates in [7].
    • [5] The endpoint of the CMF host.
    • [6] Required when you set the authentication type ([4]) is set to mtls.
    • [7] The name of the secret that contains the TLS certificates.

    An example CMFRestClass CR:

    apiVersion: platform.confluent.io/v1beta1
    kind: CMFRestClass
    metadata:
      name: default
      namespace: operator
    spec:
      cmfRest:
        endpoint: https://cmf-service:80
        authentication:
          type: mtls
          sslClientAuthentication: true
        tls:
          secretRef: cmf-day2-tls
    
  3. Check the status:

    kubectl get CMFRestClass default -n <namespace> -oyaml
    

Create a Flink environment

A Flink environment is a set of configurations that Flink applications use.

  1. Create a FlinkEnvironment CR using the following spec, and deploy it with the kubectl apply -f command.

    apiVersion: platform.confluent.io/v1beta1
    kind: FlinkEnvironment
    metadata:
      name:
      namespace:
    spec:
      kubernetesNamespace:      --- [1]
      flinkApplicationDefaults: --- [2]
        metadata:               --- [3]
        spec:                   --- [4]
          flinkConfiguration:
      cmfRestClassRef:          --- [5]
        name:
        namespace:
    
    • [1] The namespace of the Flink cluster.

      Typically, you would install the FlinkEnvironment CR in the CFK namespace (metadata.namespace), but the Flink would be in another namespace (spec.kubernetesNamespace), for example, default.

    • [2] Configurations for the Flink cluster to specify the deployment-wide default application settings.

    • [3] Kubernetes API metadata.

    • [4] Spec of the FlinkApplicationSpec type.

    • [5] The reference to the REST Class you created in Create a CMF REST Class.

      You can install FlinkEnvironment CR and the CMF REST class in different namespaces.

      If omitted, the CMFRestClass of the name default in the same namespace is used.

    An example FlinkEnvironment CR:

    apiVersion: platform.confluent.io/v1beta1
    kind: FlinkEnvironment
    metadata:
      name: my-env1
      namespace: operator
    spec:
      kubernetesNamespace: default
      flinkApplicationDefaults:
        metadata:
          labels:
            "acmecorp.com/owned-by": "analytics-team"
        spec:
          flinkConfiguration:
            taskmanager.numberOfTaskSlots: "2"
            rest.profiling.enabled": "true"
      cmfRestClassRef:
        name: default
        namespace: operator
    
  2. Check the status.

    kubectl get flinkEnvironment -n <namespace> -oyaml
    

Create a Flink application

A Flink application is a user program that creates one or more Flink jobs to process data.

To create a Flink application resource in CFK:

  1. Create a FlinkApplication CR using the following spec and deploy the resource using the kubectl apply -f command.

    apiVersion: platform.confluent.io/v1beta1
    kind: FlinkApplication
    metadata:
    spec:
      cmfRestClassRef:
         name:                  --- [1]
         namespace:             --- [2]
      image:                    --- [3]
      flinkEnvironment:         --- [4]
      image:
      flinkVersion:
      flinkConfiguration:       --- [5]
      serviceAccount:           --- [6]
      jobManager:               --- [7]
      taskManager:              --- [8]
      job:                      --- [9]
    
    • [1] The reference to the REST Class you created in Create a CMF REST Class.

      If omitted, the CMFRestClass of the name default in the same namespace is used.

    • [2] The namespace of this FlinkApplication CR.

      The namespace of the Flink cluster is determined by FlinkEnvironment.spec.kubernetesNamespace.

    • [3] The CMF image.

    • [4] The reference to the FlinkEnvironment CR you created in Create a Flink environment.

    • [5] Flink configurations.

    • [6] The service account that runs Flink.

    • [7] FlinkJobManager

    • [8] FlinkTaskManager

    • [9] FlinkJob

    An example FlinkApplication CR:

    apiVersion: platform.confluent.io/v1beta1
    kind: FlinkApplication
    metadata:
      name: my-app1
      namespace: default
    spec:
      flinkEnvironment: my-env1
      image: confluentinc/cp-flink:1.19.1-cp1
      flinkVersion: v1_19
      flinkConfiguration:
        "taskmanager.numberOfTaskSlots": "2"
        "metrics.reporter.prom.factory.class": "org.apache.flink.metrics.prometheus.PrometheusReporterFactory"
        "metrics.reporter.prom.port": "9249-9250"
        "rest.profiling.enabled": "true"
      serviceAccount: flink
      jobManager:
        resource:
          memory: 1048m
          cpu: 1
      taskManager:
        resource:
          memory: 1048m
          cpu: 1
      job:
        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
        state: running
        parallelism: 3
        upgradeMode: stateless
      cmfRestClassRef:
        name: default
        namespace: operator
    
  2. Check the status.

    kubectl get flinkApplication -n <namespace> -oyaml
    

    For status details, see Check the Flink application status.

Check the Flink application status

The following are the notable status fields in the CFK-manged FlinkApplication CR:

status:
  cmfSync:                     --- [1]
    errorMessage:              --- [2]
    lastSyncTime:              --- [3]
    status:                    --- [4]
  error:                       --- [5]
  clusterInfo:                 --- [6]
  jobManagerDeploymentStatus:  --- [7]
  jobStatus:
    state:                     --- [8]
  • [1] The status of the sync between CFK and CMF through the CMFRestClass CR.

  • [2] Any error message related to the sync between CFK and CMF ([1]), for example, a connection, authentication, or validation error.

  • [3] The time when the latest sync between CFK and CMF happened.

  • [4] The sync status. The possible values: CREATED, DELETED, UNKNOWN, FAILED.

  • [5] Indicates async errors during from the Flink deployment.

    This is only set if status.cmfSync.errorMessage ([2]) is empty and status.cmfSync.status: CREATED.

For details about the below status fields, refer to the CMF documentation.

  • [6] Information about the Flink cluster when deployed. This section is only set if the status.error ([5]) is not set.
  • [7] Status of the JobManager deployment in Kubernetes.
  • [8] Status of the Flink job inside the FlinkApplication’s Flink cluster.

It is important to note, that there is a hierarchy of status/error fields in the FlinkApplication.status:

  1. Level 1. The status.cmfSync field needs to be error-free, as this indicates that CFK was able to submit the FlinkApplication to the CMF backend.
  2. Level 2, The CMF backend or the internal Kubernetes Operator might report an error in the status.error field.
  3. Level 3. Once the errors with the above field are resolved, the rest of the status fields, get populated.

The following is an example of error status:

status:
  cfkInternalState: CREATED
  clusterInfo: {}
  cmfSync:
    errorMessage: ""
    lastSyncTime: "2024-11-05T19:15:09Z"
    status: Created
  error: '{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.configuration.IllegalConfigurationException:
    JobManager memory configuration failed: Sum of configured JVM Metaspace (256.000mb
    (268435456 bytes)) and JVM Overhead (192.000mb (201326592 bytes)) exceed configured
    Total Process Memory (1 bytes).","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.configuration.IllegalConfigurationException","message":"JobManager
    memory configuration failed: Sum of configured JVM Metaspace (256.000mb (268435456
    bytes)) and JVM Overhead (192.000mb (201326592 bytes)) exceed configured Total
    Process Memory (1 bytes).","additionalMetadata":{}},{"type":"org.apache.flink.configuration.IllegalConfigurationException","message":"Sum
    of configured JVM Metaspace (256.000mb (268435456 bytes)) and JVM Overhead (192.000mb
    (201326592 bytes)) exceed configured Total Process Memory (1 bytes).","additionalMetadata":{}}]}'
  jobManagerDeploymentStatus: MISSING
  jobStatus:
    checkpointInfo:
      lastPeriodicCheckpointTimestamp: 0
    jobId: a6251e5a0f3f2e00f56874b56bc0780c
    jobName: ""
    savepointInfo:
      lastPeriodicSavepointTimestamp: 0
      savepointHistory: []
    state: ""
  lifecycleState: UPGRADING
  observedGeneration: 5
  reconciliationStatus:
    lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":1,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"rest.profiling.enabled":"true","taskmanager.numberOfTaskSlots":"2"},"image":"confluentinc/cp-flink:1.19.1-   cp1","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_19","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"1","ephemeralStorage":null},"replicas":1,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/origin":"flink"}}}},"taskManager":{"resource":{"cpu":1.0,"memory":"1","ephemeralStorage":null},"replicas":null,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/   origin":"flink"}}}},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":6},"firstDeployment":true}}'
    reconciliationTimestamp: 1730834099726
    state: UPGRADING
  taskManager:
    labelSelector: ""
    replicas: 0

The following is an example status of a successful FlinkApplication creation:

status:
  cfkInternalState: CREATED
  clusterInfo:
    flink-revision: 89d0b8f @ 2024-06-22T13:19:31+02:00
    flink-version: 1.19.1-cp1
    total-cpu: "2.0"
    total-memory: "2516582400"
  cmfSync:
    errorMessage: ""
    lastSyncTime: "2024-11-05T19:19:10Z"
    status: Created
  jobManagerDeploymentStatus: READY
  jobStatus:
    checkpointInfo:
      lastPeriodicCheckpointTimestamp: 0
    jobId: 522d7ff7f15b4e138ffb9ea4053abbd3
    jobName: State machine job
    savepointInfo:
      lastPeriodicSavepointTimestamp: 0
      savepointHistory: []
    startTime: "1730834237948"
    state: RUNNING
    updateTime: "1730834248753"
  lifecycleState: STABLE
  observedGeneration: 6
  reconciliationStatus:
    lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":1,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"rest.profiling.enabled":"true","taskmanager.numberOfTaskSlots":"2"},"image":"confluentinc/cp-flink:1.19.1-   cp1","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_19","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"1200m","ephemeralStorage":null},"replicas":1,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/origin":"flink"}}}},"taskManager":{"resource":{"cpu":1.0,"memory":"1200m","ephemeralStorage":null},"replicas":null,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/   origin":"flink"}}}},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":12},"firstDeployment":true}}'
    lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":1,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"rest.profiling.enabled":"true","taskmanager.numberOfTaskSlots":"2"},"image":"confluentinc/cp-flink:1.19.1-   cp1","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_19","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"1200m","ephemeralStorage":null},"replicas":1,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/origin":"flink"}}}},"taskManager":{"resource":{"cpu":1.0,"memory":"1200m","ephemeralStorage":null},"replicas":null,"podTemplate":{"metadata":{"labels":{"platform.confluent.io/   origin":"flink"}}}},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":12},"firstDeployment":true}}'
    reconciliationTimestamp: 1730834229475
    state: DEPLOYED
  taskManager:
    labelSelector: component=taskmanager,app=app111
    replicas: 1