Case Study: Kafka Connect management with GitOps

The streaming-ops project is a simulated production environment running a streaming microservices based application targeting Apache Kafka® on Confluent Cloud. Applications and resources are managed by GitOps with declarative infrastructure, Kubernetes, and the Operator Pattern.

This case study looks at a method for managing Kafka Connect Workers and Connectors on Kubernetes using a GitOps approach.

Scalable & fault-tolerant Connect Clusters

A typical production Connect cluster will be deployed in distributed mode. Distributed Workers provide scalability and fault tolerance which is enhanced by the advanced container management capabilities provided by Kubernetes.

The streaming-ops project runs a distributed Connect cluster inside of Kubernetes, managed with GitOps.

In order to deploy Connect Workers with the appropriate connector plugins installed, a custom Docker image is built and published to a container registry accessible by the Kubernetes Cluster. Here is the portion of the Dockerfile which adds the necessary plugins:

FROM confluentinc/cp-server-connect:5.5.1

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.5.1
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:5.5.1

RUN wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.21/mysql-connector-java-8.0.21.jar -P /usr/share/java/kafka-connect-jdbc/
...

The Connect cluster is declared as a Kubernetes Service and Deployment and the FluxCD controller manages the deployment and changes.

Here is a snippet of the Kubernetes definition:

apiVersion: v1
kind: Service
metadata:
  name: connect
  labels:
    app: connect-service
spec:
  selector:
    app: connect-service
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8083
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: connect-service
spec:
  replicas: 1
  selector:
    matchLabels:
      app: connect-service
  template:
    metadata:
      labels:
        app: connect-service
...

When the FluxCD controller observes changes to this definition in the Git repository, it posts the appropriate changes to the Kubernetes API. Kubernetes in turn deploys the appropriate number of Pods and manages the Deployment. Kafka Connect Workers operate in a cooperative mode over Kafka to properly execute Connectors.

For a full description of using GitOps with Kubernetes to manage Confluent Cloud resources and Kafka Connect, see the blog post DevOps for Apache Kafka® with Kubernetes and GitOps Blog post, and to see the full source code for these solutions, go to the project’s GitHub repository @ https://github.com/confluentinc/streaming-ops.

Note

Another option for integrating external systems when using Confluent Cloud is fully managed Kafka Connect. With fully managed Connect, Confluent manages the Connect cluster on your behalf and you are free of the operational burden of managing Connect Workers, Docker images, and other operational complexities of Connect clusters.

Scaling Connect Cluster by PR

One benefit of using a GitOps approach is the ability to use a pull request (PR) review process for auditing changes to infrastructure code in the same way you would for application code. Here we’re going to highlight a basic PR-based workflow to scale up the number of Kafka Connect workers in a Connect cluster using the PR-based GitOps approach.

  1. Your team observes an increase in resource utilization of the Kafka Connect worker nodes and determines that the number of nodes should be increased from 1 to 2:

    kubectl get deployments/connect-service
    NAME              READY   UP-TO-DATE   AVAILABLE   AGE
    connect-service   1/1     1            1           18d
    
  2. Make a change to the number of deployed Connect Workers using the replicas field inside the Kafka Connect Deployment definition. A PR is opened to target the master branch with the change:

    ../../../_images/scale-connect-pr-17-code.png
  3. Using the familiar PR code review process, the PR is accepted or denied:

    ../../../_images/scale-connect-pr-17-lgtm.png
  4. When accepted, the PR code change will be merged into the master branch and the GitOps process takes over by scaling up the number of replicas in the connect-service Deployment:

    ../../../_images/connect-service-gke-scale-2-2.png

Connector Management with connect-operator

Managing Kafka Connect configurations is a common task for administrators of event streaming platforms. Connect configurations are defined in JSON and managed via the Connect REST Interface, which makes them suited to the Declare -> Apply model of Kubernetes. The streaming-ops project includes a tool called the connect-operator , which can help you automate management of Connector deployments declaratively.

Note

connect-operator is not a supported Confluent product and is provided as a reference for users looking for methods to manage Kafka Connect resources with Kubernetes

The connect-operator is based on a common concept in Kubernetes called the Operator pattern. connect-operator uses the open source project shell-operator as a method for implementing a very basic Operator solution for Connector management. When deployed, the connect-operator advertises to the shell-operator runtime what kind of Kubernetes resources it wishes to monitor. When these resources are added, removed, or updated, the connect-operator is notified by the execution of scripts inside it’s running container.

In the case of the connect-operator, it is monitoring ConfigMaps in Kubernetes, which match a particular filter described here:

configVersion: v1
kubernetes:
- name: ConnectConfigMapMonitor
  apiVersion: v1
  kind: ConfigMap
  executeHookOnEvent: ["Added","Deleted","Modified"]
  labelSelector:
    matchLabels:
      destination: connect
  namespace:
    nameSelector:
      matchNames: ["default"]
  jqFilter: ".data"

When ConfigMaps matching this filter are added, deleted or modified in the Kubernetes API, the scripts inside connect-operator are invoked and passed the modified declarations. The connect-operator performs three important functions in order:

  • Materializes the JSON Connector configuration at deploy time with variables provided by Kubernetes allowing for templated configuration of variables like secrets, connection strings, and hostnames
  • Determines if desired Kafka Connect configurations require an update based on the current actual configuration, preventing unnecessary calls to the Connect REST API
  • Manages connectors by adding, updating or deleting them utilizing the Connect REST API

The connect-operator is configured to use Kubernetes volumes and environment variables, which contain configurations and secrets required by the Connectors it manages. For example, the Schema Registry URL configuration for a Connectors Kafka connection is provided by a volume mount like this snippet inside a Kubernetes Deployment:

...
name: schema-registry-url-volume
secret:
  secretName: cc.schema-registry-url.streaming-ops
...

When the connect-operator scripts are invoked, containing the updated Connector JSON, they take the volume mounted and environment variable configurations, and materialize a JSON configuration for the Connector with the proper values. Here is the sample source Connector ConfigMap which contains a JSON Connector Config. This file contains variables that are filled in at deployment time by the connect-operator, like this JSON snippet from the above configuration:

"value.converter.schema.registry.url": $SCHEMA_REGISTRY_URL,

connect-operator accomplishes this by using the jq command line processing tool. Within the connect-operator scripts, you can see how the tool fills in variables from files and from environment variables in order to materialize a valid JSON configuration for the Connector. The load_configs shell function loads all values found in all properties files in the /etc/config/connect-operator folder and passes them to the jq command.

Additionally, the JSON can contain environment variable values , which are templated into the JSON document like this:

"connection.user": env.MYSQL_CONNECTION_USER,

Once the connect-operator has successfully materialized the JSON configuration for the Connector, it will determine what action to take based on the current state of any Connector in existence with the same name. For details, see the connect-operator apply_connector function.

streaming-ops in action

For more information on this, and to explore other operational use cases, see the complete DevOps for Apache Kafka® with Kubernetes and GitOps documentation and the project’s GitHub repository.