confluent local services kafka consume¶
Important
The confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Confluent Platform.
Description¶
Consume data from topics. By default this command consumes binary data from the Apache Kafka® cluster on localhost.
confluent local services kafka consume <topic> [flags]
Tip
You must export the path as an environment variable for each terminal session, or set the path to your Confluent Platform installation in your shell profile. For example:
cat ~/.bash_profile
export CONFLUENT_HOME=<path-to-confluent>
export PATH="${CONFLUENT_HOME}/bin:$PATH"
Flags¶
--bootstrap-server string The server(s) to connect to. The broker list string has the form HOST1:PORT1,HOST2:PORT2.
--cloud Consume from Confluent Cloud.
--config string Change the Confluent Cloud configuration file. (default "$HOME/.ccloud/config")
--consumer-property string A mechanism to pass user-defined properties in the form key=value to the consumer.
--consumer.config string Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)
--formatter string The name of a class to use for formatting kafka messages for display. (default "kafka.tools.DefaultMessageFormatter")
--from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.
--group string The consumer group id of the consumer.
--isolation-level string Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted to read all messages. (default "read_uncommitted")
--key-deserializer string
--max-messages int The maximum number of messages to consume before exiting. If not set, consumption is continual.
--offset string The offset id to consume from (a non-negative number), or "earliest" which means from beginning, or "latest" which means from end (default "latest")
--partition int The partition to consume from. Consumption starts from the end of the partition unless "--offset" is specified.
--property stringArray The properties to initialize the message formatter. Default properties include:
print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.deserializer>
Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with "key.deserializer." and "value.deserializer." prefixes to configure their deserializers.
--skip-message-on-error If there is an error when processing a message, skip it instead of halting.
--timeout-ms int If specified, exit if no messages are available for consumption for the specified interval.
--value-deserializer string
--value-format string Format output data: avro, json, or protobuf.
--whitelist string Regular expression specifying whitelist of topics to include for consumption.
Global Flags¶
-h, --help Show help for this command.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
Examples¶
Consume Avro data from the beginning of topic called mytopic1
on a development Kafka cluster on localhost. Assumes Confluent Schema Registry is listening at http://localhost:8081
.
confluent local services kafka consume mytopic1 --value-format avro --from-beginning
Consume newly arriving non-Avro data from a topic called mytopic2
on a development Kafka cluster on localhost.
confluent local services kafka consume mytopic2
Create a Confluent Cloud configuration file with connection details for the Confluent Cloud cluster using the format shown in this example, and save as /tmp/myconfig.properties
. You can specify the file location using --config <filename>
.
bootstrap.servers=<broker endpoint>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<api-secret>";
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<username:password>
schema.registry.url=<sr endpoint>
Consume non-Avro data from the beginning of a topic named mytopic3
in Confluent Cloud, using a user-specified Confluent Cloud configuration file at /tmp/myconfig.properties
.
confluent local services kafka consume mytopic3 --cloud --config /tmp/myconfig.properties --from-beginning
Consume messages with keys and non-Avro values from the beginning of topic called mytopic4
in Confluent Cloud, using a user-specified Confluent Cloud configuration file at /tmp/myconfig.properties
. See the sample Confluent Cloud configuration file above.
confluent local services kafka consume mytopic4 --cloud --config /tmp/myconfig.properties --from-beginning --property print.key=true
Consume Avro data from a topic called mytopic5
in Confluent Cloud. Assumes Confluent Schema Registry is listening at http://localhost:8081
.
confluent local services kafka consume mytopic5 --cloud --config /tmp/myconfig.properties --value-format avro \
--from-beginning --property schema.registry.url=http://localhost:8081
Consume Avro data from a topic called mytopic6
in Confluent Cloud. Assumes you are using Confluent Cloud Confluent Schema Registry.
confluent local services kafka consume mytopic6 --cloud --config /tmp/myconfig.properties --value-format avro \
--from-beginning --property schema.registry.url=https://<SR ENDPOINT> \
--property basic.auth.credentials.source=USER_INFO \
--property schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
See Also¶
- confluent local services kafka - Manage Apache Kafka®.