Configure and Manage Confluent CLI

This topic illustrates how to configure Kafka brokers and clients for on-premises use of produce and consume using the Confluent CLI.

Produce and consume in Confluent Platform using Confluent CLI

For Kafka data encryption, the Confluent CLI supports the SSL protocol. For authenticating with Kafka, the Confluent CLI supports the SSL, SASL_SSL/PLAIN (SASL/PLAIN) and SASL_SSL/OAUTHBEARER (SASL/OAUTHBEARER) mechanisms.

Producer and consumer configuration flags

For the flags to specify when you produce to a topic using the Confluent CLI, see confluent kafka topic produce. For the consumer configuration flags, see confluent kafka topic consume.

Create a test topic

Create a topic for testing the scenarios in the remainder of this topic:

confluent kafka topic create test-topic \
  --ca-cert-path <ca-cert-path> \
  --url <kafka-url>

Produce and consume with SSL encryption and authentication

For more information about using SSL for encryption and authentication in Kafka, see Encryption and Authentication with SSL.

Step 1: Configure brokers

Set the following properties in the server.properties for each Kafka broker.

  1. Configure the listeners for the port:

    listeners=<LISTENER_NAME>://kafka1:18091
    advertised.listeners=<LISTENER_NAME>://localhost:18091            --- [1]
    listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SSL --- [2]
    
    • [1] Optionally, specify the advertised listener if it’s different from the listener.
    • [2] If you already have a protocol map, you also need to append the SSL listener to the protocol map.
  2. Prepare and set the SSL key and certificates. When you use SSL for both encryption and authentication, separate pairs of SSL keys and certificates are required for encryption and authentication.

    ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    
  3. Enable 2-way authentication for brokers to authenticate the clients using SSL:

    ssl.client.auth=required
    

Step 2: Produce and consume with Confluent CLI

The following is an example CLI command to produce to test-topic:

confluent kafka topic produce test-topic \
  --protocol SSL \
  --bootstrap ":18091,:18092" \
  --ca-location scripts/security/snakeoil-ca-1.crt \
  --cert-location <client-certificate>.pem \
  --key-location <client-key>.key \
  --key-password <password>
  • Specify --protocol SSL to use SSL to produce and consume.
  • --bootstrap is the list of hosts that the producer/consumer talks to. The list should be the same as what you configured in Step 1. Hosts should be separated by commas.
  • --ca-location is the path to the CA certificate verifying the broker’s key. It’s required for SSL verification. For information about setting this flag, see this document.
  • --cert-location, --key-location, and --key-password are optional flags. They can be generated as described in this guide.

Produce and consume with SASL_SSL/PLAIN authentication

The SASL_SSL/PLAIN mechanism allows you to use basic authentication with a username and password. It’s a protocol that uses SASL for authentication and SSL for encryption. For a detailed discussion, refer to SASL/PLAIN Overview.

Step 1: Configure brokers

Set the following properties in the server.properties file.

  1. Enable SASL_SSL/PLAIN for each broker:

    sasl.enabled.mechanisms=PLAIN
    
  2. Configure the listener for the port:

    listeners=<LISTENER_NAME>://kafka1:19091
    advertised.listeners=<LISTENER_NAME>://localhost:19091                 --- [1]
    listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SASL_SSL --- [2]
    
    • [1] Optionally, specify the advertised listener if it’s different from the listener.
    • [2] If you have a protocol map, you also need to append the SASL_SSL listener name to the protocol map.
  3. Provide Java Authentication Service (JAAS) configuration. For example:

    listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="admin" \
        password="admin-secret" \
        user_admin="admin-secret" \
        user_alice="alice-secret";
    

Step 2: Produce and consume with Confluent CLI

The following is an example CLI command to produce to test-topic:

confluent kafka topic produce test-topic \
  --protocol SASL_SSL \
  --sasl-mechanism PLAIN \
  --bootstrap ":19091,:19092" \
  --username admin --password secret \
  --ca-location scripts/security/snakeoil-ca-1.crt
  • Specify --protocol SASL_SSL for the SASL_SSL/PLAIN authentication.
  • Specify --sasl-mechanism PLAIN is the mechanism used for SASL_SSL protocol. The default is PLAIN, so it can be omitted in this scenario.
  • --bootstrap is the list of hosts that the producer/consumer talks to. The list should be the same as what you configured in Step 1. Hosts should be separated by commas.
  • --username and --password are the credentials you have set up in the JAAS configuration. They can be passed as flags, or you could wait for CLI to prompt for it. The second option is more secure.
  • --ca-location is the path to the CA certificate verifying the broker’s key, and it’s required for SSL verification. For more information about setting up this flag, refer to this document.

