Tutorial: Use Confluent CLI with Confluent Cloud

This tutorial shows you how to use the Confluent CLI to interact with your Confluent Cloud cluster. It uses real resources in Confluent Cloud, and it creates and deletes topics, service accounts, credentials, and ACLs.

Prerequisites

  • Confluent Cloud
  • Local install of Confluent CLI (version 3.0.0 or later)
  • timeout: used by the bash scripts to terminate a consumer process after a certain period of time. timeout is available on most Linux distributions but not on macOS. macOS users can install timeout via brew install coreutils.
  • mvn installed on your host
  • jq installed on your host

Run Tutorial

Start

  1. Log in to Confluent Cloud with the Confluent CLI:

    confluent login --save
    

    The --save flag will save your Confluent Cloud login credentials to the ~/.netrc file.

  2. Clone the confluentinc/examples GitHub repository.

    git clone https://github.com/confluentinc/examples.git
    
  3. Navigate to the examples/ccloud/beginner-cloud/ directory and switch to the Confluent Platform release branch:

    cd examples/ccloud/beginner-cloud/
    git checkout 4.13.0-post
    
  4. If you want to manually step through the tutorial, which is advised for new users who want to gain familiarity with the Confluent CLI, skip ahead to the next section. Alternatively, you can run the full tutorial end-to-end with the start.sh script, which automates all the steps in the tutorial:

    ./start.sh
    

Create a new Confluent Cloud environment

  1. Run the following command to create a new Confluent Cloud environment ccloud-stack-000000-beginner-cli:

    confluent environment create ccloud-stack-000000-beginner-cli -o json
    
  2. Verify your output resembles:

    {
      "id": "env-5q3z2q",
      "name": "ccloud-stack-000000-beginner-cli"
    }
    

    The value of the environment ID, in this case env-5qz2q, will differ in your output. In this tutorial, the values for certain variables, including your environment ID, Kafka cluster ID, API key, will be unique and will not match the output shown.

  3. Specify env-5qz2q as the active environment by running the following command:

    confluent environment use env-5q3z2q
    
  4. Verify your output resembles:

    Now using "env-5q3z2q" as the default (active) environment.
    

Create a new Confluent Cloud cluster

  1. Run the following command to create a new Confluent Cloud cluster demo-kafka-cluster. It takes up to 5 minutes for the Kafka cluster to be ready.

    confluent kafka cluster create demo-kafka-cluster --cloud aws --region us-west-2
    

    Tip

    You may choose any provider or region from the list generated by running confluent kafka region list.

  2. Verify your output resembles:

    +---------------+---------------------------------------------------------+
    | Id            | lkc-x6m01                                               |
    | Name          | demo-kafka-cluster                                      |
    | Type          | BASIC                                                   |
    | Ingress       |                                                     100 |
    | Egress        |                                                     100 |
    | Storage       | 5 TB                                                    |
    | Provider      | aws                                                     |
    | Availability  | single-zone                                             |
    | Region        | us-west-2                                               |
    | Status        | UP                                                      |
    | Endpoint      | SASL_SSL://pkc-4kgmg.us-west-2.aws.confluent.cloud:9092 |
    | API Endpoint  | https://pkac-ldgj1.us-west-2.aws.confluent.cloud        |
    | REST Endpoint | https://pkc-4kgmg.us-west-2.aws.confluent.cloud:443     |
    +---------------+---------------------------------------------------------+
    

    The value of the Kafka cluster ID, in this case lkc-x6m01, and Kafka cluster endpoint, in this case pkc-4kgmg.us-west-2.aws.confluent.cloud:9092, will differ in your output.

  3. Specify lkc-x6m01 as the active Kafka cluster by running the following command:

    confluent kafka cluster use lkc-x6m01
    
  4. Verify your output resembles:

    Set Kafka cluster "lkc-x6m01" as the active cluster for environment "env-5q3z2q".
    

Create a new API key/secret pair for user

  1. Run the following command to create a user API key and secret for your Kafka cluster lkc-x6m01:

    confluent api-key create --description "Demo credentials" --resource lkc-x6m01 -o json
    
  2. Verify your output resembles:

    {
       "key": "QX7X4VA4DFJTTOIA",
       "secret": "fjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D"
    }
    

    The value of the API key, in this case QX7X4VA4DFJTTOIA, and API secret, in this case fjcDDyr0Nm84zZr77ku/AQqCKQOOmb35Ql68HQnb60VuU+xLKiu/n2UNQ0WYXp/D will differ in your output.

  3. Specify the API key QX7X4VA4DFJTTOIA for the Kafka cluster lkc-x6m01:

    confluent api-key use QX7X4VA4DFJTTOIA --resource lkc-x6m01
    

    Your output should resemble:

    Set the API Key "QX7X4VA4DFJTTOIA" as the active API key for "lkc-x6m01".
    

