On-Prem Kafka to Cloud

This Confluent Cloud demo showcases a hybrid Kafka cluster: one cluster is a self-managed Kafka cluster running locally, the other is a Confluent Cloud cluster. The use case is “Bridge to Cloud” as customers migrate from on premises to cloud.

image

Overview

The major components of the demo are:

  • Two Kafka clusters: one cluster is a self-managed cluster running locally, the other is a Confluent Cloud cluster.
  • Confluent Control Center: manages and monitors the deployment. Use it for topic inspection, viewing the schema, viewing and creating KSQL queries, streams monitoring, and more.
  • KSQL: Confluent Cloud KSQL running queries on input topics users and pageviews in Confluent Cloud.
  • Two Kafka Connect clusters: one cluster connects to the local self-managed cluster and one connects to the Confluent Cloud cluster. Both Connect worker processes themselves are running locally.
    • One instance of kafka-connect-datagen: a source connector that produces mock data to prepopulate the topic pageviews locally
    • One instance of kafka-connect-datagen: a source connector that produces mock data to prepopulate the topic users in the Confluent Cloud cluster
    • Confluent Replicator: copies the topic pageviews from the local cluster to the Confluent Cloud cluster
  • Confluent Schema Registry: the demo runs with Confluent Cloud Schema Registry, and the Kafka data is written in Avro format.

Note

This is a demo environment and has many services running on one host. Do not use this demo in production, and do not use Confluent CLI in production. This is meant exclusively to easily demo the Confluent Platform and Confluent Cloud.

Warning

This demo uses real Confluent Cloud resources. To avoid unexpected charges, carefully evaluate the cost of resources before launching the demo and ensure all resources are destroyed after you are done running it.

Prerequisites

  1. The following are prerequisites for the demo:

2. Create a Confluent Cloud configuration file with information on connecting to your Confluent Cloud cluster (see Auto-Generating Configurations for Components to Confluent Cloud for more information). By default, the demo looks for this configuration file at ~/.ccloud/config.

  1. This demo has been validated with:
  • Docker version 17.06.1-ce
  • Docker Compose version 1.14.0 with Docker Compose file format 2.1
  • Java version 1.8.0_162
  • MacOS 10.12

Run demo

Setup

  1. By default, the demo reads the configuration parameters for your Confluent Cloud environment from a file at $HOME/.ccloud/config. You can change this filename via the parameter CONFIG_FILE in config/demo.cfg. Enter the configuration parameters for your Confluent Cloud cluster, replacing the values in <...> below particular for your Confluent Cloud environment:

    $ cat $HOME/.ccloud/config
    bootstrap.servers=<BROKER ENDPOINT>
    ssl.endpoint.identification.algorithm=https
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<API KEY>" password\="<API SECRET>";
    schema.registry.url=https://<SR ENDPOINT>
    schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
    basic.auth.credentials.source=USER_INFO
    ksql.endpoint=https://<KSQL ENDPOINT>
    ksql.basic.auth.user.info=<KSQL API KEY>:<KSQL API SECRET>
    

    To retrieve the values for the endpoints and credentials in the file above, find them using either the Confluent Cloud UI or Confluent Cloud CLI commands. If you have multiple Confluent Cloud clusters, make sure to use the one with the associated KSQL cluster. The commands below demonstrate how to retrieve the values using the Confluent Cloud CLI.

    # Login
    ccloud login --url https://confluent.cloud
    
    # BROKER ENDPOINT
    ccloud kafka cluster list
    ccloud kafka cluster use
    ccloud kafka cluster describe
    
    # SR ENDPOINT
    ccloud schema-registry cluster describe
    
    # KSQL ENDPOINT
    ccloud ksql app list
    
    # Credentials: API key and secret, one for each resource above
    ccloud api-key create
    
  2. Clone the examples GitHub repository and check out the 5.4.2-post branch.

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 5.4.2-post
    
  3. Change directory to the Confluent Cloud demo.

    $ cd ccloud
    

Run

  1. Log in to Confluent Cloud with the command ccloud login, and use your Confluent Cloud username and password.

    ccloud login --url https://confluent.cloud
    
  2. Start the entire demo by running a single command. You have two choices: using a Confluent Platform local install or Docker Compose. This will take less than 5 minutes to complete.

    # For Confluent Platform local install using Confluent CLI
    $ ./start.sh
    
    # For Docker Compose
    $ ./start-docker.sh
    
  3. Log into the Confluent Cloud UI at http://confluent.cloud . Use Google Chrome to view the Confluent Control Center GUI at http://localhost:9021 .

Playbook

Confluent Cloud

  1. Validate you can list topics in your cluster.

    ccloud kafka topic list
    
  2. Get familiar with the Confluent Cloud CLI. For example, create a new topic called test, produce some messages to that topic, and then consume from that topic.

    ccloud kafka topic create test
    ccloud kafka topic produce test
    ccloud kafka topic consume test -b
    

