Class: KafkaConsumerStream

RdKafka.KafkaConsumerStream(consumer, options)

new KafkaConsumerStream(consumer, options)

ReadableStream integrating with the Kafka Consumer.

This class is used to read data off of Kafka in a streaming way. It is useful if you'd like to have a way to pipe Kafka into other systems. You should generally not make this class yourself, as it is not even exposed as part of module.exports. Instead, you should use RdKafka.KafkaConsumer.createReadStream.

The stream implementation is slower than the continuous subscribe callback. If you don't care so much about backpressure and would rather squeeze out performance, use that method. Using the stream will ensure you read only as fast as you write.

The stream detects if Kafka is already connected. If it is, it will begin reading. If it is not, it will connect and read when it is ready.

This stream operates in objectMode. It streams RdKafka.KafkaConsumer~Message.

Parameters:
Name Type Description
consumer RdKafka.KafkaConsumer

The Kafka Consumer object.

options object

Options to configure the stream.

Properties
Name Type Description
waitInterval number

Number of ms to wait if Kafka reports that it has timed out or that we are out of messages (right now).

topics array

Array of topics, or a function that parses metadata into an array of topics.

See:
  • Consumer~Message
Example
const stream = KafkaConsumer.createReadStream(globalConfig, null, {
  topics: ['topic']
});

stream.on('data', (message) => {
  console.log('Got message');
  console.log(message.value.toString());
});

Extends

  • stream.Readable

Members

consumer :RdKafka.KafkaConsumer

The KafkaConsumer that is being streamed from. Can be used to access any of the consumer methods directly.

Type: