Managing the deployment

The following sections provide information about managing the deployed Confluent Platform cluster and operating environment.

Adding a license key

You can use Confluent Operator for a 30-day trial period without a license key. After 30 days, Operator requires a license key.

You add your license key to the Confluent Operator values.yaml file that is in the path helm/confluent-operator/charts/operator/values.yaml. Enter the key in the section shown below. Do not modify the Public Key in this section.

## License Key for Operator
licenseKey: ""

If you are adding your license key to a running deployment, run helm upgrade to activate the license.

helm upgrade \
-f ./providers/<provider>.yaml \
--set operator.enabled=true \

If you are a subscriber, please contact Confluent Support at for more information.

Deleting components

Enter the following command to delete a single component:

helm delete --purge <name>

Enter the following commands to delete all components in the cluster. Components must be deleted in the command order shown below using the component release name (the example below shows the defaults):

helm delete --purge ksql
helm delete --purge controlcenter
helm delete --purge connectors
helm delete --purge replicator
helm delete --purge schemaregistry
helm delete --purge kafka
helm delete --purge zookeeper
helm delete --purge operator

Upgrading Confluent Platform software

You can update the image version for Confluent Platform components deployed by Confluent Operator without affecting cluster performance. Special care is taken when upgrading Kafka brokers. The upgrade process verifies topic leader re-election and zero under-replicated partitions before beginning the upgrade process for the next Kafka broker. This ensures that the Kafka cluster continues to perform well while being upgraded.

A component rolling upgrade begins after you run the helm upgrade command shown below:

helm upgrade \
-f ./providers/<provider>.yaml \
--set <component>.enabled=true \
--set <component>.image.tag=<new-version-#> <component> \
  • Replace <provider> with the provider platform for your cluster.
  • Replace <new-version-#> with the new image tag provided by Confluent.
  • Replace <component> with the component you are updating (for example, operator, zookeeper, kafka, etc.)

Modifying Component Configurations

Refer to the following two sections for additional information about Confluent Platform component modifications.

Default Component Modifications

The <provider>.yaml contains default configuration parameters. The values.yaml file contains the defaults along with additional parameters you can add to your configuration. The values.yaml files also contain detailed comments that describe each configuration parameter and how to use it. The table below lists each of the values.yaml files and their location.

Component Chart Name values.yaml path
Operator operator helm/confluent-operator/charts/operator/values.yaml
Manager manager helm/confluent-operator/charts/manager/values.yaml
Kafka kafka helm/confluent-operator/charts/kafka/values.yaml
ZooKeeper zookeeper helm/confluent-operator/charts/zookeeper/values.yaml
Connect connect helm/confluent-operator/charts/connect/values.yaml
Schema Registry schemaregistry helm/confluent-operator/charts/schemaregistry/values.yaml
Control Center controlcenter helm/confluent-operator/charts/controlcenter/values.yaml
Replicator replicator helm/confluent-operator/charts/replicator/values.yaml
KSQL ksql helm/confluent-operator/charts/ksql/values.yaml


You should not modify a component values.yaml file. When you need to use or modify a component configuration parameter, add it to or change it in the <provider>.yaml file. The <provider>.yaml overrides other yaml files at installation and when you upgrade a component configuration.

Complete the following steps to make component configuration changes:

  1. Find the configuration parameter block in the values.yaml file that you want to use.

  2. Copy the configuration parameter into the correct location in the <provider>.yaml file and make the changes you require.

  3. Enter the following upgrade command:

    helm upgrade -f \
    ./providers/<provider>.yaml \
    --set <component>.enabled=true \
    <component> \

    For example, to change a Kafka configuration parameter, you enter the following upgrade command after saving your configuration changes in the <provider>.yaml file.

    helm upgrade -f \
    ./providers/<provider>.yaml \
    --set kafka.enabled=true \
    kafka \

Kafka and ZooKeeper Configuration Overrides

You can override default Kafka and ZooKeeper configuration parameters using the following values.yaml configuration parameters:

   server: []
   jvm: []

You can only change settings for configuration parameters that are key/value pairs. Configuration parameters with only a key can’t be modified using the override. Refer to the following Confluent documents for configuration parameters used in Kafka and ZooKeeper:

For example, if you want to change the default false parameter for auto.create.topics.enable, you add the following configuration block to your <provider>.yaml file.

   - auto.create.topics.enable=true
   jvm: []

