On-Prem Kafka to Cloud

This Confluent Cloud demo showcases a hybrid Kafka cluster: one cluster is a self-managed Kafka cluster running locally, the other is a Confluent Cloud cluster. The use case is “Bridge to Cloud” as customers migrate from on premises to cloud.

image

Overview

The major components of the demo are:

  • Two Kafka clusters: one cluster is a self-managed cluster running locally, the other is a Confluent Cloud cluster.
  • Confluent Control Center: manages and monitors the deployment. Use it for topic inspection, viewing the schema, viewing and creating ksqlDB queries, streams monitoring, and more.
  • ksqlDB: Confluent Cloud ksqlDB running queries on input topics users and pageviews in Confluent Cloud.
  • Two Kafka Connect clusters: one cluster connects to the local self-managed cluster and one connects to the Confluent Cloud cluster. Both Connect worker processes themselves are running locally.
    • One instance of kafka-connect-datagen: a source connector that produces mock data to prepopulate the topic pageviews locally
    • One instance of kafka-connect-datagen: a source connector that produces mock data to prepopulate the topic users in the Confluent Cloud cluster
    • Confluent Replicator: copies the topic pageviews from the local cluster to the Confluent Cloud cluster
  • Confluent Schema Registry: the demo runs with Confluent Cloud Schema Registry, and the Kafka data is written in Avro format.

Note

This is a demo environment and has many services running on one host. Do not use this demo in production, and do not use Confluent CLI in production. This is meant exclusively to easily demo the Confluent Platform and Confluent Cloud.

Confluent Cloud Promo Code

The first 20 users to sign up for Confluent Cloud and use promo code C50INTEG will receive an additional $50 free usage (details).

Caution

This demo uses real Confluent Cloud resources. To avoid unexpected charges, carefully evaluate the cost of resources before launching the demo and ensure all resources are destroyed after you are done running it.

Prerequisites

  1. The following are prerequisites for the demo:

2. Create a Confluent Cloud configuration file with information on connecting to your Confluent Cloud cluster (see Auto-Generating Configurations for Components to Confluent Cloud for more information). By default, the demo looks for this configuration file at ~/.ccloud/config.

  1. This demo has been validated with:
    • Docker version 17.06.1-ce
    • Docker Compose version 1.14.0 with Docker Compose file format 2.1
    • Java version 1.8.0_162
    • MacOS 10.12

Run demo

Setup

  1. This demo creates a new Confluent Cloud environment with required resources to run this demo. As a reminder, this demo uses real Confluent Cloud resources and you may incur charges.

  2. Clone the examples GitHub repository and check out the 5.5.1-post branch.

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 5.5.1-post
    
  3. Change directory to the Confluent Cloud demo.

    cd ccloud
    

Run

  1. Log in to Confluent Cloud with the command ccloud login, and use your Confluent Cloud username and password. The --save argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the local netrc file.

    ccloud login --save
    
  2. Start the entire demo by running a single command. You have two choices: using Docker Compose or a Confluent Platform local install. This will take several minutes to complete as it creates new resources in Confluent Cloud.

    # For Docker Compose
    ./start-docker.sh
    
    # For Confluent Platform local
    ./start.sh
    
  3. As part of this script run, it creates a new Confluent Cloud stack of fully managed resources and generates a local configuration file with all connection information, cluster IDs, and credentials, which is useful for other demos/automation. View this local configuration file, where SERVICE ACCOUNT ID is auto-generated by the script.

    cat stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
    

    Your output should resemble:

    # ------------------------------
    # Confluent Cloud connection information for demo purposes only
    # Do not use in production
    # ------------------------------
    # ENVIRONMENT ID: <ENVIRONMENT ID>
    # SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID>
    # KAFKA CLUSTER ID: <KAFKA CLUSTER ID>
    # SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID>
    # KSQLDB APP ID: <KSQLDB APP ID>
    # ------------------------------
    ssl.endpoint.identification.algorithm=https
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    bootstrap.servers=<BROKER ENDPOINT>
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<API KEY>" password\="<API SECRET>";
    basic.auth.credentials.source=USER_INFO
    schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
    schema.registry.url=https://<SR ENDPOINT>
    ksql.endpoint=<KSQLDB ENDPOINT>
    ksql.basic.auth.user.info=<KSQLDB API KEY>:<KSQLDB API SECRET>
    
  4. Log into the Confluent Cloud UI at http://confluent.cloud .

  5. Use Google Chrome to navigate to Confluent Control Center GUI at http://localhost:9021 .

