Manage Confluent Platform with Confluent Operator

The following sections provide information about managing the deployed Confluent Platform cluster and operating environment.

Note

Confluent recommends Helm 3 for Confluent Operator and Confluent Platform 1.7 deployments. See the following documentation for Helm migration and Operator upgrade instructions.

The examples in this guide use the following assumptions:

  • $VALUES_FILE refers to the configuration file you set up in Create the global configuration file.

  • To present simple and clear examples in the Operator documentation, all the configuration parameters are specified in the config file ($VALUES_FILE). However, in your production deployments, use the --set or --set-file option when applying sensitive data with Helm. For example:

    helm upgrade --install kafka \
     --set kafka.services.mds.ldap.authentication.simple.principal="cn=mds\,dc=test\,dc=com" \
     --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \
     --set kafka.enabled=true
    
  • operator is the namespace that Confluent Platform is deployed in.

  • All commands are executed in the helm directory under the directory Confluent Operator was downloaded to.

Add components to an existing Kafka cluster

To add additional components to a cluster, you modify your configuration ($VALUES_FILE) file and then run the helm upgrade --install command for the added component.

For example, if you want to add ksqlDB to your cluster, you would add the ksqlDB values block to your configuration file ($VALUES_FILE) and run the install command for ksqlDB only as following:

helm upgrade --install \
  ksql \
  --values $VALUES_FILE \
  --namespace operator \
  --set ksql.enabled=true \
  ./confluent-operator

Delete components

Uninstall a component release from the cluster with the following command:

helm uninstall <component-release-name> --namespace <namespace-name>

Enter the following commands to delete Confluent Platform components in the cluster. Components must be deleted in the order shown below using the component release name. The examples below show the default release names:

helm uninstall ksql --namespace <namespace-name>
helm uninstall controlcenter --namespace <namespace-name>
helm uninstall connectors --namespace <namespace-name>
helm uninstall replicator --namespace <namespace-name>
helm uninstall schemaregistry --namespace <namespace-name>
helm uninstall kafka --namespace <namespace-name>
helm uninstall zookeeper --namespace <namespace-name>
helm uninstall operator --namespace <namespace-name>

Trigger a cluster restart

There are cases when it is necessary to trigger a restart of a cluster, such as to apply configuration changes or to have a component pick up a change after you rotated credentials in a Secret object.

To restart a Confluent Platform component cluster:

  1. Find the name of the StatefulSet corresponding to the Confluent Platform cluster you want to restart:

    kubectl get statefulset --namespace <namespace>
    
  2. Using the StatefulSet name, <name-of-statefulset>, from the previous step, roll the cluster:

    kubectl rollout restart statefulset/<name-of-statefulset> --namespace <namespace>
    

Deploy Confluent connectors

Connectors are no longer packaged with the Connect images starting in Operator 1.7. You must deploy connectors by adding them to the Connect image, cp-server-connect-operator. For more information on extending a Docker image, see Build the Confluent Platform images.

  1. Pull the Operator init container image. The connector image depends on the Operator init image, so both images must exist in the same repository.

    docker pull confluentinc/cp-init-container-operator:<version-tag>
    
  2. Create a Dockerfile in <dockerfile-dir> to add one or more connectors to the cp-server-connect-operator image.

    You can either pull connectors from Confluent Hub or use a connector JAR downloaded to the machine you are running the Docker build from.

    • To pull connectors from Confluent Hub, create a Dockerfile as follows:

      FROM confluentinc/cp-server-connect-operator:<version-tag>
      USER root
      RUN confluent-hub install --no-prompt <connector1>:<connector1-version> \
        && confluent-hub install --no-prompt <connector2>:<connector2-version> \
        && ...
      USER 1001
      

      An example Dockerfile to create a Docker image with the data-gen connector from Confluent Hub:

      FROM confluentinc/cp-server-connect-operator:6.0.0.0
      USER root
      RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.3.3
      USER 1001
      
    • To use connector JAR files that are on the machine that you are running the Docker build from, create a Dockerfile as follows:

      FROM confluentinc/cp-server-connect-operator:<version-tag>
      ADD <local-connector1-path> /usr/share/java/<connector1> \
        && <local-connector2-path> /usr/share/java/<connector2> \
        && ...
      USER 1001
      

      An example Dockerfile to use the data-gen connector existing on your local machine in the <connector-dir> directory:

      FROM confluentinc/cp-server-connect-operator:6.0.0.0
      ADD​ my-connector-dir/confluentinc-kafka-connect-datagen /usr/share/java/confluentinc-kafka-connect-datagen
      USER 1001
      
  3. Build and push the image with the following commands:

    docker build <dockerfile-dir> -t <custom-registry>/<custom-repository>:<custom-tag>
    
    docker push <custom-registry>/<custom-repository>:<custom-tag>
    
  4. Get the Docker image details from the output of the above process and use them in the next step.

  5. Run helm upgrade to apply the changes:

    helm upgrade --install \
      <connect-helm-release-name> \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set connect.enabled=true \
      --set connect.name=connect \
      --set global.provider.registry.fqdn=<custom-registry-endpoint> \
      --set global.provider.registry.credential.required=<true or false> \
      --set global.provider.registry.credential.username=<value if required> \
      --set global.provider.registry.credential.password=<value if required> \
      --set connect.image.repository=<custom-registry>/<custom-repository> \
      --set connect.image.tag=<custom-tag> \
      --set global.initContainer.image.repository=<mypath>/<name> \
      --set global.initContainer.image.tag=<custom-tag>
    