Then, run the upgrade command. For example, if you are adding this override for Kafka you enter the following command:

helm upgrade -f \
./providers/<provider>.yaml \
--set kafka.enabled=true \
kafka \

Monitoring Kafka metrics

The configuration parameter metricReporter is provided in the Kafka values.yaml file. If you add the parameters below to the provider.yaml file, messages are published to the bootstrap endpoint every 30000 milliseconds (30 seconds).

  enabled: true
  ## Period (millisecond) the report should use to publish messages to bootstrapEndpoint
  publishMs: 30000
  ## ReplicationFactor, if empty, the value is based on the replicas count
  replicationFactor: ""
    enabled: false
    ## If true, inter-communication will be encrypted if TLS is enabled. The bootstrapEndpoint will have FQDN name.
    ## If false, the security setting is configured to use either SASL_PLAINTEXT or PLAINTEXT
    internal: false
      type: ""
  ## If above tls.internal is true, configure with Kafka bootstrap DNS configuration on port 9092 e.g <>.<domain>:9092
  ## If above tls.internal is false, configure with Kafka service name on port 9071 eg. <>:9071 or FQDN name of Kafka serivce name e.g <name>.<namespace>.svc.cluster.local:9071
  bootstrapEndpoint: ""


If metricReporter is enabled with a bootstrapEndpoint as a DNS name, the DNS name must be resolved successfully or the Kafka pod will crash (crashbackloop). If the boostrapEndpoint is using an internal address (like kafka:9071), this is not an issue. Refer to the comments in the example block above for additional security information.

Data balancing

Scale Up

Complete data balancing when a Kafka cluster scale-up is triggered after an update or upgrade, or when a Kafka resource is scaled up.

  1. On your local machine, use kubectl exec to start a bash session on one of the pods in the cluster. The example uses the default pod name kafka-0 on a Kafka cluster using the default name kafka.

    kubectl -n operator exec -it kafka-0 bash
  2. Create a properties file using cat. You can get <username> and <password> from yaml files by entering the following command:

    kubectl -n operator describe kafka | grep "internalClient"``

    The following shows the to connect to the broker on internal port 9071 using SASL_PLAINTEXT.

    cat << EOF >
    confluent.rebalancer.metrics.sasl.mechanism=PLAIN required username="<username>" password="<password>";

    The following shows TLS (server or client authentication):

    cat << EOF >
  3. Enter the following command to rebalance. The endpoint should be in the format []:2181/[]-[namespace]. The example shows the ZooKeeper cluster name zookeeper, Kafka cluster name kafka, and the namespace operator. The Kafka bootstrap endpoint should be in the format []:9071.

    confluent-rebalancer execute \
    --zookeeper zookeeper:2181/kafka-operator \
    --metrics-bootstrap-server kafka:9071 \
    --throttle 10000000 --verbose \
  4. Verify that the rebalance was successful.

    confluent-rebalancer status --zookeeper zookeeper:2181/kafka-operator

Scale Down

Scaling down Kafka clusters is not supported in the current version of Confluent Operator.

Common CLI commands

The following provides common commands that you may find useful when managing the cluster.

Helm commands

Use the following commands to display component installation notes and other component release information.

Show the component release name.

helm list

Show the current status and release notes.

helm status <component-release-name>

Show the template used to deployment the component.

helm get <component-release-name>

Delete a component.

helm delete --purge <name>

See Deleting a Cluster for the order of commands used to delete a cluster.

kubectl and oc commands

Use the following commands to get information about your cluster.


Replace kubectl with oc for OpenShift clusters.

Get cluster information.

kubectl config current-context

kubectl get kafka -n operator -oyaml

Get cluster nodes.

kubectl get nodes

Get node details.

kubectl  describe node <node>

Get namespaces.

kubectl get namespaces


The following two commands are useful for getting the internal and external IP addresses for Confluent Platform components.

Get services for a namespace (for example, operator) or all namespaces.

kubectl get services -n operator
kubectl get services --all-namespaces

Get all pods in all namespaces.

kubectl get pods --all-namespaces

Get Kafka broker details (for example, operator).

kubectl get kafka -n operator -oyaml

Get pods with details within a namespace (for example, operator).

kubectl describe pods -n operator

Get pod details.

kubectl describe pods <podname> -n <namespace>

Check for Kubernetes issues.

kubectl get events -n <namespace>

Get pod logs.

kubectl logs <pod name> -n <namespace>

Access a pod container.

