Azure Kubernetes Service to Confluent Cloud with Confluent Replicator


This example features a deployment of Confluent Platform on Azure Kubernetes Service (AKS) leveraging Confluent Operator and Confluent Replicator, highlighting a data replication strategy to Confluent Cloud. Upon running this example, you will have a AKS based Confluent Platform deployment with simulated data replicating to your Confluent Cloud cluster. We will verify the replication by running client applications against the Confluent Cloud cluster to view the simulated data originating in the source AKS cluster.

If you’d like a primer on running Confluent for Kubernetes in AKS with lower resource requirements, see the Confluent Platform on Azure Kubernetes Service example.

This example is featured in the Conquering Hybrid Cloud with Replicated Event-Driven Architectures blog post which provides more details on use cases for replicated event streaming architectures.


See the DevOps for Apache Kafka® with Kubernetes and GitOps project for an example of using Kubernetes in a production-like environment targeting Confluent Cloud.

The major components of this example are:

  • A Confluent Cloud Environment and Kafka Cluster
  • A Kubernetes cluster running on AKS.
  • Confluent for Kubernetes which is used to manage the following Confluent Platform components
    • A 3 node ZooKeeper cluster
    • A 3 node Kafka cluster
    • A single node Replicator
    • A single node Schema Registry
    • A single node Kafka Connect
    • Confluent Control Center
    • One instance of kafka-connect-datagen to produce mock stock trading data


The following applications or libraries are required to be installed and available in the system path in order to properly run the example.

Application Tested Version Info
kubectl 1.18.0
helm 3.1.2
az 2.10.1
ccloud v1.25.0


As of Confluent Platform 5.4 Helm 3 is required.

Cost to Run Example


Any Confluent Cloud example uses real Confluent Cloud resources that may be billable. An example may create a new Confluent Cloud environment, Kafka cluster, topics, ACLs, and service accounts, as well as resources that have hourly charges like connectors and ksqlDB applications. To avoid unexpected charges, carefully evaluate the cost of resources before you start. After you are done running a Confluent Cloud example, destroy all Confluent Cloud resources to avoid accruing hourly charges for services and verify that they have been deleted.

In addition to Confluent Cloud resources, this example uses Microsoft Azure resources:

  • Refer to Sizing recommendations document contains information on required sizing for Confluent for Kubernetes.
  • Refer to Azure pricing data for more information.

Ensure all resources are destroyed after you are done.

Confluent Cloud Promo Code

To receive an additional $50 free usage in Confluent Cloud, enter promo code C50INTEG in the Confluent Cloud UI Billing & payment section (details). This promo code should sufficiently cover up to one day of running this Confluent Cloud example, beyond which you may be billed for the services that have an hourly charge until you destroy the Confluent Cloud resources created by this example.

Run Example

Clone the confluentinc/examples GitHub repository, and change directories to the kubernetes/replicator-aks-cc directory.

git clone
cd examples/kubernetes/replicator-aks-cc

AKS Setup

In order to properly simulate a realistic replication scenario to Confluent Cloud, the example requires a AKS Node Pool sufficiently large to support a 3 node clusters for both Kafka and ZooKeeper. In testing of this demonstration, a sufficient cluster consisted of 7 nodes of machine type Standard_D4s_v4.


The Variable Reference section can be used to control the size of the deployed resources in this example.

If you wish to use an existing AKS cluster, and your kubectl client is already configured to operate with it, skip to the Confluent Cloud Setup section of these instructions.

If you wish to create a new AKS cluster for this example, the Makefile contains a function to assist you in creating a cluster, assuming you have your az CLI properly configured to access your account. If you wish to override the behavior of the create cluster function, see the Advanced Usage section of this document.

Log in via the Azure CLI from your terminal (this should open a window in your browser):

az login

List your Azure subscription and identify the one you wish to use for this example.

az account list -o table

Set the active Azure subription via the Azure CLI.

az account set --subscription {{ azure subscription name }}

List your Azure resource groups and identify the one you wish to use for this example.

az group list -o table

To create the standard cluster you can run the following:

export AZ_RESOURCE_GROUP={{ azure resource group name }}
make aks-create-cluster

Verify that az has created the cluster properly:


provisioningState: Succeeded
  name: Basic
  tier: Free
tags: null
type: Microsoft.ContainerService/ManagedClusters


az aks get-credentials --only-show-errors --resource-group confluent-operator-demo --name cp-examples-operator-user --context aks_confluent-operator-demo_centralus_cp-examples-operator-user
Merged "aks_confluent-operator-demo_centralus_cp-examples-operator-user" as current context in /Users/user/.kube/config
✔  ++++++++++ AKS Cluster Created

Confluent Cloud Setup

Confluent Cloud Account Setup

This demonstration requires that you have a Confluent Cloud account and a Kafka cluster ready for use. The Confluent Cloud home page can help you get setup with your own account if you do not yet have access.


This demonstration highlights a multi-cloud replication strategy using Confluent Replicator. One benefit of Replicator is that the destination cluster topics and partitions will be identical in message offsets, timestamps, keys, and values. If you re-use a cluster with an existing stock-trades topic, the messages will be appended to the end of the existing topic data and the offsets will not match the source cluster. It’s advised to build a new cluster for each run of this example, or delete the stock-trades Kafka topic in the destination cluster prior to running. See: ccloud kafka topic delete for instructions on deleting Confluent Cloud topics.

Kafka Cluster Setup

If you are creating a new cluster, it is advised to create it within the same Cloud Provider and region as this example. This demonstration runs on top of Azure and, by default, in the centralus region. It is recommended to name your new cluster replicator-aks-cc-demo to match names used later in this example. The following illustrates the recommended configuration:

New Cluster Example
New Cluster Example
New Cluster Example


See the Confluent Cloud Quick Start for more information.

Kafka Bootstrap Server Configuration

After you have established the Confluent Cloud cluster you are going to use for the example you will need the public bootstrap server.

You can use the ccloud CLI to retrieve the bootstrap server value for your cluster.


You can also view the bootstrap server value on the Confluent Cloud UI under the Cluster settings


  1. If you haven’t already, install the ccloud CLI

  2. Log in to your Confluent Cloud cluster. The --save argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the local netrc file.

    ccloud login --save

    Your output should resemble:

    Enter your Confluent credentials:
    Logged in as
    Using environment t118 ("default")
  3. List your available Kafka clusters.

    ccloud kafka cluster list

    This should produce a list of clusters you have access to:

          Id      |          Name          | Provider |   Region    | Durability | Status
        lkc-xmm5g | abc-test               | azure    | centralus   | LOW        | UP
        lkc-kngnv | rjs-azure-centralus    | azure    | centralus   | LOW        | UP
        lkc-3r3vj | replicator-aks-cc-demo | azure    | centralus   | LOW        | UP
  4. Describe the cluster to obtain the bootstrap server

    ccloud kafka cluster describe lkc-3r3vj

    This will produce a detailed view of the cluster. The Endpoint field contains the bootstrap server value

    | Id          | lkc-3r3vj                                                  |
    | Name        | replicator-aks-cc-demo                                     |
    | Type        | BASIC                                                      |
    | Ingress     |                                                        100 |
    | Egress      |                                                        100 |
    | Storage     |                                                       5000 |
    | Provider    | azure                                                      |
    | Region      | centralus                                                  |
    | Status      | UP                                                         |
    | Endpoint    | SASL_SSL://  |
    | ApiEndpoint |          |

API Key and Secret Configuration

The ccloud CLI allows you to create API Keys to be used with client applications.


You can also create the API Key using the Confluent Cloud UI.

  1. To create a new API Key:

    ccloud api-key create --resource lkc-3r3vj

    The tool will display a new Key and secret as below. You will need to save these values elsewhere as they cannot be retrieved later.

    Save the API key and secret. The secret is **not** retrievable later.
    | API Key | LD35EM2YJTCTRQRM                                                 |
    | Secret  | 67JImN+9vk+Hj3eaj2/UcwUlbDNlGGC3KAIOy5JNRVSnweumPBUpW31JWZSBeawz |

Configure Helm Values

To configure the example to access your Confluent Cloud account, we are going to create a Helm Chart values file, which the example looks for in a particular location to pass to helm commands to weave your cloud account details into the configuration of the Confluent Platform configurations.

  1. Create a values file by executing the following command, first replacing the {{ mustache bracket }} values for bootstrapEndpoint, username, and password with your relevant values obtained above.

    cat <<'EOF' > ./cfg/my-values.yaml
    destinationCluster: &destinationCluster
      name: replicator-aks-cc-demo
        enabled: true
        internal: true
          type: plain
      bootstrapEndpoint: {{ cloud bootstrap server }}
      username: {{ cloud API key }}
      password: {{ cloud API secret }}
        - <<: *destinationCluster
      replicas: 1
          <<: *destinationCluster

    You can now verify the values of the file prior to running the example. The example Makefile will integrate these values into the Helm deployment.

    cat ./cfg/my-values.yaml

Preflight Checks

Prior to running the example you may want to verify the setup.

To verify your AKS cluster status:

az aks list --output table

To verify that your kubectl command is configured with the proper context to control your AKS cluster, run:

kubectl config current-context

The context should contain the proper region and cluster name. If you used the demo aks-create-cluster function to create your cluster, the context name should have the format: aks_<azure_resource_group>_<region>_<cp-examples-operator>-<username>

Example Execution

To run the automated demo (estimated running time, 8 minutes):

make demo

The demo will deploy Confluent Platform leveraging Confluent for Kubernetes. As the various components are deployed, the demonstration will echo the various commands as executing them so you can observe the process. For example, the deployment message for Kafka will look similar to:

+++++++++++++ deploy kafka
helm upgrade --install --namespace operator --wait --timeout=500 -f examples/kubernetes/gke-base/cfg/values.yaml --set global.provider.region=us-central1 --set global.provider.kubernetes.deployment.zones={us-central1-a} -f examples/kubernetes/replicator-gke-cc/cfg/values.yaml -f examples/kubernetes/replicator-gke-cc/cfg/my-values.yaml  --set kafka.replicas=3 --set kafka.enabled=true kafka examples/kubernetes/common/cp/operator/20190912-v0.65.1/helm/confluent-operator
Release "kafka" does not exist. Installing it now.
NAME:   kafka
LAST DEPLOYED: Mon Oct 28 11:42:07 2019
NAMESPACE: operator
✔  ++++++++++ Kafka deployed

+++++++++++++ Wait for Kafka
source examples/kubernetes/common/bin/; retry 15 kubectl --context |kubectl-context-pattern| -n operator get sts kafka
kafka   0/3     1s
kubectl --context |kubectl-context-pattern| -n operator rollout status statefulset/kafka
Waiting for 3 pods to be ready...
Waiting for 2 pods to be ready...
Waiting for 1 pods to be ready...
statefulset rolling update complete 3 pods at revision kafka-775f97f98b...
✔  ++++++++++ Kafka ready

The last output message you should see is:

 Replicator |k8s-service-name|->CC Demo running


Verify Control Center’s View of Multiple Clusters

In order to view Confluent Control Center, network connectivity will need to be available between your local machine and the Kubernetes pod running the Confluent Control Center service. If you used an existing cluster you may already have external cluster access configured, otherwise, you can use the following kubectl command to open a forwarded port connection between your local host and Confluent Control Center.

kubectl -n operator port-forward controlcenter-0 12345:9021

Now open a web-browser to http://localhost:12345, and you should see Confluent Control Center with 2 Healthy Clusters.


The replicator-aks-cc-demo cluster is the Confluent Cloud cluster and the controlcenter.cluster is the AKS based Confluent for Kubernetes managed cluster. At this time, detailed monitoring of Confluent Cloud clusters is not possible from on-premises Confluent Control Center. You will notice that the replicator-aks-cc-demo cluster does not list the number of Brokers as the Confluent Cloud managed Kafka service is serverless and the concept of brokers is abstracted away.

Next click on the controlcenter.cluster and then Consumers, and then replicator. This will give you a view of the Confluent Replicator consumer group as it replicates the stock-trades topics to Confluent Cloud.


This Control Center screen highlights the consumer group lag for the replicator consumer group. In the above screenshot, Control Center is telling us that the replicator consumer’s total lag across all topics and partitions is 27 messages. As Replicator processes messages, it’s consumer group lag will fluctuate and the chart on this screen will indicate the current value as well as maximum amount of lag over time.

Verify replicated stock-trades topic with clients