Produce and consume records with Confluent CLI

  1. Run the following command to create a new Kafka topic demo-topic-1:

    confluent kafka topic create demo-topic-1
    
  2. Start producing to this topic demo-topic-1 by running the following command:

    confluent kafka topic produce demo-topic-1
    
  3. The CLI waits for you to type data at the prompt, so type a few characters each on a new line. For example, type the numbers 1 through 5:

    1
    2
    3
    4
    5
    
  4. Type CTRL-C when you are finished.

  5. Run the following command to consume messages from topic demo-topic-1. The flag -b allows the consumer to read from the beginning of the topic.

    confluent kafka topic consume demo-topic-1 -b
    
  6. Verify your output resembles the following. It is expected to be out of order because of round-robin partitioner:

    Starting Kafka Consumer. ^C or ^D to exit
    1
    3
    5
    2
    4
    
  7. Type CTRL-C to stop the consumer.

Create a new service account with an API key/secret pair

  1. Run the following command to create a new service account:

    confluent iam service-account create demo-app-1 --description "Service account for demo application" -o json
    
  2. Verify your output resembles:

    {
       "id": "sa-123456",
       "name": "demo-app-1",
       "description": "Service account for demo application"
    }
    

    The value of the service account ID, in this case sa-123456, will differ in your output.

  3. Create an API key and secret for the service account sa-123456 for the Kafka cluster lkc-x6m01 by running the following command:

    confluent api-key create --service-account sa-123456 --resource lkc-x6m01 -o json
    
  4. Verify your output resembles:

    {
      "key": "ESN5FSNDHOFFSUEV",
      "secret": "nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB"
    }
    

    The value of the service account’s API key, in this case ESN5FSNDHOFFSUEV, and API secret, in this case nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB, will differ in your output.

  5. Create a local configuration file /tmp/client.config with Confluent Cloud connection information using the newly created Kafka cluster and the API key and secret for the service account. Substitute your values for the bootstrap server and credentials just created.

    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='ESN5FSNDHOFFSUEV' password='nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB';
    
  6. Wait about 90 seconds for the Confluent Cloud cluster to be ready and for the service account credentials to propagate.

