Case Study: Kafka Connect management with GitOps¶
The streaming-ops project is a simulated production environment running a streaming microservices based application targeting Apache Kafka® on Confluent Cloud. Applications and resources are managed by GitOps with declarative infrastructure, Kubernetes, and the Operator Pattern.
Scalable & fault-tolerant Connect Clusters¶
A typical production Connect cluster will be deployed in distributed mode. Distributed Workers provide scalability and fault tolerance which is enhanced by the advanced container management capabilities provided by Kubernetes.
The streaming-ops project runs a distributed Connect cluster inside of Kubernetes, managed with GitOps.
In order to deploy Connect Workers with the appropriate connector plugins installed, a custom Docker image is built and published to a container registry accessible by the Kubernetes Cluster. Here is the portion of the
Dockerfile which adds the necessary plugins:
FROM confluentinc/cp-server-connect:5.5.1 RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.5.1 RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:5.5.1 RUN wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.21/mysql-connector-java-8.0.21.jar -P /usr/share/java/kafka-connect-jdbc/ ...
Here is a snippet of the Kubernetes definition:
apiVersion: v1 kind: Service metadata: name: connect labels: app: connect-service spec: selector: app: connect-service ports: - protocol: TCP port: 80 targetPort: 8083 --- apiVersion: apps/v1 kind: Deployment metadata: name: connect-service spec: replicas: 1 selector: matchLabels: app: connect-service template: metadata: labels: app: connect-service ...
When the FluxCD controller observes changes to this definition in the Git repository, it posts the appropriate changes to the Kubernetes API. Kubernetes in turn deploys the appropriate number of Pods and manages the Deployment. Kafka Connect Workers operate in a cooperative mode over Kafka to properly execute Connectors.
For a full description of using GitOps with Kubernetes to manage Confluent Cloud resources and Kafka Connect, see the blog post DevOps for Apache Kafka® with Kubernetes and GitOps Blog post, and to see the full source code for these solutions, go to the project’s GitHub repository @ https://github.com/confluentinc/streaming-ops.
Another option for integrating external systems when using Confluent Cloud is fully managed Kafka Connect. With fully managed Connect, Confluent manages the Connect cluster on your behalf and you are free of the operational burden of managing Connect Workers, Docker images, and other operational complexities of Connect clusters.
Scaling Connect Cluster by PR¶
One benefit of using a GitOps approach is the ability to use a pull request (PR) review process for auditing changes to infrastructure code in the same way you would for application code. Here we’re going to highlight a basic PR-based workflow to scale up the number of Kafka Connect workers in a Connect cluster using the PR-based GitOps approach.
Your team observes an increase in resource utilization of the Kafka Connect worker nodes and determines that the number of nodes should be increased from
kubectl get deployments/connect-service NAME READY UP-TO-DATE AVAILABLE AGE connect-service 1/1 1 1 18d
Make a change to the number of deployed Connect Workers using the
replicasfield inside the Kafka Connect Deployment definition. A PR is opened to target the
masterbranch with the change:
Using the familiar PR code review process, the PR is accepted or denied:
When accepted, the PR code change will be merged into the
masterbranch and the GitOps process takes over by scaling up the number of replicas in the
Connector Management with connect-operator¶
Managing Kafka Connect configurations is a common task for administrators of event streaming platforms. Connect configurations are defined in JSON and managed via the Connect REST Interface, which makes them suited to the Declare -> Apply model of Kubernetes. The streaming-ops project includes a tool called the connect-operator , which can help you automate management of Connector deployments declaratively.
connect-operator is not a supported Confluent product and is provided as a reference for users looking for methods to manage Kafka Connect resources with Kubernetes
connect-operator is based on a common concept in Kubernetes called the Operator pattern.
connect-operator uses the open source project shell-operator as a method for implementing a very basic Operator solution for Connector management. When deployed, the
connect-operator advertises to the shell-operator runtime what kind of Kubernetes resources it wishes to monitor. When these resources are added, removed, or updated, the connect-operator is notified by the execution of scripts inside it’s running container.
In the case of the connect-operator, it is monitoring
ConfigMaps in Kubernetes, which match a particular filter described here:
configVersion: v1 kubernetes: - name: ConnectConfigMapMonitor apiVersion: v1 kind: ConfigMap executeHookOnEvent: ["Added","Deleted","Modified"] labelSelector: matchLabels: destination: connect namespace: nameSelector: matchNames: ["default"] jqFilter: ".data"
ConfigMaps matching this filter are added, deleted or modified in the Kubernetes API, the scripts inside connect-operator are invoked and passed the modified declarations. The connect-operator performs three important functions in order:
- Materializes the JSON Connector configuration at deploy time with variables provided by Kubernetes allowing for templated configuration of variables like secrets, connection strings, and hostnames
- Determines if desired Kafka Connect configurations require an update based on the current actual configuration, preventing unnecessary calls to the Connect REST API
- Manages connectors by adding, updating or deleting them utilizing the Connect REST API
The connect-operator is configured to use Kubernetes volumes and environment variables, which contain configurations and secrets required by the Connectors it manages. For example, the Schema Registry URL configuration for a Connectors Kafka connection is provided by a volume mount like this snippet inside a Kubernetes Deployment:
... name: schema-registry-url-volume secret: secretName: cc.schema-registry-url.streaming-ops ...
When the connect-operator scripts are invoked, containing the updated Connector JSON, they take the volume mounted and environment variable configurations, and materialize a JSON configuration for the Connector with the proper values. Here is the sample source Connector
ConfigMap which contains a JSON Connector Config. This file contains variables that are filled in at deployment time by the connect-operator, like this JSON snippet from the above configuration:
connect-operator accomplishes this by using the
jq command line processing tool. Within the connect-operator scripts, you can see how the tool fills in variables from files and from environment variables in order to materialize a valid JSON configuration for the Connector. The
load_configs shell function loads all values found in all properties files in the
/etc/config/connect-operator folder and passes them to the
Additionally, the JSON can contain environment variable values , which are templated into the JSON document like this:
Once the connect-operator has successfully materialized the JSON configuration for the Connector, it will determine what action to take based on the current state of any Connector in existence with the same name. For details, see the connect-operator apply_connector function.