new KafkaConsumer(conf, topicConfnullable)
KafkaConsumer class for reading messages from Kafka.
This is the main entry point for reading data from Kafka. You configure this like you do any other client, with a global configuration and default topic configuration.
Data will not be read until you tell the consumer what topics you want to read from.
Methods on a KafkaConsumer will throw a LibrdKafkaError on failure.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
conf |
object | Key value pairs to configure the consumer. |
|
topicConf |
object |
<nullable> |
Key value pairs to create a default topic configuration. |
- See:
Extends
Methods
assign(assignments) → {RdKafka.KafkaConsumer}
Assign the consumer specific partitions and topics. Used for eager (non-cooperative) rebalancing from within the rebalance callback.
Parameters:
Name | Type | Description |
---|---|---|
assignments |
array | Assignments array. Should contain objects with topic and partition set. |
Returns:
- Returns itself.
assignmentLost() → {boolean}
Is current assignment in rebalance callback lost?
Returns:
true if assignment was lost.
- Type
- boolean
assignments() → {array}
Get the assignments for the consumer.
Returns:
assignments - Array of topic partitions.
- Type
- array
commit(topicPartition) → {RdKafka.KafkaConsumer}
Commit specific topic partitions, or all the topic partitions that have been read.
This is asynchronous, and when this method returns, it's not guaranteed that the offsets have been committed. If you need that, use commitSync or commitCb.
If you provide a topic partition, or an array of topic parrtitins, it will commit those. Otherwise, it will commit all read offsets for all topic partitions.
Parameters:
Name | Type | Description |
---|---|---|
topicPartition |
object | array | null | Topic partition object to commit, list of topic partitions, or null if you want to commit all read offsets. |
Returns:
- returns itself.
commitCb(toppars, cb) → {RdKafka.KafkaConsumer}
Commits a list of offsets per topic partition, using provided callback.
Parameters:
Name | Type | Description |
---|---|---|
toppars |
Array.<TopicPartition> | Topic partition list to commit offsets for. Defaults to the current assignment. |
cb |
function | Callback method to execute when finished. |
Returns:
- returns itself.
commitMessage(msg) → {RdKafka.KafkaConsumer}
Commit a message.
This is a convenience method to map commit properly. The offset of the message + 1 is committed for the topic partition where the message comes from.
Parameters:
Name | Type | Description |
---|---|---|
msg |
object | Message object to commit. |
Returns:
- returns itself.
commitMessageSync(msg) → {RdKafka.KafkaConsumer}
Commit a message synchronously.
Parameters:
Name | Type | Description |
---|---|---|
msg |
object | A message object to commit. |
Returns:
- returns itself.
commitSync(topicPartition) → {RdKafka.KafkaConsumer}
Commit a topic partition (or all topic partitions) synchronously.
Parameters:
Name | Type | Description |
---|---|---|
topicPartition |
object | array | null | Topic partition object to commit, list of topic partitions, or null if you want to commit all read offsets. |
Returns:
- returns itself.
committed(topparsnullable, timeout, cb) → {RdKafka.KafkaConsumer}
Get a current list of the committed offsets per topic partition.
Calls back with array of objects in the form of a topic partition list.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
toppars |
Array.<TopicPartition> |
<nullable> |
Topic partition list to query committed offsets for. Defaults to the current assignment. |
timeout |
number | Number of ms to block before calling back and erroring. |
|
cb |
function | Callback method to execute when finished or timed out. |
Returns:
- Returns itself.
connect(metadataOptionsnullable, cbnullable) → {RdKafka.Client}
Connect to the broker and receive its metadata.
Connects to a broker by establishing the client and fetches its metadata.
Parameters:
Name | Type | Attributes | Description | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
metadataOptions |
object |
<nullable> |
Options to be sent to the metadata. Properties
|
||||||||||||
cb |
RdKafka.Client~connectionCallback |
<nullable> |
Callback that indicates we are done connecting. |
- Inherited From:
Fires:
Returns:
- Returns itself.
- Type
- RdKafka.Client
connectedTime() → {number}
Find out how long we have been connected to Kafka.
- Inherited From:
Returns:
- Milliseconds since the connection has been established.
- Type
- number
consume(sizeOrCb, cbnullable)
Consume messages from the subscribed topics.
This method can be used in two ways.
-
To read messages as fast as possible. A background thread is created to fetch the messages as quickly as it can, sleeping only in between EOF and broker timeouts. If called in this way, the method returns immediately. Messages will be sent via an optional callback that can be provided to the method, and via the data event.
-
To read a certain number of messages. If the first argument provided is a number, this method reads multiple messages, the count of which doesn't exceed the number. No additional thread is created.
Configuring timeouts for consume:
-
setDefaultConsumeTimeout - This is the time we wait for a message to be fetched from the underlying library. If this is exceeded, we either backoff for a while (Way 1), or return with however many messages we've already fetched by that point (Way 2).
-
setDefaultConsumeLoopTimeoutDelay - This is the time we wait before attempting another fetch after a message fetch has timed out. This is only used in Way 1.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
sizeOrCb |
number | function | null | Either the number of messages to read (if using in second way, or the callback) if using the first way. |
|
cb |
RdKafka.KafkaConsumer~readCallback |
<nullable> |
Callback to return when work is done, if using second way. |
Example
// Way 1:
consumer.consume(); // No callback is okay.
consumer.consume(processMessage); // Messages will be send to the callback.
// Way 2:
consumer.consume(10); // First parameter must be number of messages.
consumer.consume(10, processMessage); // Callback can be given as the second parameter.
disconnect(cb)
Disconnect from the Kafka client.
This method will disconnect us from Kafka unless we are already in a disconnecting state. Use this when you're done reading or producing messages on a given client.
It will also emit the disconnected event.
Parameters:
Name | Type | Description |
---|---|---|
cb |
function | Callback to call when disconnection is complete. |
- Inherited From:
Fires:
getClient() → {Connection}
Get the native Kafka client.
You probably shouldn't use this, but if you want to execute methods directly on the C++ wrapper you can do it here.
- Inherited From:
- See:
-
- connection.cc
Returns:
- The native Kafka client.
- Type
- Connection
getLastError() → (nullable) {LibrdKafkaError}
Get the last error emitted if it exists.
- Inherited From:
Returns:
- Returns the LibrdKafkaError or null if one hasn't been thrown.
- Type
- LibrdKafkaError
getMetadata(metadataOptions, cb)
Get client metadata.
Use RdKafka.AdminClient#describeTopics instead for fetching metadata for specific topics.
Note: using a metadataOptions.topic
parameter has a potential side-effect.
A Topic object will be created, if it did not exist yet, with default options
and it will be cached by librdkafka.
A subsequent call to create the topic object with specific options (e.g. acks
) will return
the previous instance and the specific options will be silently ignored.
To avoid this side effect, the topic object can be created with the expected options before requesting metadata,
or the metadata request can be performed for all topics (by omitting metadataOptions.topic
).
Parameters:
Name | Type | Description | |||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
metadataOptions |
object | Metadata options to pass to the client. Properties
|
|||||||||
cb |
function | Callback to fire with the metadata. |
- Inherited From:
getWatermarkOffsets(topic, partition) → {RdKafka.Client~watermarkOffsets}
Get last known offsets from the client.
The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each fetched message set from the broker.
If there is no cached offset (either low or high, or both), then this will throw an error.
Parameters:
Name | Type | Description |
---|---|---|
topic |
string | Topic to recieve offsets from. |
partition |
number | Partition of the provided topic to recieve offsets from. |
Returns:
- Returns an object with a high and low property, specifying the high and low offsets for the topic partition.
incrementalAssign(assignments) → {RdKafka.KafkaConsumer}
Assign the consumer specific partitions and topics. Used for cooperative rebalancing from within the rebalance callback.
Parameters:
Name | Type | Description |
---|---|---|
assignments |
array | Assignments array. Should contain objects with topic and partition set. Assignments are additive. |
Returns:
- Returns itself.
incrementalUnassign(assignments) → {RdKafka.KafkaConsumer}
Unassign the consumer specific partitions and topics. Used for cooperative rebalancing from within the rebalance callback.
Parameters:
Name | Type | Description |
---|---|---|
assignments |
array | Assignments array. Should contain objects with topic and partition set. Assignments are subtractive. |
Returns:
- Returns itself.
isConnected() → {boolean}
Whether or not we are connected to Kafka.
- Inherited From:
Returns:
- Whether we are connected.
- Type
- boolean
offsetsForTimes(toppars, timeout, cb)
Query offsets for times from the broker.
This function makes a call to the broker to get the offsets for times specified.
Parameters:
Name | Type | Description |
---|---|---|
toppars |
Array.<TopicPartition> | Array of topic partitions. The offset in these should instead refer to a timestamp you want offsets for |
timeout |
number | Number of ms to wait to recieve a response. |
cb |
RdKafka.Client~offsetsForTimesCallback | Callback to fire with the filled in offsets. |
- Inherited From:
offsetsStore(topicPartitions)
Store offset for topic partition.
The offset will be committed (written) to the offset store according to the auto commit interval, if auto commit is on, or next manual offset if not.
enable.auto.offset.store must be set to false to use this API.
Parameters:
Name | Type | Description |
---|---|---|
topicPartitions |
Array.<TopicPartition> | Topic partitions with offsets to store offsets for. |
pause(topicPartitions)
Pause consumption for the provided list of partitions.
Parameters:
Name | Type | Description |
---|---|---|
topicPartitions |
Array.<TopicPartition> | List of topics to pause consumption on. |
position(toppars) → {array}
Get the current offset position of the KafkaConsumer.
Returns a list of TopicPartitions on success, or throws an error on failure.
Parameters:
Name | Type | Description |
---|---|---|
toppars |
Array.<TopicPartition> | List of topic partitions to query position for. Defaults to the current assignment. |
Returns:
- TopicPartition array. Each item is an object with an offset, topic, and partition.
- Type
- array
queryWatermarkOffsets(topic, partition, timeout, cb)
Query offsets from the broker.
This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.
Parameters:
Name | Type | Description |
---|---|---|
topic |
string | Topic to recieve offsets from. |
partition |
number | Partition of the provided topic to recieve offsets from |
timeout |
number | Number of ms to wait to recieve a response. |
cb |
RdKafka.ClientClient~watermarkOffsetsCallback | Callback to fire with the offsets. |
- Inherited From:
rebalanceProtocol() → {string}
Get the type of rebalance protocol used in the consumer group.
Returns:
"NONE" (if not in a group yet), "COOPERATIVE" or "EAGER".
- Type
- string
resume(topicPartitions)
Resume consumption for the provided list of partitions.
Parameters:
Name | Type | Description |
---|---|---|
topicPartitions |
Array.<TopicPartition> | List of topic partitions to resume consumption on. |
seek(toppar, timeout, cb) → {RdKafka.KafkaConsumer}
Seek consumer for topic+partition to offset which is either an absolute or logical offset.
The consumer must have previously been assigned to topics and partitions that are being seeked to in the call.
Parameters:
Name | Type | Description |
---|---|---|
toppar |
TopicPartition | Topic partition to seek, including offset. |
timeout |
number | Number of ms to try seeking before erroring out. |
cb |
function | Callback method to execute when finished or timed out. If the seek timed out, the internal state of the consumer is unknown. |
Returns:
- Returns itself.
Example
consumer.seek({ topic: 'topic', partition: 0, offset: 1000 }, 2000, function(err) {
if (err) {
}
});
setDefaultConsumeLoopTimeoutDelay(intervalMs)
Set the default sleep delay for the next consume loop after the previous one has timed out.
Parameters:
Name | Type | Description |
---|---|---|
intervalMs |
number | number of milliseconds to sleep after a message fetch has timed out. |
setDefaultConsumeTimeout(timeoutMs)
Set the default consume timeout provided to C++land.
Parameters:
Name | Type | Description |
---|---|---|
timeoutMs |
number | number of milliseconds to wait for a message to be fetched. |
setSaslCredentials(username, password)
Change SASL credentials to be sent on the next authentication attempt.
Only applicable if SASL authentication is being used.
Parameters:
Name | Type | Description |
---|---|---|
username |
string | |
password |
string |
- Inherited From:
subscribe(topics) → {KafkaConsumer}
Subscribe to an array of topics (synchronously).
This operation is pretty fast because it just sets an assignment in librdkafka. This is the recommended way to deal with subscriptions in a situation where you will be reading across multiple files or as part of your configure-time initialization.
This is also a good way to do it for streams.
This does not actually cause any reassignment of topic partitions until the consume loop is running or the consume method is called.
Parameters:
Name | Type | Description |
---|---|---|
topics |
array | An array of topics to listen to. |
Fires:
Returns:
- Returns itself.
- Type
- KafkaConsumer
subscription() → {array}
Get the current subscription of the KafkaConsumer.
Get a list of subscribed topics. Should generally match what you passed on via subscribe.
Returns:
- Array of topic string to show the current subscription.
- Type
- array
unassign() → {RdKafka.KafkaConsumer}
Unassign the consumer from its assigned partitions and topics. Used for eager (non-cooperative) rebalancing from within the rebalance callback.
Returns:
- Returns itself.
unsubscribe() → {RdKafka.KafkaConsumer}
Unsubscribe from all currently subscribed topics.
Before you subscribe to new topics you need to unsubscribe from the old ones, if there is an active subscription. Otherwise, you will get an error because there is an existing subscription.
Returns:
- Returns itself.
(static) createReadStream(conf, topicConfnullable, streamOptions) → {RdKafka.KafkaConsumerStream}
Get a stream representation of this KafkaConsumer.
Parameters:
Name | Type | Attributes | Description | ||||||
---|---|---|---|---|---|---|---|---|---|
conf |
object | Key value pairs to configure the consumer. |
|||||||
topicConf |
object |
<nullable> |
Key value pairs to create a default topic configuration. May be null. |
||||||
streamOptions |
object | Stream options. Properties
|
Returns:
- Readable stream that receives messages when new ones become available.
Example
var consumerStream = Kafka.KafkaConsumer.createReadStream({
'metadata.broker.list': 'localhost:9092',
'group.id': 'librd-test',
'socket.keepalive.enable': true,
'enable.auto.commit': false
}, {}, { topics: [ 'test' ] });
Type Definitions
Message
Type:
- object
Properties:
Name | Type | Attributes | Description |
---|---|---|---|
topic |
string | The topic the message was read from. |
|
partition |
number | The partition the message was read from. |
|
offset |
number | The offset of the message. |
|
key |
Buffer |
<nullable> |
The key of the message. |
value |
Buffer |
<nullable> |
The value of the message. |
size |
number | The size of the message. |
|
timestamp |
number | The timestamp of the message (in milliseconds since the epoch in UTC). |
|
leaderEpoch |
number |
<nullable> |
The leader epoch of the message if available. |
headers |
Array.<RdKafka.KafkaConsumer~MessageHeader> |
<nullable> |
The headers of the message. |
MessageHeader
The key of this object denotes the header key, and the value of the key is the header value.
Type:
- object
Events
data
Data event. called whenever a message is received.
Type:
disconnected
Disconnect event. Called after disconnection is finished.
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
connectionOpened |
date | when the connection was opened. |
- Inherited From:
ready
Ready event. Called when the Client connects successfully
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
name |
string | the name of the client. |
- Inherited From:
subscribed
Subscribe event. Called efter changing subscription.
Type:
- Array.<string>