Class: Producer

RdKafka.Producer(conf, topicConfnullable)

new Producer(conf, topicConfnullable)

Producer class for sending messages to Kafka

This is the main entry point for writing data to Kafka. You configure this like you do any other client, with a global configuration and default topic configuration.

Once you instantiate this object, you need to connect to it first. This allows you to get the metadata and make sure the connection can be made before you depend on it. After that, problems with the connection will by brought down by using poll, which automatically runs when a transaction is made on the object.

Methods on a Producer will throw a LibrdKafkaError on failure.

Parameters:
Name Type Attributes Description
conf object

Key value pairs to configure the producer

topicConf object <nullable>

Key value pairs to create a default topic configuration

See:

Extends

Methods

abortTransaction(timeout, cb) → {RdKafka.Producer}

Aborts the ongoing transaction (as started with beginTransaction).

Parameters:
Name Type Description
timeout number

Number of milliseconds to try to abort, defaults to 5 seconds.

cb function

Callback to fire when operation is completed.

Returns:
  • returns itself.
Type
RdKafka.Producer

beginTransaction() → {RdKafka.Producer}

Begin a transaction.

initTransaction must have been called successfully (once) before this function is called.

There can only be one ongoing transaction at a time.

Returns:
  • returns itself.
Type
RdKafka.Producer

commitTransaction(timeout, cb) → {RdKafka.Producer}

Commit the current transaction (as started with beginTransaction).

Parameters:
Name Type Description
timeout number

Number of milliseconds to try to commit before giving up, defaults to 5 seconds.

cb function

Callback to fire when operation is completed.

Returns:
  • returns itself.
Type
RdKafka.Producer

connect(metadataOptionsnullable, cbnullable) → {RdKafka.Client}

Connect to the broker and receive its metadata.

Connects to a broker by establishing the client and fetches its metadata.

Parameters:
Name Type Attributes Description
metadataOptions object <nullable>

Options to be sent to the metadata.

Properties
Name Type Description
topic string

Topic to fetch metadata for. Empty string is treated as empty.

allTopics boolean

Fetch metadata for all topics, not just the ones we know about.

timeout int

The timeout, in ms, to allow for fetching metadata. Defaults to 30000ms

cb RdKafka.Client~connectionCallback <nullable>

Callback that indicates we are done connecting.

Inherited From:
Fires:
Returns:
  • Returns itself.
Type
RdKafka.Client

connectedTime() → {number}

Find out how long we have been connected to Kafka.

Inherited From:
Returns:
  • Milliseconds since the connection has been established.
Type
number

disconnect(timeoutnullable, cbnullable)

Disconnect the producer.

Flush everything on the internal librdkafka producer buffer. Then disconnect.

Parameters:
Name Type Attributes Description
timeout number <nullable>

Number of milliseconds to try to flush before giving up, defaults to 5 seconds.

cb function <nullable>

The callback to fire when disconnected.

Overrides:

flush(timeoutnullable, callbacknullable) → {RdKafka.Producer}

Flush the producer,

The producer accumulates messages upto linger.ms on an internal buffer. This method flushes on the buffer immediately. Do this before disconnects, usually.

Parameters:
Name Type Attributes Description
timeout number <nullable>

Number of milliseconds to try to flush before giving up.

callback function <nullable>

Callback to fire when the flush is done.

Returns:
  • returns itself.
Type
RdKafka.Producer

getClient() → {Connection}

Get the native Kafka client.

You probably shouldn't use this, but if you want to execute methods directly on the C++ wrapper you can do it here.

Inherited From:
See:
  • connection.cc
Returns:
  • The native Kafka client.
Type
Connection

getLastError() → (nullable) {LibrdKafkaError}

Get the last error emitted if it exists.

Inherited From:
Returns:
  • Returns the LibrdKafkaError or null if one hasn't been thrown.
Type
LibrdKafkaError

getMetadata(metadataOptions, cb)

Get client metadata.

Use RdKafka.AdminClient#describeTopics instead for fetching metadata for specific topics.

Note: using a metadataOptions.topic parameter has a potential side-effect. A Topic object will be created, if it did not exist yet, with default options and it will be cached by librdkafka.

A subsequent call to create the topic object with specific options (e.g. acks) will return the previous instance and the specific options will be silently ignored.

To avoid this side effect, the topic object can be created with the expected options before requesting metadata, or the metadata request can be performed for all topics (by omitting metadataOptions.topic).

Parameters:
Name Type Description
metadataOptions object

Metadata options to pass to the client.

Properties
Name Type Description
topic string

Topic string for which to fetch metadata

timeout number

Max time, in ms, to try to fetch metadata before timing out. Defaults to 3000.

cb function

Callback to fire with the metadata.

Inherited From:

initTransactions(timeout, cb) → {RdKafka.Producer}

Initialize transactions for this producer instance.

