Manage Confluent Platform with Confluent Operator

The following sections provide information about managing the deployed Confluent Platform cluster and operating environment.

Note

Confluent recommends Helm 3 for Confluent Operator and Confluent Platform 6.0 deployments. See the following documentation for Helm migration and Operator upgrade instructions.

The examples in this guide use the following assumptions:

  • $VALUES_FILE refers to the configuration file you set up in Create the global configuration file.

  • To present simple and clear examples in the Operator documentation, all the configuration parameters are specified in the config file ($VALUES_FILE). However, in your production deployments, use the --set or --set-file option when applying sensitive data with Helm. For example:

    helm upgrade --install kafka \
     --set kafka.services.mds.ldap.authentication.simple.principal=”cn=mds,dc=test,dc=com” \
     --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \
     --set kafka.enabled=true
    
  • operator is the namespace that Confluent Platform is deployed in.

  • All commands are executed in the helm directory under the directory Confluent Operator was downloaded to.

Add components to an existing Kafka cluster

To add additional components to a cluster, you modify you configuration yaml file and then run the helm upgrade --install commannd for the added component. For example, if you want to add ksqlDB to your cluster, you would add the ksqlDB values block to your configuration file ($VALUES_FILE) and run the install command for ksqlDB only. For example:

helm upgrade --install \
  --values $VALUES_FILE \
  --name ksql \
  --namespace operator \
  --set ksql.enabled=true \
  ./confluent-operator

Delete components

Uninstall a component release from the cluster.

helm uninstall <component-release-name> --namespace <namespace-name>

Enter the following commands to delete Confluent Platform components in the cluster. Components must be deleted in the order shown below using the component release name. The examples below show the default release names:

helm uninstall ksql --namespace <namespace-name>
helm uninstall controlcenter --namespace <namespace-name>
helm uninstall connectors --namespace <namespace-name>
helm uninstall replicator --namespace <namespace-name>
helm uninstall schemaregistry --namespace <namespace-name>
helm uninstall kafka --namespace <namespace-name>
helm uninstall zookeeper --namespace <namespace-name>
helm uninstall operator --namespace <namespace-name>

Monitor Kafka metrics

The configuration parameter metricReporter is provided in the Kafka values.yaml file. If you add the parameters below to the configuration file ($VALUES_FILE), messages are published to the bootstrap endpoint every 30000 milliseconds (30 seconds).

metricReporter:
  enabled: true
  ## Period (millisecond) the report should use to publish messages to bootstrapEndpoint
  ##
  publishMs: 30000
  ## ReplicationFactor, if empty, the value is based on the replicas count
  ##
  replicationFactor: ""
  tls:
    enabled: false
    ## If true, inter-communication will be encrypted if TLS is enabled. The bootstrapEndpoint will have FQDN name.
    ## If false, the security setting is configured to use either SASL_PLAINTEXT or PLAINTEXT
    internal: false
    authentication:
      type: ""
  ## If above tls.internal is true, configure with Kafka bootstrap DNS configuration on port 9092 e.g <kafka.name>.<domain>:9092
  ## If above tls.internal is false, configure with Kafka service name on port 9071 eg. <kafka.name>:9071 or FQDN name of Kafka serivce name e.g <name>.<namespace>.svc.cluster.local:9071
  bootstrapEndpoint: ""

Important

If metricReporter is enabled with a bootstrapEndpoint as a DNS name, the DNS name must be resolved successfully or the Kafka pod will crash (CrashLoopBackOff). If the boostrapEndpoint is using an internal address (like kafka:9071), this is not an issue. For additional security information, see the comments in the values.yaml file.

Scale Kafka clusters and balance data

Scale up

Starting in Confluent Platform 6.0.0, Self-Balancing is the recommended way to rebalance loads when Kafka brokers are added or removed. Self-Balancing is disabled by default.

Change the following settings to enable Self-Balancing or to rebalance Kafka for any uneven load when Self-Balancing is enabled. For a complete list of available settings you can use to control Self-Balancing, see Configuration Options and Commands for Self-Balancing Clusters. You can pass the settings in configOverrides in the kafka section of the configuration file ($VALUES_FILE).

kafka:
  configOverrides:
    server:
      - confluent.balancer.enable=                   ----- [1]
      - confluent.balancer.heal.uneven.load.trigger= ----- [2]

After you enable Self-Balancing, to scale up the cluster, perform the following:

  1. Increase the number of Kafka replicas in the configuration file ($VALUES_FILE):

    kafka:
      replicas:
    
  2. Update Kafka:

    helm upgrade --install kafka ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set kafka.enabled=true
    

If you need to use Auto Data Balancer, first turn off Self-Balancing as Self-Balancing and Auto Data Balancer cannot be used together, and then refer to the 5.5.1 or an older version of the documentation for the scale up process.

Scale down

Scaling down Kafka clusters is not supported in the current version of Confluent Operator.

Scale storage

Scale up disks

The ability to scale up disks is important to successfully use Confluent Platform at scale. For instance, as your organization’s usage of Kafka grows, you may experience a large growth in the number of Kafka topics, and expanding disks can help you ensure you have enough disk space to accommodate the data in those topics.

Confluent Operator enables simple, automated expansion of storage. Depending on your environment, expanding all the disks associated to a cluster (e.g. ZooKeeper, Kafka, etc.) can be as simple as changing a single value in your configuration file ($VALUES_FILE) and running the helm upgrade command. And after some time, all pods within the cluster are able to leverage expanded disk space without any interruption. In some cases, this expansion can be completed across an entire cluster in just a few seconds.

This section details the processes underlying disk expansion, the settings and configurations (e.g. choice of storage provisioner, Kubernetes version, StorageClass configuration, etc.) that affect which processes can be automated, and the concrete workflows you need to follow to expand disk depending on your settings and configurations.

Environment and configuration considerations

Expanding a disk requires both resizing the PersistentVolumeClaim (PVC) and expanding the file system exposed to the pod where the given associated PersistentVolume (PV) is mounted.

The ability to automatically resize a PVC depends on the volume types you use and how the StorageClass associated to the PVCs is configured. For further details on PVC expansion support, see Expanding Persistent Volumes Claims in Kubernetes.

Once a PVC is expanded, the ability to automatically expand the file system exposed to a Pod without restarting the pod requires the ExpandInUsePersistentVolumes feature to be enabled on your Kubernetes cluster. For Kubernetes clusters with version 1.15 and higher, this feature is enabled by default. For further details on expanding in-use PVCs, see Resizing an in-use PersistentVolumeClaim.

One-command workflow for disk expansion

On an environment where the volume is automatically expanded and the file system mounted in the pods are automatically resized, you can take this simplest workflow for disk expansion. Edit your configuration file ($VALUES_FILE) and run a single command to apply the new configuration.

The following example expands the disk associated to each broker in a Kafka cluster from 10 GB to 100 GB. You can apply the same approach to any Confluent Platform component:

  1. Update the storage volume in your configuration file ($VALUES_FILE).

    The following example increases the disk for Kafka brokers to 100 Gi.

    kafka:
      volume:
        data0: 100Gi # defaults to 10Gi if this wasn't specified before
    
  2. Apply the change by running the helm upgrade command.

    For example:

    helm upgrade --install kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --set kafka.enabled=true \
      --namespace operator
    

For this workflow to be supported, the following must be true of your environment:

  • The StorageClass associated with your Kafka (or other Confluent Platform component) cluster must have set allowVolumeExpansion: true.

  • The underlying volume type must support expansion.

    At the time of this writing, GCE Persistent Disk, AWS EBS, Azure Disk, and others support expansion, while Host Path and other types of volumes do not.

  • The ExpandInUsePersistentVolumes feature must be enabled on your Kubernetes cluster.

    For Kubernetes clusters with version 1.15 and above, this feature is enabled by default.

Two-command workflow for disk expansion

On an environment only the volume gets automatically expanded, and the file system mounted inside the pod is not resized, e.g. Kubernetes versions lower than 1.15, you need to roll the cluster for the new size to be reflected at the pod’s file-system level in addition to the above workflow:

  1. Update the storage volume in your configuration file ($VALUES_FILE).

    The following example increases the disk for Kafka brokers to 100 Gi.

    kafka:
      volume:
        data0: 100Gi # defaults to 10Gi if this wasn't specified before
    
  2. Apply the change by running the helm upgrade command. For example:

    helm upgrade --install kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --set kafka.enabled=true \
      --namespace operator
    
  3. Prepare to trigger Operator to perform a safe rolling restart of your cluster by setting a “dummy” annotation in your configuration file ($VALUES_FILE):

    kafka:
      podAnnotations:
        platform.confluent.io/dummy: "any-string"
    
  4. Apply the change by running the helm upgrade command. For example:

    helm upgrade --install kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --set kafka.enabled=true \
      --namespace operator
    

After the cluster has rolled, the expanded disk should now be available to your cluster. At this point, if you want to remove the “dummy” annotation, remove the annotation from your configuration file ($VALUES_FILE) and apply the change using the helm upgrade command. Note that doing so will safely roll your cluster again.

For this workflow to be supported, the following must be true of your environment:

  • The StorageClass associated with your Kafka (or other Confluent Platform component) cluster must have set allowVolumeExpansion: true.

  • The underlying volume type must support expansion.

    At the time of this writing, GCE Persistent Disk, AWS EBS, Azure Disk, and others support expansion, while Host Path and other types of volumes do not. Refer to the storage provider’s documentation to check.

Scale down disks

Scaling down disks is not supported in the current version of Confluent Operator.