Confluent Control Center

  1. Monitoring –> Data Streams –> Message Delivery: hover over any chart to see number of messages and average latency within a minute time interval.

    image
  2. Management –> Kafka Connect: Confluent Control Center uses the Kafka Connect API to manage Kafka connectors, and more specifically for this demo, Confluent Replicator.

    • Kafka Connect Sources tab shows the connector replicator. Click Edit to see the details of the connector configuration.

      image
  3. Management –> Topics –> Topic Information: For a given topic, click on the three dots ... next to the topic name to see more options per topic including in sync replicas, schema, topic messages, and configuration settings. Shown below is replica info.

    image

Note

There will not be any details on the Confluent Control Center System Health pages about brokers or topics because Confluent Cloud does not provide the Confluent Metrics Reporter instrumentation outside of the Confluent Cloud. Therefore, you should expect to see the following graphic on the System Health page.

image

KSQL

  1. At the Confluent Cloud KSQL prompt, view the configured KSQL properties that were set with the KSQL server configuration file shown earlier.

    ksql> SHOW PROPERTIES;
    
  2. View the existing KSQL streams and describe one of those streams called PAGEVIEWS_FEMALE_LIKE_89.

    ksql> SHOW STREAMS;
    
     Stream Name              | Kafka Topic              | Format
    --------------------------------------------------------------
     PAGEVIEWS_ORIGINAL       | pageviews                | AVRO
     PAGEVIEWS_FEMALE         | PAGEVIEWS_FEMALE         | AVRO
     PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | AVRO
    --------------------------------------------------------------
    
    
    ksql> DESCRIBE PAGEVIEWS_FEMALE_LIKE_89;
    
     Field    | Type
    --------------------------------------
     ROWTIME  | BIGINT           (system)
     ROWKEY   | VARCHAR(STRING)  (system)
     USERID   | VARCHAR(STRING)  (key)
     PAGEID   | VARCHAR(STRING)
     REGIONID | VARCHAR(STRING)
     GENDER   | VARCHAR(STRING)
    --------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    
  3. View the existing KSQL tables and describe one of those tables called PAGEVIEWS_REGIONS.

    ksql> SHOW TABLES;
    
     Table Name        | Kafka Topic       | Format | Windowed
    -----------------------------------------------------------
     PAGEVIEWS_REGIONS | PAGEVIEWS_REGIONS | AVRO   | true
     USERS_ORIGINAL    | users             | AVRO   | false
    -----------------------------------------------------------
    
    
    ksql> DESCRIBE PAGEVIEWS_REGIONS;
    
     Field    | Type
    --------------------------------------
     ROWTIME  | BIGINT           (system)
     ROWKEY   | VARCHAR(STRING)  (system)
     GENDER   | VARCHAR(STRING)  (key)
     REGIONID | VARCHAR(STRING)  (key)
     NUMUSERS | BIGINT
    --------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    
  4. View the existing KSQL queries, which are continuously running, and explain one of those queries called CSAS_PAGEVIEWS_FEMALE_LIKE_89.

    ksql> SHOW QUERIES;
    
     Query ID                      | Kafka Topic              | Query String
    ----------------------------------------------------------------------------------------------------------
     CTAS_PAGEVIEWS_REGIONS        | PAGEVIEWS_REGIONS        | CREATE TABLE pageviews_regions WITH (value_format='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;
     CSAS_PAGEVIEWS_FEMALE         | PAGEVIEWS_FEMALE         | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
     CSAS_PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
    ----------------------------------------------------------------------------------------------------------
    
    
    
    ksql> EXPLAIN CSAS_PAGEVIEWS_FEMALE_LIKE_89;
    
    Type                 : QUERY
    SQL                  : CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
    
    
    Local runtime statistics
    ------------------------
    messages-per-sec:         0   total-messages:        43     last-message: 4/23/18 10:28:29 AM EDT
     failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
    (Statistics of the local KSQL server interaction with the Kafka topic pageviews_enriched_r8_r9)
    
  5. At the KSQL prompt, view three messages from different KSQL streams and tables.

    ksql> SELECT * FROM PAGEVIEWS_FEMALE_LIKE_89 LIMIT 3;
    ksql> SELECT * FROM USERS_ORIGINAL LIMIT 3;
    
  6. In this demo, KSQL is run with Confluent Monitoring Interceptors configured which enables Confluent Control Center Data Streams to monitor KSQL queries. The consumer group names _confluent-ksql-default_query_ correlate to the KSQL query names shown above, and Confluent Control Center is showing the records that are incoming to each query.

For example, view throughput and latency of the incoming records for the persistent KSQL “Create Stream As Select” query CSAS_PAGEVIEWS_FEMALE, which is displayed as _confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE in Confluent Control Center.

image

Confluent Replicator

Confluent Replicator copies data from a source Kafka cluster to a destination Kafka cluster. In this demo, the source cluster is a local install that represents a self-managed cluster, and the destination cluster is Confluent Cloud.

  1. View the Confluent Replicator configuration.

    # For Confluent Platform local install using Confluent CLI
    $ cat connectors/submit_replicator_config.sh
    
    # For Docker Compose
    $ cat connectors/submit_replicator_docker_config.sh
    
  2. View topic pageviews in the local cluster

    $ ccloud kafka topic describe test
    Topic: test PartitionCount: 6 ReplicationFactor: 3
      Topic | Partition | Leader | Replicas |   ISR
    +-------+-----------+--------+----------+---------+
      test  |         0 |      3 | [3 4 0]  | [3 4 0]
      test  |         1 |      6 | [6 3 7]  | [6 3 7]
      test  |         2 |      7 | [7 8 6]  | [7 8 6]
      test  |         3 |      1 | [1 2 3]  | [1 2 3]
      test  |         4 |      8 | [8 5 1]  | [8 5 1]
      test  |         5 |      0 | [0 1 4]  | [0 1 4]
    
    Configuration
    
                       Name                   |        Value
    +-----------------------------------------+---------------------+
      compression.type                        | producer
      leader.replication.throttled.replicas   |
      message.downconversion.enable           | true
      min.insync.replicas                     |                   2
      segment.jitter.ms                       |                   0
      cleanup.policy                          | delete
      flush.ms                                | 9223372036854775807
      follower.replication.throttled.replicas |
      segment.bytes                           |          1073741824
      retention.ms                            |           604800000
      flush.messages                          | 9223372036854775807
      message.format.version                  | 2.3-IV1
      file.delete.delay.ms                    |               60000
      max.compaction.lag.ms                   | 9223372036854775807
      max.message.bytes                       |             2097164
      min.compaction.lag.ms                   |                   0
      message.timestamp.type                  | CreateTime
      preallocate                             | false
      index.interval.bytes                    |                4096
      min.cleanable.dirty.ratio               |                 0.5
      unclean.leader.election.enable          | false
      delete.retention.ms                     |            86400000
      retention.bytes                         |                  -1
      segment.ms                              |           604800000
      message.timestamp.difference.max.ms     | 9223372036854775807
      segment.index.bytes                     |            10485760
    
  3. View the replicated topics pageviews in the Confluent Cloud cluster. In Confluent Control Center, for a given topic listed in Management –> Topics, click on the three dots ... next to the topic name to see more options per topic including in sync replicas, schema, topic messages, and configuration settings. Shown below is replica info.

    image
  4. You can manage Confluent Replicator in the Management –> Kafka Connect page. The Sources tab shows the connector replicator. Click Edit to see the details of the connector configuration.

    image

Confluent Schema Registry

The connectors used in this demo are configured to automatically write Avro-formatted data, leveraging the Confluent Cloud Schema Registry.

  1. View all the Schema Registry subjects.

    # Confluent Cloud Schema Registry
    $ curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
    
  2. From Confluent Control Center, under MANAGEMENT –> Topics -> Schema: view the schema for pageviews and users. The topic value is using a Schema registered with Schema Registry (the topic key is just a String).

    image
  3. From Confluent Control Center, view the KSQL streams which are configured for Avro format.

    image
  4. To migrate schemas from on-prem Schema Registry to Confluent Cloud Schema Registry, follow this step-by-step guide. Refer to the file submit_replicator_schema_migration_config.sh for an example of a working Replicator configuration for schema migration.

Confluent Cloud Configurations

  1. View the the template delta configuration for Confluent Platform components and clients to connect to Confluent Cloud:

    $ ls template_delta_configs/
    
  2. View your Confluent Cloud configuration file

    $ cat $HOME/.ccloud/config
    
  3. Generate the per-component delta configuration parameters, automatically derived from your Confluent Cloud configuration file:

    $ ./ccloud-generate-cp-configs.sh
    
  4. If you ran this demo as start.sh which uses Confluent CLI, it saves all configuration files and log files in the respective component subfolders in the current Confluent CLI temp directory (requires demo to be actively running):

    # For Confluent Platform local install using Confluent CLI
    $ ls `confluent local current | tail -1`
    
  5. If you ran this demo as start-docker.sh, the configuration is available in the docker-compose.yml file.

    # For Docker Compose
    $ cat docker-compose.yml
    

Troubleshooting the demo

  1. If you can’t run the demo due to error messages such as “‘ccloud’ is not found” or “‘ccloud’ is not initialized”, validate that you have access to an initialized, working Confluent Cloud cluster and you have locally installed Confluent Cloud CLI.

  2. To view log files, look in the current Confluent CLI temp directory (requires demo to be actively running):

    # View all files
    $ ls `confluent local current | tail -1`
    
    # View log file per service, e.g. for the Kafka broker
    $ confluent local log kafka
    
  3. If you ran with Docker, then run docker-compose logs | grep ERROR.

Teardown

  1. Stop the demo, destroy all local components.

    # For Confluent Platform local install using Confluent CLI
    $ ./stop.sh
    
    # For Docker Compose
    $ ./stop-docker.sh
    
  2. Delete all Confluent Platform topics in CCloud that this demo used, including topics used for Confluent Control Center, Kafka Connect, KSQL, and Confluent Schema Registry. Warning: this may have unintended consequence of deleting topics that you wanted to keep.

    $ ./ccloud-delete-all-topics.sh