confluent local services kafka consume

重要

confluent local コマンドは、単一ノードの開発環境向けであり、本稼働環境には適していません。生成されるデータは一時的であり、暫定的なものです。本稼働環境用のワークフローについては、「Confluent Platform」を参照してください。

説明

トピックからのデータを消費します。デフォルトでは、このコマンドは localhost の Apache Kafka® クラスターからのバイナリデータを消費します。

confluent local services kafka consume <topic> [flags]

ちなみに

各ターミナルのセッションでパスを環境変数としてエクスポートするか、シェルのプロファイルで Confluent Platform のインストール先へのパスを設定する必要があります。以下に例を示します。

cat ~/.bash_profile
export CONFLUENT_HOME=<path-to-confluent>
export PATH="${CONFLUENT_HOME}/bin:$PATH"

フラグ

--cloud                       Consume from Confluent Cloud.
--config string               Change the Confluent Cloud configuration file. (default "$HOME/.confluent/config")
--value-format string         Format output data: avro, json, or protobuf.

--bootstrap-server string     The server(s) to connect to. The broker list string has the form HOST1:PORT1,HOST2:PORT2.
--consumer-property string    A mechanism to pass user-defined properties in the form key=value to the consumer.
--consumer.config string      Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--enable-systest-events       Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)
--formatter string            The name of a class to use for formatting kafka messages for display. (default "kafka.tools.DefaultMessageFormatter")
--from-beginning              If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.
--group string                The consumer group id of the consumer.
--isolation-level string      Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted to read all messages. (default "read_uncommitted")
--key-deserializer string
--max-messages int            The maximum number of messages to consume before exiting. If not set, consumption is continual.
--offset string               The offset id to consume from (a non-negative number), or "earliest" which means from beginning, or "latest" which means from end (default "latest")
--partition int               The partition to consume from. Consumption starts from the end of the partition unless "--offset" is specified.
--property stringArray        The properties to initialize the message formatter. Default properties include:
                                  print.timestamp=true|false
                                  print.key=true|false
                                  print.value=true|false
                                  key.separator=<key.separator>
                                  line.separator=<line.separator>
                                  key.deserializer=<key.deserializer>
                                  value.deserializer=<value.deserializer>
                              Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with "key.deserializer." and "value.deserializer." prefixes to configure their deserializers.
--skip-message-on-error       If there is an error when processing a message, skip it instead of halting.
--timeout-ms int              If specified, exit if no messages are available for consumption for the specified interval.
--value-deserializer string
--whitelist string            Regular expression specifying whitelist of topics to include for consumption.

グローバルフラグ

-h, --help            Show help for this command.
-v, --verbose count   Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

localhost の開発 Kafka クラスターの mytopic1 という名前のトピックの先頭から Avro データを消費します。Confluent Schema Registry は http://localhost:8081 でリッスンしていることを想定しています。

confluent local services kafka consume mytopic1 --value-format avro --from-beginning

localhost の開発 Kafka クラスターの mytopic2 という名前のトピックから新着の非 Avro データを消費します。

confluent local services kafka consume mytopic2

関連情報