Managing the deployment

The following sections provide information about managing the deployed Confluent Platform cluster and operating environment.

Note

Confluent recommends Helm 3 for Confluent Operator and Confluent Platform 5.4 deployments. See the following documentation for Helm migration and Operator upgrade instructions.

Adding a license key

You can use Confluent Operator and Confluent Control Center for a 30-day trial period without a license key. After 30 days, Operator and Control Center require license keys.

Note

You add the license key to your <provider>.yaml file or to a custom override YAML file, if applicable. The YAML file is read when you run the helm upgrade command to apply the license.

Operator License

The sample YAML file helm/confluent-operator/charts/operator/values.yaml contains the license section below. Add this section to the Operator block in the <provider>.yaml file or your custom override YAML file.

## License Key for Operator
##
licenseKey: ""

If you are adding your license key to a running deployment, run helm upgrade to activate the license. The example below assumes you are using the <provider>.yaml file to update your configuration.

helm upgrade \
-f ./providers/<provider>.yaml \
--set operator.enabled=true \
./confluent-operator

If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

Confluent Control Center License

The sample yaml file helm/confluent-operator/charts/controlcenter/values.yaml contains the license section below. Add this section to the Control Center block in the <provider>.yaml file or your custom override yaml file.

## C3 License information
##
license: ""

If you are adding your license key to a running deployment, run helm upgrade to activate the license. The example below assumes you are using the <provider>.yaml file to update your configuration.

helm upgrade \
-f ./providers/<provider>.yaml \
--set controlcenter.enabled=true \
./confluent-operator

If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

Deleting components

Uninstall a component release from the cluster.

helm uninstall <component-release-name> --namespace <namespace-name>

Enter the following commands to delete all components in the cluster. Components must be deleted in the command order shown below using the component release name (the example below shows the defaults):

helm uninstall ksql --namespace <namespace-name>
helm uninstall controlcenter --namespace <namespace-name>
helm uninstall connectors --namespace <namespace-name>
helm uninstall replicator --namespace <namespace-name>
helm uninstall schemaregistry --namespace <namespace-name>
helm uninstall kafka --namespace <namespace-name>
helm uninstall zookeeper --namespace <namespace-name>
helm uninstall operator --namespace <namespace-name>

Modifying Component Configurations

Refer to the following two sections for additional information about Confluent Platform component modifications.

Default Component Modifications

The <provider>.yaml contains default configuration parameters. The values.yaml file contains the defaults along with additional parameters you can add to your configuration. The values.yaml files also contain detailed comments that describe each configuration parameter and how to use it. The table below lists each of the values.yaml files and their location.

Component Chart Name values.yaml path
Operator operator helm/confluent-operator/charts/operator/values.yaml
Manager manager helm/confluent-operator/charts/manager/values.yaml
Kafka kafka helm/confluent-operator/charts/kafka/values.yaml
ZooKeeper zookeeper helm/confluent-operator/charts/zookeeper/values.yaml
Connect connect helm/confluent-operator/charts/connect/values.yaml
Schema Registry schemaregistry helm/confluent-operator/charts/schemaregistry/values.yaml
Control Center controlcenter helm/confluent-operator/charts/controlcenter/values.yaml
Replicator replicator helm/confluent-operator/charts/replicator/values.yaml
KSQL ksql helm/confluent-operator/charts/ksql/values.yaml

Important

You should not modify a component values.yaml file. When you need to use or modify a component configuration parameter, add it to or change it in the <provider>.yaml file. The <provider>.yaml overrides other yaml files at installation and when you upgrade a component configuration.

Complete the following steps to make component configuration changes:

  1. Find the configuration parameter block in the values.yaml file that you want to use.

  2. Copy the configuration parameter into the correct location in the <provider>.yaml file and make the changes you require.

  3. Enter the following upgrade command:

    helm upgrade -f \
    ./providers/<provider>.yaml \
    --set <component>.enabled=true \
    <component> \
    ./confluent-operator
    

    For example, to change a Kafka configuration parameter, you enter the following upgrade command after saving your configuration changes in the <provider>.yaml file.

    helm upgrade -f \
    ./providers/<provider>.yaml \
    --set kafka.enabled=true \
    kafka \
    ./confluent-operator
    

