Class: Producer

KafkaJS.Producer(kJSConfig)

Producer for sending messages to Kafka (promise-based, async API).

The producer allows sending messages to the Kafka cluster, and provides methods to configure and control various aspects of that. This class should not be instantiated directly, and rather, an instance of Kafka should be used.

Constructor

new Producer(kJSConfig)

This method should not be used directly. See KafkaJS.Producer.

Parameters:
Name Type Description
kJSConfig
See:
Example
const { Kafka } = require('@confluentinc/kafka-javascript');
const kafka = new Kafka({ 'bootstrap.servers': 'localhost:9092' });
const producer = kafka.producer();
await producer.connect();
await producer.send({
  // send options
});

Methods

(async) abort() → {Promise.<void>}

Abort the current transaction.

Can only be called if there is an ongoing transaction.

Returns:

Resolves when the transaction is aborted.

Type
Promise.<void>

(async) commit() → {Promise.<void>}

Commit the current transaction.

Can only be called if there is an ongoing transaction.

Returns:

Resolves when the transaction is committed.

Type
Promise.<void>

(async) connect() → {Promise.<void>}

Set up the client and connect to the bootstrap brokers.

This method can be called only once for a producer instance, and must be called before doing any other operations.

Returns:

Resolves when connection is complete, rejects on error.

Type
Promise.<void>

dependentAdmin() → {KafkaJS.Admin}

Create a new admin client using the underlying connections of the producer.

The producer must be connected before connecting the resulting admin client. The usage of the admin client is limited to the lifetime of the producer. The producer's logger is shared with the admin client.

Returns:
Type
KafkaJS.Admin

(async) disconnect() → {Promise.<void>}

Disconnect from the brokers, clean-up and tear down the client.

Returns:

Resolves when disconnect is complete, rejects on error.

Type
Promise.<void>

(async) flush(args) → {Promise.<void>}

Flushes any pending messages.

Messages are batched internally by librdkafka for performance reasons. Continously sent messages are batched upto a timeout, or upto a maximum size. Calling flush sends any pending messages immediately without waiting for this size or timeout.

This is only useful when using asynchronous sends. For example, the following code does not get any benefit from flushing, since awaiting the send waits for the delivery report, and the message has already been sent by the time we start flushing:

for (let i = 0; i < 100; i++) await send(...);
await flush(...) // Not useful.

However, using the following code may put these 5 messages into a batch and then the subsequent flush will send the batch altogether (as long as batch size, etc. are conducive to batching):

for (let i = 0; i < 5; i++) send(...);
await flush({timeout: 5000});
Parameters:
Name Type Description
args object
Properties
Name Type Description
timeout number

Time to try flushing for in milliseconds, 500ms by default.

Throws:

if the flush times out.

Type
KafkaJSTimeout
Returns:

Resolves on successful flush.

Type
Promise.<void>

isActive() → {boolean}

Check if there is an ongoing transaction.

Since a producer itself represents a transaction, and there is no distinct type for a transaction, this method exists on the producer.

Returns:

true if there is an ongoing transaction, false otherwise.

Type
boolean

logger() → {Object}

Get the logger associated to this producer instance.

See:
  • KafkaJS.logLevel for available log levels
Returns:

The logger instance.

Type
Object
Example
const logger = producer.logger();
logger.info('Hello world');
logger.setLogLevel(logLevel.ERROR);

(async) send(sendOptions) → {Promise.<Array.<Object>>}

Sends a record of messages to a specific topic.

Parameters:
Name Type Description
sendOptions Object

The record to send. The keys acks, timeout, and compression are not used, and should not be set, rather, they should be set in the global config.

Returns:

Resolves with the record metadata for the messages.

Type
Promise.<Array.<Object>>
Example
const deliveryReport = await producer.send({
  topic: 'test-topic',
  messages: [{ value: 'v1', partition: 0, key: 'x' }]
});

(async) sendBatch(sendOptions) → {Promise.<Array.<Object>>}

Sends a record of messages to various topics.

This method is identical to calling send() repeatedly and waiting on all the return values together.

Parameters:
Name Type Description
sendOptions Object

The record to send. The keys acks, timeout, and compression are not used, and should not be set, rather, they should be set in the global config.

Returns:

Resolves with the record metadata for the messages.

Type
Promise.<Array.<Object>>
Example
const deliveryReports = await producer.sendBatch({
  topicMessages: [
    { topic: 'test-topic1', messages: [{ key: 'hello', value: 'hello' }] },
    { topic: 'test-topic2', messages: [{ key: 'world', value: 'world' }] }
  ]
});

(async) sendOffsets(arg) → {Promise.<void>}

Send offsets for the transaction.

Can only be called if there is an ongoing transaction.

Parameters:
Name Type Description
arg object

The arguments to sendOffsets

Properties
Name Type Description
consumer KafkaJS.Consumer

The consumer to send offsets for.

topics Array.<{topic: string, partitions: Array.<{partition: number, offset: string}>}>

The topics, partitions and the offsets to send.

Returns:

Resolves when the offsets are sent.

Type
Promise.<void>

setSaslCredentials(args)

Change SASL credentials to be sent on the next authentication attempt.

Only applicable if SASL authentication is being used.

Parameters:
Name Type Description
args object

The SASL credentials to set.

Properties
Name Type Description
username string

SASL username.

password string

SASL password.

(async) transaction() → {Promise.<KafkaJS.Producer>}

Starts a new transaction.

This can only be used with a transactional producer, where the transactional ID is set within the config.

No more than one transaction may be ongoing at a time.

Returns:

Resolves with the producer when the transaction is started.

Type
Promise.<KafkaJS.Producer>