Module 2: Hybrid Deployment to Confluent Cloud Tutorial

Hybrid Deployment to Confluent Cloud

In a hybrid Apache Kafka® deployment scenario, you can have both an on-prem and Confluent Cloud deployment. This part of the tutorial runs Replicator to send Kafka data to Confluent Cloud, and uses a common method, the Metrics API, for collecting metrics for both.

image

Run this part of the tutorial only after you have completed the cp-demo initial bring-up, because the initial bring-up deploys the on-prem cluster. The steps in this section bring up the Confluent Cloud instance and interconnects it to your on-prem cluster.

Cost to Run

Caution

Any Confluent Cloud example uses real Confluent Cloud resources that may be billable. An example may create a new Confluent Cloud environment, Kafka cluster, topics, ACLs, and service accounts, as well as resources that have hourly charges like connectors and ksqlDB applications. To avoid unexpected charges, carefully evaluate the cost of resources before you start. After you are done running a Confluent Cloud example, destroy all Confluent Cloud resources to avoid accruing hourly charges for services and verify that they have been deleted.

Confluent Cloud Promo Code

To receive an additional $50 free usage in Confluent Cloud, enter promo code CPDEMO50 in the Confluent Cloud UI Billing and payment section (details). This promo code should sufficiently cover up to one day of running this Confluent Cloud example, beyond which you may be billed for the services that have an hourly charge until you destroy the Confluent Cloud resources created by this example.

Setup Confluent Cloud and CLI

  1. Create a Confluent Cloud account at https://confluent.cloud.

  2. Setup a payment method for your Confluent Cloud account and optionally enter the promo code CPDEMO50 in the Confluent Cloud UI Billing and payment section to receive an additional $50 free usage.

  3. Install Confluent CLI v2.2.0 or later. Do not confuse this Confluent CLI binary v2 that is used to manage Confluent Cloud with the Confluent CLI binary v1 that is used to manage Confluent Platform 7.0.14. See documentation for more information on the CLI migration and running the CLIs in parallel.

  4. Using the CLI, log in to Confluent Cloud with the command confluent login, and use your Confluent Cloud username and password. The --save argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the local netrc file.

    confluent login --save
    
  5. The remainder of the Confluent Cloud portion of this tutorial must be completed sequentially. We recommend that you manually complete all the steps in the following sections. However, you may also run the script scripts/ccloud/create-ccloud-workflow.sh which automates those steps. This option is recommended for users who have run this tutorial before and want to quickly bring it up.

    ./scripts/ccloud/create-ccloud-workflow.sh
    

ccloud-stack

Use the the ccloud-stack utility for Confluent Cloud for a quick, automated way to create resources in Confluent Cloud. Executed with a single command, it uses the Confluent Cloud CLI to:

  • Create a new environment.
  • Create a new service account.
  • Create a new Kafka cluster and associated credentials.
  • Enable Confluent Cloud Schema Registry and associated credentials.
  • Create ACLs with a wildcard for the service account.
  • Create a new ksqlDB app and associated credentials
  • Generate a local configuration file with all above connection information.
  1. Get a bash library of useful functions for interacting with Confluent Cloud (one of which is cloud-stack). This library is community-supported and not supported by Confluent.

    curl -sS -o ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
    
  2. Using ccloud_library.sh which you downloaded in the previous step, create a new ccloud-stack (see the ccloud-stack utility for Confluent Cloud for advanced options). It creates real resources in Confluent Cloud and takes a few minutes to complete.

    Note

    The true flag adds creation of a ksqlDB application in Confluent Cloud, which has hourly charges even if you are not actively using it.

    source ./ccloud_library.sh
    export EXAMPLE="cp-demo" && ccloud::create_ccloud_stack true
    
  3. When ccloud-stack completes, view the local configuration file at stack-configs/java-service-account-<SERVICE_ACCOUNT_ID>.config that was auto-generated. It contains connection information for connecting to your newly created Confluent Cloud environment.

    cat stack-configs/java-service-account-*.config
    
  4. In the current shell, set the environment variable SERVICE_ACCOUNT_ID to the <SERVICE_ACCOUNT_ID> in the filename. For example, if the filename is called stack-configs/java-service-account-154143.config, then set SERVICE_ACCOUNT_ID=154143. This environment variable is used later in the tutorial.

    SERVICE_ACCOUNT_ID=<fill in>
    
  5. The Replicator configuration file has parameters that specify how to connect to Confluent Cloud. You could set these parameters manually, but to do this in an automated fashion, use another function to set env parameters customized for the Confluent Cloud instance created above. It reads your local Confluent Cloud configuration file, i.e., the auto-generated stack-configs/java-service-account-<SERVICE_ACCOUNT_ID>.config, and creates files useful for Confluent Platform components and clients connecting to Confluent Cloud. Using ccloud_library.sh which you downloaded in an earlier step, run the generate_configs function against your auto-generated configuration file (the file created by ccloud-stack).

    ccloud::generate_configs stack-configs/java-service-account-$SERVICE_ACCOUNT_ID.config
    
  6. The output of the script is a folder called delta_configs with sample configurations for all components and clients, which you can easily apply to any Kafka client or Confluent Platform component. View the delta_configs/env.delta file.

    cat delta_configs/env.delta
    
  7. Source the delta_configs/env.delta file into your environment. These environment variables will be used in a few sections when you run Replicator to copy data from your on-prem cluster to your Confluent Cloud cluster.

    source delta_configs/env.delta
    

