Class: ProducerStream

RdKafka.ProducerStream(producer, topics, options)

new ProducerStream(producer, topics, options)

Writable stream integrating with the Kafka Producer.

This class is used to write data to Kafka in a streaming way. It takes buffers of data and puts them into the appropriate Kafka topic. If you need finer control over partitions or keys, this is probably not the class for you. In that situation just use the Producer itself.

The stream detects if Kafka is already connected. You can safely begin writing right away.

This stream does not operate in Object mode and can only be given buffers.

Do not instantiate this directly, instead, use the RdKafka.Producer.createWriteStream method.

Parameters:
Name Type Description
producer RdKafka.Producer

The Kafka Producer object.

topics array

Array of topics.

options object

Topic configuration.

Example
// This call returns a new writable stream to our topic 'topic-name'
const stream = Kafka.Producer.createWriteStream({
  'bootstrap.servers': 'localhost:9092'
}, {}, {
  topic: 'topic-name'
});

// Writes a message to the stream
const queuedSuccess = stream.write(Buffer.from('Awesome message'));

if (!queuedSuccess) {
  // Note that this only tells us if the stream's queue is full,
  // it does not tell us if there was an error getting the message to the
  // cluster.
  console.log('Too many messages in our queue already, retry later.');
}

// It is necessary to listen to errors.
// Otherwise, any error will bubble up as an uncaught exception.
stream.on('error', (err) => {
  console.error('Error in our kafka stream');
});

Extends

  • stream.Writable

Methods

close(cb)

Close the ProducerStream.

This disconnects the underlying producer if required and closes the stream.

Parameters:
Name Type Description
cb function

Callback to fire when the stream is closed.