Manage Confluent Platform with Confluent Operator¶
The following sections provide information about managing the deployed Confluent Platform cluster and operating environment.
Note
Confluent recommends Helm 3 for Confluent Operator and Confluent Platform 1.6 deployments. See the following documentation for Helm migration and Operator upgrade instructions.
The examples in this guide use the following assumptions:
$VALUES_FILE
refers to the configuration file you set up in Create the global configuration file.To present simple and clear examples in the Operator documentation, all the configuration parameters are specified in the config file (
$VALUES_FILE
). However, in your production deployments, use the--set
or--set-file
option when applying sensitive data with Helm. For example:helm upgrade --install kafka \ --set kafka.services.mds.ldap.authentication.simple.principal="cn=mds\,dc=test\,dc=com" \ --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \ --set kafka.enabled=true
operator
is the namespace that Confluent Platform is deployed in.All commands are executed in the
helm
directory under the directory Confluent Operator was downloaded to.
Add components to an existing Kafka cluster¶
To add additional components to a cluster, you modify your configuration
($VALUES_FILE
) file and then run the helm upgrade --install
command for
the added component.
For example, if you want to add ksqlDB to your cluster, you would add the
ksqlDB values block to your configuration file ($VALUES_FILE
) and run the
install command for ksqlDB only as following:
helm upgrade --install \
ksql \
--values $VALUES_FILE \
--namespace operator \
--set ksql.enabled=true \
./confluent-operator
Delete components¶
Uninstall a component release from the cluster with the following command:
helm uninstall <component-release-name> --namespace <namespace-name>
Enter the following commands to delete Confluent Platform components in the cluster. Components must be deleted in the order shown below using the component release name. The examples below show the default release names:
helm uninstall ksql --namespace <namespace-name>
helm uninstall controlcenter --namespace <namespace-name>
helm uninstall connectors --namespace <namespace-name>
helm uninstall replicator --namespace <namespace-name>
helm uninstall schemaregistry --namespace <namespace-name>
helm uninstall kafka --namespace <namespace-name>
helm uninstall zookeeper --namespace <namespace-name>
helm uninstall operator --namespace <namespace-name>
Trigger a cluster restart¶
There are cases when it is necessary to trigger a restart of a cluster, such as to apply configuration changes or to have a component pick up a change after you rotated credentials in a Secret object.
To restart a Confluent Platform component cluster:
Find the name of the StatefulSet corresponding to the Confluent Platform cluster you want to restart:
kubectl get statefulset --namespace <namespace>
Using the StatefulSet name,
<name-of-statefulset>
, from the previous step, roll the cluster:kubectl rollout restart statefulset/<name-of-statefulset> --namespace <namespace>
Deploy Confluent connectors¶
Connectors are no longer packaged with the Connect images starting in Operator
1.6. You must deploy connectors by adding them to the Connect image,
cp-server-connect-operator
. For more information on extending a Docker
image, see Build the Confluent Platform
images.
Pull the Operator init container image. The connector image depends on the Operator init image, so both images must exist in the same repository.
docker pull confluentinc/cp-init-container-operator:<version-tag>
Create a
Dockerfile
in<dockerfile-dir>
to add one or more connectors to thecp-server-connect-operator
image.You can either pull connectors from Confluent Hub or use a connector JAR downloaded to the machine you are running the Docker build from.
To pull connectors from Confluent Hub, create a
Dockerfile
as follows:FROM confluentinc/cp-server-connect-operator:<version-tag> USER root RUN confluent-hub install --no-prompt <connector1>:<connector1-version> \ && confluent-hub install --no-prompt <connector2>:<connector2-version> \ && ... USER 1001
An example
Dockerfile
to create a Docker image with the data-gen connector from Confluent Hub:FROM confluentinc/cp-server-connect-operator:6.0.0.0 USER root RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.3.3 USER 1001
To use connector JAR files that are on the machine that you are running the Docker build from, create a
Dockerfile
as follows:FROM confluentinc/cp-server-connect-operator:<version-tag> ADD <local-connector1-path> /usr/share/java/<connector1> \ && <local-connector2-path> /usr/share/java/<connector2> \ && ... USER 1001
An example
Dockerfile
to use the data-gen connector existing on your local machine in the<connector-dir>
directory:FROM confluentinc/cp-server-connect-operator:6.0.0.0 ADD my-connector-dir/confluentinc-kafka-connect-datagen /usr/share/java/confluentinc-kafka-connect-datagen USER 1001
Build and push the image with the following commands:
docker build <dockerfile-dir> -t <custom-registry>/<custom-repository>:<custom-tag> docker push <custom-registry>/<custom-repository>:<custom-tag>
Get the Docker image details from the output of the above process and use them in the next step.
Run
helm upgrade
to apply the changes:helm upgrade --install \ <connect-helm-release-name> \ ./confluent-operator \ --values $VALUES_FILE \ --namespace operator \ --set connect.enabled=true \ --set connect.name=connect \ --set global.provider.registry.fqdn=<custom-registry-endpoint> \ --set global.provider.registry.credential.required=<true or false> \ --set global.provider.registry.credential.username=<value if required> \ --set global.provider.registry.credential.password=<value if required> \ --set connect.image.repository=<custom-registry>/<custom-repository> \ --set connect.image.tag=<custom-tag> \ --set global.initContainer.image.repository=<mypath>/<name> \ --set global.initContainer.image.tag=<custom-tag>
Monitor Kafka metrics¶
The configuration parameter metricReporter
is provided in the Kafka
values.yaml
file. If you add the parameters below to the configuration file
($VALUES_FILE
), messages are published to the bootstrap endpoint every 30000
milliseconds (30 seconds).
metricReporter:
enabled: true
## Period (millisecond) the report should use to publish messages to bootstrapEndpoint
##
publishMs: 30000
## ReplicationFactor, if empty, the value is based on the replicas count
##
replicationFactor: ""
tls:
enabled: false
## If true, inter-communication will be encrypted if TLS is enabled. The bootstrapEndpoint will have FQDN name.
## If false, the security setting is configured to use either SASL_PLAINTEXT or PLAINTEXT
internal: false
authentication:
type: ""
## If above tls.internal is true, configure with Kafka bootstrap DNS configuration on port 9092 e.g <kafka.name>.<domain>:9092
## If above tls.internal is false, configure with Kafka service name on port 9071 eg. <kafka.name>:9071 or FQDN name of Kafka serivce name e.g <name>.<namespace>.svc.cluster.local:9071
bootstrapEndpoint: ""
Important
If metricReporter is enabled with a bootstrapEndpoint as a DNS name, the DNS
name must be resolved successfully or the Kafka pod will crash
(CrashLoopBackOff). If the boostrapEndpoint is using an internal address
(like kafka:9071
), this is not an issue. For additional security
information, see the comments in the values.yaml
file.
Scale Kafka clusters and balance data¶
Scale up¶
Starting in Confluent Platform 6.0.0, Self-Balancing is the recommended way to rebalance loads when Kafka brokers are added or removed. Self-Balancing is disabled by default.
To scale up the Kafka cluster:
Enable Self-Balancing and rebalance Kafka for any uneven load:
For a complete list of available settings you can use to control Self-Balancing, see Configuration Options and Commands for Self-Balancing Clusters. You can pass the settings in
configOverrides
in thekafka
section of the configuration file ($VALUES_FILE
).kafka: configOverrides: server: - confluent.balancer.enable= ----- [1] - confluent.balancer.heal.uneven.load.trigger= ----- [2]
- [1] Set confluent.balancer.enable to
true
to enable Self-Balancing. - [2] Set confluent.balancer.heal.uneven.load.trigger to
ANY_UNEVEN_LOAD
to balance the load across the cluster whenever an imbalance is detected. The default isEMPTY_BROKER
.
- [1] Set confluent.balancer.enable to
Increase the number of Kafka replicas:
kafka: replicas:
Specify whether Operator performs DNS checks on the new broker pods:
kafka: scalingDnsPreCheck:
Disable DNS checks (
scalingDnsPreCheck: false
) in an environment where you cannot guarantee that DNS can be checked/resolved from the Operator node, you do not want Operator to check whether the DNS is resolvable from the Operator node before it scales up the Kafka broker. By default, DNS check is disabled in this version of Operator.Enable DNS checks (
scalingDnsPreCheck: true
) in an environment where DNS is fully managed and is resolvable from all nodes, including the Operator pod, configure Operator to check DNS before scaling up.When this setting is enabled, Operator performs a DNS check on each new broker it brings up and checks for health when you increase the replica count of brokers. If this DNS check fails, the scaling operation fails.
If you enabled DNS checks in the previous step, ensure that proper DNS records are configured for the new brokers, and ensure that the Operator can resolve the new broker hostname, using a command such as
nslookup
.If you are using
hosts
file instead of a DNS service, you need to updatehosts
file with the new brokers information. For example:Get the new broker IP addresses:
kubectl get services -n <namespace>
Refer to the existing broker host names with the broker prefix, and derive the hostnames of the new brokers.
Add the new broker hosts to the
/etc/hosts
file, and inject the updated file to the Operator pod as described in Adding entries to Pod /etc/hosts.
Update Kafka with the new settings:
helm upgrade --install kafka ./confluent-operator \ --values $VALUES_FILE \ --namespace operator \ --set kafka.enabled=true
Note
If you need to use Auto Data Balancer, first turn off Self-Balancing as Self-Balancing and Auto Data Balancer cannot be used together and refer to the 5.5.1 or an older version of the documentation for the scale up process using Auto Data Balancer.
Scale down¶
Scaling down Kafka clusters is not supported in the current version of Confluent Operator.
Scale storage¶
Scale up disks¶
The ability to scale up disks is important to successfully use Confluent Platform at scale. For instance, as your organization’s usage of Kafka grows, you may experience a large growth in the number of Kafka topics, and expanding disks can help you ensure you have enough disk space to accommodate the data in those topics.
Confluent Operator enables simple, automated expansion of storage. Depending on your
environment, expanding all the disks associated to a cluster (e.g. ZooKeeper, Kafka,
etc.) can be as simple as changing a single value in your configuration file
($VALUES_FILE
) and running the helm upgrade
command. And after some time,
all pods within the cluster are able to leverage expanded disk space without
any interruption. In some cases, this expansion can be completed across an
entire cluster in just a few seconds.
This section details the processes underlying disk expansion, the settings and configurations (e.g. choice of storage provisioner, Kubernetes version, StorageClass configuration, etc.) that affect which processes can be automated, and the concrete workflows you need to follow to expand disk depending on your settings and configurations.
Environment and configuration considerations¶
Expanding a disk requires both resizing the PersistentVolumeClaim (PVC) and expanding the file system exposed to the pod where the given associated PersistentVolume (PV) is mounted.
The ability to automatically resize a PVC depends on the volume types you use and how the StorageClass associated to the PVCs is configured. For further details on PVC expansion support, see Expanding Persistent Volumes Claims in Kubernetes.
Once a PVC is expanded, the ability to automatically expand the file system exposed to a Pod without restarting the pod requires the ExpandInUsePersistentVolumes feature to be enabled on your Kubernetes cluster. For Kubernetes clusters with version 1.15 and higher, this feature is enabled by default. For further details on expanding in-use PVCs, see Resizing an in-use PersistentVolumeClaim.
One-command workflow for disk expansion¶
On an environment where the volume is automatically expanded and the file system
mounted in the pods are automatically resized, you can take this simplest
workflow for disk expansion. Edit your configuration file
($VALUES_FILE
) and run a single command to apply the new configuration.
The following example expands the disk associated to each broker in a Kafka cluster from 10 GB to 100 GB. You can apply the same approach to any Confluent Platform component:
Update the storage volume in your configuration file (
$VALUES_FILE
).The following example increases the disk for Kafka brokers to 100 Gi.
kafka: volume: data0: 100Gi # defaults to 10Gi if this wasn't specified before
Apply the change by running the
helm upgrade command
.For example:
helm upgrade --install kafka \ ./confluent-operator \ --values $VALUES_FILE \ --set kafka.enabled=true \ --namespace operator
For this workflow to be supported, the following must be true of your environment:
The StorageClass associated with your Kafka (or other Confluent Platform component) cluster must have set
allowVolumeExpansion: true
in$VALUES_FILE
.The underlying volume type must support expansion.
At the time of this writing, GCE Persistent Disk, AWS EBS, Azure Disk, and others support expansion, while Host Path and other types of volumes do not.
The
ExpandInUsePersistentVolumes
feature must be enabled on your Kubernetes cluster.For Kubernetes clusters with version 1.15 and above, this feature is enabled by default.
Two-command workflow for disk expansion¶
On an environment only the volume gets automatically expanded, and the file system mounted inside the pod is not resized, e.g. Kubernetes versions lower than 1.15, you need to roll the cluster for the new size to be reflected at the pod’s file-system level in addition to the above workflow:
Update the storage volume in your configuration file (
$VALUES_FILE
).The following example increases the disk for Kafka brokers to 100 Gi.
kafka: volume: data0: 100Gi # defaults to 10Gi if this wasn't specified before
Apply the change by running the
helm upgrade
command. For example:helm upgrade --install kafka \ ./confluent-operator \ --values $VALUES_FILE \ --set kafka.enabled=true \ --namespace operator
Prepare to trigger Operator to perform a safe rolling restart of your cluster by setting a “dummy” annotation in your configuration file (
$VALUES_FILE
):kafka: podAnnotations: platform.confluent.io/dummy: "any-string"
Apply the change by running the
helm upgrade
command. For example:helm upgrade --install kafka \ ./confluent-operator \ --values $VALUES_FILE \ --set kafka.enabled=true \ --namespace operator
After the cluster has rolled, the expanded disk should now be available to your
cluster. At this point, if you want to remove the “dummy” annotation, remove the
annotation from your configuration file ($VALUES_FILE
) and apply the change
using the helm upgrade
command. Note that doing so will safely roll your
cluster again.
For this workflow to be supported, the following must be true of your environment:
The StorageClass associated with your Kafka (or other Confluent Platform component) cluster must have set
allowVolumeExpansion: true
.- When using StorageClasses created by Confluent Operator Helm charts,
allowVolumeExpansion
is always set totrue
. - When using pre-defined StorageClasses and/or the Kubernetes default StorageClass, you need to ensure that
allowVolumeExpansion
is set totrue
.
- When using StorageClasses created by Confluent Operator Helm charts,
The underlying volume type must support expansion.
At the time of this writing, GCE Persistent Disk, AWS EBS, Azure Disk, and others support expansion, while Host Path and other types of volumes do not. Refer to the storage provider’s documentation to check.
Scale down disks¶
Scaling down disks is not supported in the current version of Confluent Operator.