kubectl -n <namespace> exec -it <podname> bash

Access a pod container when there is more than one container.

kubectl -n <namespace> exec -it <pod name> --container <container> bash

Run a command.

kubectl exec <pod name> <command>

Run a command if there is more than one container.

kubectl exec <pod name> --container <container> <command>

Testing the deployment

Complete the following steps to test and validate your deployment.


Use oc instaed of kubectl commands for OpenShift. Otherwise, the commands are identical. For example, instead of kubectl get pods -n operator you would use oc get pods -n operator.

Internal validation

  1. On your local machine, enter the following command to display cluster namespace information (using the example namespace operator). This information contains the bootstrap endpoint you need to complete internal validation.

    kubectl get kafka -n operator -oyaml

    The bootstrap endpoint is shown on the bootstrap.servers line.

    ... omitted
       internalClient: |-
  2. On your local machine, use kubectl exec to start a bash session on one of the pods in the cluster. The example uses the default pod name kafka-0 on a Kafka cluster using the default name kafka.

    kubectl -n operator exec -it kafka-0 bash
  3. On the pod, create and populate a file named There is no text editor installed in the containers, so you use the cat command as shown below to create this file. Use CTRL+D to save the file.


    The example shows default SASL/PLAIN security parameters. A production environment requires additional security. See Configuring security for additional information.

    cat << EOF >
    bootstrap.servers=kafka:9071 required username="test" password="test123";
  4. On the pod, query the bootstrap server using the following command:

    kafka-broker-api-versions --command-config --bootstrap-server kafka:9071

    You should see output for each of the three Kafka brokers that resembles the following:

    kafka-1.kafka.operator.svc.cluster.local:9071 (id: 1 rack: 0) -> (
       Produce(0): 0 to 7 [usable: 7],
       Fetch(1): 0 to 10 [usable: 10],
       ListOffsets(2): 0 to 4 [usable: 4],
       Metadata(3): 0 to 7 [usable: 7],
       LeaderAndIsr(4): 0 to 1 [usable: 1],
       StopReplica(5): 0 [usable: 0],
       UpdateMetadata(6): 0 to 4 [usable: 4],
       ControlledShutdown(7): 0 to 1 [usable: 1],
       OffsetCommit(8): 0 to 6 [usable: 6],
       OffsetFetch(9): 0 to 5 [usable: 5],
       FindCoordinator(10): 0 to 2 [usable: 2],
       JoinGroup(11): 0 to 3 [usable: 3],
       Heartbeat(12): 0 to 2 [usable: 2],
    ... omitted

    This output validates internal communication within your cluster.

External validation

Complete the following steps to validate external communication.



The examples use default component names.

  1. You use the Confluent CLI running on your local machine to complete external validation. The Confluent CLI is included with the Confluent Platform. On your local machine, download and start the Confluent Platform.

  2. On your local machine, use the kubectl get kafka -n operator -oyaml command to get the bootstrap servers endpoint for external clients. In the example below, the boostrap servers endpoint is kafka.<providerdomain>:9092.

    ... omitted
    externalClient: |-
       bootstrap.servers=kafka.<providerdomain>:9092 required username="test" password="test123";
  3. On your local machine where you have the Confluent Platform running locally, create and populate a file named based on the example used in the previous step.

    bootstrap.servers=kafka.<providerdomain>:9092 required username="test" password="test123";


    The example shows default SASL/PLAIN security parameters. A production environment requires additional security. See Configuring security for additional information.

  4. Using the Confluent CLI on your local machine, create a topic using the bootstrap endpoint kafka<providerdomain>:9092. The example below creates a topic with 1 partition and 3 replicas.

    kafka-topics --bootstrap-server kafka:9092 \
    --command-config \
    --create --replication-factor 3 \
    --partitions 1 --topic example
  5. Using the Confluent CLI on your local machine, produce to the new topic using the bootstrap endpoint kafka.<providerdomain>:9092. Note that the bootstrap server load balancer is the only Kafka broker endpoint required because it provides gateway access to the load balancers for all Kafka brokers.

    seq 10000 | kafka-console-producer \
    --topic example --broker-list kafka.<providerdomain>:9092 \
  6. In a new terminal on your local machine, use the Confluent CLI to consume from the new topic.

    kafka-console-consumer --from-beginning \
    --topic example --bootstrap-server kafka.<providerdomain>:9092 \

Successful completion of these steps validates external communication with your cluster.