Tutorial: Use Confluent CLI with Confluent Cloud¶
This tutorial shows you how to use the Confluent CLI to interact with your Confluent Cloud cluster. It uses real resources in Confluent Cloud, and it creates and deletes topics, service accounts, credentials, and ACLs.
Prerequisites¶
- Confluent Cloud
- Local install of Confluent CLI (version 3.0.0 or later)
timeout
: used by the bash scripts to terminate a consumer process after a certain period of time.timeout
is available on most Linux distributions but not on macOS. macOS users can installtimeout
viabrew install coreutils
.- mvn installed on your host
- jq installed on your host
Run Tutorial¶
Start¶
Log in to Confluent Cloud with the Confluent CLI:
confluent login --save
The
--save
flag will save your Confluent Cloud login credentials to the~/.netrc
file.Clone the confluentinc/examples GitHub repository.
git clone https://github.com/confluentinc/examples.git
Navigate to the
examples/ccloud/beginner-cloud/
directory and switch to the Confluent Platform release branch:cd examples/ccloud/beginner-cloud/ git checkout 4.4.0-SNAPSHOT-post
If you want to manually step through the tutorial, which is advised for new users who want to gain familiarity with the Confluent CLI, skip ahead to the next section. Alternatively, you can run the full tutorial end-to-end with the start.sh script, which automates all the steps in the tutorial:
./start.sh
Create a new Confluent Cloud environment¶
Run the following command to create a new Confluent Cloud environment
ccloud-stack-000000-beginner-cli
:confluent environment create ccloud-stack-000000-beginner-cli -o json
Verify your output resembles:
{ "id": "env-5q3z2q", "name": "ccloud-stack-000000-beginner-cli" }
The value of the environment ID, in this case
env-5qz2q
, will differ in your output. In this tutorial, the values for certain variables, including your environment ID, Kafka cluster ID, API key, will be unique and will not match the output shown.Specify
env-5qz2q
as the active environment by running the following command:confluent environment use env-5q3z2q
Verify your output resembles:
Now using "env-5q3z2q" as the default (active) environment.
Create a new Confluent Cloud cluster¶
Run the following command to create a new Confluent Cloud cluster
demo-kafka-cluster
. It takes up to 5 minutes for the Kafka cluster to be ready.confluent kafka cluster create demo-kafka-cluster --cloud aws --region us-west-2
Tip
You may choose any provider or region from the list generated by running
confluent kafka region list
.Verify your output resembles:
+---------------+---------------------------------------------------------+ | Id | lkc-x6m01 | | Name | demo-kafka-cluster | | Type | BASIC | | Ingress | 100 | | Egress | 100 | | Storage | 5 TB | | Provider | aws | | Availability | single-zone | | Region | us-west-2 | | Status | UP | | Endpoint | SASL_SSL://pkc-4kgmg.us-west-2.aws.confluent.cloud:9092 | | API Endpoint | https://pkac-ldgj1.us-west-2.aws.confluent.cloud | | REST Endpoint | https://pkc-4kgmg.us-west-2.aws.confluent.cloud:443 | +---------------+---------------------------------------------------------+
The value of the Kafka cluster ID, in this case
lkc-x6m01
, and Kafka cluster endpoint, in this casepkc-4kgmg.us-west-2.aws.confluent.cloud:9092
, will differ in your output.Specify
lkc-x6m01
as the active Kafka cluster by running the following command:confluent kafka cluster use lkc-x6m01
Verify your output resembles:
Set Kafka cluster "lkc-x6m01" as the active cluster for environment "env-5q3z2q".
Create a new API key/secret pair for user¶
Run the following command to create a user API key and secret for your Kafka cluster
lkc-x6m01
:confluent api-key create --description "Demo credentials" --resource lkc-x6m01 -o json
Verify your output resembles:
{ "key": "QX7X4VA4DFJTTOIA", "secret": "fjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D" }
The value of the API key, in this case
QX7X4VA4DFJTTOIA
, and API secret, in this casefjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D
will differ in your output.Specify the API key
QX7X4VA4DFJTTOIA
for the Kafka clusterlkc-x6m01
:confluent api-key use QX7X4VA4DFJTTOIA --resource lkc-x6m01
Your output should resemble:
Set the API Key "QX7X4VA4DFJTTOIA" as the active API key for "lkc-x6m01".
Produce and consume records with Confluent CLI¶
Run the following command to create a new Kafka topic
demo-topic-1
:confluent kafka topic create demo-topic-1
Start producing to this topic
demo-topic-1
by running the following command:confluent kafka topic produce demo-topic-1
The CLI waits for you to type data at the prompt, so type a few characters each on a new line. For example, type the numbers 1 through 5:
1 2 3 4 5
Type
CTRL-C
when you are finished.Run the following command to consume messages from topic
demo-topic-1
. The flag-b
allows the consumer to read from the beginning of the topic.confluent kafka topic consume demo-topic-1 -b
Verify your output resembles the following. It is expected to be out of order because of round-robin partitioner:
Starting Kafka Consumer. ^C or ^D to exit 1 3 5 2 4
Type
CTRL-C
to stop the consumer.
Create a new service account with an API key/secret pair¶
Run the following command to create a new service account:
confluent iam service-account create demo-app-1 --description "Service account for demo application" -o json
Verify your output resembles:
{ "id": "sa-123456", "name": "demo-app-1", "description": "Service account for demo application" }
The value of the service account ID, in this case
sa-123456
, will differ in your output.Create an API key and secret for the service account
sa-123456
for the Kafka clusterlkc-x6m01
by running the following command:confluent api-key create --service-account sa-123456 --resource lkc-x6m01 -o json
Verify your output resembles:
{ "key": "ESN5FSNDHOFFSUEV", "secret": "nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB" }
The value of the service account’s API key, in this case
ESN5FSNDHOFFSUEV
, and API secret, in this casenzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB
, will differ in your output.Create a local configuration file
/tmp/client.config
with Confluent Cloud connection information using the newly created Kafka cluster and the API key and secret for the service account. Substitute your values for the bootstrap server and credentials just created.sasl.mechanism=PLAIN security.protocol=SASL_SSL bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='ESN5FSNDHOFFSUEV' password='nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB';
Wait about 90 seconds for the Confluent Cloud cluster to be ready and for the service account credentials to propagate.
Run a Java producer without ACLs¶
By default, no ACLs are configured for the service account, which means the service account has no access to any Confluent Cloud resources. Run the following command to verify no ACLs are configured:
confluent kafka acl list --service-account sa-123456
Your output should resemble:
Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type ------------+------------+-----------+---------------+---------------+---------------
Compile the Java project at clients/cloud/java
mvn -f ../../clients/cloud/java/pom.xml compile
Run a Java producer to
demo-topic-1
before configuring ACLs (expected to fail). Note that you pass in an argument to/tmp/client.config
which has the Confluent Cloud connection information:mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerExample" -Dexec.args="/tmp/client.config demo-topic-1" -Dlog4j.configuration=file:log4j.properties > /tmp/log.1 2>&1
Verify you see
org.apache.kafka.common.errors.TopicAuthorizationException
in the log file/tmp/log.1
as shown in the following example (expected because there are no ACLs to allow this client application):[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2.1:java (default-cli) on project clients-example: An exception occured while executing the Java class. null: InvocationTargetException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed. -> [Help 1]
Run a Java producer with ACLs¶
Run the following commands to create ACLs for the service account:
confluent kafka acl create --allow --service-account sa-123456 --operation CREATE --topic demo-topic-1 confluent kafka acl create --allow --service-account sa-123456 --operation WRITE --topic demo-topic-1
Verify your output resembles:
Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | CREATE | TOPIC | demo-topic-1 | LITERAL Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | demo-topic-1 | LITERAL
Run the following command and verify the ACLs were configured:
confluent kafka acl list --service-account sa-123456
Your output should resemble below. Observe that the ACL Type is
LITERAL
.Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | CREATE | TOPIC | demo-topic-1 | LITERAL User:sa-123456 | ALLOW | WRITE | TOPIC | demo-topic-1 | LITERAL
Run the Java producer to
demo-topic-1
after configuring the ACLs (expected to pass):mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerExample" -Dexec.args="/tmp/client.config demo-topic-1" -Dlog4j.configuration=file:log4j.properties > /tmp/log.2 2>&1
Verify you see the
10 messages were produced to topic
message in the log file/tmp/log.2
as shown in the following example:Producing record: alice {"count":0} Producing record: alice {"count":1} Producing record: alice {"count":2} Producing record: alice {"count":3} Producing record: alice {"count":4} Producing record: alice {"count":5} Producing record: alice {"count":6} Producing record: alice {"count":7} Producing record: alice {"count":8} Producing record: alice {"count":9} Produced record to topic demo-topic-1 partition [3] @ offset 0 Produced record to topic demo-topic-1 partition [3] @ offset 1 Produced record to topic demo-topic-1 partition [3] @ offset 2 Produced record to topic demo-topic-1 partition [3] @ offset 3 Produced record to topic demo-topic-1 partition [3] @ offset 4 Produced record to topic demo-topic-1 partition [3] @ offset 5 Produced record to topic demo-topic-1 partition [3] @ offset 6 Produced record to topic demo-topic-1 partition [3] @ offset 7 Produced record to topic demo-topic-1 partition [3] @ offset 8 Produced record to topic demo-topic-1 partition [3] @ offset 9 10 messages were produced to topic demo-topic-1
Delete the ACLs:
confluent kafka acl delete --allow --service-account sa-123456 --operation CREATE --topic demo-topic-1 confluent kafka acl delete --allow --service-account sa-123456 --operation WRITE --topic demo-topic-1
You should see two
Deleted ACLs.
messages.
Run a Java producer with a prefixed ACL¶
Create a new Kafka topic
demo-topic-2
:confluent kafka topic create demo-topic-2
Verify you see the
Created topic "demo-topic-2"
message.Run the following command to create ACLs for the producer using a prefixed ACL which matches any topic that starts with the prefix
demo-topic
:confluent kafka acl create --allow --service-account sa-123456 --operation CREATE --topic demo-topic --prefix confluent kafka acl create --allow --service-account sa-123456 --operation WRITE --topic demo-topic --prefix
Verify your output resembles:
Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | CREATE | TOPIC | demo-topic | PREFIXED Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | demo-topic | PREFIXED
Verify the ACLs were configured by running the following command:
confluent kafka acl list --service-account sa-123456
Your output should resemble below. Observe that the ACL Type is
PREFIXED
.Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | demo-topic | PREFIXED User:sa-123456 | ALLOW | CREATE | TOPIC | demo-topic | PREFIXED
Run the Java producer to
demo-topic-2
, which should match the newly created prefixed ACLs.mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerExample" -Dexec.args="/tmp/client.config demo-topic-2" -Dlog4j.configuration=file:log4j.properties > /tmp/log.3 2>&1
Verify you see the
10 messages were produced to topic
message in the log file/tmp/log.3
as shown in the following example:Producing record: alice {"count":0} Producing record: alice {"count":1} Producing record: alice {"count":2} Producing record: alice {"count":3} Producing record: alice {"count":4} Producing record: alice {"count":5} Producing record: alice {"count":6} Producing record: alice {"count":7} Producing record: alice {"count":8} Producing record: alice {"count":9} Produced record to topic demo-topic-2 partition [3] @ offset 0 Produced record to topic demo-topic-2 partition [3] @ offset 1 Produced record to topic demo-topic-2 partition [3] @ offset 2 Produced record to topic demo-topic-2 partition [3] @ offset 3 Produced record to topic demo-topic-2 partition [3] @ offset 4 Produced record to topic demo-topic-2 partition [3] @ offset 5 Produced record to topic demo-topic-2 partition [3] @ offset 6 Produced record to topic demo-topic-2 partition [3] @ offset 7 Produced record to topic demo-topic-2 partition [3] @ offset 8 Produced record to topic demo-topic-2 partition [3] @ offset 9 10 messages were produced to topic demo-topic-2
Run the following commands to delete ACLs:
confluent kafka acl delete --allow --service-account sa-123456 --operation CREATE --topic demo-topic --prefix confluent kafka acl delete --allow --service-account sa-123456 --operation WRITE --topic demo-topic --prefix
You should see two
Deleted ACLs.
messages.
Run a fully managed Confluent Cloud connector¶
Create a new Kafka topic
demo-topic-3
:confluent kafka topic create demo-topic-3
You should see a
Created topic "demo-topic-3"
message.Run the following command to allow service account ID
sa-123456
to write to any topic:confluent kafka acl create --allow --service-account sa-123456 --operation WRITE --topic '*'
Verify your output resembles:
Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | * | LITERAL
Verify the ACLs were configured by running the following command:
confluent kafka acl list --service-account sa-123456
Your output should resemble:
Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | * | LITERAL
Create a local configuration file datagen_ccloud_pageviews.json with Confluent Cloud connection information. Substitute your API key and secret for the service account, in the
kafka.api.key
andkafka.api.secret
fields. See below for an example:{ "name" : "datagen_ccloud_pageviews", "connector.class": "DatagenSource", "kafka.api.key": "ESN5FSNDHOFFSUEV", "kafka.api.secret" : "nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB", "kafka.topic" : "demo-topic-3", "output.data.format" : "JSON",
Create a managed connector in Confluent Cloud with the configuration file you made in the previous step using the following commands:
confluent connect create --config datagen_ccloud_pageviews.json
Your output should resemble:
Created connector "lcc-qrjxjd" (datagen_ccloud_pageviews).
The connector may take up to 5 minutes to provision. Run the following command to check the connector status
confluent connect list
Your output should resemble the following:
ID | Name | Status | Type | Trace -------------+---------------------------+--------------+--------+-------- lcc-zno83 | datagen_ccloud_pageviews | PROVISIONING | source |
When the
Status
isRUNNING
you may move on to the next step.
Run a Java consumer with a Wildcard ACL¶
Create ACLs for the consumer using a wildcard by running the following commands:
confluent kafka acl create --allow --service-account sa-123456 --operation READ --consumer-group demo-beginner-cloud-1 confluent kafka acl create --allow --service-account sa-123456 --operation READ --topic '*'
Verify your output resembles:
Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+-----------------------+--------------- User:sa-123456 | ALLOW | READ | GROUP | demo-beginner-cloud-1 | LITERAL Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | READ | TOPIC | * | LITERAL
Verify the ACLs were configured by running the following command:
confluent kafka acl list --service-account sa-123456
Your output should resemble:
Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+-----------------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | * | LITERAL User:sa-123456 | ALLOW | READ | TOPIC | * | LITERAL User:sa-123456 | ALLOW | READ | GROUP | demo-beginner-cloud-1 | LITERAL
Run the Java consumer from
demo-topic-3
which is populated by thedatagen_ccloud_pageviews
connector, and wait 15 seconds for it to complete.timeout 15s mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ConsumerExamplePageviews" -Dexec.args="/tmp/client.config demo-topic-3" -Dlog4j.configuration=file:log4j.properties > /tmp/log.4 2>&1
Verify you see
Consumed record with
messages in the log file/tmp/log.4
as shown in the following example:Consumed record with key 71 and value {"viewtime":71,"userid":"User_6","pageid":"Page_11"} Consumed record with key 51 and value {"viewtime":51,"userid":"User_7","pageid":"Page_24"} Consumed record with key 31 and value {"viewtime":31,"userid":"User_7","pageid":"Page_68"} Consumed record with key 81 and value {"viewtime":81,"userid":"User_5","pageid":"Page_25"} Consumed record with key 41 and value {"viewtime":41,"userid":"User_2","pageid":"Page_88"} Consumed record with key 91 and value {"viewtime":91,"userid":"User_2","pageid":"Page_74"}
Delete the ACLs by running the following command:
confluent kafka acl delete --allow --service-account sa-123456 --operation WRITE --topic '*' confluent kafka acl delete --allow --service-account sa-123456 --operation READ --consumer-group demo-beginner-cloud-1 confluent kafka acl delete --allow --service-account sa-123456 --operation READ --topic '*'
You should see
Deleted ACLs.
messages.
Clean up Confluent Cloud resources¶
Complete the following steps to delete the managed connector:
Find the connector ID:
confluent connect list
Which should display a something similar to below. Locate your connector ID, in this case the connector ID is
lcc-zno83
.ID | Name | Status | Type | Trace -------------+---------------------------+---------+--------+-------- lcc-zno83 | datagen_ccloud_pageviews | RUNNING | source |
Delete the connector, referencing the connector ID from the previous step:
confluent connect delete lcc-zno83
You should see:
Deleted connector "lcc-zno83".
.
Run the following command to delete the service account:
confluent iam service-account delete sa-123456
Complete the following steps to delete all the Kafka topics:
Delete
demo-topic-1
:confluent kafka topic delete demo-topic-1
You should see:
Deleted topic "demo-topic-1"
.Delete
demo-topic-2
:confluent kafka topic delete demo-topic-2
You should see:
Deleted topic "demo-topic-2"
.Delete
demo-topic-3
:confluent kafka topic delete demo-topic-3
You should see:
Deleted topic "demo-topic-3"
.
Run the following command to delete the user API key:
confluent api-key delete QX7X4VA4DFJTTOIA
Note that the service account API key was deleted when you deleted the service account.
Delete the Kafka cluster:
confluent kafka cluster delete lkc-x6m01
Delete the environment:
confluent environment delete env-5qz2q
You should see:
Deleted environment "env-5qz2q"
.
If the tutorial ends prematurely, you may receive the following error message
when trying to run the example again (confluent environment create
ccloud-stack-000000-beginner-cli
):
Error: 1 error occurred:
* error creating account: Account name is already in use
Failed to create environment ccloud-stack-000000-beginner-cli. Please troubleshoot and run again
In this case, run the following script to delete the example’s topics, Kafka cluster, and environment:
./cleanup.sh
Advanced usage¶
The example script provides variables that allow you to alter the default Kafka cluster name, cloud provider, and region. For example:
CLUSTER_NAME=my-demo-cluster CLUSTER_CLOUD=aws CLUSTER_REGION=us-west-2 ./start.sh
Here are the variables and their default values:
Variable | Default |
---|---|
CLUSTER_NAME |
demo-kafka-cluster |
CLUSTER_CLOUD |
aws |
CLUSTER_REGION |
us-west-2 |
Additional Resources¶
- For producing and consuming events, see the following Confluent CLI tutorials:
- For a guide to configuring, monitoring, and optimizing your Kafka client applications when using Confluent Cloud, see Developing Client Applications on Confluent Cloud.
- For an example that showcases how to monitor Kafka client application and Confluent Cloud metrics, and steps through various failure scenarios to see how they are reflected in the provided metrics, see the Observability for Apache Kafka® Clients to Confluent Cloud demo.