Scalable & fault-tolerant Connect Clusters
A typical production Connect cluster will be deployed in distributed mode. Distributed Workers provide scalability and fault tolerance which is enhanced by the advanced container management capabilities provided by Kubernetes.
The streaming-ops project runs a distributed Connect cluster inside of Kubernetes, managed with GitOps.
In order to deploy Connect Workers with the appropriate connector plugins installed, a custom Docker image is built and published to a container registry accessible by the Kubernetes Cluster. Here is the portion of the
Dockerfile which adds the necessary plugins:
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.5.1
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:5.5.1
RUN wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.21/mysql-connector-java-8.0.21.jar -P /usr/share/java/kafka-connect-jdbc/
The Connect cluster is declared as a Kubernetes Service and Deployment and the FluxCD controller manages the deployment and changes.
Here is a snippet of the Kubernetes definition:
- protocol: TCP
When the FluxCD controller observes changes to this definition in the Git repository, it posts the appropriate changes to the Kubernetes API. Kubernetes in turn deploys the appropriate number of Pods and manages the Deployment. Kafka Connect Workers operate in a cooperative mode over Kafka to properly execute Connectors.
For a full description of using GitOps with Kubernetes to manage Confluent Cloud resources and Kafka Connect, see the blog post DevOps for Apache Kafka® with Kubernetes and GitOps Blog post, and to see the full source code for these solutions, go to the project’s GitHub repository @ https://github.com/confluentinc/streaming-ops.
Another option for integrating external systems when using Confluent Cloud is fully managed Kafka Connect. With fully managed Connect, Confluent manages the Connect cluster on your behalf and you are free of the operational burden of managing Connect Workers, Docker images, and other operational complexities of Connect clusters. Sign up for Confluent Cloud and the first 20 users to use promo code
C50INTEG will receive an additional $50 free usage (details).
Connector Management with connect-operator
Managing Kafka Connect configurations is a common task for administrators of event streaming platforms. Connect configurations are defined in JSON and managed via the Connect REST Interface, which makes them suited to the Declare -> Apply model of Kubernetes. The streaming-ops project includes a tool called the connect-operator , which can help you automate management of Connector deployments declaratively.
connect-operator is not a supported Confluent product and is provided as a reference for users looking for methods to manage Kafka Connect resources with Kubernetes
connect-operator is based on a common concept in Kubernetes called the Operator pattern.
connect-operator uses the open source project shell-operator as a method for implementing a very basic Operator solution for Connector management. When deployed, the
connect-operator advertises to the shell-operator runtime what kind of Kubernetes resources it wishes to monitor. When these resources are added, removed, or updated, the connect-operator is notified by the execution of scripts inside it’s running container.
In the case of the connect-operator, it is monitoring
ConfigMaps in Kubernetes, which match a particular filter described here:
- name: ConnectConfigMapMonitor
ConfigMaps matching this filter are added, deleted or modified in the Kubernetes API, the scripts inside connect-operator are invoked and passed the modified declarations. The connect-operator performs three important functions in order:
- Materializes the JSON Connector configuration at deploy time with variables provided by Kubernetes allowing for templated configuration of variables like secrets, connection strings, and hostnames
- Determines if desired Kafka Connect configurations require an update based on the current actual configuration, preventing unnecessary calls to the Connect REST API
- Manages connectors by adding, updating or deleting them utilizing the Connect REST API
The connect-operator is configured to use Kubernetes volumes and environment variables, which contain configurations and secrets required by the Connectors it manages. For example, the Schema Registry URL configuration for a Connectors Kafka connection is provided by a volume mount like this snippet inside a Kubernetes Deployment:
When the connect-operator scripts are invoked, containing the updated Connector JSON, they take the volume mounted and environment variable configurations, and materialize a JSON configuration for the Connector with the proper values. Here is the sample source Connector
ConfigMap which contains a JSON Connector Config. This file contains variables that are filled in at deployment time by the connect-operator, like this JSON snippet from the above configuration:
connect-operator accomplishes this by using the
jq command line processing tool. Within the connect-operator scripts, you can see how the tool fills in variables from files and from environment variables in order to materialize a valid JSON configuration for the Connector. The
load_configs shell function loads all values found in all properties files in the
/etc/config/connect-operator folder and passes them to the
Additionally, the JSON can contain environment variable values , which are templated into the JSON document like this:
Once the connect-operator has successfully materialized the JSON configuration for the Connector, it will determine what action to take based on the current state of any Connector in existence with the same name. For details, see the connect-operator apply_connector function.