confluent local services kafka produce

重要

confluent local コマンドは、単一ノードの開発環境向けであり、本稼働環境には適していません。生成されるデータは一時的であり、暫定的なものです。本稼働環境用のワークフローについては、「Confluent Platform」を参照してください。

説明

データをトピックに生成します。デフォルトでは、このコマンドは localhost の Apache Kafka® クラスターに非 Avro データを生成します。

confluent local services kafka produce <topic> [flags]

ちなみに

各ターミナルのセッションでパスを環境変数としてエクスポートするか、シェルのプロファイルで Confluent Platform のインストール先へのパスを設定する必要があります。以下に例を示します。

cat ~/.bash_profile
export CONFLUENT_HOME=<path-to-confluent>
export PATH="${CONFLUENT_HOME}/bin:$PATH"

フラグ

--cloud                            Consume from Confluent Cloud.
--config string                    Change the Confluent Cloud configuration file. (default "$HOME/.confluent/config")
--value-format string              Format output data: avro, json, or protobuf.

--batch-size int                   Number of messages to send in a single batch if they are not being sent synchronously. (default 200)
--bootstrap-server string          The server(s) to connect to. The broker list string has the form HOST1:PORT1,HOST2:PORT2.
--compression-codec string         The compression codec: either "none", "gzip", "snappy", "lz4", or "zstd". If specified without value, the it defaults to "gzip".
--line-reader string               The class name of the class to use for reading lines from stdin. By default each line is read as a separate message. (default "kafka.tools.ConsoleProducer$LineMessageReader")
--max-block-ms int                 The max time that the producer will block for during a send request (default 60000)
--max-memory-bytes int             The total memory used by the producer to buffer records waiting to be sent to the server. (default 33554432)
--max-partition-memory-bytes int   The buffer size allocated for a partition. When records are received which are small than this size, the producer will attempt to optimistically group them together until this size is reached. (default 16384)
--message-send-max-retries int     This property specifies the number of retries before the producer gives up and drops this message. Brokers can fail receiving a message for multiple reasons, and being unavailable transiently is just one of them. (default 3)
--metadata-expiry-ms int           The amount of time in milliseconds before a forced metadata refresh. This will occur independent of any leadership changes. (default 300000)
--producer-property string         A mechanism to pass user-defined properties in the form key=value to the producer.
--producer.config string           Producer config properties file. Note that [producer-property] takes precedence over this config.
--property stringArray             A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader. Default properties include:
                                          parse.key=true|false
                                          key.separator=<key.separator>
                                          ignore.error=true|false
--request-required-acks string     The required ACKs of the producer requests (default 1)
--request-timeout-ms int           The ACK timeout of the producer requests. Value must be positive (default 1500)
--retry-backoff-ms int             Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. (default 100)
--socket-buffer-size int           The size of the TCP RECV size. (default 102400)
--sync                             If set, message send requests to brokers arrive synchronously.
--timeout int                      If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. (default 1000)

グローバルフラグ

-h, --help            Show help for this command.
-v, --verbose count   Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

localhost の開発 Kafka クラスターの mytopic1 という名前のトピックに Avro データを生成します。Confluent Schema Registry は http://localhost:8081 でリッスンしていることを想定しています。

confluent local services kafka produce mytopic1 --value-format avro --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

localhost の開発 Kafka クラスターの mytopic2 という名前のトピックに非 Avro データを生成します。

confluent local produce mytopic2

関連情報