Class: KafkaConsumer

RdKafka.KafkaConsumer(conf, topicConfnullable)

new KafkaConsumer(conf, topicConfnullable)

KafkaConsumer class for reading messages from Kafka.

This is the main entry point for reading data from Kafka. You configure this like you do any other client, with a global configuration and default topic configuration.

Data will not be read until you tell the consumer what topics you want to read from.

Methods on a KafkaConsumer will throw a LibrdKafkaError on failure.

Parameters:
Name Type Attributes Description
conf object

Key value pairs to configure the consumer.

topicConf object <nullable>

Key value pairs to create a default topic configuration.

See:

Extends

Methods

assign(assignments) → {RdKafka.KafkaConsumer}

Assign the consumer specific partitions and topics. Used for eager (non-cooperative) rebalancing from within the rebalance callback.

Parameters:
Name Type Description
assignments array

Assignments array. Should contain objects with topic and partition set.

See:
Returns:
  • Returns itself.
Type
RdKafka.KafkaConsumer

assignmentLost() → {boolean}

Is current assignment in rebalance callback lost?

Returns:

true if assignment was lost.

Type
boolean

assignments() → {array}

Get the assignments for the consumer.

Returns:

assignments - Array of topic partitions.

Type
array

commit(topicPartition) → {RdKafka.KafkaConsumer}

Commit specific topic partitions, or all the topic partitions that have been read.

This is asynchronous, and when this method returns, it's not guaranteed that the offsets have been committed. If you need that, use commitSync or commitCb.

If you provide a topic partition, or an array of topic parrtitins, it will commit those. Otherwise, it will commit all read offsets for all topic partitions.

Parameters:
Name Type Description
topicPartition object | array | null

Topic partition object to commit, list of topic partitions, or null if you want to commit all read offsets.

Returns:
  • returns itself.
Type
RdKafka.KafkaConsumer

commitCb(toppars, cb) → {RdKafka.KafkaConsumer}

Commits a list of offsets per topic partition, using provided callback.

Parameters:
Name Type Description
toppars Array.<TopicPartition>

Topic partition list to commit offsets for. Defaults to the current assignment.

cb function

Callback method to execute when finished.

Returns:
  • returns itself.
Type
RdKafka.KafkaConsumer

commitMessage(msg) → {RdKafka.KafkaConsumer}

Commit a message.

This is a convenience method to map commit properly. The offset of the message + 1 is committed for the topic partition where the message comes from.

Parameters:
Name Type Description
msg object

Message object to commit.

Returns:
  • returns itself.
Type
RdKafka.KafkaConsumer

commitMessageSync(msg) → {RdKafka.KafkaConsumer}

Commit a message synchronously.

Parameters:
Name Type Description
msg object

A message object to commit.

See:
Returns:
  • returns itself.
Type
RdKafka.KafkaConsumer

commitSync(topicPartition) → {RdKafka.KafkaConsumer}

Commit a topic partition (or all topic partitions) synchronously.

Parameters:
Name Type Description
topicPartition object | array | null

Topic partition object to commit, list of topic partitions, or null if you want to commit all read offsets.

Returns:
  • returns itself.
Type
RdKafka.KafkaConsumer

committed(topparsnullable, timeout, cb) → {RdKafka.KafkaConsumer}

Get a current list of the committed offsets per topic partition.

Calls back with array of objects in the form of a topic partition list.

Parameters:
Name Type Attributes Description
toppars Array.<TopicPartition> <nullable>

Topic partition list to query committed offsets for. Defaults to the current assignment.

timeout number

Number of ms to block before calling back and erroring.

cb function

Callback method to execute when finished or timed out.

Returns:
  • Returns itself.
Type
RdKafka.KafkaConsumer

connect(metadataOptionsnullable, cbnullable) → {RdKafka.Client}

Connect to the broker and receive its metadata.

Connects to a broker by establishing the client and fetches its metadata.

Parameters:
Name Type Attributes Description
metadataOptions object <nullable>

Options to be sent to the metadata.

