Class: Admin

KafkaJS.Admin(config, existingClient)

Admin client for administering Kafka clusters (promise-based, async API).

This client is the way you can interface with the Kafka Admin APIs. This class should not be instantiated directly, and rather, an instance of Kafka should be used to create it, or an existing Producer or Consumer's dependentAdmin method may be used.

Constructor

new Admin(config, existingClient)

Parameters:
Name Type Description
config
existingClient
See:
Example
const { Kafka } = require('@confluentinc/kafka-javascript');
// From a Kafka object.
const kafka = new Kafka({ 'bootstrap.servers': 'localhost:9092' });
const admin = kafka.admin();
await admin.connect();
await admin.createTopics({ });

// From a producer/consumer
const admin = preExistingProducer.dependentAdmin();
await admin.connect();
await admin.createTopics({ });

Methods

(async) connect() → {Promise.<void>}

Set up the client and connect to the bootstrap brokers.

Returns:

Resolves when connection is complete, rejects on error.

Type
Promise.<void>

(async) createTopics(options) → {Promise.<boolean>}

Create topics with the given configuration.

Parameters:
Name Type Description
options object
Properties
Name Type Attributes Description
timeout number <nullable>

The request timeout in milliseconds (default: 5000).

topics Array.<{topic: string, numPartitions: (number|null), replicationFactor: (number|null), configEntries: (Array.<{name: string, value: string}>|null)}>

The topics to create and optionally, the configuration for each topic.

Returns:

Resolves true when the topics are created, false if topic exists already, rejects on error. In case even one topic already exists, this will return false.

Type
Promise.<boolean>

(async) deleteGroups(groups, optionsnullable) → {Promise.<Array.<{groupId: string, errorCode: (number|null), error: (RdKafka.LibrdKafkaError|null)}>>}

Delete consumer groups.

Parameters:
Name Type Attributes Description
groups Array.<string>

The names of the groups to delete.

options object <nullable>
Properties
Name Type Attributes Description
timeout number <nullable>

The request timeout in milliseconds. May be unset (default: 5000)

Returns:

Resolves with the list of deletion reports (including per-group errors).

Type
Promise.<Array.<{groupId: string, errorCode: (number|null), error: (RdKafka.LibrdKafkaError|null)}>>

(async) deleteTopicRecords(options) → {Promise.<Array.<{topic: string, partition: number, lowWatermark: number, error: (RdKafka.LibrdKafkaError|null)}>>}

Deletes records (messages) in topic partitions older than the offsets provided.

Provide -1 as offset to delete all records in the partition.

Parameters:
Name Type Description
options object
Properties
Name Type Attributes Description
topic string

The topic to delete offsets for.

partitions Array.<{partition: number, offset: string}>

The partitions and associated offsets to delete up until.

operationTimeout number <nullable>

The operation timeout in milliseconds. May be unset (default: 60000).

timeout number <nullable>

The request timeout in milliseconds. May be unset (default: 5000).

Returns:

A list of results for each partition.

Type
Promise.<Array.<{topic: string, partition: number, lowWatermark: number, error: (RdKafka.LibrdKafkaError|null)}>>

(async) deleteTopics(options) → {Promise.<void>}

Deletes given topics.

Parameters:
Name Type Description
options object
Properties
Name Type Attributes Description
topics Array.<string>

The topics to delete.

timeout number <nullable>

The request timeout in milliseconds (default: 5000).

Returns:

Resolves when the topics are deleted, rejects on error.

Type
Promise.<void>

(async) describeGroups(groups, optionsnullable) → {Promise.<{groups: Array.<object>}>}

Describe consumer groups.

Parameters:
Name Type Attributes Description
groups Array.<string>

The names of the groups to describe.

options object <nullable>
Properties
Name Type Attributes Description
timeout number <nullable>

The request timeout in milliseconds. May be unset (default: 5000)

includeAuthorizedOperations boolean <nullable>

If true, include operations allowed on the group by the calling client (default: false).

Returns:

The descriptions of the requested groups.

Type
Promise.<{groups: Array.<object>}>

(async) disconnect() → {Promise.<void>}

Disconnect from the brokers, clean-up and tear down the client.

Returns:

Resolves when disconnect is complete, rejects on error.

Type
Promise.<void>

(async) fetchOffsets(optionsnullable) → {Promise.<Array.<{topic: string, partitions: Array.<object>}>>}

Fetch the offsets for topic partition(s) for consumer group(s).

Parameters:
Name Type Attributes Description
options object <nullable>
Properties
Name Type Attributes Description
groupId string

The group ID to fetch offsets for.

topics Array.<string> | Array.<{topic: string, partitions: Array.<number>}>

The topics to fetch offsets for. Can be specified as a list of topics (in case offsets for all topics are fetched), or as a list of objects, each containing a topic and a list of partitions.

timeout number <nullable>

The request timeout in milliseconds. May be unset (default: 5000).

requireStableOffsets boolean <nullable>

Whether broker should return stable offsets (transaction-committed). (default: false).

Returns:

The list of requested offsets.

Type
Promise.<Array.<{topic: string, partitions: Array.<object>}>>

(async) fetchTopicMetadata(optionsnullable) → {Promise.<{topics: Array.<object>}>}

Describe topics.

Parameters:
Name Type Attributes Description
options object <nullable>
Properties
Name Type Attributes Description
topics Array.<string>

The topics to describe. If unset, all topics will be described.

timeout number <nullable>

The request timeout in milliseconds. May be unset (default: 5000).

includeAuthorizedOperations boolean <nullable>

If true, include operations allowed on the topic by the calling client (default: false).

Returns:
Type
Promise.<{topics: Array.<object>}>

(async) fetchTopicOffsets(topic, optionsnullable) → {Promise.<Array.<{partition: number, offset: string, high: string, low: string}>>}

List offsets for the specified topic partition(s).

Parameters:
Name Type Attributes Description
topic string

The topic to fetch offsets for.

options object <nullable>
Properties
Name Type Attributes Description
timeout number <nullable>

The request timeout in milliseconds. May be unset (default: 5000)

isolationLevel KafkaJS.IsolationLevel <nullable>

The isolation level for reading the offsets. (default: READ_UNCOMMITTED)

Returns:
Type
Promise.<Array.<{partition: number, offset: string, high: string, low: string}>>

(async) listGroups(optionsnullable) → {Promise.<{groups: Array.<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates}>, errors: Array.<RdKafka.LibrdKafkaError>}>}

List consumer groups.

Parameters:
Name Type Attributes Description
options object <nullable>
Properties
Name Type Attributes Description
timeout number <nullable>

The request timeout in milliseconds. May be unset (default: 5000).

matchConsumerGroupStates Array.<KafkaJS.ConsumerGroupStates> <nullable>

A list of consumer group states to match. May be unset, fetches all states (default: unset).

Returns:

Resolves with the list of consumer groups, rejects on error.

Type
Promise.<{groups: Array.<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates}>, errors: Array.<RdKafka.LibrdKafkaError>}>

(async) listTopics(optionsnullable) → {Promise.<Array.<string>>}

List topics.

Parameters:
Name Type Attributes Description
options object <nullable>
Properties
Name Type Attributes Description
timeout number <nullable>

The request timeout in milliseconds. May be unset (default: 5000).

Returns:

The list of all topics.

Type
Promise.<Array.<string>>