Tutorial: Use Confluent CLI with Confluent Cloud
This tutorial shows you how to use the Confluent CLI to interact with your Confluent Cloud cluster. It uses real resources in Confluent Cloud, and it creates and deletes topics, service accounts, credentials, and ACLs.
Prerequisites
- Local install of Confluent CLI (version 3.0.0 or later) 
- timeout: used by the bash scripts to terminate a consumer process after a certain period of time.- timeoutis available on most Linux distributions but not on macOS. macOS users can install- timeoutvia- brew install coreutils.
- mvn installed on your host 
- jq installed on your host 
Run Tutorial
Start
- Log in to Confluent Cloud with the Confluent CLI: - confluent login --save - The - --saveflag will save your Confluent Cloud login credentials to the- ~/.netrcfile.
- Clone the confluentinc/examples GitHub repository. - git clone https://github.com/confluentinc/examples.git 
- Navigate to the - examples/ccloud/beginner-cloud/directory and switch to the Confluent Platform release branch:- cd examples/ccloud/beginner-cloud/ git checkout 3.33.0-post 
- If you want to manually step through the tutorial, which is advised for new users who want to gain familiarity with the Confluent CLI, skip ahead to the next section. Alternatively, you can run the full tutorial end-to-end with the start.sh script, which automates all the steps in the tutorial: - ./start.sh 
Create a new Confluent Cloud environment
- Run the following command to create a new Confluent Cloud environment - ccloud-stack-000000-beginner-cli:- confluent environment create ccloud-stack-000000-beginner-cli -o json 
- Verify your output resembles: - { "id": "env-5qz2q", "name": "ccloud-stack-000000-beginner-cli" }- The value of the environment ID, in this case - env-5qz2q, will differ in your output. In this tutorial, the values for certain variables, including your environment ID, Kafka cluster ID, API key, will be unique and will not match the output shown.
- Specify - env-5qz2qas the active environment by running the following command:- confluent environment use env-5qz2q 
- Verify your output resembles: - Now using "env-5qz2q" as the default (active) environment. 
Create a new Confluent Cloud cluster
- Run the following command to create a new Confluent Cloud cluster - demo-kafka-cluster. It takes up to 5 minutes for the Kafka cluster to be ready.- confluent kafka cluster create demo-kafka-cluster --cloud aws --region us-west-2 - Tip - You may choose any provider or region from the list generated by running - confluent kafka region list.
- Verify your output resembles: - +---------------+---------------------------------------------------------+ | Id | lkc-x6m01 | | Name | demo-kafka-cluster | | Type | BASIC | | Ingress | 100 | | Egress | 100 | | Storage | 5 TB | | Provider | aws | | Availability | single-zone | | Region | us-west-2 | | Status | UP | | Endpoint | SASL_SSL://pkc-4kgmg.us-west-2.aws.confluent.cloud:9092 | | API Endpoint | https://pkac-ldgj1.us-west-2.aws.confluent.cloud | | REST Endpoint | https://pkc-4kgmg.us-west-2.aws.confluent.cloud:443 | +---------------+---------------------------------------------------------+ - The value of the Kafka cluster ID, in this case - lkc-x6m01, and Kafka cluster endpoint, in this case- pkc-4kgmg.us-west-2.aws.confluent.cloud:9092, will differ in your output.
- Specify - lkc-x6m01as the active Kafka cluster by running the following command:- confluent kafka cluster use lkc-x6m01 
- Verify your output resembles: - Set Kafka cluster "lkc-x6m01" as the active cluster for environment "env-5qz2q". 
Create a new API key/secret pair for user
- Run the following command to create a user API key and secret for your Kafka cluster - lkc-x6m01:- confluent api-key create --description "Demo credentials" --resource lkc-x6m01 -o json 
- Verify your output resembles: - { "key": "QX7X4VA4DFJTTOIA", "secret": "fjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D" }- The value of the API key, in this case - QX7X4VA4DFJTTOIA, and API secret, in this case- fjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/Dwill differ in your output.
- Specify the API key - QX7X4VA4DFJTTOIAfor the Kafka cluster- lkc-x6m01:- confluent api-key use QX7X4VA4DFJTTOIA --resource lkc-x6m01 - Your output should resemble: - Set the API Key "QX7X4VA4DFJTTOIA" as the active API key for "lkc-x6m01". 
Produce and consume records with Confluent CLI
- Run the following command to create a new Kafka topic - demo-topic-1:- confluent kafka topic create demo-topic-1 
- Start producing to this topic - demo-topic-1by running the following command:- confluent kafka topic produce demo-topic-1 
- The CLI waits for you to type data at the prompt, so type a few characters each on a new line. For example, type the numbers 1 through 5: - 1 2 3 4 5 
- Type - CTRL-Cwhen you are finished.
- Run the following command to consume messages from topic - demo-topic-1. The flag- -ballows the consumer to read from the beginning of the topic.- confluent kafka topic consume demo-topic-1 -b 
- Verify your output resembles the following. It is expected to be out of order because of round-robin partitioner: - Starting Kafka Consumer. ^C or ^D to exit 1 3 5 2 4 
- Type - CTRL-Cto stop the consumer.
Create a new service account with an API key/secret pair
- Run the following command to create a new service account: - confluent iam service-account create demo-app-1 --description "Service account for demo application" -o json 
- Verify your output resembles: - { "id": "sa-123456", "name": "demo-app-1", "description": "Service account for demo application" }- The value of the service account ID, in this case - sa-123456, will differ in your output.
- Create an API key and secret for the service account - sa-123456for the Kafka cluster- lkc-x6m01by running the following command:- confluent api-key create --service-account sa-123456 --resource lkc-x6m01 -o json 
- Verify your output resembles: - { "key": "ESN5FSNDHOFFSUEV", "secret": "nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB" }- The value of the service account’s API key, in this case - ESN5FSNDHOFFSUEV, and API secret, in this case- nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB, will differ in your output.
- Create a local configuration file - /tmp/client.configwith Confluent Cloud connection information using the newly created Kafka cluster and the API key and secret for the service account. Substitute your values for the bootstrap server and credentials just created.- sasl.mechanism=PLAIN security.protocol=SASL_SSL bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='ESN5FSNDHOFFSUEV' password='nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB'; 
- Wait about 90 seconds for the Confluent Cloud cluster to be ready and for the service account credentials to propagate. 
Run a Java producer without ACLs
- By default, no ACLs are configured for the service account, which means the service account has no access to any Confluent Cloud resources. Run the following command to verify no ACLs are configured: - confluent kafka acl list --service-account sa-123456 - Your output should resemble: - Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type ------------+------------+-----------+---------------+---------------+--------------- 
- Compile the Java project at clients/cloud/java - mvn -f ../../clients/cloud/java/pom.xml compile 
- Run a Java producer to - demo-topic-1before configuring ACLs (expected to fail). Note that you pass in an argument to- /tmp/client.configwhich has the Confluent Cloud connection information:- mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerExample" -Dexec.args="/tmp/client.config demo-topic-1" -Dlog4j.configuration=file:log4j.properties > /tmp/log.1 2>&1 
- Verify you see - org.apache.kafka.common.errors.TopicAuthorizationExceptionin the log file- /tmp/log.1as shown in the following example (expected because there are no ACLs to allow this client application):- [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2.1:java (default-cli) on project clients-example: An exception occured while executing the Java class. null: InvocationTargetException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed. -> [Help 1] 
Run a Java producer with ACLs
- Run the following commands to create ACLs for the service account: - confluent kafka acl create --allow --service-account sa-123456 --operation CREATE --topic demo-topic-1 confluent kafka acl create --allow --service-account sa-123456 --operation WRITE --topic demo-topic-1 
- Verify your output resembles: - Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | CREATE | TOPIC | demo-topic-1 | LITERAL Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | demo-topic-1 | LITERAL
- Run the following command and verify the ACLs were configured: - confluent kafka acl list --service-account sa-123456 - Your output should resemble below. Observe that the ACL Type is - LITERAL.- Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | CREATE | TOPIC | demo-topic-1 | LITERAL User:sa-123456 | ALLOW | WRITE | TOPIC | demo-topic-1 | LITERAL 
- Run the Java producer to - demo-topic-1after configuring the ACLs (expected to pass):- mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerExample" -Dexec.args="/tmp/client.config demo-topic-1" -Dlog4j.configuration=file:log4j.properties > /tmp/log.2 2>&1 
- Verify you see the - 10 messages were produced to topicmessage in the log file- /tmp/log.2as shown in the following example:- Producing record: alice {"count":0} Producing record: alice {"count":1} Producing record: alice {"count":2} Producing record: alice {"count":3} Producing record: alice {"count":4} Producing record: alice {"count":5} Producing record: alice {"count":6} Producing record: alice {"count":7} Producing record: alice {"count":8} Producing record: alice {"count":9} Produced record to topic demo-topic-1 partition [3] @ offset 0 Produced record to topic demo-topic-1 partition [3] @ offset 1 Produced record to topic demo-topic-1 partition [3] @ offset 2 Produced record to topic demo-topic-1 partition [3] @ offset 3 Produced record to topic demo-topic-1 partition [3] @ offset 4 Produced record to topic demo-topic-1 partition [3] @ offset 5 Produced record to topic demo-topic-1 partition [3] @ offset 6 Produced record to topic demo-topic-1 partition [3] @ offset 7 Produced record to topic demo-topic-1 partition [3] @ offset 8 Produced record to topic demo-topic-1 partition [3] @ offset 9 10 messages were produced to topic demo-topic-1
- Delete the ACLs: - confluent kafka acl delete --allow --service-account sa-123456 --operation CREATE --topic demo-topic-1 confluent kafka acl delete --allow --service-account sa-123456 --operation WRITE --topic demo-topic-1 - You should see two - Deleted ACLs.messages.
Run a Java producer with a prefixed ACL
- Create a new Kafka topic - demo-topic-2:- confluent kafka topic create demo-topic-2 - Verify you see the - Created topic "demo-topic-2"message.
- Run the following command to create ACLs for the producer using a prefixed ACL which matches any topic that starts with the prefix - demo-topic:- confluent kafka acl create --allow --service-account sa-123456 --operation CREATE --topic demo-topic --prefix confluent kafka acl create --allow --service-account sa-123456 --operation WRITE --topic demo-topic --prefix 
- Verify your output resembles: - Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | CREATE | TOPIC | demo-topic | PREFIXED Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | demo-topic | PREFIXED
- Verify the ACLs were configured by running the following command: - confluent kafka acl list --service-account sa-123456 - Your output should resemble below. Observe that the ACL Type is - PREFIXED.- Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | demo-topic | PREFIXED User:sa-123456 | ALLOW | CREATE | TOPIC | demo-topic | PREFIXED 
- Run the Java producer to - demo-topic-2, which should match the newly created prefixed ACLs.- mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerExample" -Dexec.args="/tmp/client.config demo-topic-2" -Dlog4j.configuration=file:log4j.properties > /tmp/log.3 2>&1 
- Verify you see the - 10 messages were produced to topicmessage in the log file- /tmp/log.3as shown in the following example:- Producing record: alice {"count":0} Producing record: alice {"count":1} Producing record: alice {"count":2} Producing record: alice {"count":3} Producing record: alice {"count":4} Producing record: alice {"count":5} Producing record: alice {"count":6} Producing record: alice {"count":7} Producing record: alice {"count":8} Producing record: alice {"count":9} Produced record to topic demo-topic-2 partition [3] @ offset 0 Produced record to topic demo-topic-2 partition [3] @ offset 1 Produced record to topic demo-topic-2 partition [3] @ offset 2 Produced record to topic demo-topic-2 partition [3] @ offset 3 Produced record to topic demo-topic-2 partition [3] @ offset 4 Produced record to topic demo-topic-2 partition [3] @ offset 5 Produced record to topic demo-topic-2 partition [3] @ offset 6 Produced record to topic demo-topic-2 partition [3] @ offset 7 Produced record to topic demo-topic-2 partition [3] @ offset 8 Produced record to topic demo-topic-2 partition [3] @ offset 9 10 messages were produced to topic demo-topic-2
- Run the following commands to delete ACLs: - confluent kafka acl delete --allow --service-account sa-123456 --operation CREATE --topic demo-topic --prefix confluent kafka acl delete --allow --service-account sa-123456 --operation WRITE --topic demo-topic --prefix - You should see two - Deleted ACLs.messages.
Run a fully managed Confluent Cloud connector
- Create a new Kafka topic - demo-topic-3:- confluent kafka topic create demo-topic-3 - You should see a - Created topic "demo-topic-3"message.
- Run the following command to allow service account ID - sa-123456to write to any topic:- confluent kafka acl create --allow --service-account sa-123456 --operation WRITE --topic '*' 
- Verify your output resembles: - Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | * | LITERAL 
- Verify the ACLs were configured by running the following command: - confluent kafka acl list --service-account sa-123456 - Your output should resemble: - Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | * | LITERAL 
- Create a local configuration file datagen_ccloud_pageviews.json with Confluent Cloud connection information. Substitute your API key and secret for the service account, in the - kafka.api.keyand- kafka.api.secretfields. See below for an example:- { "name" : "datagen_ccloud_pageviews", "connector.class": "DatagenSource", "kafka.api.key": "ESN5FSNDHOFFSUEV", "kafka.api.secret" : "nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB", "kafka.topic" : "demo-topic-3", "output.data.format" : "JSON", 
- Create a managed connector in Confluent Cloud with the configuration file you made in the previous step using the following commands: - confluent connect create --config datagen_ccloud_pageviews.json - Your output should resemble: - Created connector "lcc-qrjxjd" (datagen_ccloud_pageviews). 
- The connector may take up to 5 minutes to provision. Run the following command to check the connector status - confluent connect list - Your output should resemble the following: - ID | Name | Status | Type | Trace -------------+---------------------------+--------------+--------+-------- lcc-zno83 | datagen_ccloud_pageviews | PROVISIONING | source | - When the - Statusis- RUNNINGyou may move on to the next step.
Run a Java consumer with a Wildcard ACL
- Create ACLs for the consumer using a wildcard by running the following commands: - confluent kafka acl create --allow --service-account sa-123456 --operation READ --consumer-group demo-beginner-cloud-1 confluent kafka acl create --allow --service-account sa-123456 --operation READ --topic '*' 
- Verify your output resembles: - Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+-----------------------+--------------- User:sa-123456 | ALLOW | READ | GROUP | demo-beginner-cloud-1 | LITERAL Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+---------------+--------------- User:sa-123456 | ALLOW | READ | TOPIC | * | LITERAL
- Verify the ACLs were configured by running the following command: - confluent kafka acl list --service-account sa-123456 - Your output should resemble: - Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type -----------------+------------+-----------+---------------+-----------------------+--------------- User:sa-123456 | ALLOW | WRITE | TOPIC | * | LITERAL User:sa-123456 | ALLOW | READ | TOPIC | * | LITERAL User:sa-123456 | ALLOW | READ | GROUP | demo-beginner-cloud-1 | LITERAL 
- Run the Java consumer from - demo-topic-3which is populated by the- datagen_ccloud_pageviewsconnector, and wait 15 seconds for it to complete.- timeout 15s mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ConsumerExamplePageviews" -Dexec.args="/tmp/client.config demo-topic-3" -Dlog4j.configuration=file:log4j.properties > /tmp/log.4 2>&1 
- Verify you see - Consumed record withmessages in the log file- /tmp/log.4as shown in the following example:- Consumed record with key 71 and value {"viewtime":71,"userid":"User_6","pageid":"Page_11"} Consumed record with key 51 and value {"viewtime":51,"userid":"User_7","pageid":"Page_24"} Consumed record with key 31 and value {"viewtime":31,"userid":"User_7","pageid":"Page_68"} Consumed record with key 81 and value {"viewtime":81,"userid":"User_5","pageid":"Page_25"} Consumed record with key 41 and value {"viewtime":41,"userid":"User_2","pageid":"Page_88"} Consumed record with key 91 and value {"viewtime":91,"userid":"User_2","pageid":"Page_74"}
- Delete the ACLs by running the following command: - confluent kafka acl delete --allow --service-account sa-123456 --operation WRITE --topic '*' confluent kafka acl delete --allow --service-account sa-123456 --operation READ --consumer-group demo-beginner-cloud-1 confluent kafka acl delete --allow --service-account sa-123456 --operation READ --topic '*' - You should see - Deleted ACLs.messages.
Clean up Confluent Cloud resources
- Complete the following steps to delete the managed connector: - Find the connector ID: - confluent connect list - Which should display a something similar to below. Locate your connector ID, in this case the connector ID is - lcc-zno83.- ID | Name | Status | Type | Trace -------------+---------------------------+---------+--------+-------- lcc-zno83 | datagen_ccloud_pageviews | RUNNING | source | 
- Delete the connector, referencing the connector ID from the previous step: - confluent connect delete lcc-zno83 - You should see: - Deleted connector "lcc-zno83"..
 
- Run the following command to delete the service account: - confluent iam service-account delete sa-123456 
- Complete the following steps to delete all the Kafka topics: - Delete - demo-topic-1:- confluent kafka topic delete demo-topic-1 - You should see: - Deleted topic "demo-topic-1".
- Delete - demo-topic-2:- confluent kafka topic delete demo-topic-2 - You should see: - Deleted topic "demo-topic-2".
- Delete - demo-topic-3:- confluent kafka topic delete demo-topic-3 - You should see: - Deleted topic "demo-topic-3".
 
- Run the following command to delete the user API key: - confluent api-key delete QX7X4VA4DFJTTOIA - Note that the service account API key was deleted when you deleted the service account. 
- Delete the Kafka cluster: - confluent kafka cluster delete lkc-x6m01 
- Delete the environment: - confluent environment delete env-5qz2q - You should see: - Deleted environment "env-5qz2q".
If the tutorial ends prematurely, you may receive the following error message
when trying to run the example again (confluent environment create
ccloud-stack-000000-beginner-cli):
Error: 1 error occurred:
   * error creating account: Account name is already in use
Failed to create environment ccloud-stack-000000-beginner-cli. Please troubleshoot and run again
In this case, run the following script to delete the example’s topics, Kafka cluster, and environment:
./cleanup.sh
Advanced usage
The example script provides variables that allow you to alter the default Kafka cluster name, cloud provider, and region. For example:
CLUSTER_NAME=my-demo-cluster CLUSTER_CLOUD=aws CLUSTER_REGION=us-west-2 ./start.sh
Here are the variables and their default values:
| Variable | Default | 
|---|---|
| 
 | demo-kafka-cluster | 
| 
 | aws | 
| 
 | us-west-2 | 
Additional Resources
- For producing and consuming events, see the following Confluent CLI tutorials: 
- For a guide to configuring, monitoring, and optimizing your Kafka client applications when using Confluent Cloud, see Developing Client Applications on Confluent Cloud. 
- For an example that showcases how to monitor Kafka client application and Confluent Cloud metrics, and steps through various failure scenarios to see how they are reflected in the provided metrics, see the Observability for Apache Kafka® Clients to Confluent Cloud demo.