Properties
Name Type Description
topic string

Topic to fetch metadata for. Empty string is treated as empty.

allTopics boolean

Fetch metadata for all topics, not just the ones we know about.

timeout int

The timeout, in ms, to allow for fetching metadata. Defaults to 30000ms

cb RdKafka.Client~connectionCallback <nullable>

Callback that indicates we are done connecting.

Inherited From:
Fires:
Returns:
  • Returns itself.
Type
RdKafka.Client

connectedTime() → {number}

Find out how long we have been connected to Kafka.

Inherited From:
Returns:
  • Milliseconds since the connection has been established.
Type
number

consume(sizeOrCb, cbnullable)

Consume messages from the subscribed topics.

This method can be used in two ways.

  1. To read messages as fast as possible. A background thread is created to fetch the messages as quickly as it can, sleeping only in between EOF and broker timeouts. If called in this way, the method returns immediately. Messages will be sent via an optional callback that can be provided to the method, and via the data event.

  2. To read a certain number of messages. If the first argument provided is a number, this method reads multiple messages, the count of which doesn't exceed the number. No additional thread is created.

Configuring timeouts for consume:

  1. setDefaultConsumeTimeout - This is the time we wait for a message to be fetched from the underlying library. If this is exceeded, we either backoff for a while (Way 1), or return with however many messages we've already fetched by that point (Way 2).

  2. setDefaultConsumeLoopTimeoutDelay - This is the time we wait before attempting another fetch after a message fetch has timed out. This is only used in Way 1.

Parameters:
Name Type Attributes Description
sizeOrCb number | function | null

Either the number of messages to read (if using in second way, or the callback) if using the first way.

cb RdKafka.KafkaConsumer~readCallback <nullable>

Callback to return when work is done, if using second way.

Example
// Way 1:
consumer.consume(); // No callback is okay.
consumer.consume(processMessage); // Messages will be send to the callback.

// Way 2:
consumer.consume(10); // First parameter must be number of messages.
consumer.consume(10, processMessage); // Callback can be given as the second parameter.

disconnect(cb)

Disconnect from the Kafka client.

This method will disconnect us from Kafka unless we are already in a disconnecting state. Use this when you're done reading or producing messages on a given client.

It will also emit the disconnected event.

Parameters:
Name Type Description
cb function

Callback to call when disconnection is complete.

Inherited From:
Fires:

getClient() → {Connection}

Get the native Kafka client.

You probably shouldn't use this, but if you want to execute methods directly on the C++ wrapper you can do it here.

Inherited From:
See:
  • connection.cc
Returns:
  • The native Kafka client.
Type
Connection

getLastError() → (nullable) {LibrdKafkaError}

Get the last error emitted if it exists.

Inherited From:
Returns:
  • Returns the LibrdKafkaError or null if one hasn't been thrown.
Type
LibrdKafkaError

getMetadata(metadataOptions, cb)

Get client metadata.

Use RdKafka.AdminClient#describeTopics instead for fetching metadata for specific topics.

Note: using a metadataOptions.topic parameter has a potential side-effect. A Topic object will be created, if it did not exist yet, with default options and it will be cached by librdkafka.

A subsequent call to create the topic object with specific options (e.g. acks) will return the previous instance and the specific options will be silently ignored.

To avoid this side effect, the topic object can be created with the expected options before requesting metadata, or the metadata request can be performed for all topics (by omitting metadataOptions.topic).

Parameters:
Name Type Description
metadataOptions object

Metadata options to pass to the client.

Properties
Name Type Description
topic string

Topic string for which to fetch metadata

timeout number

Max time, in ms, to try to fetch metadata before timing out. Defaults to 3000.

cb function

Callback to fire with the metadata.

Inherited From:

getWatermarkOffsets(topic, partition) → {RdKafka.Client~watermarkOffsets}

Get last known offsets from the client.

The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each fetched message set from the broker.

If there is no cached offset (either low or high, or both), then this will throw an error.

Parameters:
Name Type Description
topic string

