Class: Admin

KafkaJS.Admin(config, existingClient)

Admin client for administering Kafka clusters (promise-based, async API).

This client is the way you can interface with the Kafka Admin APIs. This class should not be instantiated directly, and rather, an instance of Kafka should be used to create it, or an existing Producer or Consumer's dependentAdmin method may be used.

Constructor

new Admin(config, existingClient)

Parameters:
NameTypeDescription
config
existingClient
See:
Example
const { Kafka } = require('@confluentinc/kafka-javascript');
// From a Kafka object.
const kafka = new Kafka({ 'bootstrap.servers': 'localhost:9092' });
const admin = kafka.admin();
await admin.connect();
await admin.createTopics({ });

// From a producer/consumer
const admin = preExistingProducer.dependentAdmin();
await admin.connect();
await admin.createTopics({ });

Methods

(async) connect() → {Promise.<void>}

Set up the client and connect to the bootstrap brokers.

Returns:

Resolves when connection is complete, rejects on error.

Type
Promise.<void>

(async) createTopics(options) → {Promise.<boolean>}

Create topics with the given configuration.

Parameters:
NameTypeDescription
optionsobject
Properties
NameTypeAttributesDescription
timeoutnumber <nullable>

The request timeout in milliseconds (default: 5000).

topicsArray.<{topic: string, numPartitions: (number|null), replicationFactor: (number|null), configEntries: (Array.<{name: string, value: string}>|null)}>

The topics to create and optionally, the configuration for each topic.

Returns:

Resolves true when the topics are created, false if topic exists already, rejects on error. In case even one topic already exists, this will return false.

Type
Promise.<boolean>

(async) deleteGroups(groups, optionsnullable) → {Promise.<Array.<{groupId: string, errorCode: (number|null), error: (RdKafka.LibrdKafkaError|null)}>>}

Delete consumer groups.

Parameters:
NameTypeAttributesDescription
groupsArray.<string>

The names of the groups to delete.

optionsobject <nullable>
Properties
NameTypeAttributesDescription
timeoutnumber <nullable>

The request timeout in milliseconds. May be unset (default: 5000)

Returns:

Resolves with the list of deletion reports (including per-group errors).

Type
Promise.<Array.<{groupId: string, errorCode: (number|null), error: (RdKafka.LibrdKafkaError|null)}>>

(async) deleteTopicRecords(options) → {Promise.<Array.<{topic: string, partition: number, lowWatermark: number, error: (RdKafka.LibrdKafkaError|null)}>>}

Deletes records (messages) in topic partitions older than the offsets provided.

Provide -1 as offset to delete all records in the partition.

Parameters:
NameTypeDescription
optionsobject
Properties
NameTypeAttributesDescription
topicstring

The topic to delete offsets for.

partitionsArray.<{partition: number, offset: string}>

The partitions and associated offsets to delete up until.

operationTimeoutnumber <nullable>

The operation timeout in milliseconds. May be unset (default: 60000).

timeoutnumber <nullable>

The request timeout in milliseconds. May be unset (default: 5000).

Returns:

A list of results for each partition.

Type
Promise.<Array.<{topic: string, partition: number, lowWatermark: number, error: (RdKafka.LibrdKafkaError|null)}>>

(async) deleteTopics(options) → {Promise.<void>}

Deletes given topics.

Parameters:
NameTypeDescription
optionsobject
Properties
NameTypeAttributesDescription
topicsArray.<string>

The topics to delete.

timeoutnumber <nullable>

The request timeout in milliseconds (default: 5000).

Returns:

Resolves when the topics are deleted, rejects on error.

Type
Promise.<void>

(async) describeGroups(groups, optionsnullable) → {Promise.<{groups: Array.<object>}>}

Describe consumer groups.

Parameters:
NameTypeAttributesDescription
groupsArray.<string>

The names of the groups to describe.

optionsobject <nullable>
Properties
NameTypeAttributesDescription
timeoutnumber <nullable>

The request timeout in milliseconds. May be unset (default: 5000)

includeAuthorizedOperationsboolean <nullable>

If true, include operations allowed on the group by the calling client (default: false).

Returns:

The descriptions of the requested groups.

Type
Promise.<{groups: Array.<object>}>

(async) disconnect() → {Promise.<void>}

Disconnect from the brokers, clean-up and tear down the client.

Returns:

Resolves when disconnect is complete, rejects on error.

Type
Promise.<void>

