confluent local services kafka consume¶
Important
The confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Confluent Platform.
Description¶
Consume data from topics. By default this command consumes binary data from the Apache Kafka® cluster on localhost.
confluent local services kafka consume <topic> [flags]
Tip
You must export the path as an environment variable for each terminal session, or set the path to your Confluent Platform installation in your shell profile. For example:
cat ~/.bash_profile
export CONFLUENT_HOME=<path-to-confluent>
export PATH="${CONFLUENT_HOME}/bin:$PATH"
Flags¶
--cloud Consume from Confluent Cloud.
--config string Change the Confluent Cloud configuration file. (default "$HOME/.confluent/config")
--value-format string Format output data: avro, json, or protobuf.
--bootstrap-server string The server(s) to connect to. The broker list string has the form HOST1:PORT1,HOST2:PORT2.
--consumer-property string A mechanism to pass user-defined properties in the form key=value to the consumer.
--consumer.config string Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)
--formatter string The name of a class to use for formatting kafka messages for display. (default "kafka.tools.DefaultMessageFormatter")
--from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.
--group string The consumer group id of the consumer.
--isolation-level string Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted to read all messages. (default "read_uncommitted")
--key-deserializer string
--max-messages int The maximum number of messages to consume before exiting. If not set, consumption is continual.
--offset string The offset id to consume from (a non-negative number), or "earliest" which means from beginning, or "latest" which means from end. (default "latest")
--partition int The partition to consume from. Consumption starts from the end of the partition unless "--offset" is specified.
--property strings The properties to initialize the message formatter. Default properties include:
print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.deserializer>
Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with "key.deserializer." and "value.deserializer." prefixes to configure their deserializers.
--skip-message-on-error If there is an error when processing a message, skip it instead of halting.
--timeout-ms int If specified, exit if no messages are available for consumption for the specified interval.
--value-deserializer string
--whitelist string Regular expression specifying whitelist of topics to include for consumption.
Global Flags¶
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
Examples¶
Consume Avro data from the beginning of topic called mytopic1
on a development Kafka cluster on localhost. Assumes Confluent Schema Registry is listening at http://localhost:8081
.
confluent local services kafka consume mytopic1 --value-format avro --from-beginning
Consume newly arriving non-Avro data from a topic called mytopic2
on a development Kafka cluster on localhost.
confluent local services kafka consume mytopic2
See Also¶
- confluent local services kafka - Manage Apache Kafka®.