On-Prem Kafka to Cloud¶
This Confluent Cloud example showcases a hybrid Kafka cluster: one cluster is a self-managed Kafka cluster running locally, the other is a Confluent Cloud cluster. The use case is “Bridge to Cloud” as customers migrate from on premises to cloud.

Overview¶
The major components of the example are:
- Two Kafka clusters: one cluster is a self-managed cluster running locally, the other is a Confluent Cloud cluster.
- Confluent Control Center: manages and monitors the deployment. Use it for topic inspection, viewing the schema, viewing and creating ksqlDB queries, streams monitoring, and more.
- ksqlDB: Confluent Cloud ksqlDB running queries on input topics users and pageviews in Confluent Cloud.
- Two Kafka Connect clusters: one cluster connects to the local self-managed cluster and one connects to the Confluent Cloud cluster. Both Connect worker processes themselves are running locally.
- One instance of kafka-connect-datagen: a source connector that produces mock data to prepopulate the topic pageviews locally
- One instance of kafka-connect-datagen: a source connector that produces mock data to prepopulate the topic users in the Confluent Cloud cluster
- Confluent Replicator: copies the topic pageviews from the local cluster to the Confluent Cloud cluster
- Confluent Schema Registry: the example runs with Confluent Cloud Schema Registry, and the Kafka data is written in Avro format.
Note
This is an example environment and has many services running on one host. Do not run this example in production, and do not use Confluent CLI in production. This is meant exclusively to easily demo the Confluent Platform and Confluent Cloud.
Confluent Cloud Promo Code¶
The first 20 users to sign up for Confluent Cloud and use promo code C50INTEG
will receive an additional $50 free usage (details).
Caution¶
This example uses real Confluent Cloud resources. To avoid unexpected charges, carefully evaluate the cost of resources before launching the example and ensure all resources are destroyed after you are done running it.
Prerequisites¶
- An initialized Confluent Cloud cluster
- Local install of Confluent Cloud CLI v1.7.0 or later
- Download Confluent Platform if using the local install (not required for Docker)
- Confluent Platform is supported in various operating systems and software versions (see Supported Versions and Interoperability for details).
This example has been validated with the specific configuration described below.
If you are running the example in Windows, which is not officially supported, the example may still work if you update the example code in GitHub, replacing the symlink
.env
with the contents of config.env.- macOS 10.15.3
- Confluent Platform 6.0.0
- Java 11.0.6 2020-01-14 LTS
- bash version 3.2.57
- jq 1.6
- (Docker-based examples) Docker version 19.03.8
- (Docker-based examples) Docker Compose docker-compose version 1.25.4
Start Example¶
Setup¶
This example creates a new Confluent Cloud environment with required resources to run this example. As a reminder, this example uses real Confluent Cloud resources and you may incur charges.
Clone the confluentinc/examples GitHub repository, and check out the
6.0.0-post
branch..git clone https://github.com/confluentinc/examples cd examples git checkout 6.0.0-post
Change directory to the Confluent Cloud example.
cd ccloud
Run¶
Log in to Confluent Cloud with the command
ccloud login
, and use your Confluent Cloud username and password. The--save
argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the localnetrc
file.ccloud login --save
Start the entire example by running a single command. You have two choices: using Docker Compose or a Confluent Platform local install. This will take several minutes to complete as it creates new resources in Confluent Cloud.
# For Docker Compose ./start-docker.sh
# For Confluent Platform local ./start.sh
As part of this script run, it creates a new Confluent Cloud stack of fully managed resources and generates a local configuration file with all connection information, cluster IDs, and credentials, which is useful for other demos/automation. View this local configuration file, where
SERVICE ACCOUNT ID
is auto-generated by the script.cat stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
Your output should resemble:
# ------------------------------ # Confluent Cloud connection information for demo purposes only # Do not use in production # ------------------------------ # ENVIRONMENT ID: <ENVIRONMENT ID> # SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID> # KAFKA CLUSTER ID: <KAFKA CLUSTER ID> # SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID> # KSQLDB APP ID: <KSQLDB APP ID> # ------------------------------ security.protocol=SASL_SSL sasl.mechanism=PLAIN bootstrap.servers=<BROKER ENDPOINT> sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<API KEY>" password\="<API SECRET>"; basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET> schema.registry.url=https://<SR ENDPOINT> ksql.endpoint=<KSQLDB ENDPOINT> ksql.basic.auth.user.info=<KSQLDB API KEY>:<KSQLDB API SECRET>
Log into the Confluent Cloud UI at http://confluent.cloud .
Use Google Chrome to navigate to Confluent Control Center GUI at http://localhost:9021 .
Playbook¶
Confluent Cloud CLI¶
Validate you can list topics in your cluster.
ccloud kafka topic list
View the ACLs associated to the service account
<SERVICE ACCOUNT ID>
that was created for this example at the start. The resource name corresponds to the respective cluster, Kafka topic name, or consumer group name. Note: in production, you would not use the wildcard*
, this is included just for demo purposes.ccloud kafka acl list --service-account <SERVICE ACCOUNT ID>
For example, if the service account ID were 69995, your output would resemble:
ServiceAccountId | Permission | Operation | Resource | Name | Type +------------------+------------+------------------+----------+-----------------------+----------+ User:69995 | ALLOW | WRITE | TOPIC | _confluent-monitoring | PREFIXED User:69995 | ALLOW | READ | TOPIC | _confluent-monitoring | PREFIXED User:69995 | ALLOW | READ | TOPIC | _confluent-command | PREFIXED User:69995 | ALLOW | WRITE | TOPIC | _confluent-command | PREFIXED User:69995 | ALLOW | READ | TOPIC | _confluent | PREFIXED User:69995 | ALLOW | CREATE | TOPIC | _confluent | PREFIXED User:69995 | ALLOW | WRITE | TOPIC | _confluent | PREFIXED User:69995 | ALLOW | CREATE | GROUP | * | LITERAL User:69995 | ALLOW | WRITE | GROUP | * | LITERAL User:69995 | ALLOW | READ | GROUP | * | LITERAL User:69995 | ALLOW | WRITE | TOPIC | connect-demo-statuses | PREFIXED User:69995 | ALLOW | READ | TOPIC | connect-demo-statuses | PREFIXED User:69995 | ALLOW | READ | TOPIC | connect-demo-offsets | PREFIXED User:69995 | ALLOW | WRITE | TOPIC | connect-demo-offsets | PREFIXED User:69995 | ALLOW | DESCRIBE | TOPIC | pageviews | LITERAL User:69995 | ALLOW | DESCRIBE_CONFIGS | TOPIC | pageviews | LITERAL User:69995 | ALLOW | CREATE | TOPIC | pageviews | LITERAL User:69995 | ALLOW | ALTER_CONFIGS | TOPIC | pageviews | LITERAL User:69995 | ALLOW | READ | TOPIC | pageviews | LITERAL User:69995 | ALLOW | WRITE | TOPIC | pageviews | LITERAL User:69995 | ALLOW | WRITE | TOPIC | users | LITERAL User:69995 | ALLOW | WRITE | TOPIC | * | LITERAL User:69995 | ALLOW | CREATE | TOPIC | * | LITERAL User:69995 | ALLOW | READ | TOPIC | * | LITERAL User:69995 | ALLOW | DESCRIBE | TOPIC | * | LITERAL User:69995 | ALLOW | DESCRIBE_CONFIGS | TOPIC | * | LITERAL User:69995 | ALLOW | READ | GROUP | connect-cloud | LITERAL User:69995 | ALLOW | DESCRIBE | CLUSTER | kafka-cluster | LITERAL User:69995 | ALLOW | CREATE | CLUSTER | kafka-cluster | LITERAL User:69995 | ALLOW | READ | GROUP | connect-replicator | LITERAL User:69995 | ALLOW | WRITE | TOPIC | connect-demo-configs | PREFIXED User:69995 | ALLOW | READ | TOPIC | connect-demo-configs | PREFIXED User:69995 | ALLOW | WRITE | GROUP | _confluent | PREFIXED User:69995 | ALLOW | READ | GROUP | _confluent | PREFIXED User:69995 | ALLOW | CREATE | GROUP | _confluent | PREFIXED
kafka-connect-datagen¶
In the example, view this code which automatically loads the
kafka-connect-datagen
connector for the Kafka topicpageviews
into theconnect-local
cluster, which is later replicated by Replicator into Confluent Cloud (more on Replicator later).{ "name": "datagen-pageviews", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "pageviews", "quickstart": "pageviews", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.basic.auth.credentials.source": "$BASIC_AUTH_CREDENTIALS_SOURCE", "value.converter.schema.registry.basic.auth.user.info": "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO", "value.converter.schema.registry.url": "$SCHEMA_REGISTRY_URL", "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor", "max.interval": 100, "iterations": 1000000000, "tasks.max": "1" } }
In Confluent Control Center, view the data in the
pageviews
topic in the local cluster.In the example, view this code which automatically loads the
kafka-connect-datagen
connector for the Kafka topicusers
into theconnect-cloud
cluster.{ "name": "datagen-users", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "users", "quickstart": "users", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.basic.auth.credentials.source": "$BASIC_AUTH_CREDENTIALS_SOURCE", "value.converter.schema.registry.basic.auth.user.info": "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO", "value.converter.schema.registry.url": "$SCHEMA_REGISTRY_URL", "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor", "max.interval": 1000, "iterations": 1000000000, "tasks.max": "1" } }
In Confluent Control Center, view the data in the
users
topic in Confluent Cloud.
ksqlDB¶
In the example, the Confluent Cloud ksqlDB queries are created from statements.sql (for ksqlDB version 0.10.0) using the REST API in this code with proper credentials.
done # Submit KSQL queries echo -e "\nSubmit KSQL queries\n" properties='"ksql.streams.auto.offset.reset":"earliest","ksql.streams.cache.max.bytes.buffering":"0"' while read ksqlCmd; do echo -e "\n$ksqlCmd\n" response=$(curl -X POST $KSQLDB_ENDPOINT/ksql \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -u $KSQLDB_BASIC_AUTH_USER_INFO \ --silent \ -d @<(cat <<EOF { "ksql": "$ksqlCmd", "streamsProperties": {$properties} } EOF )) echo $response if [[ ! "$response" =~ "SUCCESS" ]]; then echo -e "\nERROR: KSQL command '$ksqlCmd' did not include \"SUCCESS\" in the response. Please troubleshoot." exit 1
From the Confluent Cloud UI, view the ksqlDB application flow.
Click on any stream to view its messages and its schema.
Confluent Replicator¶
Confluent Replicator copies data from a source Kafka cluster to a destination Kafka cluster.
In this example, the source cluster is a local install of a self-managed cluster, and the destination cluster is Confluent Cloud.
Replicator is replicating a Kafka topic pageviews
from the local install to Confluent Cloud, and it is running with Confluent Monitoring Interceptors for Confluent Control Center streams monitoring.
In the example, view this code which automatically loads the Replicator connector into the
connect-cloud
cluster. Notice that Replicator configuration setsconfluent.topic.replication.factor=3
, which is required because the source cluster hasreplication.factor=1
and Confluent Cloud requiresreplication.factor=3
:{ "name": "replicator", "config": { "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector", "topic.whitelist": "pageviews", "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "dest.topic.replication.factor": 3, "dest.kafka.bootstrap.servers": "$BOOTSTRAP_SERVERS", "dest.kafka.security.protocol": "SASL_SSL", "dest.kafka.sasl.mechanism": "PLAIN", "dest.kafka.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG", "confluent.topic.replication.factor": 3, "src.kafka.bootstrap.servers": "kafka:29092", "src.consumer.group.id": "connect-replicator", "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor", "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "$BOOTSTRAP_SERVERS", "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL", "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN", "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG", "src.kafka.timestamps.topic.replication.factor": 1, "src.kafka.timestamps.producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.bootstrap.servers": "$BOOTSTRAP_SERVERS", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG", "tasks.max": "1" } }
Confluent Control Center is configured to manage a locally running connect cluster called
connect-cloud
running on port 8087, which is running thekafka-connect-datagen
(for the Kafka topicusers
) connector and the Replicator connector. From the Confluent Control Center UI, view the connect clusters.Click on replicator to view the Replicator configuration. Notice that it is replicating the topic
pageviews
from the local Kafka cluster to Confluent Cloud.Validate that messages are replicated from the local
pageviews
topic to the Confluent Cloudpageviews
topic. From the Confluent Cloud UI, view messages in this topic.View the Consumer Lag for Replicator from the Confluent Cloud UI. In
Consumers
view, click onconnect-replicator
. Your output should resemble:
Confluent Schema Registry¶
The connectors used in this example are configured to automatically write Avro-formatted data, leveraging the Confluent Cloud Schema Registry.
View all the Schema Registry subjects.
# Confluent Cloud Schema Registry curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
From the Confluent Cloud UI, view the schema for the
pageviews
topic. The topic value is using a Schema registered with Schema Registry (the topic key is just a String).If you need to migrate schemas from on-prem Schema Registry to Confluent Cloud Schema Registry, follow this step-by-step guide. Refer to the file submit_replicator_schema_migration_config.sh for an example of a working Replicator configuration for schema migration.
Confluent Cloud Configurations¶
View the the template delta configuration for Confluent Platform components and clients to connect to Confluent Cloud:
ls template_delta_configs/
Generate the per-component delta configuration parameters, automatically derived from your Confluent Cloud configuration file:
./ccloud-generate-cp-configs.sh
If you ran this example as start-docker.sh, configurations for all the Confluent Platform components are available in the docker-compose.yml file.
# For Docker Compose cat docker-compose.yml
If you ran this example as start.sh which uses Confluent CLI, it saves all configuration files and log files in the respective component subfolders in the current Confluent CLI temp directory (requires example to be actively running):
# For Confluent Platform local install using Confluent CLI ls `confluent local current | tail -1`
Troubleshooting the example¶
If you ran with Docker, then run docker-compose logs | grep ERROR.
To view log files, look in the current Confluent CLI temp directory (requires example to be actively running):
# View all files ls `confluent local current | tail -1` # View log file per service, e.g. for the Kafka broker confluent local services kafka log
Stop Example¶
Stop the example, destroy all resources in Confluent Cloud and local components. As an argument to the script, pass in the path to the local configuration file and substitute
<SERVICE ACCOUNT ID>
to match what was auto-generated when you started the demo.# For Docker Compose ./stop-docker.sh stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
# For Confluent Platform local install using Confluent CLI ./stop.sh stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
Always verify that resources in Confluent Cloud have been destroyed.
Additional Resources¶
- To find additional Confluent Cloud example, see Confluent Cloud Examples.