Playbook

Confluent Cloud CLI

  1. Validate you can list topics in your cluster.

    ccloud kafka topic list
    
  2. View the ACLs associated to the service account <SERVICE ACCOUNT ID> that was created for this demo at the start. The resource name corresponds to the respective cluster, Kafka topic name, or consumer group name. Note: in production, you would not use the wildcard *, this is included just for demo purposes.

    ccloud kafka acl list --service-account <SERVICE ACCOUNT ID>
    

    For example, if the service account ID were 69995, your output would resemble:

      ServiceAccountId | Permission |    Operation     | Resource |         Name          |   Type
    +------------------+------------+------------------+----------+-----------------------+----------+
      User:69995       | ALLOW      | WRITE            | TOPIC    | _confluent-monitoring | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | _confluent-monitoring | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | _confluent-command    | PREFIXED
      User:69995       | ALLOW      | WRITE            | TOPIC    | _confluent-command    | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | _confluent            | PREFIXED
      User:69995       | ALLOW      | CREATE           | TOPIC    | _confluent            | PREFIXED
      User:69995       | ALLOW      | WRITE            | TOPIC    | _confluent            | PREFIXED
      User:69995       | ALLOW      | CREATE           | GROUP    | *                     | LITERAL
      User:69995       | ALLOW      | WRITE            | GROUP    | *                     | LITERAL
      User:69995       | ALLOW      | READ             | GROUP    | *                     | LITERAL
      User:69995       | ALLOW      | WRITE            | TOPIC    | connect-demo-statuses | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | connect-demo-statuses | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | connect-demo-offsets  | PREFIXED
      User:69995       | ALLOW      | WRITE            | TOPIC    | connect-demo-offsets  | PREFIXED
      User:69995       | ALLOW      | DESCRIBE         | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | DESCRIBE_CONFIGS | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | CREATE           | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | ALTER_CONFIGS    | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | READ             | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | WRITE            | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | WRITE            | TOPIC    | users                 | LITERAL
      User:69995       | ALLOW      | WRITE            | TOPIC    | *                     | LITERAL
      User:69995       | ALLOW      | CREATE           | TOPIC    | *                     | LITERAL
      User:69995       | ALLOW      | READ             | TOPIC    | *                     | LITERAL
      User:69995       | ALLOW      | DESCRIBE         | TOPIC    | *                     | LITERAL
      User:69995       | ALLOW      | DESCRIBE_CONFIGS | TOPIC    | *                     | LITERAL
      User:69995       | ALLOW      | READ             | GROUP    | connect-cloud         | LITERAL
      User:69995       | ALLOW      | DESCRIBE         | CLUSTER  | kafka-cluster         | LITERAL
      User:69995       | ALLOW      | CREATE           | CLUSTER  | kafka-cluster         | LITERAL
      User:69995       | ALLOW      | READ             | GROUP    | connect-replicator    | LITERAL
      User:69995       | ALLOW      | WRITE            | TOPIC    | connect-demo-configs  | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | connect-demo-configs  | PREFIXED
      User:69995       | ALLOW      | WRITE            | GROUP    | _confluent            | PREFIXED
      User:69995       | ALLOW      | READ             | GROUP    | _confluent            | PREFIXED
      User:69995       | ALLOW      | CREATE           | GROUP    | _confluent            | PREFIXED
    

kafka-connect-datagen

  1. In the demo, view this code which automatically loads the kafka-connect-datagen connector for the Kafka topic pageviews into the connect-local cluster, which is later replicated by Replicator into Confluent Cloud (more on Replicator later).

    {
      "name": "datagen-pageviews",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "kafka.topic": "pageviews",
        "quickstart": "pageviews",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.basic.auth.credentials.source": "$BASIC_AUTH_CREDENTIALS_SOURCE",
        "value.converter.schema.registry.basic.auth.user.info": "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO",
        "value.converter.schema.registry.url": "$SCHEMA_REGISTRY_URL",
        "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
        "max.interval": 100,
        "iterations": 1000000000,
        "tasks.max": "1"
      }
    }
    
  2. In Confluent Control Center, view the data in the pageviews topic in the local cluster.

    image
  3. In the demo, view this code which automatically loads the kafka-connect-datagen connector for the Kafka topic users into the connect-cloud cluster.

    {
      "name": "datagen-users",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "kafka.topic": "users",
        "quickstart": "users",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.basic.auth.credentials.source": "$BASIC_AUTH_CREDENTIALS_SOURCE",
        "value.converter.schema.registry.basic.auth.user.info": "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO",
        "value.converter.schema.registry.url": "$SCHEMA_REGISTRY_URL",
        "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
        "max.interval": 1000,
        "iterations": 1000000000,
        "tasks.max": "1"
      }
    }
    
  4. In Confluent Control Center, view the data in the users topic in Confluent Cloud.

    image

ksqlDB

  1. In the demo, the Confluent Cloud ksqlDB queries are created from statements.sql (for ksqlDB version 0.10.0) using the REST API in this code with proper credentials.

    
    # Submit KSQL queries
    echo -e "\nSubmit KSQL queries\n"
    properties='"ksql.streams.auto.offset.reset":"earliest","ksql.streams.cache.max.bytes.buffering":"0"'
    while read ksqlCmd; do
      echo -e "\n$ksqlCmd\n"
      response=$(curl -X POST $KSQLDB_ENDPOINT/ksql \
           -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
           -u $KSQLDB_BASIC_AUTH_USER_INFO \
           --silent \
           -d @<(cat <<EOF
    {
      "ksql": "$ksqlCmd",
      "streamsProperties": {$properties}
    }
    EOF
    ))
      echo $response
      if [[ ! "$response" =~ "SUCCESS" ]]; then
        echo -e "\nERROR: KSQL command '$ksqlCmd' did not include \"SUCCESS\" in the response.  Please troubleshoot."
        exit 1
      fi
    
  2. From the Confluent Cloud UI, view the ksqlDB application flow.

    image
  3. Click on any stream to view its messages and its schema.

    image

Confluent Replicator

Confluent Replicator copies data from a source Kafka cluster to a destination Kafka cluster. In this demo, the source cluster is a local install of a self-managed cluster, and the destination cluster is Confluent Cloud. Replicator is replicating a Kafka topic pageviews from the local install to Confluent Cloud, and it is running with Confluent Monitoring Interceptors for Confluent Control Center streams monitoring.

  1. In the demo, view this code which automatically loads the Replicator connector into the connect-cloud cluster. Notice that Replicator configuration sets confluent.topic.replication.factor=3, which is required because the source cluster has replication.factor=1 and Confluent Cloud requires replication.factor=3:

    {
      "name": "replicator",
      "config": {
        "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
        "topic.whitelist": "pageviews",
        "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
        "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
        "dest.topic.replication.factor": 3,
        "dest.kafka.bootstrap.servers": "$BOOTSTRAP_SERVERS",
        "dest.kafka.security.protocol": "SASL_SSL",
        "dest.kafka.sasl.mechanism": "PLAIN",
        "dest.kafka.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG",
        "confluent.topic.replication.factor": 3,
        "src.kafka.bootstrap.servers": "kafka:29092",
        "src.consumer.group.id": "connect-replicator",
        "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
        "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "$BOOTSTRAP_SERVERS",
        "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL",
        "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN",
        "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG",
        "src.kafka.timestamps.topic.replication.factor": 1,
        "src.kafka.timestamps.producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
        "src.kafka.timestamps.producer.confluent.monitoring.interceptor.bootstrap.servers": "$BOOTSTRAP_SERVERS",
        "src.kafka.timestamps.producer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL",
        "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN",
        "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG",
        "tasks.max": "1"
      }
    }
    
  2. Confluent Control Center is configured to manage a locally running connect cluster called connect-cloud running on port 8087, which is running the kafka-connect-datagen (for the Kafka topic users) connector and the Replicator connector. From the Confluent Control Center UI, view the connect clusters.

    image
  3. In the demo, view this code to see the connect-cloud connect cluster which is connected to Confluent Cloud.

          CONNECT_STATUS_STORAGE_TOPIC: connect-demo-statuses
    
          CONNECT_REPLICATION_FACTOR: 3
          CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
          CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
          CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
    
          CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
          CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
          CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
          CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
          CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
          CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
          CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
    
          CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
          CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
          CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
          # CLASSPATH required due to CC-2422
          CLASSPATH: "/etc/kafka-connect/jars/replicator-rest-extension-${CONFLUENT}.jar:/usr/share/java/monitoring-interceptors/monitoring-interceptors-${CONFLUENT}.jar"
    
          # Connect worker
          CONNECT_SECURITY_PROTOCOL: SASL_SSL
          CONNECT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
          CONNECT_SASL_MECHANISM: PLAIN
          CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
    
          # Connect embedded producer
          CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
          CONNECT_PRODUCER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
          CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
          CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
          CONNECT_REST_EXTENSION_CLASSES: io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
          CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
          CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL
          CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
          CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
    
          # Connect embedded consumer
          CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
          CONNECT_CONSUMER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
          CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
          CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
          CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
          CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL
          CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
          CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
    
      # This container is just to transfer Replicator jars to the Connect worker
      # It is not used as a Connect worker
      replicator-for-jar-transfer:
        image: ${REPOSITORY}/cp-enterprise-replicator:${CONFLUENT_DOCKER_TAG}
        hostname: replicator
        container_name: replicator
        volumes:
          - mi3:/usr/share/java/kafka-connect-replicator/
        environment:
          CONNECT_BOOTSTRAP_SERVERS: localhost:8882
          CONNECT_REST_PORT: 8883
          CONNECT_GROUP_ID: "connect-replicator"
          CONNECT_CONFIG_STORAGE_TOPIC: "default.config"
          CONNECT_OFFSET_STORAGE_TOPIC: "default.offsets"
          CONNECT_STATUS_STORAGE_TOPIC: "default.status"
          CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
          CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
          CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
          CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
        command: "tail -f /dev/null"
    
    volumes:
    
  4. Click on replicator to view the Replicator configuration. Notice that it is replicating the topic pageviews from the local Kafka cluster to Confluent Cloud.

    image
  5. Validate that messages are replicated from the local pageviews topic to the Confluent Cloud pageviews topic. From the Confluent Cloud UI, view messages in this topic.

    image
  6. View the Consumer Lag for Replicator from the Confluent Cloud UI. In Consumers view, click on connect-replicator. Your output should resemble:

    image