(async) fetchOffsets(optionsnullable) → {Promise.<Array.<{topic: string, partitions: Array.<object>}>>}

Fetch the offsets for topic partition(s) for consumer group(s).

Parameters:
NameTypeAttributesDescription
optionsobject <nullable>
Properties
NameTypeAttributesDescription
groupIdstring

The group ID to fetch offsets for.

topicsArray.<string> | Array.<{topic: string, partitions: Array.<number>}>

The topics to fetch offsets for. Can be specified as a list of topics (in case offsets for all topics are fetched), or as a list of objects, each containing a topic and a list of partitions.

timeoutnumber <nullable>

The request timeout in milliseconds. May be unset (default: 5000).

requireStableOffsetsboolean <nullable>

Whether broker should return stable offsets (transaction-committed). (default: false).

Returns:

The list of requested offsets.

Type
Promise.<Array.<{topic: string, partitions: Array.<object>}>>

(async) fetchTopicMetadata(optionsnullable) → {Promise.<{topics: Array.<object>}>}

Describe topics.

Parameters:
NameTypeAttributesDescription
optionsobject <nullable>
Properties
NameTypeAttributesDescription
topicsArray.<string>

The topics to describe. If unset, all topics will be described.

timeoutnumber <nullable>

The request timeout in milliseconds. May be unset (default: 5000).

includeAuthorizedOperationsboolean <nullable>

If true, include operations allowed on the topic by the calling client (default: false).

Returns:
Type
Promise.<{topics: Array.<object>}>

(async) fetchTopicOffsets(topic, optionsnullable) → {Promise.<Array.<{partition: number, offset: string, high: string, low: string}>>}

List offsets for the specified topic partition(s).

Parameters:
NameTypeAttributesDescription
topicstring

The topic to fetch offsets for.

optionsobject <nullable>
Properties
NameTypeAttributesDescription
timeoutnumber <nullable>

The request timeout in milliseconds. May be unset (default: 5000)

isolationLevelKafkaJS.IsolationLevel <nullable>

The isolation level for reading the offsets. (default: READ_UNCOMMITTED)

Returns:
Type
Promise.<Array.<{partition: number, offset: string, high: string, low: string}>>

(async) fetchTopicOffsetsByTimestamp(topic, timestampnullable, optionsnullable) → {Promise.<Array.<{partition: number, offset: string}>>}

List offsets for the topic partition(s) by timestamp.

Parameters:
NameTypeAttributesDescription
topicstring

The topic to fetch offsets for.

timestampnumber <nullable>

The timestamp to fetch offsets for.

optionsobject <nullable>
Properties
NameTypeAttributesDescription
timeoutnumber <nullable>

The request timeout in milliseconds. May be unset (default: 5000)

isolationLevelKafkaJS.IsolationLevel <nullable>

The isolation level for reading the offsets. (default: READ_UNCOMMITTED)

The returned topic partitions contain the earliest offset whose timestamp is greater than or equal to the given timestamp. If there is no such offset, or if the timestamp is unset, the latest offset is returned instead.

Returns:
Type
Promise.<Array.<{partition: number, offset: string}>>

(async) listGroups(optionsnullable) → {Promise.<{groups: Array.<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates, type: KafkaJS.ConsumerGroupTypes}>, errors: Array.<RdKafka.LibrdKafkaError>}>}

List consumer groups.

Parameters:
NameTypeAttributesDescription
optionsobject <nullable>
Properties
NameTypeAttributesDescription
timeoutnumber <nullable>

The request timeout in milliseconds. May be unset (default: 5000).

matchConsumerGroupStatesArray.<KafkaJS.ConsumerGroupStates> <nullable>

A list of consumer group states to match. May be unset, fetches all states (default: unset).

matchConsumerGroupTypesArray.<KafkaJS.ConsumerGroupTypes> <nullable>

A list of consumer group types to match. May be unset, fetches all types (default: unset).

Returns:

Resolves with the list of consumer groups, rejects on error.

Type
Promise.<{groups: Array.<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates, type: KafkaJS.ConsumerGroupTypes}>, errors: Array.<RdKafka.LibrdKafkaError>}>

(async) listTopics(optionsnullable) → {Promise.<Array.<string>>}

List topics.

Parameters:
NameTypeAttributesDescription
optionsobject <nullable>
Properties
NameTypeAttributesDescription
timeoutnumber <nullable>

The request timeout in milliseconds. May be unset (default: 5000).

Returns:

The list of all topics.

Type
Promise.<Array.<string>>