Constructor
new Producer(kJSConfig)
This method should not be used directly. See KafkaJS.Producer.
Parameters:
Name | Type | Description |
---|---|---|
kJSConfig |
- See:
Example
const { Kafka } = require('@confluentinc/kafka-javascript');
const kafka = new Kafka({ 'bootstrap.servers': 'localhost:9092' });
const producer = kafka.producer();
await producer.connect();
await producer.send({
// send options
});
Methods
(async) abort() → {Promise.<void>}
Abort the current transaction.
Can only be called if there is an ongoing transaction.
Returns:
Resolves when the transaction is aborted.
- Type
- Promise.<void>
(async) commit() → {Promise.<void>}
Commit the current transaction.
Can only be called if there is an ongoing transaction.
Returns:
Resolves when the transaction is committed.
- Type
- Promise.<void>
(async) connect() → {Promise.<void>}
Set up the client and connect to the bootstrap brokers.
This method can be called only once for a producer instance, and must be called before doing any other operations.
Returns:
Resolves when connection is complete, rejects on error.
- Type
- Promise.<void>
dependentAdmin() → {KafkaJS.Admin}
Create a new admin client using the underlying connections of the producer.
The producer must be connected before connecting the resulting admin client. The usage of the admin client is limited to the lifetime of the producer. The producer's logger is shared with the admin client.
Returns:
- Type
- KafkaJS.Admin
(async) disconnect() → {Promise.<void>}
Disconnect from the brokers, clean-up and tear down the client.
Returns:
Resolves when disconnect is complete, rejects on error.
- Type
- Promise.<void>
(async) flush(args) → {Promise.<void>}
Flushes any pending messages.
Messages are batched internally by librdkafka for performance reasons. Continously sent messages are batched upto a timeout, or upto a maximum size. Calling flush sends any pending messages immediately without waiting for this size or timeout.
This is only useful when using asynchronous sends.
For example, the following code does not get any benefit from flushing,
since await
ing the send waits for the delivery report, and the message
has already been sent by the time we start flushing:
for (let i = 0; i < 100; i++) await send(...);
await flush(...) // Not useful.
However, using the following code may put these 5 messages into a batch
and then the subsequent flush
will send the batch altogether (as long as
batch size, etc. are conducive to batching):
for (let i = 0; i < 5; i++) send(...);
await flush({timeout: 5000});
Parameters:
Name | Type | Description | ||||||
---|---|---|---|---|---|---|---|---|
args |
object |
Properties
|
Throws:
-
if the flush times out.
- Type
- KafkaJSTimeout
Returns:
Resolves on successful flush.
- Type
- Promise.<void>
isActive() → {boolean}
Check if there is an ongoing transaction.
Since a producer itself represents a transaction, and there is no distinct type for a transaction, this method exists on the producer.
Returns:
true if there is an ongoing transaction, false otherwise.
- Type
- boolean
logger() → {Object}
Get the logger associated to this producer instance.
- See:
-
- KafkaJS.logLevel for available log levels
Returns:
The logger instance.
- Type
- Object
Example
const logger = producer.logger();
logger.info('Hello world');
logger.setLogLevel(logLevel.ERROR);
(async) send(sendOptions) → {Promise.<Array.<Object>>}
Sends a record of messages to a specific topic.
Parameters:
Name | Type | Description |
---|---|---|
sendOptions |
Object | The record to send. The keys |
Returns:
Resolves with the record metadata for the messages.
- Type
- Promise.<Array.<Object>>
Example
const deliveryReport = await producer.send({
topic: 'test-topic',
messages: [{ value: 'v1', partition: 0, key: 'x' }]
});
(async) sendBatch(sendOptions) → {Promise.<Array.<Object>>}
Sends a record of messages to various topics.
This method is identical to calling send() repeatedly and waiting on all the return values together.
Parameters:
Name | Type | Description |
---|---|---|
sendOptions |
Object | The record to send. The keys |
Returns:
Resolves with the record metadata for the messages.
- Type
- Promise.<Array.<Object>>
Example
const deliveryReports = await producer.sendBatch({
topicMessages: [
{ topic: 'test-topic1', messages: [{ key: 'hello', value: 'hello' }] },
{ topic: 'test-topic2', messages: [{ key: 'world', value: 'world' }] }
]
});
(async) sendOffsets(arg) → {Promise.<void>}
Send offsets for the transaction.
Can only be called if there is an ongoing transaction.
Parameters:
Name | Type | Description | |||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
arg |
object | The arguments to sendOffsets Properties
|
Returns:
Resolves when the offsets are sent.
- Type
- Promise.<void>
setSaslCredentials(args)
Change SASL credentials to be sent on the next authentication attempt.
Only applicable if SASL authentication is being used.
Parameters:
Name | Type | Description | |||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
args |
object | The SASL credentials to set. Properties
|
(async) transaction() → {Promise.<KafkaJS.Producer>}
Starts a new transaction.
This can only be used with a transactional producer, where the transactional ID is set within the config.
No more than one transaction may be ongoing at a time.
Returns:
Resolves with the producer when the transaction is started.
- Type
- Promise.<KafkaJS.Producer>