Monitor Kafka metrics

The configuration parameter metricReporter is provided in the Kafka values.yaml file. If you add the parameters below to the configuration file ($VALUES_FILE), messages are published to the bootstrap endpoint every 30000 milliseconds (30 seconds).

metricReporter:
  enabled: true
  ## Period (millisecond) the report should use to publish messages to bootstrapEndpoint
  ##
  publishMs: 30000
  ## ReplicationFactor, if empty, the value is based on the replicas count
  ##
  replicationFactor: ""
  tls:
    enabled: false
    ## If true, inter-communication will be encrypted if TLS is enabled. The bootstrapEndpoint will have FQDN name.
    ## If false, the security setting is configured to use either SASL_PLAINTEXT or PLAINTEXT
    internal: false
    authentication:
      type: ""
  ## If above tls.internal is true, configure with Kafka bootstrap DNS configuration on port 9092 e.g <kafka.name>.<domain>:9092
  ## If above tls.internal is false, configure with Kafka service name on port 9071 eg. <kafka.name>:9071 or FQDN name of Kafka serivce name e.g <name>.<namespace>.svc.cluster.local:9071
  bootstrapEndpoint: ""

Important

If metricReporter is enabled with a bootstrapEndpoint as a DNS name, the DNS name must be resolved successfully or the Kafka pod will crash (CrashLoopBackOff). If the boostrapEndpoint is using an internal address (like kafka:9071), this is not an issue. For additional security information, see the comments in the values.yaml file.

Scale Kafka clusters and balance data

Scale up

Starting in Confluent Platform 6.0.0, Self-Balancing is the recommended way to rebalance loads when Kafka brokers are added or removed. Self-Balancing is disabled by default.

To scale up the Kafka cluster:

  1. Enable Self-Balancing and rebalance Kafka for any uneven load:

    For a complete list of available settings you can use to control Self-Balancing, see Configuration Options and Commands for Self-Balancing Clusters. You can pass the settings in configOverrides in the kafka section of the configuration file ($VALUES_FILE).

    kafka:
      configOverrides:
        server:
          - confluent.balancer.enable=                   ----- [1]
          - confluent.balancer.heal.uneven.load.trigger= ----- [2]
    
  2. Increase the number of Kafka replicas:

    kafka:
      replicas:
    
  3. Specify whether Operator performs DNS checks on the new broker pods:

    kafka:
      scalingDnsPreCheck:
    
    • Disable DNS checks (scalingDnsPreCheck: false) in an environment where you cannot guarantee that DNS can be checked/resolved from the Operator node, you do not want Operator to check whether the DNS is resolvable from the Operator node before it scales up the Kafka broker. By default, DNS check is disabled in this version of Operator.

    • Enable DNS checks (scalingDnsPreCheck: true) in an environment where DNS is fully managed and is resolvable from all nodes, including the Operator pod, configure Operator to check DNS before scaling up.

      When this setting is enabled, Operator performs a DNS check on each new broker it brings up and checks for health when you increase the replica count of brokers. If this DNS check fails, the scaling operation fails.

  4. If you enabled DNS checks in the previous step, ensure that proper DNS records are configured for the new brokers, and ensure that the Operator can resolve the new broker hostname, using a command such as nslookup.

    If you are using hosts file instead of a DNS service, you need to update hosts file with the new brokers information. For example:

    1. Get the new broker IP addresses:

      kubectl get services -n <namespace>
      
    2. Refer to the existing broker host names with the broker prefix, and derive the hostnames of the new brokers.

    3. Add the new broker hosts to the /etc/hosts file, and inject the updated file to the Operator pod as described in Adding entries to Pod /etc/hosts.

  5. Update Kafka with the new settings:

    helm upgrade --install kafka ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set kafka.enabled=true
    

Note

If you need to use Auto Data Balancer, first turn off Self-Balancing as Self-Balancing and Auto Data Balancer cannot be used together and refer to the 5.5.1 or an older version of the documentation for the scale up process using Auto Data Balancer.

Scale down

Scaling down Kafka clusters is not supported in the current version of Confluent Operator.

