Configure and Manage Confluent CLI¶
This topic illustrates how to configure Kafka brokers and clients for on-premises use of produce and consume using the Confluent CLI.
Produce and consume in Confluent Platform using Confluent CLI¶
For Kafka data encryption, the Confluent CLI supports the SSL protocol. For authenticating with Kafka, the Confluent CLI supports the SSL, SASL_SSL/PLAIN (SASL/PLAIN) and SASL_SSL/OAUTHBEARER (SASL/OAUTHBEARER) mechanisms.
Producer and consumer configuration flags¶
For the flags to specify when you produce to a topic using the Confluent CLI, see confluent kafka topic produce. For the consumer configuration flags, see confluent kafka topic consume.
Create a test topic¶
Create a topic for testing the scenarios in the remainder of this topic:
confluent kafka topic create test-topic \
--ca-cert-path <ca-cert-path> \
--url <kafka-url>
Produce and consume with SSL encryption and authentication¶
For more information about using SSL for encryption and authentication in Kafka, see Encryption and Authentication with SSL.
Step 1: Configure brokers¶
Set the following properties in the server.properties
for each Kafka broker.
Configure the listeners for the port:
listeners=<LISTENER_NAME>://kafka1:18091 advertised.listeners=<LISTENER_NAME>://localhost:18091 --- [1] listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SSL --- [2]
- [1] Optionally, specify the advertised listener if it’s different from the listener.
- [2] If you already have a protocol map, you also need to append the
SSL
listener to the protocol map.
Prepare and set the SSL key and certificates. When you use SSL for both encryption and authentication, separate pairs of SSL keys and certificates are required for encryption and authentication.
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234
Enable 2-way authentication for brokers to authenticate the clients using SSL:
ssl.client.auth=required
Step 2: Produce and consume with Confluent CLI¶
The following is an example CLI command to produce to test-topic
:
confluent kafka topic produce test-topic \
--protocol SSL \
--bootstrap ":18091,:18092" \
--ca-location scripts/security/snakeoil-ca-1.crt \
--cert-location <client-certificate>.pem \
--key-location <client-key>.key \
--key-password <password>
- Specify
--protocol SSL
to use SSL to produce and consume. --bootstrap
is the list of hosts that the producer/consumer talks to. The list should be the same as what you configured in Step 1. Hosts should be separated by commas.--ca-location
is the path to the CA certificate verifying the broker’s key. It’s required for SSL verification. For information about setting this flag, see this document.--cert-location
,--key-location
, and--key-password
are optional flags. They can be generated as described in this guide.
Produce and consume with SASL_SSL/PLAIN authentication¶
The SASL_SSL/PLAIN mechanism allows you to use basic authentication with a username and password. It’s a protocol that uses SASL for authentication and SSL for encryption. For a detailed discussion, refer to SASL/PLAIN Overview.
Step 1: Configure brokers¶
Set the following properties in the server.properties
file.
Enable SASL_SSL/PLAIN for each broker:
sasl.enabled.mechanisms=PLAIN
Configure the listener for the port:
listeners=<LISTENER_NAME>://kafka1:19091 advertised.listeners=<LISTENER_NAME>://localhost:19091 --- [1] listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SASL_SSL --- [2]
- [1] Optionally, specify the advertised listener if it’s different from the listener.
- [2] If you have a protocol map, you also need to append the
SASL_SSL
listener name to the protocol map.
Provide Java Authentication Service (JAAS) configuration. For example:
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret" \ user_admin="admin-secret" \ user_alice="alice-secret";
Step 2: Produce and consume with Confluent CLI¶
The following is an example CLI command to produce to test-topic
:
confluent kafka topic produce test-topic \
--protocol SASL_SSL \
--sasl-mechanism PLAIN \
--bootstrap ":19091,:19092" \
--username admin --password secret \
--ca-location scripts/security/snakeoil-ca-1.crt
- Specify
--protocol SASL_SSL
for the SASL_SSL/PLAIN authentication. - Specify
--sasl-mechanism PLAIN
is the mechanism used for SASL_SSL protocol. The default isPLAIN
, so it can be omitted in this scenario. --bootstrap
is the list of hosts that the producer/consumer talks to. The list should be the same as what you configured in Step 1. Hosts should be separated by commas.--username
and--password
are the credentials you have set up in the JAAS configuration. They can be passed as flags, or you could wait for CLI to prompt for it. The second option is more secure.--ca-location
is the path to the CA certificate verifying the broker’s key, and it’s required for SSL verification. For more information about setting up this flag, refer to this document.
Produce and consume with SASL_SSL/OAUTHBEARER authentication¶
The OAUTHBEARER mechanism makes use of the unsecured JSON web tokens for authentication. For security considerations involved, see Configuring OAUTHBEARER.
Step 1: Configure brokers¶
Enable SASL_SSL/OAUTHBEAER for each broker:
sasl.enabled.mechanisms=OAUTHBEARER
Configure the listener for the port:
listeners=<LISTENER_NAME>://kafka1:19091 advertised.listeners=<LISTENER_NAME>://localhost:19091 --- [1] listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SASL_SSL --- [2]
- [1] Optionally, configure the advertised listener if it’s different from the listener.
- [2] If you have a protocol map, also, you also need to append the
SASL_SSL
listener name to the protocol map.
Provide JAAS configuration:
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="admin" publicKeyPath="/tmp/conf/public.pem";
Step 2: Login to Kafka¶
This step is required when using SASL_SSL/OAUTHBEARER as the MDS token will be used as the OAUTHBEARER token for client to connect to brokers.
confluent login --ca-cert-path <ca-cert-path> \
--url https://<host>:<port>
Step 3: Produce and consume with Confluent CLI¶
The following is an example CLI command to produce to test-topic
:
confluent kafka topic produce test-topic \
--protocol SASL_SSL \
--sasl-mechanism OAUTHBEARER \
--bootstrap ":19091,:19092" \
--ca-location scripts/security/snakeoil-ca-1.crt
- Specify
--protocol SASL_SSL
to use the SASL_SSL/OAUTHBEAER authentication. - Specify
--sasl-mechanism OAUTHBEARER
to enable the OAUTHBEARER mechanism. --bootstrap
is the list of hosts that the producer/consumer talks to. This list should be the same as what you configured in Step 1. Hosts should be separated by commas.--ca-location
is the path to the CA certificate verifying the broker’s key, and it’s required for SSL verification. For more details about setting up this flag, see this document.
Produce and consume with Schema Registry¶
Confluent CLI supports producing and consuming with Schema Registry functionalities. You can register a schema with a local file as you produce (writing data in that schema), and then read data from the schema as you consume.
In the Schema Registry cluster you registered, retrieve the following information the CLI requires:
- Schema Registry endpoint.
- The value format.
- A local schema file.
When using Schema Registry, you must log in to Kafka as the MDS token will be used to authenticate the Schema Registry client:
confluent login --ca-cert-path <ca-cert-path> \ --url https://<host>:<port>
Produce and consume using the Confluent CLI commands.
An example CLI command to produce to
test-topic
:confluent kafka topic produce test-topic \ --protocol SASL_SSL \ --bootstrap ":19091" \ --username admin --password secret \ --value-format avro \ --schema ~/schema.avsc \ --sr-endpoint https://localhost:8085 \ --ca-location scripts/security/snakeoil-ca-1.crt
An example CLI command to consume from
test-topic
:confluent kafka topic consume test-topic -b \ --protocol SASL_SSL \ --bootstrap ":19091" \ --username admin --password secret \ --value-format avro \ --sr-endpoint https://localhost:8085 \ --ca-location scripts/security/snakeoil-ca-1.crt
--schema
is the path to your local schema file.- Specify
--value-format
according to the format of the schema file:avro
,json
orprotobuf
. When later consuming, it should also be set to the same value. --sr-endpoint
is the endpoint to the Schema Registry cluster.--ca-location
is required flag when working with schemas. It’s used to authenticate the Schema Registry client. It might be the same file that you use for SSL verification.
Troubleshooting¶
To debug producing and consuming, add the --debug
flag in the command to see
more information. The following options are available:
broker
topic
msg
protocol
consumer
cgrp
fetch
To use more than one option, separate the options with commas.
Issue: Unsupported protocol/mechanism¶
Check your broker configuration. SASL mechanism only applies when the security
protocol is set to SASL_SSL
.
Issue: Failed to resolve listener¶
Check the listener configuration. When using Docker, expose the Kafka ports in the Docker compose file. For example:
ports:
- 18091:18091
For more details, see this article.
Issue: No bootstrap.servers
configured¶
A bootstrap server is required for producing and consuming messages. Check if it is set correctly.
Issue: Some messages are missing / not properly produced to the channel¶
Check if you have configured all the brokers. If not, some messages might be produced to an offset of a broker that’s not exposed and become non-consumable.
When setting the listener list and the protocol map, check if you have configured the brokers using the same listener name.