Produce and Consume in Confluent Platform using Confluent CLI¶
This guide illustrates how to configure Kafka brokers and clients for on-prem use of produce and consume using the Confluent CLI.
For Kafka data encryption, the Confluent CLI supports the SSL protocol.
For authenticating with Kafka, the Confluent CLI supports the SSL, SASL_SSL/PLAIN (a.k.a. SASL/PLAIN) and SASL_SSL/OAUTHBEARER (a.k.a. SASL/OAUTHBEARER) mechanisms.
Producer and consumer configuration flags¶
The following are the flags you specify in the confluent kafka topic
produce
and confluent kafka topic consume
commands when you produce to a
topic or consume from a topic using Confluent CLI.
Flag | Accepted Values | Default | Description |
---|---|---|---|
bootstrap |
Any | None | The list of broker hosts. |
protocol |
SSL , SASL_SSL |
SSL |
The security protocol used to communicate. This should match the broker configuration. |
sasl-mechanism |
PLAIN , OAUTHBEARER |
PLAIN |
SASL mechanism used for authentication. |
username |
Any | None | SASL username when using the PLAIN authentication. |
password |
Any | None | SASL password when using the PLAIN authentication. |
ca-location |
Any | None | The path to CA certificates used for verifying broker’s key. |
cert-location |
Any | None | The path to the client’s public key used for the SSL authentication. |
key-location |
Any | None | The path to the client’s private key used for the SSL authentication. |
key-password |
Any | None | The private key passphrase for the SSL authentication. |
Create a test topic¶
Create a topic for testing the scenarios in the remainder of this topic:
confluent kafka topic create test-topic \
--ca-cert-path <ca-cert-path> \
--url <kafka-url>
Produce and consume with SSL encryption and authentication¶
For more information about using SSL for encryption and authentication in Kafka, see Encryption and Authentication with SSL.
Step 1: Configure brokers¶
Set the following properties in the server.properties
for each Kafka broker.
Configure the listeners for the port:
listeners=<LISTENER_NAME>://kafka1:18091 advertised.listeners=<LISTENER_NAME>://localhost:18091 --- [1] listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SSL --- [2]
- [1] Optionally, specify the advertised listener if it’s different from the listener.
- [2] If you already have a protocol map, you also need to append the
SSL
listener to the protocol map.
Prepare and set the SSL key and certificates. When you use SSL for both encryption and authentication, separate pairs of SSL keys and certificates are required for encryption and authentication.
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234
Enable 2-way authentication for brokers to authenticate the clients using SSL:
ssl.client.auth=required
Step 2: Produce and consume with Confluent CLI¶
The following is an example CLI command to produce to test-topic
:
confluent kafka topic produce test-topic \
--protocol SSL \
--bootstrap ":18091,:18092" \
--ca-location scripts/security/snakeoil-ca-1.crt \
--cert-location <client-certificate>.pem \
--key-location <client-key>.key \
--key-password <password>
- Specify
--protocol SSL
to use SSL to produce and consume. --bootstrap
is the list of hosts that the producer/consumer talks to. The list should be the same as what you configured in Step 1. Hosts should be separated by commas.--ca-location
is the path to the CA certificate verifying the broker’s key. It’s required for SSL verification. For information about setting this flag, see this document.--cert-location
,--key-location
, and--key-password
are optional flags. They can be generated as described in this guide.
Produce and consume with SASL_SSL/PLAIN authentication¶
The SASL_SSL/PLAIN mechanism allows you to use basic authentication with a username and password. It’s a protocol that uses SASL for authentication and SSL for encryption. For a detailed discussion, refer to SASL/PLAIN Overview.
Step 1: Configure brokers¶
Set the following properties in the server.properties
file.
Enable SASL_SSL/PLAIN for each broker:
sasl.enabled.mechanisms=PLAIN
Configure the listener for the port:
listeners=<LISTENER_NAME>://kafka1:19091 advertised.listeners=<LISTENER_NAME>://localhost:19091 --- [1] listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SASL_SSL --- [2]
- [1] Optionally, specify the advertised listener if it’s different from the listener.
- [2] If you have a protocol map, you also need to append the
SASL_SSL
listener name to the protocol map.
Provide Java Authentication Service (JAAS) configuration. For example:
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret" \ user_admin="admin-secret" \ user_alice="alice-secret";
Step 2: Produce and consume with Confluent CLI¶
The following is an example CLI command to produce to test-topic
:
confluent kafka topic produce test-topic \
--protocol SASL_SSL \
--sasl-mechanism PLAIN \
--bootstrap ":19091,:19092" \
--username admin --password secret \
--ca-location scripts/security/snakeoil-ca-1.crt
- Specify
--protocol SASL_SSL
for the SASL_SSL/PLAIN authentication. - Specify
--sasl-mechanism PLAIN
is the mechanism used for SASL_SSL protocol. The default isPLAIN
, so it can be omitted in this scenario. --bootstrap
is the list of hosts that the producer/consumer talks to. The list should be the same as what you configured in Step 1. Hosts should be separated by commas.--username
and--password
are the credentials you have set up in the JAAS configuration. They can be passed as flags, or you could wait for CLI to prompt for it. The second option is more secure.--ca-location
is the path to the CA certificate verifying the broker’s key, and it’s required for SSL verification. For more information about setting up this flag, refer to this document.
Produce and consume with SASL_SSL/OAUTHBEARER authentication¶
The OAUTHBEARER mechanism makes use of the unsecured JSON web tokens for authentication. For security considerations involved, refer to Configuring OAUTHBEARER.
Step 1: Configure brokers¶
Enable SASL_SSL/OAUTHBEAER for each broker:
sasl.enabled.mechanisms=OAUTHBEARER
Configure the listener for the port:
listeners=<LISTENER_NAME>://kafka1:19091 advertised.listeners=<LISTENER_NAME>://localhost:19091 --- [1] listener.security.protocol.map=<listener.map>,<LISTENER_NAME>:SASL_SSL --- [2]
- [1] Optionally, configure the advertised listener if it’s different from the listener.
- [2] If you have a protocol map, also, you also need to append the
SASL_SSL
listener name to the protocol map.
Provide JAAS configuration:
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="admin" publicKeyPath="/tmp/conf/public.pem";
Step 2: Login to Kafka¶
This step is required when using SASL_SSL/OAUTHBEARER as the MDS token will be used as the OAUTHBEARER token for client to connect to brokers.
confluent login --ca-cert-path <ca-cert-path> \
--url https://<host>:<port>
Step 3: Produce and consume with Confluent CLI¶
The following is an example CLI command to produce to test-topic
:
confluent kafka topic produce test-topic \
--protocol SASL_SSL \
--sasl-mechanism OAUTHBEARER \
--bootstrap ":19091,:19092" \
--ca-location scripts/security/snakeoil-ca-1.crt
- Specify
--protocol SASL_SSL
to use the SASL_SSL/OAUTHBEAER authentication. - Specify
--sasl-mechanism OAUTHBEARER
to enable the OAUTHBEARER mechanism. --bootstrap
is the list of hosts that the producer/consumer talks to. This list should be the same as what you configured in Step 1. Hosts should be separated by commas.--ca-location
is the path to the CA certificate verifying the broker’s key, and it’s required for SSL verification. For more details about setting up this flag, see this document.
Produce and consume with Schema Registry¶
Confluent CLI supports producing and consuming with Schema Registry functionalities. You can register a schema with a local file as you produce, writing data in that schema. And you can read data from the schema as you consume.
In the Schema Registry cluster you registered, get the following information that the Confluent CLI needs:
- Schema Registry endpoint
- The value format
- A local schema file
When using Schema Registry, you must log in to Kafka as the MDS token will be used to authenticate the Schema Registry client:
confluent login --ca-cert-path <ca-cert-path> \ --url https://<host>:<port>
Produce and consume using the Confluent CLI commands.
An example CLI command to produce to
test-topic
:confluent kafka topic produce test-topic \ --protocol SASL_SSL \ --bootstrap ":19091" \ --username admin --password secret \ --value-format avro \ --schema ~/schema.avsc \ --sr-endpoint https://localhost:8085 \ --ca-location scripts/security/snakeoil-ca-1.crt
An example CLI command to consume from
test-topic
:confluent kafka topic consume test-topic -b \ --protocol SASL_SSL \ --bootstrap ":19091" \ --username admin --password secret \ --value-format avro \ --sr-endpoint https://localhost:8085 \ --ca-location scripts/security/snakeoil-ca-1.crt
--schema
is the path to your local schema file.- Specify
--value-format
according to the format of the schema file:avro
,json
orprotobuf
. When later consuming, it should also be set to the same value. --sr-endpoint
is the endpoint to the Schema Registry cluster.--ca-location
is required flag when working with schemas. It’s used to authenticate the Schema Registry client. It might be the same file that you use for SSL verification.
Troubleshooting¶
To debug producing and consuming, you may add --debug
flag in the command to see
more info. Available options are: broker
, topic
, msg
, protocol
,
consumer
, cgrp
, fetch
. Separate multiple options with commas.
- Issue: Unsupported protocol/mechanism
- Check your broker configuration. SASL mechanism only applies when the security protocol is set to SASL_SSL.
- Issue: Failed to resolve listener
Check the listener configuration.
When using Docker, expose the Kafka ports in the the Docker compose file.
For example:
ports: - 18091:18091
Refer to this article for more details.
- Issue: No
bootstrap.servers
configured - A bootstrap server is required for producing and consuming messages. Check if it is correctly set.
- Issue: Some messages are missing / not properly produced to the channel
Check if you have configured all the brokers. If not, some messages could be produced to an offset of a broker that’s not exposed and become not consume-able.
When setting the listener list and the protocol map, check if you have configured the brokers using the same listener name.