By the end of this tutorial, you will have configured two clusters, one on
Confluent Platform and one on Confluent Cloud, and successfully used Cluster Linking to share
topic data bidirectionally across the clusters, all without opening up your firewall to Confluent Cloud.
You will create a deployment with data flowing in both directions:
From Confluent Cloud to Confluent Platform
From Confluent Platform to Confluent Cloud
This direction will require a “source initiated” cluster link; a new feature introduced in Confluent Platform 7.1.0.
In both cases, Confluent Platform brokers will initiate the connection to Confluent Cloud brokers. Therefore, you will not have to open
up your firewall to let Confluent Cloud connect to your Confluent Platform brokers.
In the process, you will create various security credentials and configuration files to use with the Confluent Platform and Confluent Cloud commands.
For a handy list of these, see the Configuration summary at the end of this tutorial.
As a general guideline (not just for this tutorial), any customer-owned firewall that allows the cluster link connection from
source cluster brokers to destination cluster brokers must allow the TCP connection to persist in order for Cluster Linking to work.
This tutorial and the source-initiated link feature require Confluent Enterprise, and are not supported in Confluent Community or Apache Kafka®.
With a default install of Confluent Platform, the Confluent CLI.
and Cluster Linking commands
should be available in $CONFLUENT_HOME/bin and properties files will be in the directory CONFLUENT_CONFIG ($CONFLUENT_HOME/etc/kafka/).
You must have Confluent Platform running to access these commands. Once Confluent Platform is configured and running,
you can type any command with no arguments to get help (for example, kafka-cluster-links).
As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.
To learn more about running Kafka in KRaft mode, see KRaft Overview.
This tutorial provides examples for both KRaft mode and ZooKeeper mode.
For KRaft, the examples show a combined mode configuration, where for each cluster the broker and controller run on the same server.
Currently, combined mode is not intended for production use but is shown here to simplify the tutorial.
If you want to run controllers and brokers on separate servers, use KRaft in isolated mode. To learn more, see KRaft Overview
and Kraft mode under
Configure Confluent Platform for production.
Configure Kafka brokers, controllers, and ZooKeeper files¶
The example deployment in this tutorial uses the following default port and feature configurations, and assumes that services will run on localhost.
These are example ports that are used for the purposes of this tutorial.
Cluster Linking does not require you to use these ports.
If you have other processes using these ports, either quit the other processes, or modify the tutorial steps to use different ports.
Configure ports, data directories, authentication, and Cluster Linking specifics¶
The sections below provide quick, copy-paste steps for setting up your Kafka brokers and controllers (KRaft) or ZooKeeper files.
Configure the following files in $CONFLUENT_CONFIG, to set up the Confluent Platform cluster.
Copy $CONFLUENT_CONFIG/server.properties to use as a basis for server-clusterlinking.properties:
Modify the listeners and advertised.listeners configurations to use SASL_PLAINTEXT, instead of the default PLAINTEXT.
You can update both of these configurations simultaneously with the following command.
If you check your server-clusterlinking.properties file after these edits, you should see the above seven lines at the end of the file,
and the other configs upated per the previous steps.
This example configures only one Confluent Server broker, secured with SSL, along with one KRaft controller or ZooKeeper, depending on your use case.
This is fine for testing on your local machine, but in a production setting, you should have more brokers (and requisite KRaft controllers or ZooKeepers),
spread across different machines for fault tolerance and high availability, all secured with authentication and encryption.
The replication factors for important internal topics are set to 1, because this is a testing setup with only one broker.
For production deployments, do not set the replication factor of these topics to 1. Generally, replication factors should
be set to 3 or more, depending on the number of brokers.
The parameter password.encoder.secret is needed to encrypt the credentials which will be stored in the cluster link.
To learn more about this parameter, see Multi-Region Clusters.
Format log directories for this server and create SASL SCRAM credentials on the cluster: a user called “kafka” that will be used by the Kafka cluster itself,
and another for a user called “admin” that you will use to run commands against this cluster. For KRaft, both credentials must be applied together in a single command.
The kafka-storage command is run only once per broker/controller. You cannot use this command to update an existing cluster.
If you make a mistake in configurations at this point, you must recreate the directories from scratch, and work through the steps again.
Create a file with the admin credentials to authenticate when you run commands against the Confluent Platform cluster.
Open a text editor, create a file called $CONFLUENT_CONFIG/CP-command.config and copy-paste in the following content:
Run the following commands in separate command windows.
The commands used to run the KRaft controller, ZooKeeper, and Kafka brokers do not “complete” until you stop them, so these windows need to stay open while the applications are running.
Use another command window to serve as your main terminal in which to run commands that you expect to complete. (Examples of these
are kafka-configs, kafka-topics, kafka-cluster-links, and in certain cases kafka-console-producer and kafka-console-consumer,
although sometimes you may want to leave these last two running as well.)
In a new command window, start a Confluent Server broker for the source cluster, passing the credentials as a part of the command.
You need a Dedicated Confluent Cloud
cluster with Public internet in order to run the rest of the commands. You may create one just for the purpose of this demo,
and then delete it after the tutorial is over. You will incur charges for this cluster.
If you created a new Confluent Cloud cluster, you must wait for the cluster to be provisioned. This typically takes a few minutes, but can take longer.
You will be notified in email when the cluster is ready for use.
View your clusters.
An asterisk indicates the currently selected cluster. You can select a different cluster as follows:
You can get information or take several types of actions on a cluster that is not currently selected by specifying its cluster ID.
For example, confluentkafkaclusterdescribe<cluster-ID>.
Note the cluster ID for your Dedicated cluster, referred to as $CC-CLUSTER-ID in this tutorial.
Optionally, set an environment variable for this either in the local shell, or in a zsh or bash profile so that you can directly cut-and-paste commands in later steps:
If the topic successfully consumes the messages, your output will be:
Use keyboard command Ctrl+C to get the prompt back.
Set up privileges for the Confluent Cloud cluster¶
On Confluent Cloud:
Create a user API key for your Confluent Cloud cluster to act as the destination in Confluent Platform to Confluent Cloud topic data mirroring.
Save the resulting API key and secret in a safe place. This tutorial refers to these as <CC-link-api-key> and <CC-link-api-secret>.
This is the API key and secret associated with the Confluent Cloud cluster that you will use to create the Confluent Platform to Confluent Cloud link.
You will add these to a configuration file in the next step.
The following sections describe how to set up and test the Confluent Platform to Confluent Cloud link.
Create a Confluent Platform to Confluent Cloud link¶
Set up the cluster link that mirrors data from Confluent Platform to Confluent Cloud.
This tutorial shows how to create a cluster link from Confluent Platform to Confluent Cloud.
That said, you can use the same general configuration if the destination is Confluent Platform 7.0 or later;
you would create cluster link in the same way.
This is a source initiated link, meaning that its connection will come from Confluent Platform
and go to Confluent Cloud. As such, you won’t have to open your on-premise firewall.
To create this source initiated link, you must create both halves of the cluster link:
the first half on Confluent Cloud, the second half on Confluent Platform.
Create a cluster link on the Confluent Cloud cluster.
Create a link configuration file $CONFLUENT_CONFIG/clusterlink-hybrid-dst.config with the following entries:
The combination of the configurations link.mode=DESTINATION and connection.mode=INBOUND tell the cluster link
that it is the Destination half of a source initiated cluster link. These two configurations must be used together.
This tutorial example is based on the assumption that there is only one listener. If you configure multiple listeners (for example, INTERNAL, REPLICATION and EXTERNAL)
and want to switch to a different listener than the default, you must add one more parameter to the configuration: local.listener.name=EXTERNAL.
To learn more, see the Confluent Platform documentation on Configuration Options
and Understanding Listeners in Cluster Linking
If you want to add any configurations to your cluster link (such as consumer offset sync or auto-create mirror topics)
clusterlink-hybrid-dst.config is the file where you would add them. Cluster link configurations are always set on
the Destination cluster link (not the Source cluster link).
Create the destination cluster link on Confluent Cloud.
The combination of configurations link.mode=SOURCE and connection.mode=OUTBOUND tell the cluster link
that it is the source-half of a source initiated cluster link. These configurations must be used together.
The middle section tells the cluster link the bootstrap.servers of the Confluent Cloud destination cluster for it to reach out to,
and the authentication credentials to use. Cluster Linking to Confluent Cloud uses TLS and SASL_PLAIN. This is needed so that the
Confluent Cloud cluster knows to accept the incoming request. The Confluent Cloud bootstrap server is shown as the Endpoint in the output for
confluentkafkaclusterdescribe$CC_CLUSTER_ID , or in cluster settings on the Confluent Cloud console. If you use the Endpoint from
the CLI output, remove the protocol prefix. For example, if the endpoint shows as SASL_SSL://pkc-r2ymk.us-east-1.aws.confluent.cloud:9092,
your entry in $CONFLUENT_CONFIG/clusterlink-CP-src.config should be bootstrap.servers=pkc-r2ymk.us-east-1.aws.confluent.cloud:9092.
The last section, where lines are prefixed with local, contains the security credentials to use with the source cluster (Confluent Platform) to read data.
Note that the authentication mechanisms and security protocols for Confluent Platform map to what is defined in the broker.
Those for Confluent Cloud map to what will be defined in a file called clusterlink-cloud-to-CP.config in a subsequent step.
To learn more about the authentication and security protocols used, see Authentication with SASL using JAAS.
Do not add any cluster link configurations (such as consumer offset sync or auto-create mirror topics) to clusterlink-CP-src.config.
These configurations must be set on the Destination’s cluster link (not the Source cluster’s cluster link).
Create the source cluster link on Confluent Platform, using the following command, specifying the configuration file from the previous step.
The command to create the cluster link uses the Confluent Platform kafka-cluster-links
to talk with the Confluent Platform cluster, which is different from the unified Confluent CLI used to talk with the Confluent Cloud cluster.
When using Schema Linking: To use a mirror topic that has a schema with Confluent Cloud Connect, ksqlDB, broker-side schema validation,
or the topic viewer, make sure that make sure that Schema Linking
puts the schema in the default context of the Confluent Cloud Schema Registry. To learn more, see
How Schemas work with Mirror Topics.
Before running the first command in the steps below, make sure that you are still logged in to Confluent Cloud and have the appropriate environment and cluster selected.
To list and select these resources, use the commands confluentkafkaenvironmentlist, confluentkafkaenvironmentuse, confluentkafkaclusterlist,
and confluentkafkaclusteruse. A selected environment or cluster is indicated by an asterisk next to it in the output of list commands.
The commands won’t work properly if no resources are selected (or if the wrong ones are selected).
Perform the following tasks logged in to Confluent Cloud.
Create a mirror topic.
The following command establishes a mirror of the original from-on-prem topic, using the cluster link from-on-prem-link.
If when you attempt to run the consumer you get an error indicating “no API key selected for resource”, run this command to specify
the <CC-API-KEY> for the Confluent Cloud destination cluster, then re-run the consumer command: confluentapi-keyuse<CC-API-KEY>--resource$CC_CLUSTER_ID,
or follow the instructions on the CLI provided with the error messages.
The following sections describe how to set up and test the Confluent Cloud to Confluent Platform link.
Create the Confluent Cloud to Confluent Platform link¶
Create another user API key for this cluster link on your Confluent Cloud cluster.
You use the same cluster that served as the destination in previous steps as the source cluster in the following steps, therefore, you create
a different API key and secret for the same cluster to serve in this new role.
Keep the resulting API key and secret in a safe place. This tutorial refers to these as <CC-src-api-key> and <CC-src-api-secret>.
You will add these to a configuration file in the next step.
If you are setting this up in production, you should use a service account API key instead of a user-associated key.
To do this, you would create a service account for your cluster link, give the service account the requisite ACLs, then
create an API key for the service account. It’s best practice for each cluster link to have its own API key and service account. A guide on
how to set up privileges to access Confluent Cloud clusters with a service account
is provided in the topic data sharing tutorial.
Use confluentkafkaclusterdescribe to get the Confluent Cloud cluster Endpoint URL.
This Endpoint URL will be referred to as <CC-BOOTSTRAP-SERVER> in the following steps.
Save your API key and secret along with the following configuration entries in a file called $CONFLUENT_CONFIG/clusterlink-cloud-to-CP.config that the Confluent Platform commands will use to authenticate into Confluent Cloud:
A copy-paste of this file into a vi or Emacs editor should result in each of these statements being on one line. Make sure that the lines are not broken up.
The last line starting with sasl.jaas.config= should show all on one line in your file (as should the others). Supply values for your Confluent Cloud bootstrap server,
and API key and secret, then save the file.
Note that the values for security.protocol and sasl.mechanism map to what you defined for Confluent Cloud in clusterlink-CP-src.config.
Create the cluster link to Confluent Platform.
If you want to follow this example exactly, name the cluster link from-cloud-link but you have the option to name it whatever you like.
You will use the cluster link name to create and manipulate mirror topics. You cannot rename a cluster link once it’s created.
The following command creates the cluster link on an unsecured Confluent Platform cluster. If you have security set up on your Confluent Platform cluster,
you must pass security credentials to this command with --command-config as shown in
Setting Properties on a Cluster Link.
Your output should resemble the following, showing the previous from-on-prem-link you created along with the new from-cloud-link
Link name: 'from-on-prem-link', link ID: '7eb4304e-b513-41d2-903e-147dea62a01c', remote cluster ID: 'lkc-1vgo6', local cluster ID: 'G1pnOMOxSjWYIX8xuR2cfQ'
Link name: 'from-cloud-link', link ID: 'b1a56076-4d6f-45e0-9013-ff305abd0e54', remote cluster ID: 'lkc-1vgo6', local cluster ID: 'G1pnOMOxSjWYIX8xuR2cfQ'
If you want to verify that the mirroring stopped, you can re-run the above command. You should get a message in the
Error Message column that Topic'from-on-prem'hasalreadystoppeditsmirrorfrom'from-on-prem-link'.
On Confluent Platform, promote the mirror topic called cloud-topic:
Calculating max offset and ms lag for mirror topics: [cloud-topic]
Finished calculating max offset lag and max lag ms for mirror topics: [cloud-topic]
Request for stopping topic cloud-topics mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
If you retry this command, you will get an error indicating that the Topic'cloud-topic'hasalreadystoppeditsmirror'from-cloud-link'.
There will be two because one was required for the source initiated link and the other to act as the destination for Confluent Cloud data:
Linkname:'from-on-prem-link',linkID:'7eb4304e-b513-41d2-903e-147dea62a01c',remoteclusterID:'lkc-1vgo6'localclusterID:', local cluster ID: 'G1pnOMOxSjWYIX8xuR2cfQ''remoteclusteravailable:'true'
Linkname:'from-cloud-link',linkID:'b1a56076-4d6f-45e0-9013-ff305abd0e54',remoteclusterID:'lkc-1vgo6'localclusterID:', local cluster ID: 'G1pnOMOxSjWYIX8xuR2cfQ''remoteclusteravailable:'true'
Delete the cluster links on Confluent Platform, using kafka-cluster-links--delete<link-name>.
Specifies the link configuration for the Confluent Platform cluster that serves as the source, includes credentials and connection information for Confluent Platform to authenticate into Confluent Cloud
Used to create the cluster link from-on-prem-link on the Confluent Platform side