Produce and consume with SASL_SSL/OAUTHBEARER authentication

The OAUTHBEARER mechanism makes use of the unsecured JSON web tokens for authentication. For security considerations involved, see Configuring OAUTHBEARER.

Step 1: Configure brokers

  1. Enable SASL_SSL/OAUTHBEAER for each broker:

    sasl.enabled.mechanisms=OAUTHBEARER
    
  2. Configure the listener for the port:

    listeners=<LISTENER_NAME>://kafka1:19091
    advertised.listeners=<LISTENER_NAME>://localhost:19091                 --- [1]
    listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SASL_SSL --- [2]
    
    • [1] Optionally, configure the advertised listener if it’s different from the listener.
    • [2] If you have a protocol map, also, you also need to append the SASL_SSL listener name to the protocol map.
  3. Provide JAAS configuration:

    listener.name.sasl_ssl.oauthbearer.sasl.jaas.config =
      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
      unsecuredLoginStringClaim_sub="admin"
      publicKeyPath="/tmp/conf/public.pem";
    

Step 2: Login to Kafka

This step is required when using SASL_SSL/OAUTHBEARER as the MDS token will be used as the OAUTHBEARER token for client to connect to brokers.

confluent login --ca-cert-path <ca-cert-path> \
  --url https://<host>:<port>

Step 3: Produce and consume with Confluent CLI

The following is an example CLI command to produce to test-topic:

confluent kafka topic produce test-topic \
  --protocol SASL_SSL \
  --sasl-mechanism OAUTHBEARER \
  --bootstrap ":19091,:19092" \
  --ca-location scripts/security/snakeoil-ca-1.crt
  • Specify --protocol SASL_SSL to use the SASL_SSL/OAUTHBEAER authentication.
  • Specify --sasl-mechanism OAUTHBEARER to enable the OAUTHBEARER mechanism.
  • --bootstrap is the list of hosts that the producer/consumer talks to. This list should be the same as what you configured in Step 1. Hosts should be separated by commas.
  • --ca-location is the path to the CA certificate verifying the broker’s key, and it’s required for SSL verification. For more details about setting up this flag, see this document.

Produce and consume with Schema Registry

Confluent CLI supports producing and consuming with Schema Registry functionalities. You can register a schema with a local file as you produce (writing data in that schema), and then read data from the schema as you consume.

  1. In the Schema Registry cluster you registered, retrieve the following information the CLI requires:

    • Schema Registry endpoint.
    • The value format.
    • A local schema file.
  2. When using Schema Registry, you must log in to Kafka as the MDS token will be used to authenticate the Schema Registry client:

    confluent login --ca-cert-path <ca-cert-path> \
      --url https://<host>:<port>
    
  3. Produce and consume using the Confluent CLI commands.

    An example CLI command to produce to test-topic:

    confluent kafka topic produce test-topic \
      --protocol SASL_SSL \
      --bootstrap ":19091" \
      --username admin --password secret \
      --value-format avro \
      --schema ~/schema.avsc \
      --sr-endpoint https://localhost:8085 \
      --ca-location scripts/security/snakeoil-ca-1.crt
    

    An example CLI command to consume from test-topic:

    confluent kafka topic consume test-topic -b \
      --protocol SASL_SSL \
      --bootstrap ":19091" \
      --username admin --password secret \
      --value-format avro \
      --sr-endpoint https://localhost:8085 \
      --ca-location scripts/security/snakeoil-ca-1.crt
    
    • --schema is the path to your local schema file.
    • Specify --value-format according to the format of the schema file: avro, json or protobuf. When later consuming, it should also be set to the same value.
    • --sr-endpoint is the endpoint to the Schema Registry cluster.
    • --ca-location is required flag when working with schemas. It’s used to authenticate the Schema Registry client. It might be the same file that you use for SSL verification.

Troubleshooting

To debug producing and consuming, add the --debug flag in the command to see more information. The following options are available:

  • broker
  • topic
  • msg
  • protocol
  • consumer
  • cgrp
  • fetch

To use more than one option, separate the options with commas.

Issue: Unsupported protocol/mechanism

Check your broker configuration. SASL mechanism only applies when the security protocol is set to SASL_SSL.

Issue: Failed to resolve listener

Check the listener configuration. When using Docker, expose the Kafka ports in the Docker compose file. For example:

ports:
- 18091:18091

For more details, see this article.

Issue: No bootstrap.servers configured

A bootstrap server is required for producing and consuming messages. Check if it is set correctly.

Issue: Some messages are missing / not properly produced to the channel

Check if you have configured all the brokers. If not, some messages might be produced to an offset of a broker that’s not exposed and become non-consumable.

When setting the listener list and the protocol map, check if you have configured the brokers using the same listener name.