Initialize transactions, this must only be performed once per transactional producer, before it can be used.

Parameters:
Name Type Description
timeout number

Number of milliseconds to try to initialize before giving up, defaults to 5 seconds.

cb function

Callback to fire when operation is completed.

Returns:
  • returns itself.
Type
RdKafka.Producer

isConnected() → {boolean}

Whether or not we are connected to Kafka.

Inherited From:
Returns:
  • Whether we are connected.
Type
boolean

offsetsForTimes(toppars, timeout, cb)

Query offsets for times from the broker.

This function makes a call to the broker to get the offsets for times specified.

Parameters:
Name Type Description
toppars Array.<TopicPartition>

Array of topic partitions. The offset in these should instead refer to a timestamp you want offsets for

timeout number

Number of ms to wait to recieve a response.

cb RdKafka.Client~offsetsForTimesCallback

Callback to fire with the filled in offsets.

Inherited From:

poll() → {RdKafka.Producer}

Poll for events.

We need to run poll in order to learn about new events that have occurred. This is not done automatically when we produce, so we need to run it manually, or set the producer to automatically poll using RdKafka.Producer#setPollInterval or RdKafka.Producer#setPollInBackground.

Returns:
  • returns itself.
Type
RdKafka.Producer

produce(topic, partition, message, key, timestamp, opaque, headers) → {boolean}

Produce a message to Kafka asynchronously.

This is the method mainly used in this class. Use it to produce a message to Kafka.

When this is sent off, there is no guarantee it is delivered, as the method returns immediately after queuing the message in the library.

If you need guaranteed delivery, change your acks settings and use delivery reports.

Parameters:
Name Type Description
topic string

The topic name to produce to.

partition number | null

The partition number to produce to.

message Buffer | null

The message to produce.

key string

The key associated with the message.

timestamp number | null

Timestamp to send with the message.

opaque object

An object you want passed along with this message, if provided. This will be available in the delivery report

headers object

A list of custom key value pairs that provide message metadata.

Returns:
  • throws an error if it failed, or returns true if not.
Type
boolean

queryWatermarkOffsets(topic, partition, timeout, cb)

Query offsets from the broker.

This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.

Parameters:
Name Type Description
topic string

Topic to recieve offsets from.

partition number

Partition of the provided topic to recieve offsets from

timeout number

Number of ms to wait to recieve a response.

cb RdKafka.ClientClient~watermarkOffsetsCallback

Callback to fire with the offsets.

Inherited From:

sendOffsetsToTransaction(offsets, consumer, timeout, cb) → {RdKafka.Producer}

Send the current offsets of the consumer to the ongoing transaction.

Parameters:
Name Type Description
offsets Array.<RdKafka.TopicPartition>

An array of topic-partitions with offsets filled in.

consumer RdKafka.KafkaConsumer

An instance of the consumer to send offsets for.

timeout number

Number of milliseconds to try to send offsets, defaults to 5 seconds.

cb function

Callback to return when operation is completed.

Returns:
  • returns itself.
Type
RdKafka.Producer

setPollInBackground(set) → {RdKafka.Producer}

Set automatic polling for events on the librdkafka background thread.

This provides several advantages over setPollInterval, as the polling does not happen on the event loop, but on the C thread spawned by librdkafka, and can be more efficient for high-throughput producers.

If set = true, this will disable any polling interval set by setPollInterval.

Parameters:
Name Type Description
set boolean

Whether to poll in the background or not.

Returns:
  • returns itself.
Type
RdKafka.Producer

setPollInterval(interval) → {RdKafka.Producer}

Set automatic polling for events.

We need to run poll in order to learn about new events that have occurred. If you would like this done on an interval with disconnects and reconnections managed, you can do it here.

Parameters:
Name Type Description
interval number

Interval, in milliseconds, to poll for.

Returns:
  • returns itself.
Type
RdKafka.Producer

setSaslCredentials(username, password)

Change SASL credentials to be sent on the next authentication attempt.

Only applicable if SASL authentication is being used.

Parameters:
Name Type Description
username string
password string
Inherited From:

(static) createWriteStream(conf, topicConf, streamOptions) → {RdKafka.ProducerStream}

Create a write stream interface for a producer.

This stream does not run in object mode. It only takes buffers of data.

Parameters:
Name Type Description
conf object

Key value pairs to configure the producer.

topicConf object

Key value pairs to create a default topic configuration.

streamOptions object

Stream options.

Returns:
  • returns the write stream for writing to Kafka.
Type
RdKafka.ProducerStream

Events

disconnected

Disconnect event. Called after disconnection is finished.

Type:
  • object
Properties:
Name Type Description
connectionOpened date

when the connection was opened.

Inherited From:

ready

Ready event. Called when the Client connects successfully

Type:
  • object
Properties:
Name Type Description
name string

the name of the client.

Inherited From: