Hybrid Cloud and Bridge-to-Cloud

Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.

Introduction

This tutorial is a hands-on look at how to use Cluster Linking for hybrid use cases that link Confluent Platform and Confluent Cloud clusters.

You will create a deployment with data flowing in both directions:

  • From Confluent Cloud to Confluent Platform

  • From Confluent Platform to Confluent Cloud

    • This direction will require a “source initiated” cluster link; a new feature introduced in Confluent Platform 7.0.0.

      ../../_images/source-initiated-cluster-link.png

In both cases, Confluent Platform brokers will initiate the connection to Confluent Cloud brokers. Therefore, you will not have to open up your firewall to let Confluent Cloud connect to your Confluent Platform brokers.

In the process, you will create various security credentials and configuration files to use with the Confluent Platform and Confluent Cloud commands. For a handy list of these, see the Configuration Summary at the end of this tutorial.

To see what clusters can use Cluster Linking, see Supported Cluster Types.

../../_images/cluster-link-hybrid.png

What the tutorial covers

By the end of this tutorial, you will have configured two clusters, one on Confluent Platform and one on Confluent Cloud, and successfully used Cluster Linking to share topic data bidirectionally across the clusters, all without opening up your firewall to Confluent Cloud.

Install Confluent Platform

Download and extract Confluent Platform version 7.0.0.

The rest of the tutorial expects these variables to be set:

export CONFLUENT_HOME=<CP installation directory>
export CONFLUENT_CONFIG=$CONFLUENT_HOME/etc/kafka

Add these two lines to your .bashrc or .bash-profile so that they are executed whenever you open a terminal window.

About Prerequisites and Command Examples

Note

As a general guideline (not just for this tutorial), any customer-owned firewall that allows the cluster link connection from source cluster brokers to destination cluster brokers must allow the TCP connection to persist in order for Cluster Linking to work.

Ports and Configuration Mapping

The example deployment in this tutorial uses the following port and feature configurations, and assumes that services will run on localhost.

Confluent Platform
Kafka Brokers 9092
ZooKeeper 2181

Tip

  • These are example ports that are used for the purposes of this tutorial. Cluster Linking does not require you to use these exact ports. You may change them if needed.
  • If you have other processes using these ports, either quit the other processes, or modify the tutorial steps to use different ports.

Configure Kafka and ZooKeeper files

In $CONFLUENT_CONFIG, configure the following files to set up the Confluent Platform cluster.

Copy $CONFLUENT_CONFIG/zookeeper.properties to use as a basis for zookeeper-clusterlinking.properties.

Copy $CONFLUENT_CONFIG/server.properties to use as a basis for server-clusterlinking.properties.

File Configurations
zookeeper-clusterlinking.properties

dataDir=/tmp/zookeeper-clusterlinking (this must be modified)

clientPort=2181 (this is the default)

server-clusterlinking.properties

These must be added to the existing file:

inter.broker.listener.name=SASL_PLAINTEXT

sasl.enabled.mechanisms=SCRAM-SHA-512

sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret";

confluent.reporters.telemetry.auto.enable=false

confluent.cluster.link.enable=true

password.encoder.secret=encoder-secret

These are modifications or uses of existing configs:

listeners=SASL_PLAINTEXT://:9092

advertised.listeners=SASL_PLAINTEXT://:9092

log.dirs=/tmp/kafka-logs-1

zookeeper.connect=localhost:2181 (should already be set this way)

offsets.topic.replication.factor=1 (should already be set this way)

confluent.license.topic.replication.factor=1 (should already be set this way)

Note

  • This example configures only one ZooKeeper and one Confluent Server broker, secured with SSL. This is fine for testing on your local machine, but in a production setting, you should have more Zookeepers and brokers, spread across different machines for fault tolerance and high availability, all secured with authentication and encryption.
  • For this example, the replication factors for important internal topics are set to 1, because this is a testing setup with only one broker. For production deployments, do not set the replication factor of these topics to 1. Generally, replication factors should be set to 3 or more, depending on the number of brokers.
  • The parameter password.encoder.secret is needed to encrypt the credentials which will be stored in the cluster link.

Start the Confluent Platform cluster

Run the following commands in separate command windows.

