Constructor
new Consumer(kJSConfig)
This method should not be used directly. See KafkaJS.Consumer.
Parameters:
Name | Type | Description |
---|---|---|
kJSConfig |
- See:
Example
const { Kafka } = require('@confluentinc/kafka-javascript');
const kafka = new Kafka({ 'bootstrap.servers': 'localhost:9092' });
const consumer = kafka.consumer({ 'group.id': 'test-group' });
await consumer.connect();
await consumer.subscribe({ topics: ["test-topic"] });
consumer.run({
eachMessage: async ({ topic, partition, message }) => { console.log({topic, partition, message}); }
});
Methods
assignment() → {Array.<{topic: string, partitions: Array.<number>}>}
Find the assigned topic partitions for the consumer.
Returns:
the current assignment.
- Type
- Array.<{topic: string, partitions: Array.<number>}>
(async) commitOffsets(topicPartitionsnullable) → {Promise.<void>}
Commit offsets for the given topic partitions. If topic partitions are not specified, commits all offsets.
Parameters:
Name | Type | Attributes | Default | Description |
---|---|---|---|---|
topicPartitions |
Array.<{topic: string, partition: number, offset: string, leaderEpoch: (number|null), metadata: (string|null)}> |
<nullable> |
null |
Returns:
A promise that resolves when the offsets have been committed.
- Type
- Promise.<void>
(async) committed(topicPartitionsnullable, timeout) → {Promise.<Array.<{topic: string, partition: number, offset: string, leaderEpoch: (number|null), metadata: (string|null)}>>}
Fetch committed offsets for the given topic partitions.
Parameters:
Name | Type | Attributes | Default | Description |
---|---|---|---|---|
topicPartitions |
Array.<{topic: string, partition: number}> |
<nullable> |
null | The topic partitions to check for committed offsets. Defaults to all assigned partitions. |
timeout |
number | Timeout in ms. Defaults to infinite (-1). |
Returns:
A promise that resolves to the committed offsets.
- Type
- Promise.<Array.<{topic: string, partition: number, offset: string, leaderEpoch: (number|null), metadata: (string|null)}>>
(async) connect() → {Promise.<void>}
Set up the client and connect to the bootstrap brokers.
This method can be called only once for a consumer instance, and must be called before doing any other operations.
Returns:
a promise that resolves when the consumer is connected.
- Type
- Promise.<void>
dependentAdmin() → {KafkaJS.Admin}
Create a new admin client using the underlying connections of the consumer.
The consumer must be connected before connecting the resulting admin client. The usage of the admin client is limited to the lifetime of the consumer. The consumer's logger is shared with the admin client.
Returns:
- Type
- KafkaJS.Admin
(async) disconnect() → {Promise.<void>}
Disconnects and cleans up the consumer.
Warning: This cannot be called from within eachMessage
or eachBatch
callback of
Consumer.run.
Returns:
A promise that resolves when the consumer has disconnected.
- Type
- Promise.<void>
logger() → {Object}
Get the logger associated to this consumer instance.
- See:
-
- KafkaJS.logLevel for available log levels
Returns:
The logger instance.
- Type
- Object
Example
const logger = consumer.logger();
logger.info('Hello world');
logger.setLogLevel(logLevel.ERROR);
pause(topics) → {function}
Pauses the given topic partitions. If partitions are not specified, pauses all partitions for the given topic.
If topic partition(s) are already paused this method has no effect.
Parameters:
Name | Type | Description |
---|---|---|
topics |
Array.<{topic: string, partitions: (Array.<number>|null)}> | Topics or topic partitions to pause. |
Returns:
A function that can be called to resume the topic partitions paused by this call.
- Type
- function
paused() → {Array.<{topic: string, partitions: Array.<number>}>}
Returns the list of paused topic partitions.
Returns:
A list of paused topic partitions.
- Type
- Array.<{topic: string, partitions: Array.<number>}>
rebalanceProtocol() → {string}
Get the type of rebalance protocol used in the consumer group.
Returns:
"NONE" (if not in a group yet), "COOPERATIVE" or "EAGER".
- Type
- string
resume(topics)
Resumes the given topic partitions. If partitions are not specified, resumes all partitions for the given topic.
If topic partition(s) are already resumed this method has no effect.
Parameters:
Name | Type | Description |
---|---|---|
topics |
Array.<{topic: string, partitions: (Array.<number>|null)}> | Topics or topic partitions to resume. |
(async) run(config)
Starts consumer polling. This method returns immediately.
Parameters:
Name | Type | Description | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
config |
object | The configuration for running the consumer. Properties
|
seek(topicPartitionOffset)
Seek to the given offset for a topic partition.
This method is completely asynchronous, and does not wait for the seek to complete. In case any partitions that are seeked to, are not a part of the current assignment, they are stored internally.
If at any later time, the consumer is assigned the partition, as a part of a rebalance, the pending seek will be performed.
Additionally, if the librdkafka property 'enable.auto.commit' or kafkaJS.autoCommit is true, the consumer will commit the offset seeked.
Parameters:
Name | Type | Description | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
topicPartitionOffset |
object |
Properties
|
(async) subscribe(subscription)
Subscribes the consumer to the given topics.
Parameters:
Name | Type | Description | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
subscription |
object | An object containing the topic(s) to subscribe to - one of Properties
|