confluent local services kafka produce

Important

The confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Confluent Platform.

Description

Produce data to topics. By default this command produces non-Avro data to the Apache Kafka® cluster on localhost.

confluent local services kafka produce <topic> [flags]

Tip

You must export the path as an environment variable for each terminal session, or set the path to your Confluent Platform installation in your shell profile. For example:

cat ~/.bash_profile
export CONFLUENT_HOME=<path-to-confluent>
export PATH="${CONFLUENT_HOME}/bin:$PATH"

Flags

--batch-size int                   Number of messages to send in a single batch if they are not being sent synchronously. (default 200)
--bootstrap-server string          The server(s) to connect to. The broker list string has the form HOST1:PORT1,HOST2:PORT2.
--cloud                            Consume from Confluent Cloud.
--compression-codec string         The compression codec: either "none", "gzip", "snappy", "lz4", or "zstd". If specified without value, the it defaults to "gzip".
--config string                    Change the Confluent Cloud configuration file. (default "$HOME/.confluent/config")
--line-reader string               The class name of the class to use for reading lines from stdin. By default each line is read as a separate message. (default "kafka.tools.ConsoleProducer$LineMessageReader")
--max-block-ms int                 The max time that the producer will block for during a send request (default 60000)
--max-memory-bytes int             The total memory used by the producer to buffer records waiting to be sent to the server. (default 33554432)
--max-partition-memory-bytes int   The buffer size allocated for a partition. When records are received which are small than this size, the producer will attempt to optimistically group them together until this size is reached. (default 16384)
--message-send-max-retries int     This property specifies the number of retries before the producer gives up and drops this message. Brokers can fail receiving a message for multiple reasons, and being unavailable transiently is just one of them. (default 3)
--metadata-expiry-ms int           The amount of time in milliseconds before a forced metadata refresh. This will occur independent of any leadership changes. (default 300000)
--producer-property string         A mechanism to pass user-defined properties in the form key=value to the producer.
--producer.config string           Producer config properties file. Note that [producer-property] takes precedence over this config.
--property stringArray             A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader. Default properties include:
                                          parse.key=true|false
                                          key.separator=<key.separator>
                                          ignore.error=true|false
--request-required-acks string     The required ACKs of the producer requests (default 1)
--request-timeout-ms int           The ACK timeout of the producer requests. Value must be positive (default 1500)
--retry-backoff-ms int             Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. (default 100)
--socket-buffer-size int           The size of the TCP RECV size. (default 102400)
--sync                             If set, message send requests to brokers arrive synchronously.
--timeout int                      If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. (default 1000)
--value-format string              Format output data: avro, json, or protobuf.

Global Flags

-h, --help            Show help for this command.
-v, --verbose count   Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Examples

Produce Avro data to a topic called mytopic1 on a development Kafka cluster on localhost. Assumes Confluent Schema Registry is listening at http://localhost:8081.

confluent local services kafka produce mytopic1 --value-format avro --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Produce non-Avro data to a topic called mytopic2 on a development Kafka cluster on localhost:

confluent local produce mytopic2

Create a customized Confluent Cloud configuration file with connection details for the Confluent Cloud cluster using the format shown in this example, and save as /tmp/myconfig.properties. You can specify the file location using --config <filename>.

bootstrap.servers=<broker endpoint>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<api-secret>";
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<username:password>
schema.registry.url=<sr endpoint>

Produce non-Avro data to a topic called mytopic3 in Confluent Cloud. Assumes topic has already been created.

confluent local services kafka produce mytopic3 --cloud --config /tmp/myconfig.properties

Produce messages with keys and non-Avro values to a topic called mytopic4 in Confluent Cloud, using a user-specified Confluent Cloud configuration file at /tmp/myconfig.properties. Assumes topic has already been created.

confluent local services kafka produce mytopic4 --cloud --config /tmp/myconfig.properties --property parse.key=true --property key.separator=,

Produce Avro data to a topic called mytopic5 in Confluent Cloud. Assumes topic has already been created, and Confluent Schema Registry is listening at http://localhost:8081.

confluent local services kafka produce mytopic5 --cloud --config /tmp/myconfig.properties --value-format avro
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
--property schema.registry.url=http://localhost:8081

Produce Avro data to a topic called mytopic6 in Confluent Cloud. Assumes topic has already been created and you are using Confluent Cloud Confluent Schema Registry.

confluent local services kafka produce mytopic6 --cloud --config /tmp/myconfig.properties --value-format avro
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
--property schema.registry.url=https://<SR ENDPOINT>
--property basic.auth.credentials.source=USER_INFO
--property schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>

See Also