ZooKeeper and Confluent Server commands do not “complete” until you stop them, so these windows need to stay open while the applications are running.

Use another command window to serve as your main terminal in which to run commands that you expect to complete. (Examples of these are kafka-configs, kafka-topics, kafka-cluster-links, and in certain cases kafka-console-producer and kafka-console-consumer, although sometimes you may want to leave these last two running as well.)

../../_images/cluster-link-hybrid-command-windows.png
  1. In a new command window, start the ZooKeeper server for the Confluent Platform cluster.

    zookeeper-server-start $CONFLUENT_CONFIG/zookeeper-clusterlinking.properties
    
  2. Run commands to create SASL SCRAM credentials on the cluster for two users: one to be used by the Kafka cluster, and the other for running commands against the cluster.

    • Run this command to create credentials on the cluster for a user called “kafka” that will be used by the Kafka cluster itself.

      kafka-configs --zookeeper localhost:2181 --alter --add-config \
        'SCRAM-SHA-512=[iterations=8192,password=kafka-secret]' \
        --entity-type users --entity-name kafka
      
    • Run this command to create credentials on the cluster for a user called “admin” that you will use to run commands against this cluster.

      kafka-configs --zookeeper localhost:2181 --alter --add-config \
        'SCRAM-SHA-512=[iterations=8192,password=admin-secret]' \
        --entity-type users --entity-name admin
      
  3. Create a file with the admin credentials to authenticate when you run commands against the Confluent Platform cluster.

    Open a text editor, create a file called $CONFLUENT_CONFIG/CP-command.config and copy-paste in the following content:

    sasl.mechanism=SCRAM-SHA-512
    security.protocol=SASL_PLAINTEXT
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="admin" \
      password="admin-secret";
    
  4. In a new command window, start a Confluent Server broker for the source cluster, passing the credentials as a part of the command.

    kafka-server-start $CONFLUENT_CONFIG/server-clusterlinking.properties
    
  5. Get the Confluent Platform cluster ID.

    kafka-cluster cluster-id --bootstrap-server localhost:9092 --config $CONFLUENT_CONFIG/CP-command.config
    

    Your output should resemble:

    Cluster ID: G1pnOMOxSjWYIX8xuR2cfQ
    

    In this case, G1pnOMOxSjWYIX8xuR2cfQ is the Confluent Platform cluster ID, referred to in these examples as $CP_CLUSTER_ID.

    Optionally, set an environment variable for this either in the local shell, or in a zsh or bash profile so that you can directly cut-and-paste commands in later steps:

    export CP_CLUSTER_ID=<CP-CLUSTER-ID>
    

Start the Confluent Cloud cluster

You need a Dedicated Confluent Cloud cluster with Public internet in order to run the rest of the commands. You may create one just for the purpose of this demo, and then delete it after the tutorial is over. You will incur charges for this cluster.

  1. Log on to Confluent Cloud using either the unified CLI or the Confluent Cloud CLI (see About Prerequisites and Command Examples).

    This example uses the unified CLI command:

    confluent login
    
  2. View environments, and select the one you want to use by environment ID.

    confluent environment list
    

    An asterisk indicates the currently selected environment in the list. You can select a different environment as follows.

    confluent environment use <environment-ID>
    
  3. Use an existing Dedicated cluster in Confluent Cloud, or create a new one either from the Confluent Cloud Console or directly from the Confluent CLI as shown below:

    confluent kafka cluster create CLOUD-DEMO --type dedicated --cloud aws --region us-east-1 --cku 1 --availability single-zone
    

    Your output should resemble:

    It may take up to 5 minutes for the Kafka cluster to be ready.
    +--------------+---------------+
    | Id           | lkc-59oyn     |
    | Name         | MY-CLOUD-DEMO |
    | Type         | DEDICATED     |
    | Ingress      |            50 |
    | Egress       |           150 |
    | Storage      | Infinite      |
    | Provider     | aws           |
    | Availability | single-zone   |
    | Region       | us-east-1     |
    | Status       | PROVISIONING  |
    | Endpoint     |               |
    | ApiEndpoint  |               |
    | RestEndpoint |               |
    | ClusterSize  |             1 |
    +--------------+---------------+
    

    If you created a new Confluent Cloud cluster, you must wait for the cluster to be provisioned. This typically takes a few minutes, but can take longer. You will be notified in email when the cluster is ready for use.

  4. View your clusters.

    confluent kafka cluster list
    

    An asterisk indicates the currently selected cluster. You can select a different cluster as follows:

    confluent kafka cluster use <CC-CLUSTER-ID>
    

    Tip

    You can get information or take several types of actions on a cluster that is not currently selected by specifying its cluster ID. For example, confluent kafka cluster describe <cluster-ID>.

  5. Note the cluster ID for your Dedicated cluster, referred to as $CC-CLUSTER-ID in this tutorial.

    Optionally, set an environment variable for this either in the local shell, or in a zsh or bash profile so that you can directly cut-and-paste commands in later steps:

    export CC_CLUSTER_ID=<CC-CLUSTER-ID>
    

Populate the Confluent Platform cluster

These commands use the Confluent Platform CLI.

  1. Create a topic on the Confluent Platform cluster with a single partition so ordering is easier to see.

    kafka-topics --create --topic from-on-prem --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    You should get confirmation that the topic was successfully created.

    Created topic from-on-prem.
    

    You can get a list of existing topics as follows:

    kafka-topics --list --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    And get detailed information on a topic with the --describe option:

    kafka-topics --describe --topic from-on-prem --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    
  2. Send some messages to the from-on-prem topic on the source cluster, and fill it with data.

    seq 1 5 | kafka-console-producer --topic from-on-prem --bootstrap-server localhost:9092 --producer.config $CONFLUENT_CONFIG/CP-command.config
    

    The command should terminate without any output.

  3. Consume from the topic on the source cluster.

    Run a consumer to consume messages from the from-on-prem topic.

    kafka-console-consumer --topic from-on-prem --from-beginning --bootstrap-server localhost:9092 --consumer.config $CONFLUENT_CONFIG/CP-command.config
    

    If the topic successfully consumes the messages, your output will be:

    1
    2
    3
    4
    5
    

    Use keyboard command Ctrl+C to get the prompt back.

Set up privileges for the Confluent Cloud cluster

On Confluent Cloud:

  1. Create a user API key for your Confluent Cloud cluster to act as the destination in Confluent Platform to Confluent Cloud topic data mirroring.

    confluent api-key create --resource $CC_CLUSTER_ID
    
  2. Save the resulting API key and secret in a safe place. This tutorial refers to these as <CC-link-api-key> and <CC-link-api-secret>. This is the API key and secret associated with the Confluent Cloud cluster that you will use to create the Confluent Platform to Confluent Cloud link. You will add these to a configuration file in the next step.

    Important

    If you are setting this up in production, you should use a service account API key instead of a user-associated key. A guide on how to set up privileges to access Confluent Cloud clusters with a service account is provided in the topic data sharing tutorial. For source-initiated links, the only ACL your service account will need is ALTER on the destination cluster (Cluster: Alter ACL). To learn more about ACLs for cluster linking, see the Security for Cluster Linking on Confluent Platform and the Security for Cluster Linking on Confluent Cloud

Mirror data from on-premises to Confluent Cloud

The following sections describe how to set up and test the Confluent Platform to Confluent Cloud link.

Create topics and mirror data to Confluent Cloud

Note

  • When using Schema Linking: To use a mirror topic that has a schema with Confluent Cloud Connect, ksqlDB, broker-side schema validation, or the topic viewer, make sure that make sure that Schema Linking puts the schema in the default context of the Confluent Cloud Schema Registry. To learn more, see How Schemas work with Mirror Topics.
  • Before running the first command in the steps below, make sure that you are still logged in to Confluent Cloud and have the appropriate environment and cluster selected. To list and select these resources, use the commands confluent kafka environment list, confluent kafka environment use, confluent kafka cluster list, and confluent kafka cluster use. A selected environment or cluster is indicated by an asterisk next to it in the output of list commands. The commands won’t work properly if no resources are selected (or if the wrong ones are selected).

Perform the following tasks logged in to Confluent Cloud.

  1. Create a mirror topic.

    The following command establishes a mirror of the original from-on-prem topic, using the cluster link from-on-prem-link.

    confluent kafka mirror create from-on-prem --link from-on-prem-link
    

    The command output will be:

    Created mirror topic "from-on-prem".
    
    • The mirror topic name must match the original topic name. To learn more, see all Known Limitations.
    • A mirror topic must specify the link to its source topic at creation time. This ensures that the mirror topic is a clean slate, with no conflicting data or metadata.
  2. List the mirror topics on the link.

    confluent kafka mirror list --cluster $CC_CLUSTER_ID
    

    Your output will resemble:

          Link Name     | Mirror Topic Name | Num Partition | Max Per Partition Mirror Lag | Source Topic Name | Mirror Status | Status Time Ms
    +-------------------+-------------------+---------------+------------------------------+-------------------+---------------+----------------+
      from-on-prem-link | from-on-prem      |             1 |                            0 | from-on-prem      | ACTIVE        |  1633640214250
    
  3. Consume from the mirror topic on the destination cluster to verify it.

    Still on Confluent Cloud, run a consumer to consume messages from the mirror topic to consume the messages you originally produced to the Confluent Platform topic in previous steps.

    confluent kafka topic consume from-on-prem --from-beginning
    

    Your output should be:

    1
    2
    3
    4
    5
    

    Note

    If when you attempt to run the consumer you get an error indicating “no API key selected for resource”, run this command to specify the <CC-API-KEY> for the Confluent Cloud destination cluster, then re-run the consumer command: confluent api-key use <CC-API-KEY> --resource $CC_CLUSTER_ID, or follow the instructions on the CLI provided with the error messages.

Mirror data from Confluent Cloud to on-premises

The following sections describe how to set up and test the Confluent Cloud to Confluent Platform link.

Create topics and mirror data to on-premises

  1. In Confluent Cloud, use the unified Confluent CLI to create a topic with one partition called cloud-topic.

    confluent kafka topic create cloud-topic --partitions 1
    
  2. In another command window on Confluent Cloud, start a producer to send some data into cloud-topic.

    confluent kafka topic produce cloud-topic --cluster $CC_CLUSTER_ID
    
    • Verify that the producer has started. Your output will resemble the following to show that the producer is ready.

      $ confluent kafka topic produce cloud-topic --cluster lkc-1vgo6
      Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit.
      
    • Type some entries of your choice into the producer window, hitting return after each entry to send.

      Riesling
      Pinot Blanc
      Verdejo
      
  3. Mirror the cloud-topic on Confluent Platform, using the command kafka-mirrors --create --mirror-topic <topic-name>.

    The following command establishes a mirror of the original cloud-topic, using the cluster link from-cloud-link.

    kafka-mirrors --create --mirror-topic cloud-topic --link from-cloud-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    You should get this verification that the mirror topic was created.

    Created topic cloud-topic.
    
  4. On Confluent Platform, check the mirror topic status by running kafka-mirrors --describe on the from-cloud-link.

    kafka-mirrors --describe --link from-cloud-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    Your output will show the status of any mirror topics on the specified link.

    Topic: cloud-topic        LinkName: from-cloud-link       LinkId: b1a56076-4d6f-45e0-9013-ff305abd0e54    MirrorTopic: cloud-topic        State: ACTIVE   StateTime: 2021-10-07 16:36:20
              Partition: 0    State: ACTIVE   DestLogEndOffset: 2     LastFetchSourceHighWatermark: 2 Lag: 0  TimeSinceLastFetchMs: 384566
    
  5. Consume the data from the on-prem mirror topic.

    kafka-console-consumer --topic cloud-topic --from-beginning --bootstrap-server localhost:9092 --consumer.config $CONFLUENT_CONFIG/CP-command.config
    

    Your output should match the entries you typed into the Confluent Cloud producer in step 8.

    ../../_images/cluster-link-hybrid-produce-consume.png
  6. View the configuration of your cluster link:

    kafka-configs --describe --cluster-link from-cloud-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    The output for this command is a list of configurations, partially shown in the following example.

    Dynamic configs for cluster-link from-cloud-link are:
    metadata.max.age.ms=300000 sensitive=false synonyms={}
    reconnect.backoff.max.ms=1000 sensitive=false synonyms={}
    auto.create.mirror.topics.filters= sensitive=false synonyms={}
    ssl.engine.factory.class=null sensitive=false synonyms={}
    sasl.kerberos.ticket.renew.window.factor=0.8 sensitive=false synonyms={}
    reconnect.backoff.ms=50 sensitive=false synonyms={}
    consumer.offset.sync.ms=30000 sensitive=false synonyms={}
    
    ...
    
    link.mode=DESTINATION sensitive=false synonyms={}
    security.protocol=SASL_SSL sensitive=false synonyms={}
    acl.sync.ms=5000 sensitive=false synonyms={}
    ssl.keymanager.algorithm=SunX509 sensitive=false synonyms={}
    sasl.login.callback.handler.class=null sensitive=false synonyms={}
    replica.fetch.max.bytes=5242880 sensitive=false synonyms={}
    availability.check.consecutive.failure.threshold=5 sensitive=false synonyms={}
    sasl.login.refresh.window.jitter=0.05 sensitive=false synonyms={}
    

