Produce and Consume in Confluent Platform using Confluent CLI
This guide illustrates how to configure Kafka brokers and clients for on-prem use of produce and consume using the Confluent CLI.
For Kafka data encryption, the Confluent CLI supports the SSL protocol.
For authenticating with Kafka, the Confluent CLI supports the SSL, SASL_SSL/PLAIN (a.k.a. SASL/PLAIN) and SASL_SSL/OAUTHBEARER (a.k.a. SASL/OAUTHBEARER) mechanisms.
Producer and consumer configuration flags
The following are the flags you specify in the  confluent kafka topic
produce and confluent kafka topic consume commands when you  produce to a
topic or consume from a topic using Confluent CLI.
| Flag | Accepted Values | Default | Description | 
|---|---|---|---|
| 
 | Any | None | The list of broker hosts. | 
| 
 | 
 | 
 | The security protocol used to communicate. This should match the broker configuration. | 
| 
 | 
 | 
 | SASL mechanism used for authentication. | 
| 
 | Any | None | SASL username when using the PLAIN authentication. | 
| 
 | Any | None | SASL password when using the PLAIN authentication. | 
| 
 | Any | None | The path to CA certificates used for verifying broker’s key. | 
| 
 | Any | None | The path to the client’s public key used for the SSL authentication. | 
| 
 | Any | None | The path to the client’s private key used for the SSL authentication. | 
| 
 | Any | None | The private key passphrase for the SSL authentication. | 
Create a test topic
Create a topic for testing the scenarios in the remainder of this topic:
confluent kafka topic create test-topic \
  --ca-cert-path <ca-cert-path> \
  --url <kafka-url>
Produce and consume with SSL encryption and authentication
For more information about using SSL for encryption and authentication in Kafka, see Encryption and Authentication with SSL.
Step 1: Configure brokers
Set the following properties in the server.properties for each Kafka broker.
- Configure the listeners for the port: - listeners=<LISTENER_NAME>://kafka1:18091 advertised.listeners=<LISTENER_NAME>://localhost:18091 --- [1] listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SSL --- [2] - [1] Optionally, specify the advertised listener if it’s different from the listener. 
- [2] If you already have a protocol map, you also need to append the - SSLlistener to the protocol map.
 
- Prepare and set the SSL key and certificates. When you use SSL for both encryption and authentication, separate pairs of SSL keys and certificates are required for encryption and authentication. - ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 
- Enable 2-way authentication for brokers to authenticate the clients using SSL: - ssl.client.auth=required 
Step 2: Produce and consume with Confluent CLI
The following is an example CLI command to produce to test-topic:
confluent kafka topic produce test-topic \
  --protocol SSL \
  --bootstrap ":18091,:18092" \
  --ca-location scripts/security/snakeoil-ca-1.crt \
  --cert-location <client-certificate>.pem \
  --key-location <client-key>.key \
  --key-password <password>
- Specify - --protocol SSLto use SSL to produce and consume.
- --bootstrapis the list of hosts that the producer/consumer talks to. The list should be the same as what you configured in Step 1. Hosts should be separated by commas.
- --ca-locationis the path to the CA certificate verifying the broker’s key. It’s required for SSL verification. For information about setting this flag, see this document.
- --cert-location,- --key-location, and- --key-passwordare optional flags. They can be generated as described in this guide.
Produce and consume with SASL_SSL/PLAIN authentication
The SASL_SSL/PLAIN mechanism allows you to use basic authentication with a username and password. It’s a protocol that uses SASL for authentication and SSL for encryption. For a detailed discussion, refer to SASL/PLAIN Overview.
Step 1: Configure brokers
Set the following properties in the server.properties file.
- Enable SASL_SSL/PLAIN for each broker: - sasl.enabled.mechanisms=PLAIN 
- Configure the listener for the port: - listeners=<LISTENER_NAME>://kafka1:19091 advertised.listeners=<LISTENER_NAME>://localhost:19091 --- [1] listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SASL_SSL --- [2] - [1] Optionally, specify the advertised listener if it’s different from the listener. 
- [2] If you have a protocol map, you also need to append the - SASL_SSLlistener name to the protocol map.
 
- Provide Java Authentication Service (JAAS) configuration. For example: - listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret" \ user_admin="admin-secret" \ user_alice="alice-secret"; 
Step 2: Produce and consume with Confluent CLI
The following is an example CLI command to produce to test-topic:
confluent kafka topic produce test-topic \
  --protocol SASL_SSL \
  --sasl-mechanism PLAIN \
  --bootstrap ":19091,:19092" \
  --username admin --password secret \
  --ca-location scripts/security/snakeoil-ca-1.crt
