Class: Consumer

KafkaJS.Consumer(kJSConfig)

Consumer for reading messages from Kafka (promise-based, async API).

The consumer allows reading messages from the Kafka cluster, and provides methods to configure and control various aspects of that. This class should not be instantiated directly, and rather, an instance of Kafka should be used.

Constructor

new Consumer(kJSConfig)

This method should not be used directly. See KafkaJS.Consumer.

Parameters:
Name Type Description
kJSConfig
See:
Example
const { Kafka } = require('@confluentinc/kafka-javascript');
const kafka = new Kafka({ 'bootstrap.servers': 'localhost:9092' });
const consumer = kafka.consumer({ 'group.id': 'test-group' });
await consumer.connect();
await consumer.subscribe({ topics: ["test-topic"] });
consumer.run({
  eachMessage: async ({ topic, partition, message }) => { console.log({topic, partition, message}); }
});

Methods

assignment() → {Array.<{topic: string, partitions: Array.<number>}>}

Find the assigned topic partitions for the consumer.

Returns:

the current assignment.

Type
Array.<{topic: string, partitions: Array.<number>}>

(async) commitOffsets(topicPartitionsnullable) → {Promise.<void>}

Commit offsets for the given topic partitions. If topic partitions are not specified, commits all offsets.

Parameters:
Name Type Attributes Default Description
topicPartitions Array.<{topic: string, partition: number, offset: string, leaderEpoch: (number|null), metadata: (string|null)}> <nullable>
null
Returns:

A promise that resolves when the offsets have been committed.

Type
Promise.<void>

(async) committed(topicPartitionsnullable, timeout) → {Promise.<Array.<{topic: string, partition: number, offset: string, leaderEpoch: (number|null), metadata: (string|null)}>>}

Fetch committed offsets for the given topic partitions.

Parameters:
Name Type Attributes Default Description
topicPartitions Array.<{topic: string, partition: number}> <nullable>
null

The topic partitions to check for committed offsets. Defaults to all assigned partitions.

timeout number

Timeout in ms. Defaults to infinite (-1).

Returns:

A promise that resolves to the committed offsets.

Type
Promise.<Array.<{topic: string, partition: number, offset: string, leaderEpoch: (number|null), metadata: (string|null)}>>

(async) connect() → {Promise.<void>}

Set up the client and connect to the bootstrap brokers.

This method can be called only once for a consumer instance, and must be called before doing any other operations.

Returns:

a promise that resolves when the consumer is connected.

Type
Promise.<void>

dependentAdmin() → {KafkaJS.Admin}

Create a new admin client using the underlying connections of the consumer.

The consumer must be connected before connecting the resulting admin client. The usage of the admin client is limited to the lifetime of the consumer. The consumer's logger is shared with the admin client.

Returns:
Type
KafkaJS.Admin

(async) disconnect() → {Promise.<void>}

Disconnects and cleans up the consumer.

Warning: This cannot be called from within eachMessage or eachBatch callback of Consumer.run.

Returns:

A promise that resolves when the consumer has disconnected.

Type
Promise.<void>

logger() → {Object}

Get the logger associated to this consumer instance.

See:
  • KafkaJS.logLevel for available log levels
Returns:

The logger instance.

Type
Object
Example
const logger = consumer.logger();
logger.info('Hello world');
logger.setLogLevel(logLevel.ERROR);

pause(topics) → {function}

Pauses the given topic partitions. If partitions are not specified, pauses all partitions for the given topic.

If topic partition(s) are already paused this method has no effect.

Parameters:
Name Type Description
topics Array.<{topic: string, partitions: (Array.<number>|null)}>

Topics or topic partitions to pause.

Returns:

A function that can be called to resume the topic partitions paused by this call.

Type
function

paused() → {Array.<{topic: string, partitions: Array.<number>}>}

Returns the list of paused topic partitions.

Returns:

A list of paused topic partitions.

Type
Array.<{topic: string, partitions: Array.<number>}>

rebalanceProtocol() → {string}

Get the type of rebalance protocol used in the consumer group.

Returns:

"NONE" (if not in a group yet), "COOPERATIVE" or "EAGER".

Type
string

resume(topics)

Resumes the given topic partitions. If partitions are not specified, resumes all partitions for the given topic.

If topic partition(s) are already resumed this method has no effect.

Parameters:
Name Type Description
topics Array.<{topic: string, partitions: (Array.<number>|null)}>

Topics or topic partitions to resume.

(async) run(config)

Starts consumer polling. This method returns immediately.

Parameters:
Name Type Description
config object

The configuration for running the consumer.

Properties
Name Type Attributes Description
eachMessage function <nullable>

The function to call for processing each message.

eachBatch function <nullable>

The function to call for processing each batch of messages - can only be set if eachMessage is not set.

eachBatchAutoResolve boolean <nullable>

Whether to automatically resolve offsets for each batch (only applicable if eachBatch is set, true by default).

partitionsConsumedConcurrently number <nullable>

The limit to the number of partitions consumed concurrently (1 by default).

seek(topicPartitionOffset)

Seek to the given offset for a topic partition.

This method is completely asynchronous, and does not wait for the seek to complete. In case any partitions that are seeked to, are not a part of the current assignment, they are stored internally.

If at any later time, the consumer is assigned the partition, as a part of a rebalance, the pending seek will be performed.

Additionally, if the librdkafka property 'enable.auto.commit' or kafkaJS.autoCommit is true, the consumer will commit the offset seeked.

Parameters:
Name Type Description
topicPartitionOffset object
Properties
Name Type Description
topic string
partition number
offset string

The offset to seek to.

(async) subscribe(subscription)

Subscribes the consumer to the given topics.

Parameters:
Name Type Description
subscription object

An object containing the topic(s) to subscribe to - one of topic or topics must be present.

Properties
Name Type Attributes Description
topic string <nullable>

The topic to subscribe to.

topics Array.<string> <nullable>

The topics to subscribe to.

replace boolean <nullable>

Whether to replace the existing subscription, or to add to it. Adds by default.