Case Study: Kafka Connect management with GitOps¶
The streaming-ops project is a simulated production environment running a streaming microservices based application targeting Apache Kafka® on Confluent Cloud. Applications and resources are managed by GitOps with declarative infrastructure, Kubernetes, and the Operator Pattern.
This case study looks at a method for managing Kafka Connect Workers and Connectors on Kubernetes using a GitOps approach.
Scalable & fault-tolerant Connect Clusters¶
A typical production Connect cluster will be deployed in distributed mode. Distributed Workers provide scalability and fault tolerance which is enhanced by the advanced container management capabilities provided by Kubernetes.
The streaming-ops project runs a distributed Connect cluster inside of Kubernetes, managed with GitOps.
In order to deploy Connect Workers with the appropriate connector plugins installed, a custom Docker image is built and published to a container registry accessible by the Kubernetes Cluster. Here is the portion of the Dockerfile
which adds the necessary plugins:
FROM confluentinc/cp-server-connect:5.5.1
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.5.1
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:5.5.1
RUN wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.21/mysql-connector-java-8.0.21.jar -P /usr/share/java/kafka-connect-jdbc/
...
The Connect cluster is declared as a Kubernetes Service and Deployment and the FluxCD controller manages the deployment and changes.
Here is a snippet of the Kubernetes definition:
apiVersion: v1
kind: Service
metadata:
name: connect
labels:
app: connect-service
spec:
selector:
app: connect-service
ports:
- protocol: TCP
port: 80
targetPort: 8083
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: connect-service
spec:
replicas: 1
selector:
matchLabels:
app: connect-service
template:
metadata:
labels:
app: connect-service
...
When the FluxCD controller observes changes to this definition in the Git repository, it posts the appropriate changes to the Kubernetes API. Kubernetes in turn deploys the appropriate number of Pods and manages the Deployment. Kafka Connect Workers operate in a cooperative mode over Kafka to properly execute Connectors.
For a full description of using GitOps with Kubernetes to manage Confluent Cloud resources and Kafka Connect, see the blog post DevOps for Apache Kafka® with Kubernetes and GitOps Blog post, and to see the full source code for these solutions, go to the project’s GitHub repository @ https://github.com/confluentinc/streaming-ops.
Note
Another option for integrating external systems when using Confluent Cloud is fully managed Kafka Connect. With fully managed Connect, Confluent manages the Connect cluster on your behalf and you are free of the operational burden of managing Connect Workers, Docker images, and other operational complexities of Connect clusters.
Scaling Connect Cluster by PR¶
One benefit of using a GitOps approach is the ability to use a pull request (PR) review process for auditing changes to infrastructure code in the same way you would for application code. Here we’re going to highlight a basic PR-based workflow to scale up the number of Kafka Connect workers in a Connect cluster using the PR-based GitOps approach.
Your team observes an increase in resource utilization of the Kafka Connect worker nodes and determines that the number of nodes should be increased from
1
to2
:kubectl get deployments/connect-service NAME READY UP-TO-DATE AVAILABLE AGE connect-service 1/1 1 1 18d
Make a change to the number of deployed Connect Workers using the
replicas
field inside the Kafka Connect Deployment definition. A PR is opened to target themaster
branch with the change:Using the familiar PR code review process, the PR is accepted or denied:
When accepted, the PR code change will be merged into the
master
branch and the GitOps process takes over by scaling up the number of replicas in theconnect-service
Deployment:
Connector Management with connect-operator¶
Managing Kafka Connect configurations is a common task for administrators of event streaming platforms. Connect configurations are defined in JSON and managed via the Connect REST Interface, which makes them suited to the Declare -> Apply model of Kubernetes. The streaming-ops project includes a tool called the connect-operator , which can help you automate management of Connector deployments declaratively.
Note
connect-operator is not a supported Confluent product and is provided as a reference for users looking for methods to manage Kafka Connect resources with Kubernetes
The connect-operator
is based on a common concept in Kubernetes called the Operator pattern. connect-operator
uses the open source project shell-operator as a method for implementing a very basic Operator solution for Connector management. When deployed, the connect-operator
advertises to the shell-operator runtime what kind of Kubernetes resources it wishes to monitor. When these resources are added, removed, or updated, the connect-operator is notified by the execution of scripts inside it’s running container.
In the case of the connect-operator, it is monitoring ConfigMaps
in Kubernetes, which match a particular filter described here:
configVersion: v1
kubernetes:
- name: ConnectConfigMapMonitor
apiVersion: v1
kind: ConfigMap
executeHookOnEvent: ["Added","Deleted","Modified"]
labelSelector:
matchLabels:
destination: connect
namespace:
nameSelector:
matchNames: ["default"]
jqFilter: ".data"
When ConfigMaps
matching this filter are added, deleted or modified in the Kubernetes API, the scripts inside connect-operator are invoked and passed the modified declarations. The connect-operator performs three important functions in order:
- Materializes the JSON Connector configuration at deploy time with variables provided by Kubernetes allowing for templated configuration of variables like secrets, connection strings, and hostnames
- Determines if desired Kafka Connect configurations require an update based on the current actual configuration, preventing unnecessary calls to the Connect REST API
- Manages connectors by adding, updating or deleting them utilizing the Connect REST API
The connect-operator is configured to use Kubernetes volumes and environment variables, which contain configurations and secrets required by the Connectors it manages. For example, the Schema Registry URL configuration for a Connectors Kafka connection is provided by a volume mount like this snippet inside a Kubernetes Deployment:
...
name: schema-registry-url-volume
secret:
secretName: cc.schema-registry-url.streaming-ops
...
When the connect-operator scripts are invoked, containing the updated Connector JSON, they take the volume mounted and environment variable configurations, and materialize a JSON configuration for the Connector with the proper values. Here is the sample source Connector ConfigMap
which contains a JSON Connector Config. This file contains variables that are filled in at deployment time by the connect-operator, like this JSON snippet from the above configuration:
"value.converter.schema.registry.url": $SCHEMA_REGISTRY_URL,
connect-operator accomplishes this by using the jq
command line processing tool. Within the connect-operator scripts, you can see how the tool fills in variables from files and from environment variables in order to materialize a valid JSON configuration for the Connector. The load_configs
shell function loads all values found in all properties files in the /etc/config/connect-operator
folder and passes them to the jq
command.
Additionally, the JSON can contain environment variable values , which are templated into the JSON document like this:
"connection.user": env.MYSQL_CONNECTION_USER,
Once the connect-operator has successfully materialized the JSON configuration for the Connector, it will determine what action to take based on the current state of any Connector in existence with the same name. For details, see the connect-operator apply_connector function.
streaming-ops in action¶
For more information on this, and to explore other operational use cases, see the complete DevOps for Kafka with Kubernetes and GitOps documentation and the project’s GitHub repository.