Kafka and ZooKeeper Configuration Overrides

You can override default Kafka and ZooKeeper configuration parameters using the following values.yaml configuration parameters:

configOverrides:
   server: []
   jvm: []

You can only change settings for configuration parameters that are key/value pairs. Configuration parameters with only a key can’t be modified using the override. Refer to the following Confluent documents for configuration parameters used in Kafka and ZooKeeper:

For example, if you want to change the default false parameter for auto.create.topics.enable, you add the following configuration block to your <provider>.yaml file.

configOverrides:
   server:
   - auto.create.topics.enable=true
   jvm: []

Then, run the upgrade command. For example, if you are adding this override for Kafka you enter the following command:

helm upgrade -f \
./providers/<provider>.yaml \
--set kafka.enabled=true \
kafka \
./confluent-operator

Schema Validation

You use a configuration override in Kafka to set up schema validation. The following example shows HTTP endpoint override for an example release name schemaregistry on namespace operator.

configOverrides:
   server:
   - confluent.schema.registry.url=http://schemaregistry.operator.svc.cluster.local:8081

If Schema Registry is deployed using a secure HTTPS endpoint, use the following configuration override:

configOverrides:
   server:
   - confluent.schema.registry.url=https://<domain>:8081

OR (using the same example names as above):

configOverrides:
   server:
   - confluent.schema.registry.url=http://schemaregistry.operator.svc.cluster.local:9081

Note

You can view the <domain> name by running helm status <release-name>.

See Schema Validation in the Schema Registry documentation for additional details.

Monitoring Kafka metrics

The configuration parameter metricReporter is provided in the Kafka values.yaml file. If you add the parameters below to the provider.yaml file, messages are published to the bootstrap endpoint every 30000 milliseconds (30 seconds).

metricReporter:
  enabled: true
  ## Period (millisecond) the report should use to publish messages to bootstrapEndpoint
  ##
  publishMs: 30000
  ## ReplicationFactor, if empty, the value is based on the replicas count
  ##
  replicationFactor: ""
  tls:
    enabled: false
    ## If true, inter-communication will be encrypted if TLS is enabled. The bootstrapEndpoint will have FQDN name.
    ## If false, the security setting is configured to use either SASL_PLAINTEXT or PLAINTEXT
    internal: false
    authentication:
      type: ""
  ## If above tls.internal is true, configure with Kafka bootstrap DNS configuration on port 9092 e.g <kafka.name>.<domain>:9092
  ## If above tls.internal is false, configure with Kafka service name on port 9071 eg. <kafka.name>:9071 or FQDN name of Kafka serivce name e.g <name>.<namespace>.svc.cluster.local:9071
  bootstrapEndpoint: ""

Important

If metricReporter is enabled with a bootstrapEndpoint as a DNS name, the DNS name must be resolved successfully or the Kafka pod will crash (CrashLoopBackOff). If the boostrapEndpoint is using an internal address (like kafka:9071), this is not an issue. For additional security information, see the comments in the values.yaml file.

Data balancing

Scale Up

Complete data balancing when a Kafka cluster scale-up is triggered after an update or upgrade, or when a Kafka resource is scaled up.

  1. On your local machine, use kubectl exec to start a bash session on one of the pods in the cluster. The example uses the default pod name kafka-0 on a Kafka cluster using the default name kafka.

    kubectl -n operator exec -it kafka-0 bash
    
  2. Create a config.properties properties file using cat. You can get <username> and <password> from yaml files by entering the following command:

    kubectl -n operator describe kafka | grep "internalClient"``
    

    The following shows the config.properties to connect to the broker on internal port 9071 using SASL_PLAINTEXT.

    cat << EOF > config.properties
    confluent.license=
    confluent.rebalancer.metrics.sasl.mechanism=PLAIN
    confluent.rebalancer.metrics.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
    confluent.rebalancer.metrics.bootstrap.servers=kafka:9071
    confluent.rebalancer.metrics.security.protocol=SASL_PLAINTEXT
    EOF
    

    The following shows TLS (server or client authentication):

    cat << EOF > config.properties
    confluent.license=
    confluent.rebalancer.metrics.bootstrap.servers=kafka:9071
    confluent.rebalancer.metrics.security.protocol=PLAINTEXT
    EOF
    
  3. Enter the following command to rebalance. The endpoint should be in the format [zookeeper.name]:2181/[kafka.name]-[namespace]. The example shows the ZooKeeper cluster name zookeeper, Kafka cluster name kafka, and the namespace operator. The Kafka bootstrap endpoint should be in the format [kafka.name]:9071.

    confluent-rebalancer execute \
    --bootstrap-server brokerhost:brokerport \
    --metrics-bootstrap-server kafka:9071 \
    --throttle 10000000 --verbose \
    —-config-file config.properties
    
  4. Verify that the rebalance was successful.

    confluent-rebalancer status --zookeeper zookeeper:2181/kafka-operator
    

Scale Down

Scaling down Kafka clusters is not supported in the current version of Confluent Operator.

Common CLI commands

The following provides common commands that you may find useful when managing the cluster.

Helm commands

Use the following commands to display component installation notes and other component release information.

Show the component release information.

helm list --namespace <namespace-name>

or

helm list --kube-context <kubernetes-cluster-name> --namespace <namespace-name>

Show the current status and release notes.

helm status <component-release-name>

Show the template used to deployment the component.

helm get <component-release-name>

Uninstall a component release from the cluster.

helm uninstall <component-release-name> --namespace <namespace-name>

kubectl and oc commands

Use the following commands to get information about your cluster.

Get Kubernetes cluster name.

kubectl config current-context

Set the context. Use this when using multiple namespaces in your environment. For troubleshooting, you may need to set a context even when having only one namespace.

kubectl config set-context <kubernetes-cluster-name>  --namespace=<namespace-name>

Get cluster information.

kubectl get kafka -n <namespace-name> -oyaml

Get cluster nodes.

kubectl get nodes

Get node details.

kubectl  describe node <node>

Tip

The following two commands are useful for getting the internal and external IP addresses for Confluent Platform components.

Check for Kubernetes issues.

kubectl get events -n <namespace>

Get services for a namespace (for example, operator) or all namespaces.

kubectl get services -n operator
kubectl get services --all-namespaces

Get all pods in all namespaces.

kubectl get pods --all-namespaces

Get Kafka broker details (for example, operator).

kubectl get kafka -n operator -oyaml

Get pods with details within a namespace (for example, operator).

kubectl describe pods -n operator

Get pod details.

kubectl describe pods <podname> -n <namespace>

Get pod logs.

kubectl logs <pod name> -n <namespace>

Access a pod container.

kubectl -n <namespace> exec -it <podname> bash

Access a pod container when there is more than one container.

kubectl -n <namespace> exec -it <pod name> --container <container> bash

Run a command.

kubectl exec <pod name> <command>

Run a command if there is more than one container.

kubectl exec <pod name> --container <container> <command>

Testing the deployment

Complete the following steps to test and validate your deployment.

