Confluent JavaScript Client for Apache Kafka¶
Confluent, a leading developer and maintainer of Apache Kafka®, offers the Confluent JavaScript Client for Apache Kafka®. This client integrates JavaScript applications with Kafka clusters and enables you to produce and consume messages from Kafka topics.
The JavaScript Client is a wrapper around the librdkafka C library, supporting various Kafka features with high performance and reliability.
There are two variants of the API offered by this client library: the promisified API and the callback-based API.
- If you’re starting afresh, use the promisified API.
- If you’re migrating from KafkaJS, you can use the KafkaJS migration guide to get started.
- If you’re migrating from node-rdkafka, you can use the node-rdkafka migration guide to get started.
Regardless of which API you choose, this document, along with the API reference and the examples, serves as a comprehensive guide to using the JavaScript Client.
Installation¶
Requirements
The following configurations are supported for the JavaScript Client:
- Supported versions of Node.js: The two LTS versions, 18 and 20, and the latest versions, 21 and 22.
- Linux (x64 and arm64): Both glibc and musl/alpine.
- macOS: arm64/m1. macOS (Intel) is supported on a best-effort basis.
- Windows: x64.
Installation on any of the above platforms is seamlessly supported and does not require C/C++ compilation.
To install the JavaScript Client on an unsupported system that is not listed above, a supported version of Python must be available on the system for the installation process. This is required for the node-gyp build tool.
Yarn and pnpm (Performant Node Package Manager) support is experimental.
Install on supported environments
To install the JavaScript Client, run the following command:
npm install @confluentinc/kafka-javascript
Install on unsupported environments
To install the client an OS or an architecture that isn’t supported by the prebuilt binaries:
Install a recent version of Python, a C++ compiler that supports C++20 or above. It’s best to use a recent version of
gcc
orclang
.Install librdkafka. There are many ways to do this depending on your target machine. The easiest way is to grab it from your package manager, or if not available, you might need to build it from the source.
Example installation for Debian/Ubuntu based platforms:
sudo mkdir -p /etc/apt/keyrings wget -qO - https://packages.confluent.io/deb/7.8/archive.key | gpg --dearmor | sudo tee /etc/apt/keyrings/confluent.gpg > /dev/null sudo apt-get update sudo apt install librdkafka-dev
It is recommended that you follow detailed, up to date instructions available on the librdkafka repository.
Once librdkafka is installed, set the environment variables. An example of this for a Unix-like platform is shown below:
export CKJS_LINKING=dynamic export BUILD_LIBRDKAFKA=0
Run
npm install
to build the library and link it against the librdkafka installed in the system:npm install @confluentinc/kafka-javascript
This process sometimes requires troubleshooting. If you encounter any issues, please search for similar issues in the repository, or file one.
If npm install
takes more than a couple of minutes and you’re using a MacOS,
you can check this issue.
for information.
Promisified API¶
The promisified is included in your code as:
const {
Kafka,
ErrorCodes, CompressionTypes, // You can specify additional optional properties for further configuration.
} = require('@confluentinc/kafka-javascript').KafkaJS;
Configure¶
JavaScript Client supports many librdkafka configuration options.
Configuration keys that have the suffix _cb
are designated as
callbacks. Currently, the only supported callbacks are the
rebalance_cb
for the consumer, and oauthbearer_token_refresh_cb
for all clients.
In addition, a kafkaJS
configuration key can be used to specify an object
whose keys can be a limited set of KafkaJS properties. This should only be
used if you’re migrating from KafkaJS.
Produce¶
A Producer
sends messages to Kafka. The following example illustrates how to
create an instance and send a message:
const producer = new Kafka().producer({
'bootstrap.servers': '<fill>',
});
await producer.connect();
const deliveryReports = await producer.send({
topic: 'test-topic',
messages: [
{ value: 'v1', key: 'x' },
]
});
console.log({deliveryReports});
await producer.disconnect();
The send
method is used to send messages to the Kafka cluster. It
returns an array of delivery reports for each message sent. The producer
must be connected to use this method.
Multiple messages can be sent to a topic in a single send
. Besides
the value, a message can contain several other fields. For example:
const message = {
// An optional key for the message. If a key is provided, all messages with the same key will be sent to the same partition. It can be a string, a Buffer, or null.
key: null,
// The value of the message. This can be a Buffer or a string, and must be set.
value: Buffer.from('Hello'),
// The partition to send the message to. It may be a number, or be unset. If not set, the partition will be chosen by the partitioner based on the key.
partition: 1,
// Headers to be sent along with the message. May be unset.
headers: {
// Header keys are strings.
// Header values can be strings.
'header-key-0': 'header-value',
// Header values can be buffers, too.
'header-key-1': Buffer.from('header-value'),
// One or more values can be sent for a single key.
'header-key-3': [Buffer.from('header-value-0'), Buffer.from('header-value-1')],
}
// The timestamp of the message in milliseconds since the Unix epoch. If not set, the broker will set the timestamp.
timestamp?: string
}
Message batching, async sends, and flushing¶
Messages are batched internally by the library on a per partition basis.
Setting the linger.ms
property allows the library to wait up to the
specified amount of time to accumulate messages into a single batch.
The maximum size of the batch is determined by batch.num.messages
or
batch.size
.
Note that if send
is awaited on, only the messages included within
that send
can be batched together.
For the best possible throughput, it’s recommended to use send
calls
without awaiting them, and then await all of them together.
Additionally, linger.ms
can be increased to accumulate a larger
number of messages into the same batch to reduce network and CPU
overhead.
const reportPromises = [];
for (const topicMessages of topicsMessages) {
reportPromises.push(
producer.send({topic: topicMessages.topic, messages: topicMessages.messages}));
}
await reportPromises;
On the other hand, if you want to ensure that the latency of each
message is as low as possible, you can await each send
individually
and set linger.ms
to 0 or 1.
const producer = new Kafka().producer({
'bootstrap.servers': '<fill>',
'linger.ms': 1,
});
await producer.connect();
await producer.send(/* ... */);
In case you are producing messages with a larger linger.ms
, and want
to send messages immediately, you can call flush
to send all
messages in the internal buffer without waiting for the linger.ms
to
expire.
// Unawaited send.
const reportPromise = producer.send(/*...*/);
// Try to send everything, waiting upto 1s. If we weren't able to finish sending in 1s, throw an error.
await producer.flush({ timeout: 1000 });
Warning
If you increase the value of linger.ms
, do not await your send
calls,
as every single await may take up to linger.ms
to resolve. Instead, if
you need the delivery report, flush after sending as shown above.
For further details, see compression and retries/reliability.
Producer methods¶
Method | Description |
---|---|
async connect() | Connects to the broker. Must be called exactly once before using the producer |
async disconnect() | Disconnects producer (flushing it) and cleans up connections and resources. |
async send() | Sends message(s) to a Kafka topic. Returns the delivery reports on awaiting. |
async sendBatch() | Sends message(s) to various Kafka topics. Returns the delivery reports on awaiting. |
async flush() | Flushes any messages buffered in the library and attempts to send them immediately. |
async transaction() | Starts a new transaction. See Transactional producer and exactly-once semantics. |
async sendOffsets() | Commits offsets within a transaction. See Transactional producer and exactly-once semantics. |
async commit() | Commits an ongoing transaction. See Transactional producer and exactly-once semantics. |
async abort() | Aborts an ongoing transaction. See Transactional producer and exactly-once semantics. |
dependentAdmin() | Returns an Admin client which uses this producer for its underlying connections. |
setSaslCredentials() | Sets the SASL username and password to be used for the next login attempt. Used for SASL PLAIN or SASL SCRAM authentication. |
See the JavaScript Client API reference documentation for more details.
Consume¶
A Consumer
receives messages from Kafka. The following example illustrates
how to create an instance and start consuming messages:
const consumer = new Kafka().consumer({
'bootstrap.servers': '<fill>',
'group.id': 'test-group', // Mandatory property for a consumer - the consumer group id.
});
await consumer.connect();
await consumer.subscribe({ topics: ["test-topic"] });
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic,
partition,
headers: message.headers,
offset: message.offset,
key: message.key?.toString(),
value: message.value.toString(),
});
}
});
// Whenever we're done consuming, maybe after user input or a signal:
await consumer.disconnect();
The consumer must be connected before calling run
. The run
method starts the consumer loop, which takes care of polling the
cluster, and will call the eachMessage
callback for every message you
get from the cluster.
The message may contain several other fields besides the value. For example:
{
// Key of the message - may not be set.
key: Buffer.from('key'),
// Value of the message - will be set.
value: Buffer.from('value'),
// The timestamp set by the producer or the broker in milliseconds since the Unix epoch.
timestamp: '1734008723000',
// The current epoch of the leader for this partition.
leaderEpoch: 2,
// Size of the message in bytes.
size: 6,
// Offset of the message on the partition.
offset: '42',
// Headers that were sent along with the message.
headers: {
'header-key-0': ['header-value-0', 'header-value-1'],
'header-key-1': Buffer.from('header-value'),
}
}
A message is considered to be processed successfully when
eachMessage
for that message runs to completion without throwing an
error. In case an error is thrown, the message is marked unprocessed,
and eachMessage
will be called with the same message again.
Subscribe and rebalance¶
To consume messages, the consumer must be a part of a consumer
group,
and it must subscribe to one or more topics. The group is specified with
the group.id
property, and the subscribe
method should be called
after connecting to the cluster.
The consumer does not actually join the consumer group until run
is
called. Joining a consumer group causes a rebalance within all the
members of that consumer group, where each consumer is assigned a set of
partitions to consume from. Rebalances may also be caused when a
consumer leaves a group by disconnecting, or when new partitions are
added to a topic.
It is possible to add a callback to track rebalances:
const rebalance_cb = (err, assignment) => {
switch (err.code) {
case ErrorCodes.ERR__ASSIGN_PARTITIONS:
console.log(`Assigned partitions ${JSON.stringify(assignment)}`);
break;
case ErrorCodes.ERR__REVOKE_PARTITIONS:
console.log(`Revoked partitions ${JSON.stringify(assignment)}`);
break;
default:
console.error(err);
}
};
const consumer = new Kafka().consumer({
'bootstrap.servers': '<fill>',
'group.id': 'test-group',
'rebalance_cb': rebalance_cb,
});
It’s also possible to modify the assignment of partitions, or pause consumption of newly assigned partitions just after a rebalance.
const rebalance_cb = (err, assignment, assignmentFns) => {
switch (err.code) {
case ErrorCodes.ERR__ASSIGN_PARTITIONS:
// Change the assignment as needed - this mostly boils down to changing the offset to start consumption from, though
// you are free to do anything.
if (assignment.length > 0) {
assignment[0].offset = 34;
}
assignmentFns.assign(assignment);
// Can pause consumption of new partitions just after a rebalance.
break;
case ErrorCodes.ERR__REVOKE_PARTITIONS:
break;
default:
console.error(err);
}
};
Subscriptions can be changed anytime, and the running consumer triggers
a rebalance whenever that happens. The current assignment of partitions
to the consumer can be checked with the assign
method.
Commit offsets¶
As a part of a consumer group, the consumer can commit offsets to the broker, so any consumer from the same group can pick up from where it left off in case of a crash or a restart. By default, offsets are committed automatically by the library.
By default, every 5000ms, the last successfully processed offset (that
is, for which eachMessage
ran to completion) for each partition is
committed to the broker. Disconnecting the consumer or a rebalance also
causes offsets to be committed.
It’s possible to control this interval using the property
auto.commit.interval.ms
.
It’s possible to commit offsets manually, as well, by setting
enable.auto.commit
to false
:
const consumer = new Kafka().consumer({
'bootstrap.servers': '<fill>',
'group.id': 'test-group',
'enable.auto.commit': false,
});
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(message);
if (isItTimeToCommit) {
await consumer.commitOffsets([
{
topic,
partition,
// Note how we are committing offset + 1, and not offset. This is
// Kafka expects the offsets as 'the next offset to read', not 'the last offset processed'.
offset: (parseInt(message.offset, 10) + 1).toString(),
},
]);
}
},
});
// commitOffsets can also be called without an argument to commit all the successfully
// processed offsets upto that point in time.
setInterval(() => {
consumer.commitOffsets();
}, 1000);
Warning
If you are manually handling offsets, make sure to set up a rebalance_cb
and commit offsets in case of ERR__REVOKE_PARTITIONS
, as the library will
not commit offsets automatically even in that case.
Pause and seek¶
It’s possible to pause consumption of owned partitions. The library will not make any more requests to the broker to fetch messages for that partition until it’s resumed.
// Pause topic, partitions 0 and 1.
const resumeMethod = consumer.pause([{ topic, partitions: [0, 1] }]);
// Resume - after a while.
resumeMethod();
// Can also resume manually specifying partitions.
consumer.resume([{ topic, partitions: [1] }]);
Partitions not in the assignment cannot be paused (the pause method doesn’t have an effect).
It’s possible to seek to specific offsets of a partition. This is useful for reprocessing or skipping messages.
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (restartFromEarliest) {
// offset can be a positive number (in a string), or be '-2' representing the 'earliest' offset available on the broker.
consumer.seek({topic, partition, offset: '-2'});
}
},
});
If a partition belongs to the assignment, it’s “seeked” immediately. If not, the seek is persisted in the library, and if that partition is assigned to the consumer at a later point through a rebalance, it’s “seeked” then.
If enable.auto.commit
is true
(it is by default), then seeking
to an offset also causes a commit to the broker with that offset.
Advanced consumer options¶
The consumer can consume messages concurrently. If your message
processing involves async operations, you can pass a number to the
partitionsConsumedConcurrently
option to the run
method. The
library will attempt to dispatch up to that number of eachMessage
at
the same time. Note that, for a single partition, the order of messages
is guaranteed, and thus no more than one message from a partition will
trigger eachMessage
at the same time.
Messages can also be consumed in batches. The eachBatch
callback can
be provided to the run
method instead of the eachMessage
callback. Currently, the batch size has a fixed upper limit of 32. The
batches passed to the method do not correspond to the actual batches of
messages on the broker, but logical batches which the library creates by
accumulating messages from the broker.
Since there is no way for the library to tell when individual messages
in a batch are processed, the library will commit offsets for the entire
batch when the eachBatch
callback runs to completion. If an error is
thrown from the callback, no offsets are committed, and the callback is
dispatched again for the same batch.
For finer-grained control, the argument passed to eachBatch
contains
a resolve
method. Calling the method allows the library to commit
offsets for the messages processed up to that point, even if there’s an
error in processing further messages within the batch. It’s also
possible to turn off auto-resolution completely, and only use the
resolve
method, by passing eachBatchAutoResolve: false
to the
run
method. partitionsConsumedConcurrently
is also applicable to
eachBatch
.
consumer.run({
partitionsConsumedConcurrently,
eachBatchAutoResolve: isAutoResolve,
eachBatch: async event => {
console.log(event.batch.messages);
// If we're not auto-resolving, we need to resolve the offsets manually.
// Note that unlike commit, we don't need to add 1 here to the message offset.
if (!isAutoResolve)
event.resolveOffset(event.batch.messages[event.batch.messages.length - 1].message.offset);
}
});
Consumer methods¶
Method | Description |
---|---|
async connect() | Connects to the broker. Must be called exactly once before using the consumer |
async disconnect() | Disconnects consumer, triggering one final rebalance, and cleans up connections and resources. |
async subscribe() | Subscribes consumer to one or more topics. May add to or replace current subscription. |
async run() | Starts the main consume loop, polling the broker, and triggering message process callbacks. |
assignment() | Gets the current assignment of partitions for this consumer. |
async commitOffsets() | Commits offsets to the broker, up to the last processed offset, or custom ranges specified in the argument. |
async committed() | Fetches committed offsets for the assigned topic partitions, or a custom topic partition list. |
pause() | Pauses consumption for a subset of assigned partitions. |
resume() | Resumes consumption for paused partitions. |
paused() | Allows checking which partitions are currently paused. |
seek() | Allows seeking to a specific offset in assigned partitions (immediately) or unassigned partitions (to be “seeked” when assigned). |
dependentAdmin() | Returns an Admin client which uses this producer for its underlying connections. |
See the JavaScript Client API reference documentation for more details.
Transactional producer and exactly-once semantics¶
The JavaScript Client library supports idempotent producers, transactional producers, and exactly-once semantics (EOS).
To use an idempotent producer:
const producer = new Kafka().producer({
'bootstrap.servers': '<fill>',
'enable.idempotence': true,
});
More details about the guarantees provided by an idempotent producer can be found here, as well as the limitations and other configuration changes that an idempotent producer brings.
To use a transactional producer:
const producer = new Kafka().producer({
'bootstrap.servers': '<fill>',
'transactional.id': 'my-transactional-id', // Must be unique for each producer instance.
});
await producer.connect();
// Start transaction.
await producer.transaction();
await producer.send({topic: 'topic', messages: [{value: 'message'}]});
// Commit transaction.
await producer.commit();
Specifying a transactional.id
makes the producer transactional. The
transactional.id
must be unique for each producer instance. The
producer must be connected before starting a transaction with
transaction()
. A transactional producer cannot be used as a
non-transactional producer, and every message must be within a
transaction.
More details about the guarantees provided by a transactional producer can be found here.
Using a transactional producer also allows for exactly-once semantics (EOS) in the specific case of consuming from a Kafka cluster, processing the message, and producing to another topic on the same cluster.
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const transaction = await producer.transaction();
await transaction.send({
topic: 'produceTopic',
messages: [
{ value: 'consumed a message: ' + message.value.toString() },
]
});
await transaction.sendOffsets({
consumer,
topics: [
{
topic,
partitions: [
{ partition, offset: String(Number(message.offset) + 1) },
],
}
],
});
// The transaction assures that the message sent and the offset committed
// are transactional, only reflecting on the broker on commit.
await transaction.commit();
} catch (e) {
console.error(e);
await transaction.abort();
}
},
});
Admin client¶
The JavaScript Client library includes an admin client to interact with the Kafka cluster. The admin client provides several methods to manage topics, groups, and other Kafka entities.
// An admin client can be created from configuration.
const admin = new Kafka().admin({
'bootstrap.servers': '<fill>',
});
// Or from a producer or consumer instance.
const depAdmin = producer.dependentAdmin();
await admin.connect();
await depAdmin.connect();
A complete list of methods available on the admin client can be found in the JavaScript Client API reference documentation.
Error handling¶
Methods from the library throw KafkaJSError
when they encounter a
problem. Each error object contains a code
property, which should be
compared against the ErrorCodes
object exported by the library to
decide how to handle the error.
const { Kafka, ErrorCodes, isKafkaJSError } = require('@confluentinc/kafka-javascript').KafkaJS;
try {
await producer.send({ /*... */ });
} catch (e) {
if (!isKafkaJSError()) throw e;
if (err.code === ErrorCodes.ERR__MSG_TIMED_OUT) {
// Some backoff and retry logic.
}
}
After the client has successfully connected, the client stays useable despite any errors that it throws. The client will attempt to reconnect to the broker in case of a disconnection, and will continue to work as expected if the cluster is reachable. It is not recommended to terminate clients on errors indiscriminately, rather, it’s better to check the error and take specific action.
There is a fatal
attribute on the error. In very rare cases
(relating to idempotent or transactional producers, and static consumer
group membership, both of which are properties you would have configured
yourself), the error may be fatal, and the client becomes unusable. In
such cases, it’s recommended to terminate the client and create a new
one.
The error object also contains a retriable
and abortable
properties which are set only while using a transactional producer. The
former indicates that the failed operation may be retried, and the
latter indicates that the transaction should be aborted.
Logging¶
The library provides a built-in logger, and it allows you to set a custom logger.
The default logger logs to console
.
const { logLevel } = require('@confluentinc/kafka-javascript').KafkaJS;
// You can use the built-in logger.
const libraryLogger = producer.logger();
libraryLogger.info('This is an info message');
// And modify the log level on the fly.
libraryLogger.setLogLevel(logLevel.INFO);
Most of the internal workings of the library are logged by librdkafka at
the loglevel.DEBUG
. It uses component-wise logging system.
To turn on debug logs, set the debug
property in the configuration object to
a comma-separated list of components you want to debug, or all
to turn on
all debug logs.
A list of available components can be found here.
Setting debug
in the configuration also changes the in-built logger to start
with a default log level of logLevel.DEBUG
.
Note
Because the debug
configuration cannot be changed on the fly, a majority
of the debug logs are not available in the library if you don’t set the
debug
property before connecting. Therefore, using
libraryLogger.setLogLevel(logLevel.DEBUG)
is generally not very useful to
debug library internals. However, going from logLevel.DEBUG
to
logLevel.INFO
or logLevel.ERROR
is still useful.
A custom logger may also be provided, as long as it conforms to the following interface:
class MyLogger {
constructor(); // You can freely change the signature of the constructor.
setLogLevel(logLevel); // logLevel is a member of the logLevel enum.
info(message, extra); // message is a string, extra is an object with additional information.
error(message, extra);
warn(message, extra);
debug(message, extra);
}
// Usage
const customLogger = new MyLogger();
const producer = new Kafka().producer({
'bootstrap.servers': '<fill>',
'debug': 'all',
"kafkaJS": {
"logger": customLogger,
}
});
Callback-based API¶
The callback-based API is included in your code as:
const Kafka = require('@confluentinc/kafka-javascript');
Configure¶
The JavaScript Client supports many librdkafka Configuration options.
If you pass in a unsupported configuration option, an error will be thrown.
Configuration keys that have the suffix _cb
are designated as
callbacks:
- Some of these keys are informational and you can choose to opt-in (for
example,
dr_cb
). - Others are callbacks designed to return a value, such as
partitioner_cb
.
The JavaScript Client library currently supports the following callbacks:
partitioner_cb
dr_cb
ordr_msg_cb
event_cb
rebalance_cb
(see Rebalancing)offset_commit_cb
(see Commits)oauthbearer_token_refresh_cb
librdkafka methods¶
The JavaScript Client library includes two utility functions for detecting the status of your installation. Where applicable, try to include these functions when reporting issues:
Get the features supported by your compilation of the librdkafka library by reading the variable
features
on the root of theconfluent-kafka-javascript
object:const Kafka = require('@confluentinc/kafka-javascript'); console.log(Kafka.features); // #=> [ 'gzip', 'snappy', 'ssl', 'sasl', 'regex', 'lz4' ]
Get the version of the librdkafka:
const Kafka = require('@confluentinc/kafka-javascript'); console.log(Kafka.librdkafkaVersion); // #=> 2.3.0
Produce¶
Initialization¶
The Producer
constructor takes a configuration object to send messages to
Kafka. For example:
const producer = new Kafka.Producer({
'bootstrap.servers': 'kafka-host1:9092,kafka-host2:9092'
});
A Producer
only requires bootstrap.servers
(the Kafka brokers) to be
created. The values in this list are separated by commas. For other
configuration options, see the librdkafka Configuration options.
The following example illustrates a list with several librdkafka options set.
const producer = new Kafka.Producer({
'client.id': 'kafka',
'bootstrap.servers': 'localhost:9092',
'compression.codec': 'gzip',
'retry.backoff.ms': 200,
'message.send.max.retries': 10,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
'dr_cb': true
});
Stream API¶
You can use the Producer
as a writable stream immediately after creation.
For example:
// Our producer with its Kafka brokers
// This call returns a new writable stream to our topic 'topic-name'
const stream = Kafka.Producer.createWriteStream({
'bootstrap.servers': 'kafka-host1:9092,kafka-host2:9092'
}, {}, {
topic: 'topic-name',
autoClose: false,
pollInterval: 1000,
});
// Writes a message to the stream
const queuedSuccess = stream.write(Buffer.from('Awesome message'));
if (queuedSuccess) {
// This only indicates that the message was queued, not sent to Kafka.
console.log('We queued our message!');
} else {
console.log('Too many messages in our queue already');
}
// NOTE: MAKE SURE TO LISTEN TO THIS IF YOU WANT THE STREAM TO BE DURABLE
// Otherwise, any error will bubble up as an uncaught exception.
stream.on('error', (err) => {
// Here's where we'll know if something went wrong sending to Kafka
console.error('Error in our producer stream');
console.error(err);
})
Note
If you do not want your code to crash when an error happens, ensure you
have an error
listener on the stream. Most errors are not
necessarily fatal, but the ones that are will immediately destroy the
stream. If you use autoClose
, the stream will close itself at the
first sign of a problem.
Standard API¶
The Standard API is more performant, particularly when handling high volumes of messages. However, it requires more manual setup to use. The following example illustrates its use:
const producer = new Kafka.Producer({
'bootstrap.servers': 'localhost:9092',
'dr_cb': true
});
// Connect to the broker manually
producer.connect();
// Wait for the ready event before proceeding
producer.on('ready', () => {
try {
producer.produce(
// Topic to send the message to
'topic',
// optionally we can manually specify a partition for the message
// this defaults to -1 - which will use librdkafka's default partitioner (consistent random for keyed messages, random for unkeyed messages)
null,
// Message to send. Must be a buffer
Buffer.from('Awesome message'),
// for keyed messages, we also specify the key - note that this field is optional
'Stormwind',
// you can send a timestamp here. If your broker version supports it,
// it will get added. Otherwise, we default to 0
Date.now(),
// you can send an opaque token here, which gets passed along
// to your delivery reports
);
} catch (err) {
console.error('A problem occurred when sending our message');
console.error(err);
}
});
// Any errors we encounter, including connection errors
producer.on('event.error', (err) => {
console.error('Error from producer');
console.error(err);
})
// We must either call .poll() manually after sending messages
// or set the producer to poll on an interval (.setPollInterval).
// Without this, we do not get delivery events and the queue
// will eventually fill up.
producer.setPollInterval(100);
// You can also set up the producer to poll in the background thread which is
// spawned by the C code. It is more efficient for high-throughput producers.
// Calling this clears any interval set in setPollInterval.
producer.setPollInBackground(true);
To see the configuration options available to you, see the librdkafka Configuration options.
Producer methods¶
Method | Description |
---|---|
connect() | Connects to the broker. The connect() method emits the ready
event when it connects successfully. If it does not, the error will be
passed through the callback. |
disconnect() | Disconnects from the broker. The disconnect() method emits the
disconnected event when it has disconnected. If it does not, the
error will be passed through the callback. |
poll() | Polls the producer for delivery reports or other events to be transmitted
via the emitter . In order to get the events in the librdkafka queue
to emit, you must call this regularly or use one of the methods which
automatically poll the producer for you. |
setPollInterval(interval) | Polls the producer on this interval, handling disconnections and
reconnection. Set it to 0 to turn it off. |
setPollInBackground(set) | Polls the producer on the librdkafka background thread. Pass a truthy value to turn it on, a non-truthy value to turn it off. |
produce(topic, partition, msg, key, timestamp, opaque) | Sends a message. The produce() method throws when produce would
return an error. Ordinarily, this is just if the queue is full. |
flush(timeout, callback) | Flushes the librdkafka internal queue, sending all messages. The default
timeout is 500ms . |
initTransactions(timeout, callback) | Initializes the transactional producer. |
beginTransaction(callback) | Starts a new transaction. |
sendOffsetsToTransaction(offsets, consumer, timeout, callback) | Sends consumed topic-partition-offsets to the broker, which will get
committed along with the transaction. |
abortTransaction(timeout, callback) | Aborts the ongoing transaction. |
commitTransaction(timeout, callback) | Commits the ongoing transaction. |
setSaslCredentials(username, password) | Change SASL credentials to be sent on the next authentication attempt. Only applicable if SASL authentication is being used. |
Events¶
Some configuration properties that end in _cb
indicate that an event should
be generated for that option. You have two options:
- Provide a value of
true
and react to the event - Provide a callback function directly
The following example illustrates an event:
const producer = new Kafka.Producer({
'client.id': 'my-client', // Specifies an identifier to use to help trace activity in Kafka
'bootstrap.servers': 'localhost:9092', // Connect to a Kafka instance on localhost
'dr_cb': true // Specifies that we want a delivery-report event to be generated
});
// Poll for events every 100 ms
producer.setPollInterval(100);
producer.on('delivery-report', (err, report) => {
// Report of delivery statistics here:
//
console.log(report);
});
The following table describes types of events.
Event | Description |
---|---|
disconnected | The disconnected event is emitted when the broker has disconnected.
This event is emitted only when .disconnect is called. The wrapper
will always try to reconnect otherwise. |
ready | The ready event is emitted when the Producer is ready to send
messages. |
event | The event event is emitted when librdkafka reports an event (if you
opted in via the event_cb option). |
event.log | The event.log event is emitted when logging events come in (if you
opted into logging via the event_cb option). You will need to set a
value for debug if you want to send information. |
event.stats | The event.stats event is emitted when librdkafka reports stats
(if you opted in by setting the statistics.interval.m to a non-zero
value). |
event.error | The event.error event is emitted when librdkafka reports an error. |
event.throttle | The event.throttle event emitted when librdkafka reports throttling. |
delivery-report | The delivery-report event is emitted when a delivery report has been
found via polling. To use this event, you must set
request.required.acks to 1 or -1 in topic configuration and
dr_cb (or dr_msg_cb if you want the report to contain
the message payload) to true in the Producer constructor options. |
Higher level producer¶
The higher level producer is a variant of the producer which can propagate callbacks to you upon message delivery. The rest of the class behavior is the same as the producer class.
const producer = new Kafka.HighLevelProducer({
'bootstrap.servers': 'localhost:9092',
});
This will enrich the produce call so it will have a callback to tell you when the message has been delivered. You lose the ability to specify opaque tokens.
producer.produce(topicName, null, Buffer.from('alliance4ever'), null, Date.now(), (err, offset) => {
// The offset if our acknowledgment level allows us to receive delivery offsets
console.log(offset);
});
Additionally, you can add serializers to modify the value of a produce for a key or value before it is sent over to Kafka.
producer.setValueSerializer((value) => {
return Buffer.from(JSON.stringify(value));
});
producer.setTopicValueSerializer((topic, value) => {
// Process value based on topic it's being produced to.
return processedValue;
});
Otherwise the behaviour of the class is exactly the same.
Consume¶
Initialization¶
To read messages from Kafka, instantiate a KafkaConsumer
object as follows:
const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'bootstrap.servers': 'localhost:9092',
}, {});
The first parameter is the global config.
The
group.id
andbootstrap.servers
properties are required for a consumer.The second parameter is an optional topic configuration that gets applied to all subscribed topics.
To view a list of all supported configuration properties, see librdkafka
Configurations.
Look for the C
and *
keys.
Rebalancing¶
By default, rebalancing is managed internally by librdkafka. To override this functionality, you may provide your own logic as a rebalance callback.
const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'bootstrap.servers': 'localhost:9092',
'rebalance_cb': (err, assignment) => {
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
// Note: this can throw when you are disconnected. Take care and wrap it in
// a try catch if that matters to you
this.assign(assignment);
} else if (err.code == ErrorCodes.ERR__REVOKE_PARTITIONS){
// Same as above
this.unassign();
} else {
// We had a real error
console.error(err);
}
}
})
this
is bound to the KafkaConsumer
you have created. By specifying a
rebalance_cb
you can also listen to the rebalance
event as an emitted
event. This event is not emitted when using the internal librdkafka rebalancer.
Commits¶
When you commit in confluent-kafka-javascript
, the standard way is to queue
the commit request up with the next librdkafka request to the broker. When
doing this, there isn’t a way to know the result of the commit.
To get the result of the commit, you can use another callback you can listen to.
const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'bootstrap.servers': 'localhost:9092',
'offset_commit_cb': (err, topicPartitions) => {
if (err) {
// There was an error committing
console.error(err);
} else {
// Commit went through. Let's log the topic partitions
console.log(topicPartitions);
}
}
})
this
is bound to the KafkaConsumer
you have created. By
specifying an offset_commit_cb
you can also listen to the
offset.commit
event as an emitted event. It receives an error and
the list of topic partitions as argument. This is not emitted unless
opted in.
Message structure¶
Messages that are returned by the KafkaConsumer
have the following
structure:
{
value: // message contents as a Buffer,
size: // size of the message, in bytes,
topic: // topic the message comes from,
offset: // offset the message was read from,
partition: // partition the message was on,
key: // key of the message if present,
timestamp: // timestamp of message creation if present
}
For example:
{
value: Buffer.from('hi'),
size: 2,
topic: 'librdtesting-01',
offset: 1337,
partition: 1,
key: 'someKey',
timestamp: 1510325354780
}
Stream API¶
The stream API is the easiest way to consume messages. The following example illustrates the use of the stream API:
// Read from the librdtesting-01 topic... Note that this creates a new stream on each call, so call it once and store it
const stream = KafkaConsumer.createReadStream(globalConfig, topicConfig, {
topics: ['librdtesting-01']
});
stream.on('data', (message) => {
console.log('Got message');
console.log(message.value.toString());
});
You can also get the consumer
from the streamConsumer, for using
consumer methods. The following example:
stream.consumer.commit(); // Commits all locally stored offsets
Standard API¶
You can use the Standard API and manage callbacks and events yourself.
The two modes for consuming messages are supported: flowing mode and non-flowing mode.
Flowing mode. This mode flows all of the messages it can read by maintaining an infinite loop in a separate thread. It only stops when it detects the consumer has issued the unsubscribe or disconnect method. Messages are sent to you either with a callback that you provide to the consume method, or by listening to the data event.
The following example illustrates the flowing mode:
// Flowing mode consumer.connect(); consumer .on('ready', () => { consumer.subscribe(['librdtesting-01']); // Consume from the librdtesting-01 topic. This is what determines // the mode we are running in. // In case consume() is called without an argument, or with only one argument // which is the callback function, it works in the flowing mode. consumer.consume(); }) .on('data', (data) => { // Output the actual message contents console.log(data.value.toString()); });
Non-flowing mode. This mode reads up to a specific number of messages from Kafka at once, and does not create a separate thread for fetching. It only stops when it detects the consumer has issued the
unsubscribe
ordisconnect
method.The following example illustrates the non-flowing mode:
// Non-flowing mode consumer.connect(); consumer .on('ready', () => { // Subscribe to the librdtesting-01 topic // This makes subsequent consumes read from that topic. consumer.subscribe(['librdtesting-01']); // Read up to 1000 message every 1000 milliseconds setInterval(() => { consumer.consume(1); }, 1000); }) .on('data', (data) => { console.log('Message found! Contents below.'); console.log(data.value.toString()); });
Consumer methods¶
Method | Description |
---|---|
consumer.connect() | Connects to the broker. The connect() emits the event ready when it has successfully connected. If it does not, the error will be passed through the callback. |
consumer.disconnect() | Disconnects from the broker. The disconnect() method emits disconnected when it has disconnected. If it does not, the error will be passed through the callback. |
consumer.subscribe(topics) | Subscribes to an array of topics. |
consumer.unsubscribe() | Unsubscribes from the currently subscribed topics. You cannot subscribe to different topics without calling the unsubscribe() method first. |
consumer.consume(cb?) | Gets messages from the existing subscription as quickly as possible. If
cb is specified, invokes cb(err, message) . This method keeps a
background thread running to do the work. Note that the number of worker
threads in a NodeJS process is limited by UV_THREADPOOL_SIZE
(default value is 4 ) and using up all of them blocks other parts of
the application that need threads. If you need multiple consumers then
consider increasing UV_THREADPOOL_SIZE or using
consumer.consume(number, cb) instead. |
consumer.consume(number, cb?) | Gets number of messages from the existing subscription. If cb is
specified, invokes cb(err, message) . |
consumer.commit() | Commits all locally stored offsets. |
consumer.commit(topicPartition) | Commits offsets specified by the topic partition. |
consumer.commitMessage(message) | Commits the offsets specified by the message. |
Events¶
Event | Description |
---|---|
data | When using the Standard API consumed messages are emitted in this event. |
partition.eof | When using Standard API and the configuration option
enable.partition.eof is set, partition.eof events are emitted in
this event. The event contains topic , partition , and offset
properties. |
warning | The event is emitted in case of UNKNOWN_TOPIC_OR_PART or
TOPIC_AUTHORIZATION_FAILED errors when consuming in flowing mode.
Since the consumer will continue working if the error is still happening,
the warning event should reappear after the next metadata refresh. To
control the metadata refresh rate set the
topic.metadata.refresh.interval.ms property. Once you resolve the
error, you can manually call getMetadata to speed up consumer
recovery. |
disconnected | The disconnected event is emitted when the broker disconnects. This
event is only emitted when .disconnect is called. The wrapper will
always try to reconnect otherwise. |
ready | The ready event is emitted when the Consumer is ready to read
messages. |
event | The event event is emitted when librdkafka reports an event (if you
opted in via the event_cb option). |
event.log | The event.log event is emitted when logging events occur (if you
opted in for logging via the event_cb option). You will need to set a
value for debug if you want information to send. |
event.stats | The event.stats event is emitted when librdkafka reports stats (if
you opted in by setting the statistics.interval.ms to a non-zero value). |
event.error | The event.error event is emitted when librdkafka reports an error. |
event.throttle | The event.throttle event is emitted when librdkafka reports throttling. |
Read current offsets from the broker for a topic¶
To query the latest (and earliest) offset for one of your topics, use
queryWaterMarkOffsets
with connected producers and consumers both allow:
const timeout = 5000, partition = 0;
consumer.queryWatermarkOffsets('my-topic', partition, timeout, (err, offsets) => {
const high = offsets.highOffset;
const low = offsets.lowOffset;
});
producer.queryWatermarkOffsets('my-topic', partition, timeout, (err, offsets) => {
const high = offsets.highOffset;
const low = offsets.lowOffset;
});
An error will return if the client is not connected or the request timed out within the specified interval.
Metadata¶
To retrieve metadata from Kafka, use the getMetadata
method with
Kafka.Producer
or Kafka.KafkaConsumer
.
When fetching metadata for a specific topic, if a topic reference does not exist, one is created using the default configuration.
See the documentation on Client.getMetadata
if you want to set configuration
parameters, for example, acks
on a topic to produce messages to.
The following example illustrates how to use the getMetadata
method.
const opts = {
topic: 'librdtesting-01',
timeout: 10000
};
producer.getMetadata(opts, (err, metadata) => {
if (err) {
console.error('Error getting metadata');
console.error(err);
} else {
console.log('Got metadata');
console.log(metadata);
}
});
Metadata on any connection is returned in the following data structure:
{
orig_broker_id: 1,
orig_broker_name: "broker_name",
brokers: [
{
id: 1,
host: 'localhost',
port: 40
}
],
topics: [
{
name: 'awesome-topic',
partitions: [
{
id: 1,
leader: 20,
replicas: [1, 2],
isrs: [1, 2]
}
]
}
]
}
Admin client¶
The library provides an admin client to interact with the Kafka cluster. The admin client provides several methods to manage topics, groups, and other Kafka entities.
To following code snippet instantiates the AdminClient
:
const Kafka = require('@confluentinc/kafka-javascript');
const client = Kafka.AdminClient.create({
'client.id': 'kafka-admin',
'bootstrap.servers': 'broker01'
});
// From an existing producer or consumer
const depClient = Kafka.AdminClient.createFrom(producer);
These will instantiate and connect the AdminClient
, which will allow the
calling of the admin methods.
A complete list of methods available on the admin client can be found in the API reference documentation.
OAuthbearer callback authentication¶
The JavaScript Client library supports OAuthBearer token authentication for both the promisified and the callback-based API. The token is fetched using a callback provided by the user. The callback is called at 80% of the token expiry time, and the library uses the new token for the next login attempt.
async function token_refresh(oauthbearer_config /* string - passed from config */, cb /* can be used if function is not async */) {
// Some logic to fetch the token, before returning it.
return { tokenValue, lifetime, principal, extensions };
}
const producer = new Kafka().producer({
'bootstrap.servers': '<fill>',
'security.protocol': 'sasl_ssl', // or sasl_plain
'sasl.mechanisms': 'OAUTHBEARER',
'sasl.oauthbearer.config': 'someConfigPropertiesKey=value', // Just passed straight to token_refresh as a string, carries no other significance.
'oauthbearer_token_refresh_cb': token_refresh,
});
For a special case of OAuthBearer token authentication, where the token
is fetched from an OIDC provider using the client_credentials
grant
type, the library provides a built-in callback, which can be set through
just the configuration without any custom function required:
const producer = new Kafka().producer({
'bootstrap.servers': '<fill>',
'security.protocol': 'sasl_ssl', // or sasl_plain
'sasl.mechanisms': 'OAUTHBEARER',
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.token.endpoint.url': issuerEndpointUrl,
'sasl.oauthbearer.scope': scope,
'sasl.oauthbearer.client.id': oauthClientId,
'sasl.oauthbearer.client.secret': oauthClientSecret,
'sasl.oauthbearer.extensions': `logicalCluster=${kafkaLogicalCluster},identityPoolId=${identityPoolId}`
});
These examples are for the promisified API, but the callback-based API can be used with the same configuration settings.
API documentation¶
To view the JavaScript Client API documentation, see the JavaScript Client API.