Confluent Schema Registry

The connectors used in this demo are configured to automatically write Avro-formatted data, leveraging the Confluent Cloud Schema Registry.

  1. View all the Schema Registry subjects.

    # Confluent Cloud Schema Registry
    curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
    
  2. From the Confluent Cloud UI, view the schema for the pageviews topic. The topic value is using a Schema registered with Schema Registry (the topic key is just a String).

    image
  3. If you need to migrate schemas from on-prem Schema Registry to Confluent Cloud Schema Registry, follow this step-by-step guide. Refer to the file submit_replicator_schema_migration_config.sh for an example of a working Replicator configuration for schema migration.

Confluent Cloud Configurations

  1. View the the template delta configuration for Confluent Platform components and clients to connect to Confluent Cloud:

    ls template_delta_configs/
    
  2. Generate the per-component delta configuration parameters, automatically derived from your Confluent Cloud configuration file:

    ./ccloud-generate-cp-configs.sh
    
  3. If you ran this demo as start-docker.sh, configurations for all the Confluent Platform components are available in the docker-compose.yml file.

    # For Docker Compose
    cat docker-compose.yml
    
  4. If you ran this demo as start.sh which uses Confluent CLI, it saves all configuration files and log files in the respective component subfolders in the current Confluent CLI temp directory (requires demo to be actively running):

    # For Confluent Platform local install using Confluent CLI
    ls `confluent local current | tail -1`
    

Troubleshooting the demo

  1. If you ran with Docker, then run docker-compose logs | grep ERROR.

  2. To view log files, look in the current Confluent CLI temp directory (requires demo to be actively running):

    # View all files
    ls `confluent local current | tail -1`
    
    # View log file per service, e.g. for the Kafka broker
    confluent local log kafka
    

Stop Demo

  1. Stop the demo, destroy all resources in Confluent Cloud and local components.

    # For Docker Compose
    ./stop-docker.sh
    
    # For Confluent Platform local install using Confluent CLI
    ./stop.sh
    
  2. Always verify that resources in Confluent Cloud have been destroyed.

Additional Resources