Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
On-Prem Kafka to Cloud¶
This Confluent Cloud demo showcases a hybrid Kafka cluster: one cluster is a self-managed Kafka cluster running locally, the other is a Confluent Cloud cluster. The use case is “Bridge to Cloud” as customers migrate from on premises to cloud.
Overview¶
The major components of the demo are:
- Two Kafka clusters: one cluster is a self-managed cluster running locally, the other is a Confluent Cloud cluster.
- Confluent Control Center: manages and monitors the deployment. Use it for topic inspection, viewing the schema, viewing and creating ksqlDB queries, streams monitoring, and more.
- ksqlDB: Confluent Cloud ksqlDB running queries on input topics users and pageviews in Confluent Cloud.
- Two Kafka Connect clusters: one cluster connects to the local self-managed cluster and one connects to the Confluent Cloud cluster. Both Connect worker processes themselves are running locally.
- One instance of kafka-connect-datagen: a source connector that produces mock data to prepopulate the topic pageviews locally
- One instance of kafka-connect-datagen: a source connector that produces mock data to prepopulate the topic users in the Confluent Cloud cluster
- Confluent Replicator: copies the topic pageviews from the local cluster to the Confluent Cloud cluster
- Confluent Schema Registry: the demo runs with Confluent Cloud Schema Registry, and the Kafka data is written in Avro format.
Note
This is a demo environment and has many services running on one host. Do not use this demo in production, and do not use Confluent CLI in production. This is meant exclusively to easily demo the Confluent Platform and Confluent Cloud.
Confluent Cloud Promo Code¶
The first 20 users to sign up for Confluent Cloud and use promo code C50INTEG
will receive an additional $50 free usage (details).
Caution¶
This demo uses real Confluent Cloud resources. To avoid unexpected charges, carefully evaluate the cost of resources before launching the demo and ensure all resources are destroyed after you are done running it.
Prerequisites¶
- An initialized Confluent Cloud cluster
- Local install of Confluent Cloud CLI v1.7.0 or later
- Download Confluent Platform if using the local install (not required for Docker)
- Confluent Platform is supported in various operating systems and software versions (see Supported Versions and Interoperability for details).
This example has been validated with the specific configuration described below.
If you are running the example in Windows, which is not officially supported, the example may still work if you update the example code in GitHub, replacing the symlink
.env
with the contents of config.env.- macOS 10.15.3
- Confluent Platform 5.5.15
- Java 11.0.6 2020-01-14 LTS
- bash version 3.2.57
- jq 1.6
- (Docker-based examples) Docker version 19.03.8
- (Docker-based examples) Docker Compose docker-compose version 1.25.4
Run demo¶
Setup¶
This demo creates a new Confluent Cloud environment with required resources to run this demo. As a reminder, this demo uses real Confluent Cloud resources and you may incur charges.
Clone the examples GitHub repository and check out the
5.5.15-post
branch.git clone https://github.com/confluentinc/examples cd examples git checkout 5.5.15-post
Change directory to the Confluent Cloud demo.
cd ccloud
Run¶
Log in to Confluent Cloud with the command
ccloud login
, and use your Confluent Cloud username and password. The--save
argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the localnetrc
file.ccloud login --save
Start the entire demo by running a single command. You have two choices: using Docker Compose or a Confluent Platform local install. This will take several minutes to complete as it creates new resources in Confluent Cloud.
# For Docker Compose ./start-docker.sh
# For Confluent Platform local ./start.sh
As part of this script run, it creates a new Confluent Cloud stack of fully managed resources and generates a local configuration file with all connection information, cluster IDs, and credentials, which is useful for other demos/automation. View this local configuration file, where
SERVICE ACCOUNT ID
is auto-generated by the script.cat stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
Your output should resemble:
# ------------------------------ # Confluent Cloud connection information for demo purposes only # Do not use in production # ------------------------------ # ENVIRONMENT ID: <ENVIRONMENT ID> # SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID> # KAFKA CLUSTER ID: <KAFKA CLUSTER ID> # SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID> # KSQLDB APP ID: <KSQLDB APP ID> # ------------------------------ ssl.endpoint.identification.algorithm=https security.protocol=SASL_SSL sasl.mechanism=PLAIN bootstrap.servers=<BROKER ENDPOINT> sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<API KEY>" password\="<API SECRET>"; basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET> schema.registry.url=https://<SR ENDPOINT> ksql.endpoint=<KSQLDB ENDPOINT> ksql.basic.auth.user.info=<KSQLDB API KEY>:<KSQLDB API SECRET>
Log into the Confluent Cloud UI at http://confluent.cloud .
Use Google Chrome to navigate to Confluent Control Center GUI at http://localhost:9021 .
Playbook¶
Confluent Cloud CLI¶
Validate you can list topics in your cluster.
ccloud kafka topic list
View the ACLs associated to the service account <SERVICE ACCOUNT ID> that was created for this demo at the start. The resource name corresponds to the respective cluster, Kafka topic name, or consumer group name. Note: in production, you would not use the wildcard
*
, this is included just for demo purposes.ccloud kafka acl list --service-account <SERVICE ACCOUNT ID>
For example, if the service account ID were 69995, your output would resemble:
ServiceAccountId | Permission | Operation | Resource | Name | Type +------------------+------------+------------------+----------+-----------------------+----------+ User:69995 | ALLOW | WRITE | TOPIC | _confluent-monitoring | PREFIXED User:69995 | ALLOW | READ | TOPIC | _confluent-monitoring | PREFIXED User:69995 | ALLOW | READ | TOPIC | _confluent-command | PREFIXED User:69995 | ALLOW | WRITE | TOPIC | _confluent-command | PREFIXED User:69995 | ALLOW | READ | TOPIC | _confluent | PREFIXED User:69995 | ALLOW | CREATE | TOPIC | _confluent | PREFIXED User:69995 | ALLOW | WRITE | TOPIC | _confluent | PREFIXED User:69995 | ALLOW | CREATE | GROUP | * | LITERAL User:69995 | ALLOW | WRITE | GROUP | * | LITERAL User:69995 | ALLOW | READ | GROUP | * | LITERAL User:69995 | ALLOW | WRITE | TOPIC | connect-demo-statuses | PREFIXED User:69995 | ALLOW | READ | TOPIC | connect-demo-statuses | PREFIXED User:69995 | ALLOW | READ | TOPIC | connect-demo-offsets | PREFIXED User:69995 | ALLOW | WRITE | TOPIC | connect-demo-offsets | PREFIXED User:69995 | ALLOW | DESCRIBE | TOPIC | pageviews | LITERAL User:69995 | ALLOW | DESCRIBE_CONFIGS | TOPIC | pageviews | LITERAL User:69995 | ALLOW | CREATE | TOPIC | pageviews | LITERAL User:69995 | ALLOW | ALTER_CONFIGS | TOPIC | pageviews | LITERAL User:69995 | ALLOW | READ | TOPIC | pageviews | LITERAL User:69995 | ALLOW | WRITE | TOPIC | pageviews | LITERAL User:69995 | ALLOW | WRITE | TOPIC | users | LITERAL User:69995 | ALLOW | WRITE | TOPIC | * | LITERAL User:69995 | ALLOW | CREATE | TOPIC | * | LITERAL User:69995 | ALLOW | READ | TOPIC | * | LITERAL User:69995 | ALLOW | DESCRIBE | TOPIC | * | LITERAL User:69995 | ALLOW | DESCRIBE_CONFIGS | TOPIC | * | LITERAL User:69995 | ALLOW | READ | GROUP | connect-cloud | LITERAL User:69995 | ALLOW | DESCRIBE | CLUSTER | kafka-cluster | LITERAL User:69995 | ALLOW | CREATE | CLUSTER | kafka-cluster | LITERAL User:69995 | ALLOW | READ | GROUP | connect-replicator | LITERAL User:69995 | ALLOW | WRITE | TOPIC | connect-demo-configs | PREFIXED User:69995 | ALLOW | READ | TOPIC | connect-demo-configs | PREFIXED User:69995 | ALLOW | WRITE | GROUP | _confluent | PREFIXED User:69995 | ALLOW | READ | GROUP | _confluent | PREFIXED User:69995 | ALLOW | CREATE | GROUP | _confluent | PREFIXED
kafka-connect-datagen¶
In the demo, view this code which automatically loads the
kafka-connect-datagen
connector for the Kafka topicpageviews
into theconnect-local
cluster, which is later replicated by Replicator into Confluent Cloud (more on Replicator later).{ "name": "datagen-pageviews", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "pageviews", "quickstart": "pageviews", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.basic.auth.credentials.source": "$BASIC_AUTH_CREDENTIALS_SOURCE", "value.converter.schema.registry.basic.auth.user.info": "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO", "value.converter.schema.registry.url": "$SCHEMA_REGISTRY_URL", "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor", "max.interval": 100, "iterations": 1000000000, "tasks.max": "1" } }
In Confluent Control Center, view the data in the
pageviews
topic in the local cluster.In the demo, view this code which automatically loads the
kafka-connect-datagen
connector for the Kafka topicusers
into theconnect-cloud
cluster.{ "name": "datagen-users", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "users", "quickstart": "users", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.basic.auth.credentials.source": "$BASIC_AUTH_CREDENTIALS_SOURCE", "value.converter.schema.registry.basic.auth.user.info": "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO", "value.converter.schema.registry.url": "$SCHEMA_REGISTRY_URL", "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor", "max.interval": 1000, "iterations": 1000000000, "tasks.max": "1" } }
In Confluent Control Center, view the data in the
users
topic in Confluent Cloud.
ksqlDB¶
In the demo, the Confluent Cloud ksqlDB queries are created from statements.sql (for ksqlDB version 0.10.0) using the REST API in this code with proper credentials.
# Submit KSQL queries echo -e "\nSubmit KSQL queries\n" properties='"ksql.streams.auto.offset.reset":"earliest","ksql.streams.cache.max.bytes.buffering":"0"' while read ksqlCmd; do echo -e "\n$ksqlCmd\n" response=$(curl -X POST $KSQLDB_ENDPOINT/ksql \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -u $KSQLDB_BASIC_AUTH_USER_INFO \ --silent \ -d @<(cat <<EOF { "ksql": "$ksqlCmd", "streamsProperties": {$properties} } EOF )) echo $response if [[ ! "$response" =~ "SUCCESS" ]]; then echo -e "\nERROR: KSQL command '$ksqlCmd' did not include \"SUCCESS\" in the response. Please troubleshoot." exit 1 fi
From the Confluent Cloud UI, view the ksqlDB application flow.
Click on any stream to view its messages and its schema.
Confluent Replicator¶
Confluent Replicator copies data from a source Kafka cluster to a destination Kafka cluster.
In this demo, the source cluster is a local install of a self-managed cluster, and the destination cluster is Confluent Cloud.
Replicator is replicating a Kafka topic pageviews
from the local install to Confluent Cloud, and it is running with Confluent Monitoring Interceptors for Confluent Control Center streams monitoring.
In the demo, view this code which automatically loads the Replicator connector into the
connect-cloud
cluster. Notice that Replicator configuration setsconfluent.topic.replication.factor=3
, which is required because the source cluster hasreplication.factor=1
and Confluent Cloud requiresreplication.factor=3
:{ "name": "replicator", "config": { "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector", "topic.whitelist": "pageviews", "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "dest.topic.replication.factor": 3, "dest.kafka.bootstrap.servers": "$BOOTSTRAP_SERVERS", "dest.kafka.security.protocol": "SASL_SSL", "dest.kafka.sasl.mechanism": "PLAIN", "dest.kafka.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG", "confluent.topic.replication.factor": 3, "src.kafka.bootstrap.servers": "kafka:29092", "src.consumer.group.id": "connect-replicator", "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor", "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "$BOOTSTRAP_SERVERS", "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL", "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN", "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG", "src.kafka.timestamps.topic.replication.factor": 1, "src.kafka.timestamps.producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.bootstrap.servers": "$BOOTSTRAP_SERVERS", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG", "tasks.max": "1" } }
Confluent Control Center is configured to manage a locally running connect cluster called
connect-cloud
running on port 8087, which is running thekafka-connect-datagen
(for the Kafka topicusers
) connector and the Replicator connector. From the Confluent Control Center UI, view the connect clusters.In the demo, view this code to see the
connect-cloud
connect cluster which is connected to Confluent Cloud.CONNECT_STATUS_STORAGE_TOPIC: connect-demo-statuses CONNECT_REPLICATION_FACTOR: 3 CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3 CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter" CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_ROOT_LOGLEVEL: INFO CONNECT_LOG4J_LOGGERS: org.reflections=ERROR # CLASSPATH required due to CC-2422 CLASSPATH: "/etc/kafka-connect/jars/replicator-rest-extension-${CONFLUENT}.jar:/usr/share/java/monitoring-interceptors/monitoring-interceptors-${CONFLUENT}.jar" # Connect worker CONNECT_SECURITY_PROTOCOL: SASL_SSL CONNECT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG CONNECT_SASL_MECHANISM: PLAIN CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS" # Connect embedded producer CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL CONNECT_PRODUCER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG CONNECT_PRODUCER_SASL_MECHANISM: PLAIN CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" CONNECT_REST_EXTENSION_CLASSES: io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN # Connect embedded consumer CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL CONNECT_CONSUMER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG CONNECT_CONSUMER_SASL_MECHANISM: PLAIN CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN # This container is just to transfer Replicator jars to the Connect worker # It is not used as a Connect worker replicator-for-jar-transfer: image: ${REPOSITORY}/cp-enterprise-replicator:${CONFLUENT_DOCKER_TAG} hostname: replicator container_name: replicator volumes: - mi3:/usr/share/java/kafka-connect-replicator/ environment: CONNECT_BOOTSTRAP_SERVERS: localhost:8882 CONNECT_REST_PORT: 8883 CONNECT_GROUP_ID: "connect-replicator" CONNECT_CONFIG_STORAGE_TOPIC: "default.config" CONNECT_OFFSET_STORAGE_TOPIC: "default.offsets" CONNECT_STATUS_STORAGE_TOPIC: "default.status" CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_REST_ADVERTISED_HOST_NAME: "localhost" CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG command: "tail -f /dev/null" volumes:
Click on replicator to view the Replicator configuration. Notice that it is replicating the topic
pageviews
from the local Kafka cluster to Confluent Cloud.Validate that messages are replicated from the local
pageviews
topic to the Confluent Cloudpageviews
topic. From the Confluent Cloud UI, view messages in this topic.View the Consumer Lag for Replicator from the Confluent Cloud UI. In
Consumers
view, click onconnect-replicator
. Your output should resemble:
Confluent Schema Registry¶
The connectors used in this demo are configured to automatically write Avro-formatted data, leveraging the Confluent Cloud Schema Registry.
View all the Schema Registry subjects.
# Confluent Cloud Schema Registry curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
From the Confluent Cloud UI, view the schema for the
pageviews
topic. The topic value is using a Schema registered with Schema Registry (the topic key is just a String).If you need to migrate schemas from on-prem Schema Registry to Confluent Cloud Schema Registry, follow this step-by-step guide. Refer to the file submit_replicator_schema_migration_config.sh for an example of a working Replicator configuration for schema migration.
Confluent Cloud Configurations¶
View the the template delta configuration for Confluent Platform components and clients to connect to Confluent Cloud:
ls template_delta_configs/
Generate the per-component delta configuration parameters, automatically derived from your Confluent Cloud configuration file:
./ccloud-generate-cp-configs.sh
If you ran this demo as start-docker.sh, configurations for all the Confluent Platform components are available in the docker-compose.yml file.
# For Docker Compose cat docker-compose.yml
If you ran this demo as start.sh which uses Confluent CLI, it saves all configuration files and log files in the respective component subfolders in the current Confluent CLI temp directory (requires demo to be actively running):
# For Confluent Platform local install using Confluent CLI ls `confluent local current | tail -1`
Troubleshooting the demo¶
If you ran with Docker, then run docker-compose logs | grep ERROR.
To view log files, look in the current Confluent CLI temp directory (requires demo to be actively running):
# View all files ls `confluent local current | tail -1` # View log file per service, e.g. for the Kafka broker confluent local log kafka
Stop Demo¶
Stop the demo, destroy all resources in Confluent Cloud and local components.
# For Docker Compose ./stop-docker.sh
# For Confluent Platform local install using Confluent CLI ./stop.sh
Always verify that resources in Confluent Cloud have been destroyed.
Additional Resources¶
- To find additional Confluent Cloud demos, see Confluent Cloud Demos Overview.