Teardown

Stop consumers and producers

Stop consumers and producers with Ctl-C in their respective command windows.

Promote mirror topics

Promote the mirror topics to normal topics.

  1. On Confluent Cloud promote the mirror topic called from-on-prem:

    confluent kafka mirror promote from-on-prem --link from-on-prem-link --cluster $CC_CLUSTER_ID
    

    Your output will resemble:

     Mirror Topic Name | Partition | Partition Mirror Lag | Error Message | Error Code | Last Source Fetch Offset
    +-------------------+-----------+----------------------+---------------+------------+--------------------------+
     from-on-prem      |         0 |                    0 |               |            |                        9
    

    If you want to verify that the mirroring stopped, you can re-run the above command. You should get a message in the Error Message column that Topic 'from-on-prem' has already stopped its mirror from 'from-on-prem-link'.

  2. On Confluent Platform, promote the mirror topic called cloud-topic:

    kafka-mirrors --promote --topics cloud-topic --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    Your output should resemble:

    Calculating max offset and ms lag for mirror topics: [cloud-topic]
    Finished calculating max offset lag and max lag ms for mirror topics: [cloud-topic]
    Request for stopping topic cloud-topics mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
    

    If you retry this command, you will get an error indicating that the Topic 'cloud-topic' has already stopped its mirror 'from-cloud-link'.

Delete the source and mirror topics

Tip

  • To list the topics on Confluent Cloud: confluent kafka topic list
  • To list the topics on Confluent Platform: kafka-topics --list --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
  1. Delete the topics on Confluent Cloud.

    confluent kafka topic delete cloud-topic
    
    confluent kafka topic delete from-on-prem
    
  2. Delete the topics from Confluent Platform.

    kafka-topics --delete --topic cloud-topic --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    
    kafka-topics --delete --topic from-on-prem --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

Stop Confluent Platform and ZooKeeper

Stop all of the other components with Ctl-C in their respective command windows, in reverse order in which you started them.

  1. Stop the Kafka broker first.
  2. When the Kafka broker has fully shut down and your prompt has returned, then go to the other window and stop the associated ZooKeeper.

Configuration Summary

File Purpose
zookeeper-clusterlinking.properties Configuration file used for ZooKeeper startup, as described in Configure Kafka and ZooKeeper files
server-clusterlinking.properties Configuration file used for the Confluent Platform cluster startup, as described in Configure Kafka and ZooKeeper files
CP-command.config
  • Created in step 3 of Start the Confluent Platform cluster
  • Contains admin credentials to authenticate when you run commands against the Confluent Platform cluster
  • Used with the flag –command-config in Confluent Platform commands
clusterlink-hybrid-dst.config
clusterlink-CP-src.config
  • Created in step 3 of Create a Confluent Platform to Confluent Cloud link
  • Specifies the link configuration for the Confluent Platform cluster that serves as the source, includes credentials and connection information for Confluent Platform to authenticate into Confluent Cloud
  • Used to create the cluster link from-on-prem-link on the Confluent Platform side
clusterlink-cloud-to-CP.config
  • Created in step 4 of Create the Confluent Cloud to Confluent Platform link
  • Contains security credentials and connection information that the Confluent Platform commands use to authenticate into Confluent Cloud
  • Used to create the from-cloud-link on the Confluent Platform side