Internal validation

  1. On your local machine, enter the following command to display cluster namespace information (using the example namespace operator). This information contains the bootstrap endpoint you need to complete internal validation.

    kubectl get kafka -n operator -oyaml
    

    The bootstrap endpoint is shown on the bootstrap.servers line.

    ... omitted
    
       internalClient: |-
          bootstrap.servers=kafka:9071
    
  2. On your local machine, use kubectl exec to start a bash session on one of the pods in the cluster. The example uses the default pod name kafka-0 on a Kafka cluster using the default name kafka.

    kubectl -n operator exec -it kafka-0 bash
    
  3. On the pod, create and populate a file named kafka.properties. There is no text editor installed in the containers, so you use the cat command as shown below to create this file. Use CTRL+D to save the file.

    Note

    The example shows default SASL/PLAIN security parameters. A production environment requires additional security. See Configuring security for additional information.

    cat << EOF > kafka.properties
    bootstrap.servers=kafka:9071
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
    sasl.mechanism=PLAIN
    security.protocol=SASL_PLAINTEXT
    EOF
    
  4. On the pod, query the bootstrap server using the following command:

    kafka-broker-api-versions --command-config kafka.properties --bootstrap-server kafka:9071
    

    You should see output for each of the three Kafka brokers that resembles the following:

    kafka-1.kafka.operator.svc.cluster.local:9071 (id: 1 rack: 0) -> (
       Produce(0): 0 to 7 [usable: 7],
       Fetch(1): 0 to 10 [usable: 10],
       ListOffsets(2): 0 to 4 [usable: 4],
       Metadata(3): 0 to 7 [usable: 7],
       LeaderAndIsr(4): 0 to 1 [usable: 1],
       StopReplica(5): 0 [usable: 0],
       UpdateMetadata(6): 0 to 4 [usable: 4],
       ControlledShutdown(7): 0 to 1 [usable: 1],
       OffsetCommit(8): 0 to 6 [usable: 6],
       OffsetFetch(9): 0 to 5 [usable: 5],
       FindCoordinator(10): 0 to 2 [usable: 2],
       JoinGroup(11): 0 to 3 [usable: 3],
       Heartbeat(12): 0 to 2 [usable: 2],
    
    ... omitted
    

    This output validates internal communication within your cluster.

External validation

Complete the following steps to validate external communication.

Prerequisites:
  • Access to download the Confluent Platform.
  • Outside access to the Kafka brokers is only available through an external load balancer. You can’t complete these steps if you did not enable an external load balancer when configuring the provider YAML file and add DNS entries.
  • To access the cluster nodes from your local machine, you must add the DNS entries to your /etc/hosts file.

Note

The examples use default component names.

  1. You use the Confluent CLI running on your local machine to complete external validation. The Confluent CLI is included with the Confluent Platform. On your local machine, download and start the Confluent Platform.

  2. On your local machine, use the kubectl get kafka -n operator -oyaml command to get the bootstrap servers endpoint for external clients. In the example below, the boostrap servers endpoint is kafka.<providerdomain>:9092.

    ... omitted
    
    externalClient: |-
       bootstrap.servers=kafka.<providerdomain>:9092
       sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
       sasl.mechanism=PLAIN
       security.protocol=SASL_PLAINTEXT
    
  3. On your local machine where you have the Confluent Platform running locally, create and populate a file named kafka.properties based on the example used in the previous step.

    bootstrap.servers=kafka.<providerdomain>:9092
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
    sasl.mechanism=PLAIN
    security.protocol=SASL_PLAINTEXT
    

    Note

    The example shows default SASL/PLAIN security parameters. A production environment requires additional security. See Configuring security for additional information.

  4. Using the Confluent CLI on your local machine, create a topic using the bootstrap endpoint kafka<providerdomain>:9092. The example below creates a topic with 1 partition and 3 replicas.

    kafka-topics --bootstrap-server kafka.<providerdomain>:9092 \
    --command-config kafka.properties \
    --create --replication-factor 3 \
    --partitions 1 --topic example
    
  5. Using the Confluent CLI on your local machine, produce to the new topic using the bootstrap endpoint kafka.<providerdomain>:9092. Note that the bootstrap server load balancer is the only Kafka broker endpoint required because it provides gateway access to the load balancers for all Kafka brokers.

    seq 10000 | kafka-console-producer \
    --topic example --broker-list kafka.<providerdomain>:9092 \
    --producer.config kafka.properties
    
  6. In a new terminal on your local machine, use the Confluent CLI to consume from the new topic.

    kafka-console-consumer --from-beginning \
    --topic example --bootstrap-server kafka.<providerdomain>:9092 \
    --consumer.config kafka.properties
    

Successful completion of these steps validates external communication with your cluster.