Telemetry Reporter

Enable Confluent Telemetry Reporter on the on-prem cluster, and configure it to send metrics to the Confluent Cloud instance created above..

  1. Create a new Cloud API key and secret to authenticate to Confluent Cloud. These credentials will be used to configure the Telemetry Reporter and used by the Metrics API.

    confluent api-key create --resource cloud -o json
    
  2. Verify your output resembles:

    {
       "key": "QX7X4VA4DFJTTOIA",
       "secret": "fjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D"
    }
    

    The value of the API key, in this case QX7X4VA4DFJTTOIA, and API secret, in this case fjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D, will differ in your output.

  3. Set parameters to reference these credentials returned in the previous step.

    METRICS_API_KEY='QX7X4VA4DFJTTOIA'
    METRICS_API_SECRET='fjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D'
    
  4. Dynamically configure the cp-demo cluster to use the Telemetry Reporter, which sends metrics to Confluent Cloud. This requires setting 3 configuration parameters: confluent.telemetry.enabled=true, confluent.telemetry.api.key, and confluent.telemetry.api.secret.

    docker-compose exec kafka1 kafka-configs \
      --bootstrap-server kafka1:12091 \
      --alter \
      --entity-type brokers \
      --entity-default \
      --add-config confluent.telemetry.enabled=true,confluent.telemetry.api.key=${METRICS_API_KEY},confluent.telemetry.api.secret=${METRICS_API_SECRET}
    
  5. Check the broker logs to verify the brokers were dynamically configured.

    docker-compose logs kafka1 | grep confluent.telemetry.api
    

    Your output should resemble the following, but the confluent.telemetry.api.key value will be different in your environment.

    ...
    kafka1            |       confluent.telemetry.api.key = QX7X4VA4DFJTTOIA
    kafka1            |       confluent.telemetry.api.secret = [hidden]
    ...
    
  6. Log into Confluent Cloud UI and verify you see this cluster dashboard in the Hosted monitoring section under Confluent Platform.

    ../../../_images/hosted-monitoring.png

Replicator to Confluent Cloud

Deploy Replicator to copy data from the on-prem cluster to the Kafka cluster running in Confluent Cloud. It is configured to copy from the Kafka topic wikipedia.parsed (on-prem) to the cloud topic wikipedia.parsed.ccloud.replica in Confluent Cloud. The Replicator instance is running on the existing Connect worker in the on-prem cluster.

  1. If you have been running cp-demo for a long time, you may need to refresh your local token to log back into MDS:

    ./scripts/helper/refresh_mds_login.sh
    
  2. Create a role binding to permit a new instance of Replicator to be submitted to the local connect cluster with id connect-cluster.

    Get the Kafka cluster ID:

    KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert scripts/security/snakeoil-ca-1.crt | jq -r ".id")
    

    Create the role bindings:

    docker-compose exec tools bash -c "confluent-v1 iam rolebinding create \
        --principal User:connectorSubmitter \
        --role ResourceOwner \
        --resource Connector:replicate-topic-to-ccloud \
        --kafka-cluster-id $KAFKA_CLUSTER_ID \
        --connect-cluster-id connect-cluster"
    
  3. View the Replicator configuration file. Note that it uses the local connect cluster (the origin site), so the Replicator configuration has overrides for the producer. The configuration parameters that use variables are read from the environment variables you sourced in an earlier step.

  4. Submit the Replicator connector to the local connect cluster.

    ./scripts/connectors/submit_replicator_to_ccloud_config.sh
    
  5. It takes about 1 minute to show up in the Connectors view in Confluent Control Center. When it does, verify Replicator to Confluent Cloud has started properly, and there are now 4 connectors:

    ../../../_images/connectors-with-rep-to-ccloud.png
  6. Log into Confluent Cloud UI and verify you see the topic wikipedia.parsed.ccloud.replica and its messages.

  7. View the schema for this topic that is already registered in Confluent Cloud Schema Registry. In cp-demo, in the Replicator configuration file, value.converter is configured to use io.confluent.connect.avro.AvroConverter, therefore it automatically registers new schemas, as needed, while copying data. The schema ID in the on-prem Schema Registry will not match the schema ID in the Confluent Cloud Schema Registry. (See documentation for other schema migration options)

    ../../../_images/ccloud-schema.png

Metrics API

You can use the Metrics API to get data for both the on-prem cluster as well as the Confluent Cloud cluster. The Metrics API provides a queryable HTTP API in which you can post a query to get a time series of metrics. It can be used for observing both:

../../../_images/metrics-api.jpg
  1. To define the time interval when querying the Metrics API, get the current time minus 1 hour and current time plus 1 hour. The date utility varies between operating systems, so use the tools Docker container to get consistent and reliable dates.

    CURRENT_TIME_MINUS_1HR=$(docker-compose exec tools date -Is -d '-1 hour' | tr -d '\r')
    CURRENT_TIME_PLUS_1HR=$(docker-compose exec tools date -Is -d '+1 hour' | tr -d '\r')
    
  2. For the on-prem metrics: view the metrics query file, which requests io.confluent.kafka.server/received_bytes for the topic wikipedia.parsed in the on-prem cluster (for all queryable metrics examples, see Metrics API).

    {
      "aggregations": [
          {
              "agg": "SUM",
              "metric": "io.confluent.kafka.server/received_bytes"
          }
      ],
      "filter": {
          "filters": [
              {
                   "field": "metric.topic",
                   "op": "EQ",
                   "value": "wikipedia.parsed"
              }
          ],
          "op": "AND"
      },
      "intervals": ["${CURRENT_TIME_MINUS_1HR}/${CURRENT_TIME_PLUS_1HR}"],
      "granularity": "PT1M",
      "group_by": [
          "metric.topic"
      ],
      "limit": 5
    }
    
  3. Substitute values into the query json file. For this substitution to work, you must have set the following parameters in your environment:

    • CURRENT_TIME_MINUS_1HR
    • CURRENT_TIME_PLUS_1HR
    DATA=$(eval "cat <<EOF
    $(<./scripts/ccloud/metrics_query_onprem.json)
    EOF
    ")
    
    # View this parameter
    echo $DATA
    
  4. Send this query to the Metrics API endpoint at https://api.telemetry.confluent.cloud/v2/metrics/hosted-monitoring/query. For this query to work, you must have set the following parameters in your environment:

    • METRICS_API_KEY
    • METRICS_API_SECRET
    curl -s -u ${METRICS_API_KEY}:${METRICS_API_SECRET} \
         --header 'content-type: application/json' \
         --data "${DATA}" \
         https://api.telemetry.confluent.cloud/v2/metrics/hosted-monitoring/query \
            | jq .
    
  5. Your output should resemble the output below, showing metrics for the on-prem topic wikipedia.parsed:

    {
      "data": [
        {
          "timestamp": "2020-12-14T20:52:00Z",
          "value": 1744066,
          "metric.topic": "wikipedia.parsed"
        },
        {
          "timestamp": "2020-12-14T20:53:00Z",
          "value": 1847596,
          "metric.topic": "wikipedia.parsed"
        }
      ]
    }
    
  6. For the Confluent Cloud metrics: view the metrics query file, which requests io.confluent.kafka.server/received_bytes for the topic wikipedia.parsed.ccloud.replica in Confluent Cloud (for all queryable metrics examples, see Metrics API).

    {
      "aggregations": [
          {
              "agg": "SUM",
              "metric": "io.confluent.kafka.server/received_bytes"
          }
      ],
      "filter": {
          "filters": [
              {
                   "field": "metric.topic",
                   "op": "EQ",
                   "value": "wikipedia.parsed.ccloud.replica"
              },
              {
                  "field": "resource.kafka.id",
                  "op": "EQ",
                  "value": "${CCLOUD_CLUSTER_ID}"
              }
          ],
          "op": "AND"
      },
      "intervals": ["${CURRENT_TIME_MINUS_1HR}/${CURRENT_TIME_PLUS_1HR}"],
      "granularity": "PT1H",
      "group_by": [
          "metric.topic"
      ],
      "limit": 5
    }
    
  7. Get the Kafka cluster ID in Confluent Cloud, derived from the $SERVICE_ACCOUNT_ID.

    CCLOUD_CLUSTER_ID=$(confluent kafka cluster list -o json | jq -c -r '.[] | select (.name == "'"demo-kafka-cluster-${SERVICE_ACCOUNT_ID}"'")' | jq -r .id)
    
  8. Substitute values into the query json file. For this substitution to work, you must have set the following parameters in your environment:

    • CURRENT_TIME_MINUS_1HR
    • CURRENT_TIME_PLUS_1HR
    • CCLOUD_CLUSTER_ID
    DATA=$(eval "cat <<EOF
    $(<./scripts/ccloud/metrics_query_ccloud.json)
    EOF
    ")
    
    # View this parameter
    echo $DATA
    
  9. Send this query to the Metrics API endpoint at https://api.telemetry.confluent.cloud/v2/metrics/cloud/query. For this query to work, you must have set the following parameters in your environment:

    • METRICS_API_KEY
    • METRICS_API_SECRET
    curl -s -u ${METRICS_API_KEY}:${METRICS_API_SECRET} \
         --header 'content-type: application/json' \
         --data "${DATA}" \
         https://api.telemetry.confluent.cloud/v2/metrics/cloud/query \
            | jq .
    
  10. Your output should resemble the output below, showing metrics for the Confluent Cloud topic wikipedia.parsed.ccloud.replica:

    {
      "data": [
        {
          "timestamp": "2020-12-14T20:00:00Z",
          "value": 1690522,
          "metric.topic": "wikipedia.parsed.ccloud.replica"
        }
      ]
    }
    

Confluent Cloud ksqlDB

This section shows how to create queries in the Confluent Cloud ksqlDB application that processes data from the wikipedia.parsed.ccloud.replica topic that Replicator copied from the on-prem cluster. You must have completed ccloud-stack before proceeding.

  1. Get the Confluent Cloud ksqlDB application ID and save it to the parameter ksqlDBAppId.

    ksqlDBAppId=$(confluent ksql app list | grep "$KSQLDB_ENDPOINT" | awk '{print $1}')
    
  2. Verify the Confluent Cloud ksqlDB application has transitioned from PROVISIONING to UP state. This may take a few minutes.

    confluent ksql app describe $ksqlDBAppId -o json
    
  3. Configure ksqlDB ACLs to permit the ksqlDB application to read from wikipedia.parsed.ccloud.replica.

    confluent ksql app configure-acls $ksqlDBAppId wikipedia.parsed.ccloud.replica
    
  4. Create new ksqlDB queries in Confluent Cloud from the scripts/ccloud/statements.sql file. Note: depending on which folder you are in, you may need to modify the relative path to the statements.sql file.

    while read ksqlCmd; do
      echo -e "\n$ksqlCmd\n"
      curl -X POST $KSQLDB_ENDPOINT/ksql \
           -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
           -u $KSQLDB_BASIC_AUTH_USER_INFO \
           --silent \
           -d @<(cat <<EOF
    {
      "ksql": "$ksqlCmd",
      "streamsProperties": {}
    }
    EOF
    )
    done <scripts/ccloud/statements.sql
    
  5. Log into Confluent Cloud UI and view the ksqlDB application Flow.

    ../../../_images/ccloud_ksqldb_flow.png
  6. View the events in the ksqlDB streams in Confluent Cloud.

    ../../../_images/ccloud_ksqldb_stream.png
  7. Go to Module 2: Confluent Cloud and destroy the demo resources used. Important: The ksqlDB application in Confluent Cloud has hourly charges even if you are not actively using it.

Cleanup

Any Confluent Cloud example uses real Confluent Cloud resources. After you are done running a Confluent Cloud example, manually verify that all Confluent Cloud resources are destroyed to avoid unexpected charges.

Follow the clean up procedure in Module 2: Confluent Cloud to avoid unexpected Confluent Cloud charges.