confluent local services kafka consume

Important

  • The confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Install and Upgrade Confluent Platform.

Description

Consume data from topics. By default this command consumes binary data from the Apache Kafka® cluster on localhost.

confluent local services kafka consume <topic> [flags]

Tip

You must export the path as an environment variable for each terminal session, or set the path to your Confluent Platform installation in your shell profile. For example:

cat ~/.bash_profile
export CONFLUENT_HOME=<path-to-confluent>
export PATH="${CONFLUENT_HOME}/bin:$PATH"

Flags

--cloud                       Consume from Confluent Cloud.
--config string               Change the Confluent Cloud configuration file. (default "$HOME/.confluent/config.json")
--value-format string         Format output data: avro, json, or protobuf.

--bootstrap-server string     The server(s) to connect to. The broker list string has the form HOST1:PORT1,HOST2:PORT2.
--consumer-property string    A mechanism to pass user-defined properties in the form key=value to the consumer.
--consumer.config string      Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--enable-systest-events       Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)
--formatter string            The name of a class to use for formatting kafka messages for display. (default "kafka.tools.DefaultMessageFormatter")
--from-beginning              If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.
--group string                The consumer group id of the consumer.
--isolation-level string      Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted to read all messages. (default "read_uncommitted")
--key-deserializer string
--max-messages int            The maximum number of messages to consume before exiting. If not set, consumption is continual.
--offset string               The offset id to consume from (a non-negative number), or "earliest" which means from beginning, or "latest" which means from end. (default "latest")
--partition int               The partition to consume from. Consumption starts from the end of the partition unless "--offset" is specified.
--property strings            The properties to initialize the message formatter. Default properties include:
                                  print.timestamp=true|false
                                  print.key=true|false
                                  print.value=true|false
                                  key.separator=<key.separator>
                                  line.separator=<line.separator>
                                  key.deserializer=<key.deserializer>
                                  value.deserializer=<value.deserializer>
                              Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with "key.deserializer." and "value.deserializer." prefixes to configure their deserializers.
--skip-message-on-error       If there is an error when processing a message, skip it instead of halting.
--timeout-ms int              If specified, exit if no messages are available for consumption for the specified interval.
--value-deserializer string
--whitelist string            Regular expression specifying whitelist of topics to include for consumption.

Global Flags

-h, --help            Show help for this command.
    --unsafe-trace    Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count   Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Examples

Consume Avro data from the beginning of topic called mytopic1 on a development Kafka cluster on localhost. Assumes Confluent Schema Registry is listening at http://localhost:8081.

confluent local services kafka consume mytopic1 --value-format avro --from-beginning

Consume newly arriving non-Avro data from a topic called mytopic2 on a development Kafka cluster on localhost.

confluent local services kafka consume mytopic2

See Also