To view the stock-trades topic data streaming on both clusters, you can open two terminals and run the following series of commands.

  1. To view the stock-trades events on the source (AKS) cluster, in the first terminal, first open a shell on the client-console pod:

    kubectl -n operator exec -it client-console bash
  2. Then use the kafka-console-consumer to stream the values from the stock-trades topic. The required configuraiton is provided in the /etc/kafka-client-properties/ file already mounted in a volume on the pod:

    kafka-console-consumer --bootstrap-server kafka:9071 --consumer.config /etc/kafka-client-properties/ --topic stock-trades --property print.value=false --property print.key=true --property print.timestamp=true
  3. To view the stock-trades events on the destination (Confluent Cloud) cluster, in a second terminal open another shell on the client-console pod:

    kubectl -n operator exec -it client-console bash
  4. Again, use the kafka-console-consumer to stream the values from the stock-trades topic, this time from the destination cluster. The required configuration has been mounted in a volume on the pod in the /etc/destination-cluster-client-properties/ file. Additionally, the bootstrap server value has been added in the /etc/destination-cluster-client-properties/destination-cluster-bootstrap file. This command will use both of those files to create the connection to the destination cluster:

    kafka-console-consumer --bootstrap-server $(cat /etc/destination-cluster-client-properties/destination-cluster-bootstrap)  --consumer.config /etc/destination-cluster-client-properties/ --topic stock-trades --property print.value=false --property print.key=true --property print.timestamp=true

    These commands will print out the timestamp and key of messages as they arrive in each cluster. You an visually match up the events by these values and observe the replication process, for example, in terminal 1 you might see:

    CreateTime:1572380698171        ZJZZT
    CreateTime:1572380698280        ZWZZT
    CreateTime:1572380698351        ZWZZT
    CreateTime:1572380698577        ZJZZT
    CreateTime:1572380699340        ZVZZT

    And in terminal 2 shortly after identical messages:

    CreateTime:1572380698171        ZJZZT
    CreateTime:1572380698280        ZWZZT
    CreateTime:1572380698351        ZWZZT

Stop Example

Any Confluent Cloud example uses real Confluent Cloud resources. After you are done running a Confluent Cloud example, manually verify that all Confluent Cloud resources are destroyed to avoid unexpected charges.

After you are done evaluating the results of the example, you can destroy all the provisioned Kubernetes resources with:

make destroy-demo

If you used the example to create your cluster, you can destroy the AKS cluster with:

make aks-destroy-cluster


Deploying Kafka Connectors with Helm

The following highlights a simple Helm chart that can be used to deploy Kafka Connect Connector configurations using the standard Kafka Connect REST Interface. This is how this demonstration deploys the Confluent Replicator configuration, however, the same method could be used to deploy any Kafka Connect configuration. In future versions of Confluent for Kubernetes, Kafka Connect connectors will be managed by the Operator Controller.

The Helm chart is located in the kubernetes/common/helm/replicator-cc folder of this demonstration. The templates/replicator-configmap.yaml file contains a data section with a templated JSON value that conforms to the Kafka Connect connectors API. The Destination and Source cluster configuration values are filled in at runtime by the helm templating system, and are proivded by your my-values.yaml file created in the demo instructions above.