Scale storage

Scale up disks

The ability to scale up disks is important to successfully use Confluent Platform at scale. For instance, as your organization’s usage of Kafka grows, you may experience a large growth in the number of Kafka topics, and expanding disks can help you ensure you have enough disk space to accommodate the data in those topics.

Confluent Operator enables simple, automated expansion of storage. Depending on your environment, expanding all the disks associated to a cluster (e.g. ZooKeeper, Kafka, etc.) can be as simple as changing a single value in your configuration file ($VALUES_FILE) and running the helm upgrade command. And after some time, all pods within the cluster are able to leverage expanded disk space without any interruption. In some cases, this expansion can be completed across an entire cluster in just a few seconds.

This section details the processes underlying disk expansion, the settings and configurations (e.g. choice of storage provisioner, Kubernetes version, StorageClass configuration, etc.) that affect which processes can be automated, and the concrete workflows you need to follow to expand disk depending on your settings and configurations.

Environment and configuration considerations

Expanding a disk requires both resizing the PersistentVolumeClaim (PVC) and expanding the file system exposed to the pod where the given associated PersistentVolume (PV) is mounted.

The ability to automatically resize a PVC depends on the volume types you use and how the StorageClass associated to the PVCs is configured. For further details on PVC expansion support, see Expanding Persistent Volumes Claims in Kubernetes.

Once a PVC is expanded, the ability to automatically expand the file system exposed to a Pod without restarting the pod requires the ExpandInUsePersistentVolumes feature to be enabled on your Kubernetes cluster. For Kubernetes clusters with version 1.15 and higher, this feature is enabled by default. For further details on expanding in-use PVCs, see Resizing an in-use PersistentVolumeClaim.

One-command workflow for disk expansion

On an environment where the volume is automatically expanded and the file system mounted in the pods are automatically resized, you can take this simplest workflow for disk expansion. Edit your configuration file ($VALUES_FILE) and run a single command to apply the new configuration.

The following example expands the disk associated to each broker in a Kafka cluster from 10 GB to 100 GB. You can apply the same approach to any Confluent Platform component:

  1. Update the storage volume in your configuration file ($VALUES_FILE).

    The following example increases the disk for Kafka brokers to 100 Gi.

    kafka:
      volume:
        data0: 100Gi # defaults to 10Gi if this wasn't specified before
    
  2. Apply the change by running the helm upgrade command.

    For example:

    helm upgrade --install kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --set kafka.enabled=true \
      --namespace operator
    

For this workflow to be supported, the following must be true of your environment:

  • The StorageClass associated with your Kafka (or other Confluent Platform component) cluster must have set allowVolumeExpansion: true in $VALUES_FILE.

  • The underlying volume type must support expansion.

    At the time of this writing, GCE Persistent Disk, AWS EBS, Azure Disk, and others support expansion, while Host Path and other types of volumes do not.

  • The ExpandInUsePersistentVolumes feature must be enabled on your Kubernetes cluster.

    For Kubernetes clusters with version 1.15 and above, this feature is enabled by default.

Two-command workflow for disk expansion

On an environment only the volume gets automatically expanded, and the file system mounted inside the pod is not resized, e.g. Kubernetes versions lower than 1.15, you need to roll the cluster for the new size to be reflected at the pod’s file-system level in addition to the above workflow:

  1. Update the storage volume in your configuration file ($VALUES_FILE).

    The following example increases the disk for Kafka brokers to 100 Gi.

    kafka:
      volume:
        data0: 100Gi # defaults to 10Gi if this wasn't specified before
    
  2. Apply the change by running the helm upgrade command. For example:

    helm upgrade --install kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --set kafka.enabled=true \
      --namespace operator
    
  3. Prepare to trigger Operator to perform a safe rolling restart of your cluster by setting a “dummy” annotation in your configuration file ($VALUES_FILE):

    kafka:
      podAnnotations:
        platform.confluent.io/dummy: "any-string"
    
  4. Apply the change by running the helm upgrade command. For example:

    helm upgrade --install kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --set kafka.enabled=true \
      --namespace operator
    

After the cluster has rolled, the expanded disk should now be available to your cluster. At this point, if you want to remove the “dummy” annotation, remove the annotation from your configuration file ($VALUES_FILE) and apply the change using the helm upgrade command. Note that doing so will safely roll your cluster again.

For this workflow to be supported, the following must be true of your environment:

  • The StorageClass associated with your Kafka (or other Confluent Platform component) cluster must have set allowVolumeExpansion: true.

  • The underlying volume type must support expansion.

    At the time of this writing, GCE Persistent Disk, AWS EBS, Azure Disk, and others support expansion, while Host Path and other types of volumes do not. Refer to the storage provider’s documentation to check.

Scale down disks

Scaling down disks is not supported in the current version of Confluent Operator.