Produce and Consume in Confluent Platform using Confluent CLI

This guide illustrates how to configure Kafka brokers and clients for on-prem use of produce and consume using the Confluent CLI.

For Kafka data encryption, the Confluent CLI supports the SSL protocol.

For authenticating with Kafka, the Confluent CLI supports the SSL, SASL_SSL/PLAIN (a.k.a. SASL/PLAIN) and SASL_SSL/OAUTHBEARER (a.k.a. SASL/OAUTHBEARER) mechanisms.

Producer and consumer configuration flags

The following are the flags you specify in the confluent kafka topic produce and confluent kafka topic consume commands when you produce to a topic or consume from a topic using Confluent CLI.

Flag Accepted Values Default Description
bootstrap Any None The list of broker hosts.
protocol SSL, SASL_SSL SSL The security protocol used to communicate. This should match the broker configuration.
sasl-mechanism PLAIN, OAUTHBEARER PLAIN SASL mechanism used for authentication.
username Any None SASL username when using the PLAIN authentication.
password Any None SASL password when using the PLAIN authentication.
ca-location Any None The path to CA certificates used for verifying broker’s key.
cert-location Any None The path to the client’s public key used for the SSL authentication.
key-location Any None The path to the client’s private key used for the SSL authentication.
key-password Any None The private key passphrase for the SSL authentication.

Create a test topic

Create a topic for testing the scenarios in the remainder of this topic:

confluent kafka topic create test-topic \
  --ca-cert-path <ca-cert-path> \
  --url <kafka-url>

Produce and consume with SSL encryption and authentication

For more information about using SSL for encryption and authentication in Kafka, see Encryption and Authentication with SSL.

Step 1: Configure brokers

Set the following properties in the server.properties for each Kafka broker.

  1. Configure the listeners for the port:

    listeners=<LISTENER_NAME>://kafka1:18091
    advertised.listeners=<LISTENER_NAME>://localhost:18091            --- [1]
    listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SSL --- [2]
    
    • [1] Optionally, specify the advertised listener if it’s different from the listener.
    • [2] If you already have a protocol map, you also need to append the SSL listener to the protocol map.
  2. Prepare and set the SSL key and certificates. When you use SSL for both encryption and authentication, separate pairs of SSL keys and certificates are required for encryption and authentication.

    ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    
  3. Enable 2-way authentication for brokers to authenticate the clients using SSL:

    ssl.client.auth=required
    

Step 2: Produce and consume with Confluent CLI

The following is an example CLI command to produce to test-topic:

confluent kafka topic produce test-topic \
  --protocol SSL \
  --bootstrap ":18091,:18092" \
  --ca-location scripts/security/snakeoil-ca-1.crt \
  --cert-location <client-certificate>.pem \
  --key-location <client-key>.key \
  --key-password <password>
  • Specify --protocol SSL to use SSL to produce and consume.
  • --bootstrap is the list of hosts that the producer/consumer talks to. The list should be the same as what you configured in Step 1. Hosts should be separated by commas.
  • --ca-location is the path to the CA certificate verifying the broker’s key. It’s required for SSL verification. For information about setting this flag, see this document.
  • --cert-location, --key-location, and --key-password are optional flags. They can be generated as described in this guide.

Produce and consume with SASL_SSL/PLAIN authentication

The SASL_SSL/PLAIN mechanism allows you to use basic authentication with a username and password. It’s a protocol that uses SASL for authentication and SSL for encryption. For a detailed discussion, refer to SASL/PLAIN Overview.

Step 1: Configure brokers

Set the following properties in the server.properties file.

  1. Enable SASL_SSL/PLAIN for each broker:

    sasl.enabled.mechanisms=PLAIN
    
  2. Configure the listener for the port:

    listeners=<LISTENER_NAME>://kafka1:19091
    advertised.listeners=<LISTENER_NAME>://localhost:19091                 --- [1]
    listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SASL_SSL --- [2]
    
    • [1] Optionally, specify the advertised listener if it’s different from the listener.
    • [2] If you have a protocol map, you also need to append the SASL_SSL listener name to the protocol map.
  3. Provide Java Authentication Service (JAAS) configuration. For example:

    listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="admin" \
        password="admin-secret" \
        user_admin="admin-secret" \
        user_alice="alice-secret";
    

Step 2: Produce and consume with Confluent CLI

The following is an example CLI command to produce to test-topic:

confluent kafka topic produce test-topic \
  --protocol SASL_SSL \
  --sasl-mechanism PLAIN \
  --bootstrap ":19091,:19092" \
  --username admin --password secret \
  --ca-location scripts/security/snakeoil-ca-1.crt
  • Specify --protocol SASL_SSL for the SASL_SSL/PLAIN authentication.
  • Specify --sasl-mechanism PLAIN is the mechanism used for SASL_SSL protocol. The default is PLAIN, so it can be omitted in this scenario.
  • --bootstrap is the list of hosts that the producer/consumer talks to. The list should be the same as what you configured in Step 1. Hosts should be separated by commas.
  • --username and --password are the credentials you have set up in the JAAS configuration. They can be passed as flags, or you could wait for CLI to prompt for it. The second option is more secure.
  • --ca-location is the path to the CA certificate verifying the broker’s key, and it’s required for SSL verification. For more information about setting up this flag, refer to this document.

Produce and consume with SASL_SSL/OAUTHBEARER authentication

The OAUTHBEARER mechanism makes use of the unsecured JSON web tokens for authentication. For security considerations involved, refer to Configuring OAUTHBEARER.

Step 1: Configure brokers

  1. Enable SASL_SSL/OAUTHBEAER for each broker:

    sasl.enabled.mechanisms=OAUTHBEARER
    
  2. Configure the listener for the port:

    listeners=<LISTENER_NAME>://kafka1:19091
    advertised.listeners=<LISTENER_NAME>://localhost:19091                 --- [1]
    listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SASL_SSL --- [2]
    
    • [1] Optionally, configure the advertised listener if it’s different from the listener.
    • [2] If you have a protocol map, also, you also need to append the SASL_SSL listener name to the protocol map.
  3. Provide JAAS configuration:

    listener.name.sasl_ssl.oauthbearer.sasl.jaas.config =
      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
      unsecuredLoginStringClaim_sub="admin"
      publicKeyPath="/tmp/conf/public.pem";
    

Step 2: Login to Kafka

This step is required when using SASL_SSL/OAUTHBEARER as the MDS token will be used as the OAUTHBEARER token for client to connect to brokers.

confluent login --ca-cert-path <ca-cert-path> \
  --url https://<host>:<port>

Step 3: Produce and consume with Confluent CLI

The following is an example CLI command to produce to test-topic:

confluent kafka topic produce test-topic \
  --protocol SASL_SSL \
  --sasl-mechanism OAUTHBEARER \
  --bootstrap ":19091,:19092" \
  --ca-location scripts/security/snakeoil-ca-1.crt
  • Specify --protocol SASL_SSL to use the SASL_SSL/OAUTHBEAER authentication.
  • Specify --sasl-mechanism OAUTHBEARER to enable the OAUTHBEARER mechanism.
  • --bootstrap is the list of hosts that the producer/consumer talks to. This list should be the same as what you configured in Step 1. Hosts should be separated by commas.
  • --ca-location is the path to the CA certificate verifying the broker’s key, and it’s required for SSL verification. For more details about setting up this flag, see this document.

Produce and consume with Schema Registry

Confluent CLI supports producing and consuming with Schema Registry functionalities. You can register a schema with a local file as you produce, writing data in that schema. And you can read data from the schema as you consume.

  1. In the Schema Registry cluster you registered, get the following information that the Confluent CLI needs:

    • Schema Registry endpoint
    • The value format
    • A local schema file
  2. When using Schema Registry, you must log in to Kafka as the MDS token will be used to authenticate the Schema Registry client:

    confluent login --ca-cert-path <ca-cert-path> \
      --url https://<host>:<port>
    
  3. Produce and consume using the Confluent CLI commands.

    An example CLI command to produce to test-topic:

    confluent kafka topic produce test-topic \
      --protocol SASL_SSL \
      --bootstrap ":19091" \
      --username admin --password secret \
      --value-format avro \
      --schema ~/schema.avsc \
      --sr-endpoint https://localhost:8085 \
      --ca-location scripts/security/snakeoil-ca-1.crt
    

    An example CLI command to consume from test-topic:

    confluent kafka topic consume test-topic -b \
      --protocol SASL_SSL \
      --bootstrap ":19091" \
      --username admin --password secret \
      --value-format avro \
      --sr-endpoint https://localhost:8085 \
      --ca-location scripts/security/snakeoil-ca-1.crt
    
    • --schema is the path to your local schema file.
    • Specify --value-format according to the format of the schema file: avro, json or protobuf. When later consuming, it should also be set to the same value.
    • --sr-endpoint is the endpoint to the Schema Registry cluster.
    • --ca-location is required flag when working with schemas. It’s used to authenticate the Schema Registry client. It might be the same file that you use for SSL verification.

Troubleshooting

To debug producing and consuming, you may add --debug flag in the command to see more info. Available options are: broker, topic, msg, protocol, consumer, cgrp, fetch. Separate multiple options with commas.

Issue: Unsupported protocol/mechanism
Check your broker configuration. SASL mechanism only applies when the security protocol is set to SASL_SSL.
Issue: Failed to resolve listener

Check the listener configuration.

When using Docker, expose the Kafka ports in the the Docker compose file.

For example:

ports:
- 18091:18091

Refer to this article for more details.

Issue: No bootstrap.servers configured
A bootstrap server is required for producing and consuming messages. Check if it is correctly set.
Issue: Some messages are missing / not properly produced to the channel

Check if you have configured all the brokers. If not, some messages could be produced to an offset of a broker that’s not exposed and become not consume-able.

When setting the listener list and the protocol map, check if you have configured the brokers using the same listener name.