apiVersion: v1
kind: ConfigMap
  name: replicator-connector
  replicator-connector.json: '{
    "config": {
      "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
      "topic.whitelist": "{{.Values.replicator.topic.whitelist}}",
      "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
      "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
      "dest.kafka.bootstrap.servers": "{{.Values.replicator.dependencies.kafka.bootstrapEndpoint}}",
      "": "{{$destProtocol}}",
      "dest.kafka.sasl.mechanism": "PLAIN",
      "dest.kafka.sasl.jaas.config": "{{$destJaasConfig}}",
      "": "replicator",
      "src.kafka.bootstrap.servers": "kafka:9071",
      "": "{{$srcProtocol}}",
      "src.kafka.sasl.mechanism": "PLAIN",
      "src.kafka.sasl.jaas.config": "{{$srcJaasConfig}}",
      "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
      "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka:9071",
      "": "{{$srcProtocol}}",
      "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN",
      "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "{{$srcJaasConfig}}",
      "src.kafka.timestamps.producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
      "src.kafka.timestamps.producer.confluent.monitoring.interceptor.bootstrap.servers": "kafka:9071",
      "": "{{$srcProtocol}}",
      "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN",
      "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.jaas.config": "{{$srcJaasConfig}}",
      "tasks.max": "1"

In the same directory as the ConfigMap manifest file is a Kubernetes Job manifest (replicator-connector-deploy-job.yaml). This defines a Kubernetes Job that will successfully execute a job to completion and terminate. Using a Docker image that contains the curl program, the ConfigMap defined above is mounted to the batch job Pod, and then curl executes a POST to the Kafka Connect REST API to deploy Replicator.

apiVersion: batch/v1
kind: Job
  name: replicator-connector-deploy
  ttlSecondsAfterFinished: 5
      - name: replicator-connector
          name: replicator-connector
      - name: replicator-connector-deploy
        image: cnfldemos/alpine-curl:3.10.2_7.65.1
        args: [
          "-X", "POST",
          "-H", "Content-Type: application/json",
          "--data", "@/etc/config/connector/replicator-connector.json",
          - name: replicator-connector
            mountPath: /etc/config/connector
      restartPolicy: Never
  backoffLimit: 1

Of note is the way in which the ConfigMap is associated to the Job Pod by name. The value, replicator-connector in the volumes stanza of the Job manifest matches the ConfigMap name in the metadata section of the ConfigMap manifest.

Deploying Connectors with Operator

You can deploy any Kafka connector (or single message transformation (SMT) or converter) in your Kubernetes environment. Search in Confluent Hub, an online library of pre-packaged and ready-to-install connectors, transformations, and converters, to find the one that suits your needs. The Confluent for Kubernetes image for Kafka Connect, confluentinc/cp-server-connect-operator, includes a small number of those connectors but may not have the specific connector you want to deploy. Therefore, to deploy a new connector type in your Kubernetes environment, you will need to get the jars onto the Connect image.

The recommended way is to create a custom Docker image that extends the base Connect Docker image with the desired jars. The custom Docker image builds the dependencies into a single artifact, which is more self-sufficient and portable such that it can be run on any pod despite totally ephemeral disk. See the documentation to learn how to use the Confluent Hub client to create a custom Docker image that extends one of Confluent’s Kafka Connect images with a specific set of ready-to-install connectors. As an example, see how the Kafka Connect Datagen connector, which generates mock events, can be pulled from Confluent Hub and bundled into a Docker image using this Dockerfile. Once you build the custom Docker image, Kubernetes will need to pull this image from a Docker Registry to create the Pods.


It is not recommended to use volumes to place the desired jars onto the Connect image because it is less self-sufficient, less portable, and harder to match up versions between the base image and jars.

For more advanced use cases where you want to use a custom connector instead of a pre-packaged one available at Confluent Hub, you may create a Docker image with a custom connector from a local archive. The demonstration uses this more advanced workflow. We use the Kafka Connect Datagen connector to generate mock events, and this Dockerfile builds the Docker image with a local archive of the Kafka Connect Datagen connector compiled from source code (versus pulling directly from Confluent Hub). We publish this image to Docker Hub, but in your environment, publish to your own Docker Hub repo.

Your Operator Helm values will need to be updated to pull the custom Connect Docker image for your Pods. You can accomplish this by overriding the connect image to instead use the one published to Docker Hub in the demo’s value.yaml configuration file.

  repository: cnfldemos/cp-server-connect-operator-datagen
  tag: 0.3.1-

Advanced Usage

Customize AKS Cluster Creation

The following table documents variables that can be used to configure various behaviors. Variables can be exported or set in each individual make command with either sample syntax below:

VARIABLE=value make <make-target>
make <make-target> VARIABLE=value
Variable Description Default
AZ_RESOURCE_GROUP Maps to your AZURE RESOURCE GROUP. This is used by the demo to build a new AKS cluster as well as configuring the kubectl context. You must set this variable in the current shell where you are running the demo. none
AKS_BASE_CLUSTER_ID Identifies the AKS Cluster. Substitutes in the current user to help with project uniqueness on GCP. cp-examples-operator-$USER
AKS_BASE_REGION Used in the --location flag to define the networking region us-central1
AKS_BASE_ZONE Maps to the --zones flag us-central1-a
AKS_BASE_CLUSTER_VERSION Maps to the --kubernetes-version flag 1.17.13
AKS_BASE_MACHINE_TYPE Maps to the --node-vm-size flag Standard_D4s_v4
AKS_BASE_DISK_SIZE Maps to the --node-osdisk-size flag 100
AKS_BASE_NUM_NODES Maps to the --node-count flag 3
KUBECTL_CONTEXT Used to explicitly set the kubectl context within the demo aks_$(AZ_RESOURCE_GROUP)_$(AKS_BASE_REGION)_$(AKS_BASE_CLUSTER_ID)


  • If you observe that the replicated offsets do not match in the source and destination cluster, the destination cluster may have existed prior to starting the cluster in situations where you may have restarted the demonstration. To see the full demonstration function properly, use a new cluster or delete and recreate the destination topic prior to running the demo.