Topic to recieve offsets from.

partition number

Partition of the provided topic to recieve offsets from.

Returns:
  • Returns an object with a high and low property, specifying the high and low offsets for the topic partition.
Type
RdKafka.Client~watermarkOffsets

incrementalAssign(assignments) → {RdKafka.KafkaConsumer}

Assign the consumer specific partitions and topics. Used for cooperative rebalancing from within the rebalance callback.

Parameters:
Name Type Description
assignments array

Assignments array. Should contain objects with topic and partition set. Assignments are additive.

See:
Returns:
  • Returns itself.
Type
RdKafka.KafkaConsumer

incrementalUnassign(assignments) → {RdKafka.KafkaConsumer}

Unassign the consumer specific partitions and topics. Used for cooperative rebalancing from within the rebalance callback.

Parameters:
Name Type Description
assignments array

Assignments array. Should contain objects with topic and partition set. Assignments are subtractive.

See:
Returns:
  • Returns itself.
Type
RdKafka.KafkaConsumer

isConnected() → {boolean}

Whether or not we are connected to Kafka.

Inherited From:
Returns:
  • Whether we are connected.
Type
boolean

offsetsForTimes(toppars, timeout, cb)

Query offsets for times from the broker.

This function makes a call to the broker to get the offsets for times specified.

Parameters:
Name Type Description
toppars Array.<TopicPartition>

Array of topic partitions. The offset in these should instead refer to a timestamp you want offsets for

timeout number

Number of ms to wait to recieve a response.

cb RdKafka.Client~offsetsForTimesCallback

Callback to fire with the filled in offsets.

Inherited From:

offsetsStore(topicPartitions)

Store offset for topic partition.

The offset will be committed (written) to the offset store according to the auto commit interval, if auto commit is on, or next manual offset if not.

enable.auto.offset.store must be set to false to use this API.

Parameters:
Name Type Description
topicPartitions Array.<TopicPartition>

Topic partitions with offsets to store offsets for.

See:

pause(topicPartitions)

Pause consumption for the provided list of partitions.

Parameters:
Name Type Description
topicPartitions Array.<TopicPartition>

List of topics to pause consumption on.

position(toppars) → {array}

Get the current offset position of the KafkaConsumer.

Returns a list of TopicPartitions on success, or throws an error on failure.

Parameters:
Name Type Description
toppars Array.<TopicPartition>

List of topic partitions to query position for. Defaults to the current assignment.

Returns:
  • TopicPartition array. Each item is an object with an offset, topic, and partition.
Type
array

queryWatermarkOffsets(topic, partition, timeout, cb)

Query offsets from the broker.

This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.

Parameters:
Name Type Description
topic string

Topic to recieve offsets from.

partition number

Partition of the provided topic to recieve offsets from

timeout number

Number of ms to wait to recieve a response.

cb RdKafka.ClientClient~watermarkOffsetsCallback

Callback to fire with the offsets.

Inherited From:

rebalanceProtocol() → {string}

Get the type of rebalance protocol used in the consumer group.

Returns:

"NONE" (if not in a group yet), "COOPERATIVE" or "EAGER".

Type
string

resume(topicPartitions)

Resume consumption for the provided list of partitions.

Parameters:
Name Type Description
topicPartitions Array.<TopicPartition>

List of topic partitions to resume consumption on.

seek(toppar, timeout, cb) → {RdKafka.KafkaConsumer}

Seek consumer for topic+partition to offset which is either an absolute or logical offset.

The consumer must have previously been assigned to topics and partitions that are being seeked to in the call.

Parameters:
Name Type Description
toppar TopicPartition

Topic partition to seek, including offset.

timeout number

Number of ms to try seeking before erroring out.

cb function

Callback method to execute when finished or timed out. If the seek timed out, the internal state of the consumer is unknown.

Returns:
  • Returns itself.
Type
RdKafka.KafkaConsumer
Example
consumer.seek({ topic: 'topic', partition: 0, offset: 1000 }, 2000, function(err) {
  if (err) {

  }
});

