Module 2: Hybrid Deployment to Confluent Cloud Tutorial¶
Hybrid Deployment to Confluent Cloud¶
In a hybrid Apache Kafka® deployment scenario, you can have both an on-prem and Confluent Cloud deployment. This part of the tutorial runs Replicator to send Kafka data to Confluent Cloud, and uses a common method, the Metrics API, for collecting metrics for both.
Run this part of the tutorial only after you have completed the cp-demo initial bring-up, because the initial bring-up deploys the on-prem cluster. The steps in this section bring up the Confluent Cloud instance and interconnects it to your on-prem cluster.
Cost to Run¶
Caution¶
Any Confluent Cloud example uses real Confluent Cloud resources that may be billable. An example may create a new Confluent Cloud environment, Kafka cluster, topics, ACLs, and service accounts, as well as resources that have hourly charges like connectors and ksqlDB applications. To avoid unexpected charges, carefully evaluate the cost of resources before you start. After you are done running a Confluent Cloud example, destroy all Confluent Cloud resources to avoid accruing hourly charges for services and verify that they have been deleted.
Confluent Cloud Promo Code¶
To receive an additional $50 free usage in Confluent Cloud, enter promo code CPDEMO50
in the Confluent Cloud UI Billing and payment section (details).
This promo code should sufficiently cover up to one day of running this Confluent Cloud example, beyond which you may be billed for the services that have an hourly charge until you destroy the Confluent Cloud resources created by this example.
Setup Confluent Cloud and CLI¶
Create a Confluent Cloud account at https://confluent.cloud.
Setup a payment method for your Confluent Cloud account and optionally enter the promo code
CPDEMO50
in the Confluent Cloud UI Billing and payment section to receive an additional $50 free usage.Install Confluent Cloud CLI v1.25.0 or later.
Using the CLI, log in to Confluent Cloud with the command
ccloud login
, and use your Confluent Cloud username and password. The--save
argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the localnetrc
file.ccloud login --save
The remainder of the Confluent Cloud portion of this tutorial must be completed sequentially. We recommend that you manually complete all the steps in the following sections. However, you may also run the script scripts/ccloud/create-ccloud-workflow.sh which automates those steps. This option is recommended for users who have run this tutorial before and want to quickly bring it up.
./scripts/ccloud/create-ccloud-workflow.sh
ccloud-stack¶
Use the the ccloud-stack utility for Confluent Cloud for a quick, automated way to create resources in Confluent Cloud. Executed with a single command, it uses the Confluent Cloud CLI to:
- Create a new environment.
- Create a new service account.
- Create a new Kafka cluster and associated credentials.
- Enable Confluent Cloud Schema Registry and associated credentials.
- Create ACLs with a wildcard for the service account.
- Create a new ksqlDB app and associated credentials
- Generate a local configuration file with all above connection information.
Get a bash library of useful functions for interacting with Confluent Cloud (one of which is
cloud-stack
). This library is community-supported and not supported by Confluent.curl -sS -o ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
Using
ccloud_library.sh
which you downloaded in the previous step, create a newccloud-stack
(see the ccloud-stack utility for Confluent Cloud for advanced options). It creates real resources in Confluent Cloud and takes a few minutes to complete.Note
The
true
flag adds creation of a ksqlDB application in Confluent Cloud, which has hourly charges even if you are not actively using it.source ./ccloud_library.sh export EXAMPLE="cp-demo" && ccloud::create_ccloud_stack true
When
ccloud-stack
completes, view the local configuration file atstack-configs/java-service-account-<SERVICE_ACCOUNT_ID>.config
that was auto-generated. It contains connection information for connecting to your newly created Confluent Cloud environment.cat stack-configs/java-service-account-*.config
In the current shell, set the environment variable
SERVICE_ACCOUNT_ID
to the <SERVICE_ACCOUNT_ID> in the filename. For example, if the filename is calledstack-configs/java-service-account-154143.config
, then setSERVICE_ACCOUNT_ID=154143
. This environment variable is used later in the tutorial.SERVICE_ACCOUNT_ID=<fill in>
The Replicator configuration file has parameters that specify how to connect to Confluent Cloud. You could set these parameters manually, but to do this in an automated fashion, use another function to set env parameters customized for the Confluent Cloud instance created above. It reads your local Confluent Cloud configuration file, i.e., the auto-generated
stack-configs/java-service-account-<SERVICE_ACCOUNT_ID>.config
, and creates files useful for Confluent Platform components and clients connecting to Confluent Cloud. Usingccloud_library.sh
which you downloaded in an earlier step, run thegenerate_configs
function against your auto-generated configuration file (the file created byccloud-stack
).ccloud::generate_configs stack-configs/java-service-account-$SERVICE_ACCOUNT_ID.config
The output of the script is a folder called
delta_configs
with sample configurations for all components and clients, which you can easily apply to any Kafka client or Confluent Platform component. View thedelta_configs/env.delta
file.cat delta_configs/env.delta
Source the
delta_configs/env.delta
file into your environment. These environment variables will be used in a few sections when you run Replicator to copy data from your on-prem cluster to your Confluent Cloud cluster.source delta_configs/env.delta
Telemetry Reporter¶
Enable Confluent Telemetry Reporter on the on-prem cluster, and configure it to send metrics to the Confluent Cloud instance created above..
Create a new
Cloud
API key and secret to authenticate to Confluent Cloud. These credentials will be used to configure the Telemetry Reporter and used by the Metrics API.ccloud api-key create --resource cloud -o json
Verify your output resembles:
{ "key": "QX7X4VA4DFJTTOIA", "secret": "fjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D" }
The value of the API key, in this case
QX7X4VA4DFJTTOIA
, and API secret, in this casefjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D
, will differ in your output.Set parameters to reference these credentials returned in the previous step.
METRICS_API_KEY='QX7X4VA4DFJTTOIA' METRICS_API_SECRET='fjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D'
Dynamically configure the
cp-demo
cluster to use the Telemetry Reporter, which sends metrics to Confluent Cloud. This requires setting 3 configuration parameters:confluent.telemetry.enabled=true
,confluent.telemetry.api.key
, andconfluent.telemetry.api.secret
.docker-compose exec kafka1 kafka-configs \ --bootstrap-server kafka1:12091 \ --alter \ --entity-type brokers \ --entity-default \ --add-config confluent.telemetry.enabled=true,confluent.telemetry.api.key=${METRICS_API_KEY},confluent.telemetry.api.secret=${METRICS_API_SECRET}
Check the broker logs to verify the brokers were dynamically configured.
docker-compose logs kafka1 | grep confluent.telemetry.api
Your output should resemble the following, but the
confluent.telemetry.api.key
value will be different in your environment.... kafka1 | confluent.telemetry.api.key = QX7X4VA4DFJTTOIA kafka1 | confluent.telemetry.api.secret = [hidden] ...
Log into Confluent Cloud UI and verify you see this cluster dashboard in the
Hosted monitoring
section underConfluent Platform
.
Replicator to Confluent Cloud¶
Deploy Replicator to copy data from the on-prem cluster to the Kafka cluster running in Confluent Cloud.
It is configured to copy from the Kafka topic wikipedia.parsed
(on-prem) to the cloud topic wikipedia.parsed.ccloud.replica
in Confluent Cloud.
The Replicator instance is running on the existing Connect worker in the on-prem cluster.
If you have been running
cp-demo
for a long time, you may need to refresh your local token to log back into MDS:./scripts/helper/refresh_mds_login.sh
Create a role binding to permit a new instance of Replicator to be submitted to the local connect cluster with id
connect-cluster
.Get the Kafka cluster ID:
KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert scripts/security/snakeoil-ca-1.crt | jq -r ".id")
Create the role bindings:
docker-compose exec tools bash -c "confluent iam rolebinding create \ --principal User:connectorSubmitter \ --role ResourceOwner \ --resource Connector:replicate-topic-to-ccloud \ --kafka-cluster-id $KAFKA_CLUSTER_ID \ --connect-cluster-id connect-cluster"
View the Replicator configuration file. Note that it uses the local connect cluster (the origin site), so the Replicator configuration has overrides for the producer. The configuration parameters that use variables are read from the environment variables you sourced in an earlier step.
Submit the Replicator connector to the local connect cluster.
./scripts/connectors/submit_replicator_to_ccloud_config.sh
It takes about 1 minute to show up in the Connectors view in Confluent Control Center. When it does, verify Replicator to Confluent Cloud has started properly, and there are now 4 connectors:
Log into Confluent Cloud UI and verify you see the topic
wikipedia.parsed.ccloud.replica
and its messages.View the schema for this topic that is already registered in Confluent Cloud Schema Registry. In
cp-demo
, in the Replicator configuration file,value.converter
is configured to useio.confluent.connect.avro.AvroConverter
, therefore it automatically registers new schemas, as needed, while copying data. The schema ID in the on-prem Schema Registry will not match the schema ID in the Confluent Cloud Schema Registry. (See documentation for other schema migration options)
Metrics API¶
You can use the Metrics API to get data for both the on-prem cluster as well as the Confluent Cloud cluster. The Metrics API provides a queryable HTTP API in which you can post a query to get a time series of metrics. It can be used for observing both:
- On-prem metrics (enabled by Telemetry Reporter) using the endpoint https://api.telemetry.confluent.cloud/v2/metrics/hosted-monitoring/query (this is in preview and the API may change)
- Confluent Cloud metrics using the endpoint https://api.telemetry.confluent.cloud/v2/metrics/cloud/query
To define the time interval when querying the Metrics API, get the current time minus 1 hour and current time plus 1 hour. The
date
utility varies between operating systems, so use thetools
Docker container to get consistent and reliable dates.CURRENT_TIME_MINUS_1HR=$(docker-compose exec tools date -Is -d '-1 hour' | tr -d '\r') CURRENT_TIME_PLUS_1HR=$(docker-compose exec tools date -Is -d '+1 hour' | tr -d '\r')
For the on-prem metrics: view the metrics query file, which requests
io.confluent.kafka.server/received_bytes
for the topicwikipedia.parsed
in the on-prem cluster (for all queryable metrics examples, see Metrics API).{ "aggregations": [ { "agg": "SUM", "metric": "io.confluent.kafka.server/received_bytes" } ], "filter": { "filters": [ { "field": "metric.topic", "op": "EQ", "value": "wikipedia.parsed" } ], "op": "AND" }, "intervals": ["${CURRENT_TIME_MINUS_1HR}/${CURRENT_TIME_PLUS_1HR}"], "granularity": "PT1M", "group_by": [ "metric.topic" ], "limit": 5 }
Substitute values into the query json file. For this substitution to work, you must have set the following parameters in your environment:
CURRENT_TIME_MINUS_1HR
CURRENT_TIME_PLUS_1HR
DATA=$(eval "cat <<EOF $(<./scripts/ccloud/metrics_query_onprem.json) EOF ") # View this parameter echo $DATA
Send this query to the Metrics API endpoint at https://api.telemetry.confluent.cloud/v2/metrics/hosted-monitoring/query. For this query to work, you must have set the following parameters in your environment:
METRICS_API_KEY
METRICS_API_SECRET
curl -s -u ${METRICS_API_KEY}:${METRICS_API_SECRET} \ --header 'content-type: application/json' \ --data "${DATA}" \ https://api.telemetry.confluent.cloud/v2/metrics/hosted-monitoring/query \ | jq .
Your output should resemble the output below, showing metrics for the on-prem topic
wikipedia.parsed
:{ "data": [ { "timestamp": "2020-12-14T20:52:00Z", "value": 1744066, "metric.topic": "wikipedia.parsed" }, { "timestamp": "2020-12-14T20:53:00Z", "value": 1847596, "metric.topic": "wikipedia.parsed" } ] }
For the Confluent Cloud metrics: view the metrics query file, which requests
io.confluent.kafka.server/received_bytes
for the topicwikipedia.parsed.ccloud.replica
in Confluent Cloud (for all queryable metrics examples, see Metrics API).{ "aggregations": [ { "agg": "SUM", "metric": "io.confluent.kafka.server/received_bytes" } ], "filter": { "filters": [ { "field": "metric.topic", "op": "EQ", "value": "wikipedia.parsed.ccloud.replica" }, { "field": "resource.kafka.id", "op": "EQ", "value": "${CCLOUD_CLUSTER_ID}" } ], "op": "AND" }, "intervals": ["${CURRENT_TIME_MINUS_1HR}/${CURRENT_TIME_PLUS_1HR}"], "granularity": "PT1H", "group_by": [ "metric.topic" ], "limit": 5 }
Get the Kafka cluster ID in Confluent Cloud, derived from the
$SERVICE_ACCOUNT_ID
.CCLOUD_CLUSTER_ID=$(ccloud kafka cluster list -o json | jq -c -r '.[] | select (.name == "'"demo-kafka-cluster-${SERVICE_ACCOUNT_ID}"'")' | jq -r .id)
Substitute values into the query json file. For this substitution to work, you must have set the following parameters in your environment:
CURRENT_TIME_MINUS_1HR
CURRENT_TIME_PLUS_1HR
CCLOUD_CLUSTER_ID
DATA=$(eval "cat <<EOF $(<./scripts/ccloud/metrics_query_ccloud.json) EOF ") # View this parameter echo $DATA
Send this query to the Metrics API endpoint at https://api.telemetry.confluent.cloud/v2/metrics/cloud/query. For this query to work, you must have set the following parameters in your environment:
METRICS_API_KEY
METRICS_API_SECRET
curl -s -u ${METRICS_API_KEY}:${METRICS_API_SECRET} \ --header 'content-type: application/json' \ --data "${DATA}" \ https://api.telemetry.confluent.cloud/v2/metrics/cloud/query \ | jq .
Your output should resemble the output below, showing metrics for the Confluent Cloud topic
wikipedia.parsed.ccloud.replica
:{ "data": [ { "timestamp": "2020-12-14T20:00:00Z", "value": 1690522, "metric.topic": "wikipedia.parsed.ccloud.replica" } ] }
Confluent Cloud ksqlDB¶
This section shows how to create queries in the Confluent Cloud ksqlDB application that processes data from the wikipedia.parsed.ccloud.replica
topic that Replicator copied from the on-prem cluster.
You must have completed ccloud-stack before proceeding.
Get the Confluent Cloud ksqlDB application ID and save it to the parameter
ksqlDBAppId
.ksqlDBAppId=$(ccloud ksql app list | grep "$KSQLDB_ENDPOINT" | awk '{print $1}')
Verify the Confluent Cloud ksqlDB application has transitioned from
PROVISIONING
toUP
state. This may take a few minutes.ccloud ksql app describe $ksqlDBAppId -o json
Configure ksqlDB ACLs to permit the ksqlDB application to read from
wikipedia.parsed.ccloud.replica
.ccloud ksql app configure-acls $ksqlDBAppId wikipedia.parsed.ccloud.replica
Create new ksqlDB queries in Confluent Cloud from the scripts/ccloud/statements.sql file. Note: depending on which folder you are in, you may need to modify the relative path to the
statements.sql
file.while read ksqlCmd; do echo -e "\n$ksqlCmd\n" curl -X POST $KSQLDB_ENDPOINT/ksql \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -u $KSQLDB_BASIC_AUTH_USER_INFO \ --silent \ -d @<(cat <<EOF { "ksql": "$ksqlCmd", "streamsProperties": {} } EOF ) done <scripts/ccloud/statements.sql
Log into Confluent Cloud UI and view the ksqlDB application Flow.
View the events in the ksqlDB streams in Confluent Cloud.
Go to Module 2: Confluent Cloud and destroy the demo resources used. Important: The ksqlDB application in Confluent Cloud has hourly charges even if you are not actively using it.
Cleanup¶
Any Confluent Cloud example uses real Confluent Cloud resources. After you are done running a Confluent Cloud example, manually verify that all Confluent Cloud resources are destroyed to avoid unexpected charges.
Follow the clean up procedure in Module 2: Confluent Cloud to avoid unexpected Confluent Cloud charges.