Run a Java producer without ACLs

  1. By default, no ACLs are configured for the service account, which means the service account has no access to any Confluent Cloud resources. Run the following command to verify no ACLs are configured:

    confluent kafka acl list --service-account sa-123456
    

    Your output should resemble:

      Principal | Permission | Operation | Resource Type | Resource Name | Pattern Type
    ------------+------------+-----------+---------------+---------------+---------------
    
  2. Compile the Java project at clients/cloud/java

    mvn  -f ../../clients/cloud/java/pom.xml compile
    
  3. Run a Java producer to demo-topic-1 before configuring ACLs (expected to fail). Note that you pass in an argument to /tmp/client.config which has the Confluent Cloud connection information:

    mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerExample" -Dexec.args="/tmp/client.config demo-topic-1" -Dlog4j.configuration=file:log4j.properties > /tmp/log.1 2>&1
    
  4. Verify you see org.apache.kafka.common.errors.TopicAuthorizationException in the log file /tmp/log.1 as shown in the following example (expected because there are no ACLs to allow this client application):

    [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2.1:java (default-cli) on project clients-example: An exception occured while executing the Java class. null: InvocationTargetException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed. -> [Help 1]
    

Run a Java producer with ACLs

  1. Run the following commands to create ACLs for the service account:

    confluent kafka acl create --allow --service-account sa-123456 --operation CREATE --topic demo-topic-1
    confluent kafka acl create --allow --service-account sa-123456 --operation WRITE --topic demo-topic-1
    
  2. Verify your output resembles:

        Principal    | Permission | Operation | Resource Type | Resource Name | Pattern Type
    -----------------+------------+-----------+---------------+---------------+---------------
      User:sa-123456 | ALLOW      | CREATE    | TOPIC         | demo-topic-1  | LITERAL
    
        Principal    | Permission | Operation | Resource Type | Resource Name | Pattern Type
    -----------------+------------+-----------+---------------+---------------+---------------
      User:sa-123456 | ALLOW      | WRITE     | TOPIC         | demo-topic-1  | LITERAL
    
  3. Run the following command and verify the ACLs were configured:

    confluent kafka acl list --service-account sa-123456
    

    Your output should resemble below. Observe that the ACL Type is LITERAL.

        Principal    | Permission | Operation | Resource Type | Resource Name | Pattern Type
    -----------------+------------+-----------+---------------+---------------+---------------
      User:sa-123456 | ALLOW      | CREATE    | TOPIC         | demo-topic-1  | LITERAL
      User:sa-123456 | ALLOW      | WRITE     | TOPIC         | demo-topic-1  | LITERAL
    
  4. Run the Java producer to demo-topic-1 after configuring the ACLs (expected to pass):

    mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerExample" -Dexec.args="/tmp/client.config demo-topic-1" -Dlog4j.configuration=file:log4j.properties > /tmp/log.2 2>&1
    
  5. Verify you see the 10 messages were produced to topic message in the log file /tmp/log.2 as shown in the following example:

    Producing record: alice        {"count":0}
    Producing record: alice        {"count":1}
    Producing record: alice        {"count":2}
    Producing record: alice        {"count":3}
    Producing record: alice        {"count":4}
    Producing record: alice        {"count":5}
    Producing record: alice        {"count":6}
    Producing record: alice        {"count":7}
    Producing record: alice        {"count":8}
    Producing record: alice        {"count":9}
    Produced record to topic demo-topic-1 partition [3] @ offset 0
    Produced record to topic demo-topic-1 partition [3] @ offset 1
    Produced record to topic demo-topic-1 partition [3] @ offset 2
    Produced record to topic demo-topic-1 partition [3] @ offset 3
    Produced record to topic demo-topic-1 partition [3] @ offset 4
    Produced record to topic demo-topic-1 partition [3] @ offset 5
    Produced record to topic demo-topic-1 partition [3] @ offset 6
    Produced record to topic demo-topic-1 partition [3] @ offset 7
    Produced record to topic demo-topic-1 partition [3] @ offset 8
    Produced record to topic demo-topic-1 partition [3] @ offset 9
    10 messages were produced to topic demo-topic-1
    
  6. Delete the ACLs:

    confluent kafka acl delete --allow --service-account sa-123456 --operation CREATE --topic demo-topic-1
    confluent kafka acl delete --allow --service-account sa-123456 --operation WRITE --topic demo-topic-1
    

    You should see two Deleted ACLs. messages.

Run a Java producer with a prefixed ACL

  1. Create a new Kafka topic demo-topic-2:

    confluent kafka topic create demo-topic-2
    

    Verify you see the Created topic "demo-topic-2" message.

  2. Run the following command to create ACLs for the producer using a prefixed ACL which matches any topic that starts with the prefix demo-topic:

    confluent kafka acl create --allow --service-account sa-123456 --operation CREATE --topic demo-topic --prefix
    confluent kafka acl create --allow --service-account sa-123456 --operation WRITE --topic demo-topic --prefix
    
  3. Verify your output resembles:

        Principal    | Permission | Operation | Resource Type | Resource Name | Pattern Type
    -----------------+------------+-----------+---------------+---------------+---------------
      User:sa-123456 | ALLOW      | CREATE    | TOPIC         | demo-topic    | PREFIXED
    
        Principal    | Permission | Operation | Resource Type | Resource Name | Pattern Type
    -----------------+------------+-----------+---------------+---------------+---------------
      User:sa-123456 | ALLOW      | WRITE     | TOPIC         | demo-topic    | PREFIXED
    
  4. Verify the ACLs were configured by running the following command:

    confluent kafka acl list --service-account sa-123456
    

    Your output should resemble below. Observe that the ACL Type is PREFIXED.

        Principal    | Permission | Operation | Resource Type | Resource Name | Pattern Type
    -----------------+------------+-----------+---------------+---------------+---------------
      User:sa-123456 | ALLOW      | WRITE     | TOPIC         | demo-topic    | PREFIXED
      User:sa-123456 | ALLOW      | CREATE    | TOPIC         | demo-topic    | PREFIXED
    
  5. Run the Java producer to demo-topic-2, which should match the newly created prefixed ACLs.

    mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerExample" -Dexec.args="/tmp/client.config demo-topic-2" -Dlog4j.configuration=file:log4j.properties > /tmp/log.3 2>&1
    
  6. Verify you see the 10 messages were produced to topic message in the log file /tmp/log.3 as shown in the following example:

    Producing record: alice   {"count":0}
    Producing record: alice   {"count":1}
    Producing record: alice   {"count":2}
    Producing record: alice   {"count":3}
    Producing record: alice   {"count":4}
    Producing record: alice   {"count":5}
    Producing record: alice   {"count":6}
    Producing record: alice   {"count":7}
    Producing record: alice   {"count":8}
    Producing record: alice   {"count":9}
    Produced record to topic demo-topic-2 partition [3] @ offset 0
    Produced record to topic demo-topic-2 partition [3] @ offset 1
    Produced record to topic demo-topic-2 partition [3] @ offset 2
    Produced record to topic demo-topic-2 partition [3] @ offset 3
    Produced record to topic demo-topic-2 partition [3] @ offset 4
    Produced record to topic demo-topic-2 partition [3] @ offset 5
    Produced record to topic demo-topic-2 partition [3] @ offset 6
    Produced record to topic demo-topic-2 partition [3] @ offset 7
    Produced record to topic demo-topic-2 partition [3] @ offset 8
    Produced record to topic demo-topic-2 partition [3] @ offset 9
    10 messages were produced to topic demo-topic-2
    
  7. Run the following commands to delete ACLs:

    confluent kafka acl delete --allow --service-account sa-123456 --operation CREATE --topic demo-topic --prefix
    confluent kafka acl delete --allow --service-account sa-123456 --operation WRITE --topic demo-topic --prefix
    

    You should see two Deleted ACLs. messages.

Run a fully managed Confluent Cloud connector

  1. Create a new Kafka topic demo-topic-3:

    confluent kafka topic create demo-topic-3
    

    You should see a Created topic "demo-topic-3" message.

  2. Run the following command to allow service account ID sa-123456 to write to any topic:

    confluent kafka acl create --allow --service-account sa-123456 --operation WRITE --topic '*'
    
  3. Verify your output resembles:

        Principal    | Permission | Operation | Resource Type | Resource Name | Pattern Type
    -----------------+------------+-----------+---------------+---------------+---------------
      User:sa-123456 | ALLOW      | WRITE     | TOPIC         | *             | LITERAL
    
  4. Verify the ACLs were configured by running the following command:

    confluent kafka acl list --service-account sa-123456
    

    Your output should resemble:

        Principal    | Permission | Operation | Resource Type | Resource Name | Pattern Type
    -----------------+------------+-----------+---------------+---------------+---------------
      User:sa-123456 | ALLOW      | WRITE     | TOPIC         | *             | LITERAL
    
  5. Create a local configuration file datagen_ccloud_pageviews.json with Confluent Cloud connection information. Substitute your API key and secret for the service account, in the kafka.api.key and kafka.api.secret fields. See below for an example:

    {
        "name" : "datagen_ccloud_pageviews",
        "connector.class": "DatagenSource",
        "kafka.api.key": "ESN5FSNDHOFFSUEV",
        "kafka.api.secret" : "nzBEyC1k7zfLvVON3vhBMQrNRjJR7pdMc2WLVyyPscBhYHkMwP6VpPVDTqhctamB",
        "kafka.topic" : "demo-topic-3",
        "output.data.format" : "JSON",
    
  6. Create a managed connector in Confluent Cloud with the configuration file you made in the previous step using the following commands:

    confluent connect create --config datagen_ccloud_pageviews.json
    

    Your output should resemble:

    Created connector "lcc-qrjxjd" (datagen_ccloud_pageviews).
    
  7. The connector may take up to 5 minutes to provision. Run the following command to check the connector status

    confluent connect list
    

    Your output should resemble the following:

          ID     |           Name            |    Status    |  Type  | Trace
    -------------+---------------------------+--------------+--------+--------
       lcc-zno83 | datagen_ccloud_pageviews  | PROVISIONING | source |
    

    When the Status is RUNNING you may move on to the next step.

Run a Java consumer with a Wildcard ACL

  1. Create ACLs for the consumer using a wildcard by running the following commands:

    confluent kafka acl create --allow --service-account sa-123456 --operation READ --consumer-group demo-beginner-cloud-1
    confluent kafka acl create --allow --service-account sa-123456 --operation READ --topic '*'
    
  2. Verify your output resembles:

        Principal    | Permission | Operation | Resource Type |     Resource Name     | Pattern Type
    -----------------+------------+-----------+---------------+-----------------------+---------------
      User:sa-123456 | ALLOW      | READ      | GROUP         | demo-beginner-cloud-1 | LITERAL
    
        Principal    | Permission | Operation | Resource Type | Resource Name | Pattern Type
    -----------------+------------+-----------+---------------+---------------+---------------
      User:sa-123456 | ALLOW      | READ      | TOPIC         | *             | LITERAL
    
  3. Verify the ACLs were configured by running the following command:

    confluent kafka acl list --service-account sa-123456
    

    Your output should resemble:

        Principal    | Permission | Operation | Resource Type |     Resource Name     | Pattern Type
    -----------------+------------+-----------+---------------+-----------------------+---------------
      User:sa-123456 | ALLOW      | WRITE     | TOPIC         | *                     | LITERAL
      User:sa-123456 | ALLOW      | READ      | TOPIC         | *                     | LITERAL
      User:sa-123456 | ALLOW      | READ      | GROUP         | demo-beginner-cloud-1 | LITERAL
    
  4. Run the Java consumer from demo-topic-3 which is populated by the datagen_ccloud_pageviews connector, and wait 15 seconds for it to complete.

    timeout 15s mvn -q -f ../../clients/cloud/java/pom.xml exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ConsumerExamplePageviews" -Dexec.args="/tmp/client.config demo-topic-3" -Dlog4j.configuration=file:log4j.properties > /tmp/log.4 2>&1
    
  5. Verify you see Consumed record with messages in the log file /tmp/log.4 as shown in the following example:

    Consumed record with key 71 and value {"viewtime":71,"userid":"User_6","pageid":"Page_11"}
    Consumed record with key 51 and value {"viewtime":51,"userid":"User_7","pageid":"Page_24"}
    Consumed record with key 31 and value {"viewtime":31,"userid":"User_7","pageid":"Page_68"}
    Consumed record with key 81 and value {"viewtime":81,"userid":"User_5","pageid":"Page_25"}
    Consumed record with key 41 and value {"viewtime":41,"userid":"User_2","pageid":"Page_88"}
    Consumed record with key 91 and value {"viewtime":91,"userid":"User_2","pageid":"Page_74"}
    
  6. Delete the ACLs by running the following command:

    confluent kafka acl delete --allow --service-account sa-123456 --operation WRITE --topic '*'
    confluent kafka acl delete --allow --service-account sa-123456 --operation READ --consumer-group demo-beginner-cloud-1
    confluent kafka acl delete --allow --service-account sa-123456 --operation READ --topic '*'
    

    You should see Deleted ACLs. messages.

Clean up Confluent Cloud resources

  1. Complete the following steps to delete the managed connector:

    1. Find the connector ID:

      confluent connect list
      

      Which should display a something similar to below. Locate your connector ID, in this case the connector ID is lcc-zno83.

            ID     |           Name            | Status  |  Type  | Trace
      -------------+---------------------------+---------+--------+--------
         lcc-zno83 | datagen_ccloud_pageviews  | RUNNING | source |
      
    2. Delete the connector, referencing the connector ID from the previous step:

      confluent connect delete lcc-zno83
      

      You should see: Deleted connector "lcc-zno83"..

  2. Run the following command to delete the service account:

    confluent iam service-account delete sa-123456
    
  3. Complete the following steps to delete all the Kafka topics:

    1. Delete demo-topic-1:

      confluent kafka topic delete demo-topic-1
      

      You should see: Deleted topic "demo-topic-1".

    2. Delete demo-topic-2:

      confluent kafka topic delete demo-topic-2
      

      You should see: Deleted topic "demo-topic-2".

    3. Delete demo-topic-3:

      confluent kafka topic delete demo-topic-3
      

      You should see: Deleted topic "demo-topic-3".

  4. Run the following command to delete the user API key:

    confluent api-key delete QX7X4VA4DFJTTOIA
    

    Note that the service account API key was deleted when you deleted the service account.

  5. Delete the Kafka cluster:

    confluent kafka cluster delete lkc-x6m01
    
  6. Delete the environment:

    confluent environment delete env-5qz2q
    

    You should see: Deleted environment "env-5qz2q".

If the tutorial ends prematurely, you may receive the following error message when trying to run the example again (confluent environment create ccloud-stack-000000-beginner-cli):

Error: 1 error occurred:
   * error creating account: Account name is already in use

Failed to create environment ccloud-stack-000000-beginner-cli. Please troubleshoot and run again

In this case, run the following script to delete the example’s topics, Kafka cluster, and environment:

./cleanup.sh

Advanced usage

The example script provides variables that allow you to alter the default Kafka cluster name, cloud provider, and region. For example:

CLUSTER_NAME=my-demo-cluster CLUSTER_CLOUD=aws CLUSTER_REGION=us-west-2 ./start.sh

Here are the variables and their default values:

Variable Default
CLUSTER_NAME demo-kafka-cluster
CLUSTER_CLOUD aws
CLUSTER_REGION us-west-2

Additional Resources