setDefaultConsumeLoopTimeoutDelay(intervalMs)

Set the default sleep delay for the next consume loop after the previous one has timed out.

Parameters:
Name Type Description
intervalMs number

number of milliseconds to sleep after a message fetch has timed out.

setDefaultConsumeTimeout(timeoutMs)

Set the default consume timeout provided to C++land.

Parameters:
Name Type Description
timeoutMs number

number of milliseconds to wait for a message to be fetched.

setSaslCredentials(username, password)

Change SASL credentials to be sent on the next authentication attempt.

Only applicable if SASL authentication is being used.

Parameters:
Name Type Description
username string
password string
Inherited From:

subscribe(topics) → {KafkaConsumer}

Subscribe to an array of topics (synchronously).

This operation is pretty fast because it just sets an assignment in librdkafka. This is the recommended way to deal with subscriptions in a situation where you will be reading across multiple files or as part of your configure-time initialization.

This is also a good way to do it for streams.

This does not actually cause any reassignment of topic partitions until the consume loop is running or the consume method is called.

Parameters:
Name Type Description
topics array

An array of topics to listen to.

Fires:
Returns:
  • Returns itself.
Type
KafkaConsumer

subscription() → {array}

Get the current subscription of the KafkaConsumer.

Get a list of subscribed topics. Should generally match what you passed on via subscribe.

See:
Returns:
  • Array of topic string to show the current subscription.
Type
array

unassign() → {RdKafka.KafkaConsumer}

Unassign the consumer from its assigned partitions and topics. Used for eager (non-cooperative) rebalancing from within the rebalance callback.

See:
Returns:
  • Returns itself.
Type
RdKafka.KafkaConsumer

unsubscribe() → {RdKafka.KafkaConsumer}

Unsubscribe from all currently subscribed topics.

Before you subscribe to new topics you need to unsubscribe from the old ones, if there is an active subscription. Otherwise, you will get an error because there is an existing subscription.

Returns:
  • Returns itself.
Type
RdKafka.KafkaConsumer

(static) createReadStream(conf, topicConfnullable, streamOptions) → {RdKafka.KafkaConsumerStream}

Get a stream representation of this KafkaConsumer.

Parameters:
Name Type Attributes Description
conf object

Key value pairs to configure the consumer.

topicConf object <nullable>

Key value pairs to create a default topic configuration. May be null.

streamOptions object

Stream options.

Properties
Name Type Description
topics array

Array of topics to subscribe to.

Returns:
  • Readable stream that receives messages when new ones become available.
Type
RdKafka.KafkaConsumerStream
Example
var consumerStream = Kafka.KafkaConsumer.createReadStream({
	'metadata.broker.list': 'localhost:9092',
	'group.id': 'librd-test',
	'socket.keepalive.enable': true,
	'enable.auto.commit': false
}, {}, { topics: [ 'test' ] });

Type Definitions

Message

Type:
  • object
Properties:
Name Type Attributes Description
topic string

The topic the message was read from.

partition number

The partition the message was read from.

offset number

The offset of the message.

key Buffer <nullable>

The key of the message.

value Buffer <nullable>

The value of the message.

size number

The size of the message.

timestamp number

The timestamp of the message (in milliseconds since the epoch in UTC).

leaderEpoch number <nullable>

The leader epoch of the message if available.

headers Array.<RdKafka.KafkaConsumer~MessageHeader> <nullable>

The headers of the message.

MessageHeader

The key of this object denotes the header key, and the value of the key is the header value.

Type:
  • object

Events

data

Data event. called whenever a message is received.

Type:

disconnected

Disconnect event. Called after disconnection is finished.

Type:
  • object
Properties:
Name Type Description
connectionOpened date

when the connection was opened.

Inherited From:

ready

Ready event. Called when the Client connects successfully

Type:
  • object
Properties:
Name Type Description
name string

the name of the client.

Inherited From:

subscribed

Subscribe event. Called efter changing subscription.

Type:
  • Array.<string>