- Specify - --protocol SASL_SSLfor the SASL_SSL/PLAIN authentication.
- Specify - --sasl-mechanism PLAINis the mechanism used for SASL_SSL protocol. The default is- PLAIN, so it can be omitted in this scenario.
- --bootstrapis the list of hosts that the producer/consumer talks to. The list should be the same as what you configured in Step 1. Hosts should be separated by commas.
- --usernameand- --passwordare the credentials you have set up in the JAAS configuration. They can be passed as flags, or you could wait for CLI to prompt for it. The second option is more secure.
- --ca-locationis the path to the CA certificate verifying the broker’s key, and it’s required for SSL verification. For more information about setting up this flag, refer to this document.
Produce and consume with SASL_SSL/OAUTHBEARER authentication
The OAUTHBEARER mechanism makes use of the unsecured JSON web tokens for authentication. For security considerations involved, refer to Configuring OAUTHBEARER.
Step 1: Configure brokers
- Enable SASL_SSL/OAUTHBEAER for each broker: - sasl.enabled.mechanisms=OAUTHBEARER 
- Configure the listener for the port: - listeners=<LISTENER_NAME>://kafka1:19091 advertised.listeners=<LISTENER_NAME>://localhost:19091 --- [1] listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SASL_SSL --- [2] - [1] Optionally, configure the advertised listener if it’s different from the listener. 
- [2] If you have a protocol map, also, you also need to append the - SASL_SSLlistener name to the protocol map.
 
- Provide JAAS configuration: - listener.name.sasl_ssl.oauthbearer.sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="admin" publicKeyPath="/tmp/conf/public.pem"; 
Step 2: Login to Kafka
This step is required when using SASL_SSL/OAUTHBEARER as the MDS token will be used as the OAUTHBEARER token for client to connect to brokers.
confluent login --ca-cert-path <ca-cert-path> \
  --url https://<host>:<port>
Step 3: Produce and consume with Confluent CLI
The following is an example CLI command to produce to test-topic:
confluent kafka topic produce test-topic \
  --protocol SASL_SSL \
  --sasl-mechanism OAUTHBEARER \
  --bootstrap ":19091,:19092" \
  --ca-location scripts/security/snakeoil-ca-1.crt
- Specify - --protocol SASL_SSLto use the SASL_SSL/OAUTHBEAER authentication.
- Specify - --sasl-mechanism OAUTHBEARERto enable the OAUTHBEARER mechanism.
- --bootstrapis the list of hosts that the producer/consumer talks to. This list should be the same as what you configured in Step 1. Hosts should be separated by commas.
- --ca-locationis the path to the CA certificate verifying the broker’s key, and it’s required for SSL verification. For more details about setting up this flag, see this document.
Produce and consume with Schema Registry
Confluent CLI supports producing and consuming with Schema Registry functionalities. You can register a schema with a local file as you produce, writing data in that schema. And you can read data from the schema as you consume.
- In the Schema Registry cluster you registered, get the following information that the Confluent CLI needs: - Schema Registry endpoint 
- The value format 
- A local schema file 
 
- When using Schema Registry, you must log in to Kafka as the MDS token will be used to authenticate the Schema Registry client: - confluent login --ca-cert-path <ca-cert-path> \ --url https://<host>:<port> 
- Produce and consume using the Confluent CLI commands. - An example CLI command to produce to - test-topic:- confluent kafka topic produce test-topic \ --protocol SASL_SSL \ --bootstrap ":19091" \ --username admin --password secret \ --value-format avro \ --schema ~/schema.avsc \ --sr-endpoint https://localhost:8085 \ --ca-location scripts/security/snakeoil-ca-1.crt - An example CLI command to consume from - test-topic:- confluent kafka topic consume test-topic -b \ --protocol SASL_SSL \ --bootstrap ":19091" \ --username admin --password secret \ --value-format avro \ --sr-endpoint https://localhost:8085 \ --ca-location scripts/security/snakeoil-ca-1.crt - --schemais the path to your local schema file.
- Specify - --value-formataccording to the format of the schema file:- avro,- jsonor- protobuf. When later consuming, it should also be set to the same value.
- --sr-endpointis the endpoint to the Schema Registry cluster.
- --ca-locationis required flag when working with schemas. It’s used to authenticate the Schema Registry client. It might be the same file that you use for SSL verification.
 
Troubleshooting
To debug producing and consuming, you may add --debug flag in the command to see
more info. Available options are: broker, topic, msg, protocol,
consumer, cgrp, fetch. Separate multiple options with commas.
- Issue: Unsupported protocol/mechanism
- Check your broker configuration. SASL mechanism only applies when the security protocol is set to SASL_SSL. 
- Issue: Failed to resolve listener
- Check the listener configuration. - When using Docker, expose the Kafka ports in the the Docker compose file. - For example: - ports: - 18091:18091 - Refer to this article for more details. 
- Issue: No bootstrap.serversconfigured
- A bootstrap server is required for producing and consuming messages. Check if it is correctly set. 
- Issue: Some messages are missing / not properly produced to the channel
- Check if you have configured all the brokers. If not, some messages could be produced to an offset of a broker that’s not exposed and become not consume-able. - When setting the listener list and the protocol map, check if you have configured the brokers using the same listener name.