Manage Confluent Platform with Confluent Operator

The following sections provide information about managing the deployed Confluent Platform cluster and operating environment.

Note

Confluent recommends Helm 3 for Confluent Operator and Confluent Platform 5.5 deployments. See the following documentation for Helm migration and Operator upgrade instructions.

The examples in this guide use the following assumptions:

  • $VALUES_FILE refers to the configuration file you set up in Create the global configuration file.

  • To present simple and clear examples in the Operator documentation, all the configuration parameters are specified in the config file ($VALUES_FILE). However, in your production deployments, use the --set or --set-file option when applying sensitive data with Helm. For example:

    helm upgrade --install kafka \
     --set kafka.services.mds.ldap.authentication.simple.principal=”cn=mds,dc=test,dc=com” \
     --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \
     --set kafka.enabled=true
    
  • operator is the namespace that Confluent Platform is deployed in.

  • All commands are executed in the helm directory under the directory Confluent Operator was downloaded to.

Add components to an existing Kafka cluster

To add additional components to a cluster, you modify you configuration yaml file and then run the helm upgrade --install commannd for the added component. For example, if you want to add ksqlDB to your cluster, you would add the ksqlDB values block to your configuration file ($VALUES_FILE) and run the install command for ksqlDB only. For example:

helm upgrade --install \
  --values $VALUES_FILE \
  --name ksql \
  --namespace operator \
  --set ksql.enabled=true \
  ./confluent-operator

Delete components

Uninstall a component release from the cluster.

helm uninstall <component-release-name> --namespace <namespace-name>

Enter the following commands to delete Confluent Platform components in the cluster. Components must be deleted in the order shown below using the component release name. The examples below show the default release names:

helm uninstall ksql --namespace <namespace-name>
helm uninstall controlcenter --namespace <namespace-name>
helm uninstall connectors --namespace <namespace-name>
helm uninstall replicator --namespace <namespace-name>
helm uninstall schemaregistry --namespace <namespace-name>
helm uninstall kafka --namespace <namespace-name>
helm uninstall zookeeper --namespace <namespace-name>
helm uninstall operator --namespace <namespace-name>

Monitor Kafka metrics

The configuration parameter metricReporter is provided in the Kafka values.yaml file. If you add the parameters below to the provider.yaml file, messages are published to the bootstrap endpoint every 30000 milliseconds (30 seconds).

metricReporter:
  enabled: true
  ## Period (millisecond) the report should use to publish messages to bootstrapEndpoint
  ##
  publishMs: 30000
  ## ReplicationFactor, if empty, the value is based on the replicas count
  ##
  replicationFactor: ""
  tls:
    enabled: false
    ## If true, inter-communication will be encrypted if TLS is enabled. The bootstrapEndpoint will have FQDN name.
    ## If false, the security setting is configured to use either SASL_PLAINTEXT or PLAINTEXT
    internal: false
    authentication:
      type: ""
  ## If above tls.internal is true, configure with Kafka bootstrap DNS configuration on port 9092 e.g <kafka.name>.<domain>:9092
  ## If above tls.internal is false, configure with Kafka service name on port 9071 eg. <kafka.name>:9071 or FQDN name of Kafka serivce name e.g <name>.<namespace>.svc.cluster.local:9071
  bootstrapEndpoint: ""

Important

If metricReporter is enabled with a bootstrapEndpoint as a DNS name, the DNS name must be resolved successfully or the Kafka pod will crash (CrashLoopBackOff). If the boostrapEndpoint is using an internal address (like kafka:9071), this is not an issue. For additional security information, see the comments in the values.yaml file.

Scale Kafka clusters and balance data

Scale up

Complete data balancing when a Kafka cluster scale-up is triggered after an update or upgrade, or when a Kafka resource is scaled up.

  1. On your local machine, use kubectl exec to start a bash session on one of the pods in the cluster. The example uses the default pod name kafka-0 on a Kafka cluster using the default name kafka.

    kubectl -n operator exec -it kafka-0 bash
    
  2. Create a config.properties properties file using cat. You can get <username> and <password> from yaml files by entering the following command:

    kubectl -n operator describe kafka | grep "internalClient"``
    

    The following shows the config.properties to connect to the broker on internal port 9071 using SASL_PLAINTEXT.

    cat << EOF > config.properties
    confluent.license=
    confluent.rebalancer.metrics.sasl.mechanism=PLAIN
    confluent.rebalancer.metrics.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
    confluent.rebalancer.metrics.bootstrap.servers=kafka:9071
    confluent.rebalancer.metrics.security.protocol=SASL_PLAINTEXT
    EOF
    

    The following shows TLS (server or client authentication):

    cat << EOF > config.properties
    confluent.license=
    confluent.rebalancer.metrics.bootstrap.servers=kafka:9071
    confluent.rebalancer.metrics.security.protocol=PLAINTEXT
    EOF
    
  3. Enter the following command to rebalance. The --bootstrap-server value should be in the format <kafka.name>:9071. The example shows the Kafka name kafka.

    confluent-rebalancer execute \
      --bootstrap-server kafka:9071 \
      --metrics-bootstrap-server kafka:9071 \
      --throttle 10000000 --verbose \
      —-config-file config.properties
    
  4. Verify that the rebalance was successful.

    confluent-rebalancer status --bootstrap-server kafka:9071
    

Scale down

Scaling down Kafka clusters is not supported in the current version of Confluent Operator.