Class: Consumer

KafkaJS.Consumer(kJSConfig)

Consumer for reading messages from Kafka (promise-based, async API).

The consumer allows reading messages from the Kafka cluster, and provides methods to configure and control various aspects of that. This class should not be instantiated directly, and rather, an instance of Kafka should be used.

Constructor

new Consumer(kJSConfig)

This method should not be used directly. See KafkaJS.Consumer.

Parameters:
NameTypeDescription
kJSConfig
See:
Example
const { Kafka } = require('@confluentinc/kafka-javascript');
const kafka = new Kafka({ 'bootstrap.servers': 'localhost:9092' });
const consumer = kafka.consumer({ 'group.id': 'test-group' });
await consumer.connect();
await consumer.subscribe({ topics: ["test-topic"] });
consumer.run({
  eachMessage: async ({ topic, partition, message }) => { console.log({topic, partition, message}); }
});

Methods

assignment() → {Array.<{topic: string, partitions: Array.<number>}>}

Find the assigned topic partitions for the consumer.

Returns:

the current assignment.

Type
Array.<{topic: string, partitions: Array.<number>}>

(async) commitOffsets(topicPartitionsnullable) → {Promise.<void>}

Commit offsets for the given topic partitions. If topic partitions are not specified, commits all offsets.

Parameters:
NameTypeAttributesDefaultDescription
topicPartitionsArray.<{topic: string, partition: number, offset: string, leaderEpoch: (number|null), metadata: (string|null)}> <nullable>
null
Returns:

A promise that resolves when the offsets have been committed.

Type
Promise.<void>

(async) committed(topicPartitionsnullable, timeout) → {Promise.<Array.<{topic: string, partition: number, offset: string, leaderEpoch: (number|null), metadata: (string|null)}>>}

Fetch committed offsets for the given topic partitions.

Parameters:
NameTypeAttributesDefaultDescription
topicPartitionsArray.<{topic: string, partition: number}> <nullable>
null

The topic partitions to check for committed offsets. Defaults to all assigned partitions.

timeoutnumber

Timeout in ms. Defaults to infinite (-1).

Returns:

A promise that resolves to the committed offsets.

Type
Promise.<Array.<{topic: string, partition: number, offset: string, leaderEpoch: (number|null), metadata: (string|null)}>>

(async) connect() → {Promise.<void>}

Set up the client and connect to the bootstrap brokers.

This method can be called only once for a consumer instance, and must be called before doing any other operations.

Returns:

a promise that resolves when the consumer is connected.

Type
Promise.<void>

dependentAdmin() → {KafkaJS.Admin}

Create a new admin client using the underlying connections of the consumer.

The consumer must be connected before connecting the resulting admin client. The usage of the admin client is limited to the lifetime of the consumer. The consumer's logger is shared with the admin client.

Returns:
Type
KafkaJS.Admin

(async) disconnect() → {Promise.<void>}

Disconnects and cleans up the consumer.

Warning: This cannot be called from within eachMessage or eachBatch callback of Consumer.run.

Returns:

A promise that resolves when the consumer has disconnected.

Type
Promise.<void>

logger() → {Object}

Get the logger associated to this consumer instance.

See:
  • KafkaJS.logLevel for available log levels
Returns:

The logger instance.

Type
Object
Example
const logger = consumer.logger();
logger.info('Hello world');
logger.setLogLevel(logLevel.ERROR);

pause(topics) → {function}

Pauses the given topic partitions. If partitions are not specified, pauses all partitions for the given topic.

If topic partition(s) are already paused this method has no effect.

Parameters:
NameTypeDescription
topicsArray.<{topic: string, partitions: (Array.<number>|null)}>

Topics or topic partitions to pause.

Returns:

A function that can be called to resume the topic partitions paused by this call.

Type
function

paused() → {Array.<{topic: string, partitions: Array.<number>}>}

Returns the list of paused topic partitions.

Returns:

A list of paused topic partitions.

Type
Array.<{topic: string, partitions: Array.<number>}>

rebalanceProtocol() → {string}

Get the type of rebalance protocol used in the consumer group.

Returns:

"NONE" (if not in a group yet), "COOPERATIVE" or "EAGER".

Type
string

resume(topics)

Resumes the given topic partitions. If partitions are not specified, resumes all partitions for the given topic.

If topic partition(s) are already resumed this method has no effect.

Parameters:
NameTypeDescription
topicsArray.<{topic: string, partitions: (Array.<number>|null)}>

Topics or topic partitions to resume.

(async) run(config)

Starts consumer polling. This method returns immediately.

Parameters:
NameTypeDescription
configobject

The configuration for running the consumer.

Properties
NameTypeAttributesDescription
eachMessagefunction <nullable>

The function to call for processing each message.

eachBatchfunction <nullable>

The function to call for processing each batch of messages - can only be set if eachMessage is not set.

eachBatchAutoResolveboolean <nullable>

Whether to automatically resolve offsets for each batch (only applicable if eachBatch is set, true by default).

partitionsConsumedConcurrentlynumber <nullable>

The limit to the number of partitions consumed concurrently (1 by default).

seek(topicPartitionOffset)

Seek to the given offset for a topic partition.

This method is completely asynchronous, and does not wait for the seek to complete. In case any partitions that are seeked to, are not a part of the current assignment, they are stored internally.

If at any later time, the consumer is assigned the partition, as a part of a rebalance, the pending seek will be performed.

Additionally, if the librdkafka property 'enable.auto.commit' or kafkaJS.autoCommit is true, the consumer will commit the offset seeked.

Parameters:
NameTypeDescription
topicPartitionOffsetobject
Properties
NameTypeDescription
topicstring
partitionnumber
offsetstring

The offset to seek to.

(async) subscribe(subscription)

Subscribes the consumer to the given topics.

Parameters:
NameTypeDescription
subscriptionobject

An object containing the topic(s) to subscribe to - one of topic or topics must be present.

Properties
NameTypeAttributesDescription
topicstring <nullable>

The topic to subscribe to.

topicsArray.<string> <nullable>

The topics to subscribe to.

replaceboolean <nullable>

Whether to replace the existing subscription, or to add to it. Adds by default.