Migrate to JavaScript Client
Follow the process described in this topic to migrate the KafkaJS or node-rdkafka Client to the JavaScript Client Client.
Migrate to JavaScript Client from KafkaJS
Below is a simple produce example for users migrating from KafkaJS.
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka } = require("@confluentinc/kafka-javascript").KafkaJS;
async function producerStart() {
const kafka = new Kafka({
kafkaJS: {
brokers: ['<fill>'],
ssl: true,
sasl: {
mechanism: 'plain',
username: '<fill>',
password: '<fill>',
},
}
});
const producer = kafka.producer();
await producer.connect();
console.log("Connected successfully");
const res = []
for (let i = 0; i < 50; i++) {
res.push(producer.send({
topic: 'test-topic',
messages: [
{ value: 'v222', partition: 0 },
{ value: 'v11', partition: 0, key: 'x' },
]
}));
}
await Promise.all(res);
await producer.disconnect();
console.log("Disconnected successfully");
}
producerStart();
To migrate to the JavaScript Client from the KafkaJS:
Change the import statement, and add a
kafkaJSblock around your configs.From:
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ brokers: ['kafka1:9092', 'kafka2:9092'], /* ... */ }); const producer = kafka.producer({ /* ... */, });
To:
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS; const kafka = new Kafka({ kafkaJS: { brokers: ['kafka1:9092', 'kafka2:9092'], /* ... */ } }); const producer = kafka.producer({ kafkaJS: { /* ... */, } });
Try running your program. In case a migration is needed, an informative error will be thrown. If you’re using Typescript, some of these changes will be caught at compile time.
The most common expected changes to the code are:
For the producer:
acks,compressionandtimeoutare not set persend(). They must be configured in the top-level configuration while creating the producer.For the consumer:
fromBeginningis not set persubscribe(). It must be configured in the top-level configuration while creating the consumer.autoCommitandautoCommitIntervalare not set perrun(). They must be configured in the top-level configuration while creating the consumer.autoCommitThresholdis not supported.eachBatch’s batch size never exceeds 1.
For errors: Check the
error.coderather than the errornameortype.
A more exhaustive list of semantic and configuration differences is presented below.
An example migration:
-const { Kafka } = require('kafkajs');
+const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
const kafka = new Kafka({
+ kafkaJS: {
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
+ }
})
const producerRun = async () => {
- const producer = kafka.producer();
+ const producer = kafka.producer({ kafkaJS: { acks: 1 } });
await producer.connect();
await producer.send({
topic: 'test-topic',
- acks: 1,
messages: [
{ value: 'Hello confluent-kafka-javascript user!' },
],
});
};
const consumerRun = async () => {
// Consuming
- const consumer = kafka.consumer({ groupId: 'test-group' });
+ const consumer = kafka.consumer({ kafkaJS: { groupId: 'test-group', fromBeginning: true } });
await consumer.connect();
- await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
+ await consumer.subscribe({ topic: 'test-topic' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
});
};
producerRun().then(consumerRun).catch(console.error);
Common configuration changes
const kafka = new Kafka({ kafkaJS: { /* common configuration changes */ } });
Each allowed config property is discussed below. If there is any change in semantics or the default values, the property and the change is highlighted in bold.
Property |
Default Value |
Comment |
|---|---|---|
brokers |
none |
A list of strings, representing the bootstrap brokers. A function is no longer allowed as an argument for this. |
ssl |
false |
A boolean, set to true if ssl needs to be enabled. Additional properties like CA, certificate, key, etc. need to be specified outside the kafkaJS block. |
sasl |
none |
An optional object of the form |
clientId |
rdkafka |
An optional string used to identify the client. |
connectionTimeout |
1000 |
This timeout is not enforced individually, but a sum of connectionTimeout and authenticationTimeout is enforced together. |
authenticationTimeout |
10000 |
This timeout is not enforced individually, but a sum of connectionTimeout and authenticationTimeout is enforced together. |
reauthenticationThreshold |
80% of connections.max.reauth.ms |
No longer checked, the default is always used. |
requestTimeout |
30000 |
number of milliseconds for a network request to timeout. |
enforceRequestTimeout |
true |
When set to false, requestTimeout is set to 5 minutes. This cannot be completely disabled. |
retry |
object |
Properties individually discussed below. |
retry.maxRetryTime |
30000 |
maximum time to backoff a retry, in milliseconds. |
retry.initialRetryTime |
300 |
minimum time to backoff a retry, in milliseconds. |
retry.retries |
5 |
Total cap on the number of retries. Applicable only to Produce requests. |
retry.factor |
0.2 |
Randomization factor (jitter) for backoff. Cannot be changed. |
retry.multiplier |
2 |
Multiplier for exponential factor of backoff. Cannot be changed. |
retry.restartOnFailure |
true |
Consumer only. Cannot be changed. Consumer will always make an attempt to restart. |
logLevel |
logLevel.INFO |
Decides the severity level of the logger created by the underlying library. A logger created with the INFO level will not be able to log DEBUG messages later. |
socketFactory |
null |
No longer supported. |
outer config |
{} |
The configuration outside the kafkaJS block can contain any of the keys present in the librdkafka CONFIGURATION table. |
Producer configuration changes
const producer = kafka.producer({ kafkaJS: { /* producer-specific configuration changes. */ } });
Each allowed config property is discussed below. If there is any change in semantics or the default values, the property and the change is highlighted in bold.
Property |
Default Value |
Comment |
|---|---|---|
createPartitioner |
DefaultPartioner (murmur2_random) - |
Java client compatible Custom partitioner support is not yet provided. The default partitioner’s behaviour is retained, and a number of partitioners are provided via the partitioner property, which is specified outside the kafkaJS block. |
retry |
object |
Identical to retry in the common configuration. This takes precedence over the common config retry. |
metadataMaxAge |
5 minutes |
Time in milliseconds after which to refresh metadata for known topics |
allowAutoTopicCreation |
true |
Determines if a topic should be created if it doesn’t exist while producing. |
transactionTimeout |
60000 |
The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. Only applicable when transactionalId is set to true. |
idempotent |
false |
If set to |
maxInFlightRequests |
null |
Maximum number of in-flight requests per broker connection. If not set, it is practically unbounded (same as KafkaJS). |
transactionalId |
null |
If set, turns this into a transactional producer with this identifier. This also automatically sets idempotent to |
acks |
-1 |
The number of required acks before a Produce succeeds. This is set on a per-producer level, not on a per send level. -1 denotes it will wait for all brokers in the in-sync replica set. |
compression |
CompressionTypes.NONE |
Compression codec for Produce messages. This is set on a per-producer level, not on a per send level. It must be a key of CompressionType, namely GZIP, SNAPPY, LZ4, ZSTD or NONE. |
timeout |
30000 |
The ack timeout of the producer request in milliseconds. This value is only enforced by the broker. This is set on a per-producer level, not on a per send level. |
outer config |
{} |
The configuration outside the kafkaJS block can contain any of the keys present in the librdkafka CONFIGURATION table. |
Semantic and per-method changes
send: andsendBatch:While sending multiple messages, even if one of the messages fails, the method throws an error.
While
sendBatchis available, it acts as a wrapper around send, and the actual batching is handled by librdkafka.acks,compressionandtimeoutare not set persend(). They must be configured in the top-level configuration. See configuration changes. Additionally, there are several more compression types available by default besides GZIP.Before:
const kafka = new Kafka({/* ... */}); const producer = kafka.producer(); await producer.connect(); await producer.send({ topic: 'test', messages: [ /* ... */ ], acks: 1, compression: CompressionTypes.GZIP, timeout: 30000, });
After:
const kafka = new Kafka({ kafkaJS: { /* ... */ }}); const producer = kafka.producer({ kafkaJS: { acks: 1, compression: CompressionTypes.GZIP|CompressionTypes.SNAPPY|CompressionTypes.LZ4|CompressionTypes.ZSTD|CompressionTypes.NONE, timeout: 30000, } }); await producer.connect(); await producer.send({ topic: 'test', messages: [ /* ... */ ], });
It’s recommended to send a number of messages without awaiting them, and then calling
flushto ensure all messages are sent, rather than awaiting each message. This is more efficient. Example:const kafka = new Kafka({ kafkaJS: { /* ... */ }}); const producer = kafka.producer(); await producer.connect(); for (/*...*/) producer.send({ /* ... */}); await producer.flush({timeout: 5000});
However, in case it is desired to await every message,
linger.msshould be set to0, to ensure that the default batching behavior does not cause a delay in awaiting messages. Example:const kafka = new Kafka({ kafkaJS: { /* ... */ }}); const producer = kafka.producer({ 'linger.ms': 0 });
A transactional producer (with a
transactionId) set cannot send messages without initiating a transaction usingproducer.transaction().While using
sendOffsetsfrom a transactional producer, theconsumerGroupIdargument must be omitted, and the consumer object itself must be passed instead.
Consumer configuration changes
const consumer = kafka.consumer({ kafkaJS: { /* producer-specific configuration changes. */ } });
Each allowed config property is discussed below. If there is any change in semantics or the default values, the property and the change is highlighted in bold.
Property |
Default Value |
Comment |
|---|---|---|
groupId |
none |
A mandatory string denoting consumer group name that this consumer is a part of. |
partitionAssigners |
[PartitionAssigners.roundRobin] |
Support for range, roundRobin, and cooperativeSticky assignors is provided. Custom assignors are not supported. |
partitionAssignors |
[PartitionAssignors.roundRobin] |
Alias for partitionAssigners |
rebalanceTimeout |
300000 |
The maximum allowed time for each member to join the group once a rebalance has begun. Note, that setting this value also changes the max poll interval. Message processing in eachMessage/eachBatch must not take more than this time. |
heartbeatInterval |
3000 |
The expected time in milliseconds between heartbeats to the consumer coordinator. |
metadataMaxAge |
5 minutes |
Time in milliseconds after which to refresh metadata for known topics |
allowAutoTopicCreation |
|
Determines if a topic should be created if it doesn’t exist while consuming. |
maxBytesPerPartition |
1048576 (1MB) |
determines how many bytes can be fetched in one request from a single partition. There is a change in semantics, this size grows dynamically if a single message larger than this is encountered, and the client does not get stuck. |
minBytes |
1 |
Minimum number of bytes the broker responds with (or wait until maxWaitTimeInMs) |
maxBytes |
10485760 (10MB) |
Maximum number of bytes the broker responds with. |
retry |
object |
Identical to retry in the common configuration. This takes precedence over the common config retry. |
readUncommitted |
false |
If |
maxInFlightRequests |
null |
Maximum number of in-flight requests per broker connection. If not set, it is practically unbounded (same as KafkaJS). |
rackId |
null |
Can be set to an arbitrary string which will be used for fetch-from-follower if set up on the cluster. |
fromBeginning |
false |
If there is initial offset in offset store or the desired offset is out of range, and this is true, we consume the earliest possible offset. This is set on a per-consumer level, not on a per subscribe level. |
autoCommit |
|
Whether to periodically auto-commit offsets to the broker while consuming. This is set on a per-consumer level, not on a per run level. |
autoCommitInterval |
5000 |
Offsets are committed periodically at this interval, if autoCommit is true. This is set on a per-consumer level, not on a per run level. The default value is changed to 5 seconds. |
outer config |
{} |
The configuration outside the kafkaJS block can contain any of the keys present in the librdkafka CONFIGURATION table. |
Semantic and per-method changes
subscribe:Regex flags are ignored while passing a topic subscription (like
iorg). Regexes must start with^,; otherwise, an error is thrown.Subscribe must be called only after
connect.An optional parameter,
replaceis provided. If set totrue, the current subscription is replaced with the new one. If set to false, the new subscription is added to the current one, for example,consumer.subscribe({ topics: ['topic1'], replace: true});. The default value is false to retain existing behaviour.While passing a list of topics to
subscribe(), thefromBeginningis not set persubscribe. It must be configured in the top-level configuration.Before:
const consumer = kafka.consumer({ groupId: 'test-group', }); await consumer.connect(); await consumer.subscribe({ topics: ["topic"], fromBeginning: true });
After:
const consumer = kafka.consumer({ kafkaJS: { groupId: 'test-group', fromBeginning: true, } }); await consumer.connect(); await consumer.subscribe({ topics: ["topic"] });``
run:For auto-committing using a consumer, the properties
autoCommitandautoCommitIntervalonrunare not set persubscribe(). They must be configured in the top-level configuration.autoCommitThresholdis not supported. IfautoCommitis set totrue, messages are not committed per-message, but periodically at the interval specified byautoCommitInterval(default 5 seconds).Before:
const kafka = new Kafka({ /* ... */ }); const consumer = kafka.consumer({ /* ... */ }); await consumer.connect(); await consumer.subscribe({ topics: ["topic"] }); consumer.run({ eachMessage: someFunc, autoCommit: true, autoCommitInterval: 5000, });
After:
const kafka = new Kafka({ kafkaJS: { /* ... */ } }); const consumer = kafka.consumer({ kafkaJS: { /* ... */, autoCommit: true, autoCommitInterval: 5000, }, }); await consumer.connect(); await consumer.subscribe({ topics: ["topic"] }); consumer.run({ eachMessage: someFunc, });
The
heartbeat()no longer needs to be called by the user in theeachMessage/eachBatchcallback. Heartbeats are automatically managed by librdkafka.The
partitionsConsumedConcurrentlyis supported by botheachMessageandeachBatch.An API compatible version of
eachBatchis available, but the batch size calculation is not as per configured parameters. the batch size a constant maximum size and is configured internally. This is subject to change. The propertyeachBatchAutoResolveis supported. Within theeachBatchcallback, use ofuncommittedOffsetsis unsupported, and within the returned batch,offsetLagandoffsetLagLoware unsupported.
commitOffsets:Does not yet support sending metadata for topic partitions being committed.
If called with no arguments, it commits all offsets passed to the user (or the stored offsets, if manually handling offset storage using
consumer.storeOffsets).
seek:The restriction to call seek only after
runis removed. It can be called any time.
pauseandresume:These methods MUST be called after the consumer group is joined. In practice, this means it can be called whenever
consumer.assignment()has a non-zero size, or within theeachMessage/eachBatchcallback.
stopis not yet supported, and the user must disconnect the consumer.
Admin client
The admin-client supports a limited subset of methods:
The
createTopicsmethod does not yet support thevalidateOnlyorwaitForLeadersproperties, and the per-topic configuration does not supportreplicaAssignment.The
deleteTopicsmethod is fully supported.The
listTopicsmethod is supported with an additionaltimeoutoption.The
listGroupsmethod is supported with additionaltimeoutandmatchConsumerGroupStatesoptions. A number of additional properties have been added to the returned groups, and a list of errors within the returned object.The
describeGroupsmethod is supported with additionaltimeoutandincludeAuthorizedOperationsoptions. A number of additional properties have been added to the returned groups.The
deleteGroupsmethod is supported with an additionaltimeoutoption.The
fetchOffsetsmethod is supported with additionaltimeoutandrequireStableOffsetsoptions butresolveOffsetsoption is not yet supported.The
deleteTopicRecordsmethod is supported with additionaltimeoutandoperationTimeoutoptions.The
fetchTopicMetadatamethod is supported with additionaltimeoutandincludeAuthorizedOperationsoptions. Fetching for all topics is not advisable.
Schema Registry client
If you are using the Schema Registry client at
kafkajs/confluent-schema-registry, you do not need to make any changes to
the usage.
An example is available here.
Error handling
Convert any checks based on instanceof and error.name or to
error checks based on error.code or error.type.
Example:
try {
await producer.send(/* args */);
} catch (error) {
if (!Kafka.isKafkaJSError(error)) { /* unrelated err handling */ }
else if (error.fatal) { /* fatal error, abandon producer */ }
else if (error.code === Kafka.ErrorCode.ERR__QUEUE_FULL) { /*...*/ }
else if (error.type === 'ERR_MSG_SIZE_TOO_LARGE') { /*...*/ }
/* and so on for specific errors */
}
Error Type Changes:
Some possible subtypes of KafkaJSError have been removed, and
additional information has been added into KafkaJSError. Fields have
been added denoting if the error is fatal, retriable, or abortable (the
latter two only relevant for a transactional producer). Some
error-specific fields have also been removed.
An exhaustive list of changes is at the bottom of this section.
For compatibility, as many error types as possible have been retained,
but it is better to switch to checking the error.code.
Exhaustive list of error types and error fields removed:
Error |
Change |
|---|---|
KafkaJSNonRetriableError |
Removed. Retriable errors are automatically retried by librdkafka, so there’s no need for this type. Note that error.retriable still exists, but it’s applicable only for transactional producer, where users are expected to retry an action themselves. All error types using this as a superclass now use KafkaJSError as their superclass. |
KafkaJSOffsetOutOfRange |
topic and partition are removed from this object. |
KafkaJSMemberIdRequired Removed. |
Automatically handled by librdkafka. |
KafkaJSNumberOfRetriesExceeded |
Removed. Retries are handled by librdkafka. |
KafkaJSNumberOfRetriesExceeded |
broker, correlationId, createdAt, sentAt and pendingDuration are removed from this object. |
KafkaJSMetadataNotLoaded |
Removed. Metadata is automatically reloaded by librdkafka. |
KafkaJSTopicMetadataNotLoaded |
Removed. Topic metadata is automatically reloaded by librdkafka. |
KafkaJSStaleTopicMetadataAssignment |
removed as it’s automatically refreshed by librdkafka. |
KafkaJSDeleteGroupsError |
Removed, as the Admin Client doesn’t have this yet. May be added back again, or changed. |
KafkaJSServerDoesNotSupportApiKey |
Removed, as this error isn’t generally exposed to user in librdkafka. If raised, it is subsumed into KafkaJSError where error.code === Kafka.ErrorCode.ERR_UNSUPPORTED_VERSION. |
KafkaJSBrokerNotFound |
Removed. This error isn’t exposed directly to the user in librdkafka. |
KafkaJSLockTimeout |
Removed. This error is not applicable while using librdkafka. |
KafkaJSUnsupportedMagicByteInMessageSet |
Removed. It is subsumed into KafkaJSError where error.code === Kafka.ErrorCode.ERR_UNSUPPORTED_VERSION. |
KafkaJSDeleteTopicRecordsError |
Removed, as the Admin Client doesn’t have this yet. May be added back again, or changed. |
KafkaJSInvariantViolation |
Removed, as it’s not applicable to librdkafka. Errors in internal state are subsumed into KafkaJSError where error.code === Kafka.ErrorCode.ERR__STATE. |
KafkaJSInvalidVarIntError |
Removed. This error isn’t exposed directly to the user in librdkafka. |
KafkaJSInvalidLongError |
Removed. This error isn’t exposed directly to the user in librdkafka. |
KafkaJSCreateTopicError |
Removed, as the Admin Client doesn’t have this yet. May be added back again, or changed.. |
KafkaJSAlterPartitionReassignmentsError |
removed, as the RPC is not used in librdkafka. |
KafkaJSFetcherRebalanceError |
Removed. This error isn’t exposed directly to the user in librdkafka. |
KafkaJSConnectionError |
broker is removed from this object. |
KafkaJSConnectionClosedError |
Removed. Subsumed into KafkaJSConnectionError as librdkafka treats them equivalently. |
Migrate to JavaScript Client from node-rdkafka
Migration from v2.18.0 and below should only require changing the import statement:
From:
Kafka = require('node-rdkafka');``
To:
const Kafka = require('@confluentinc/kafka-javascript');
The rest of the functionality should work as usual.
For future releases, the node-rdkafka API diverges from this library. If you encounter any issues migrating, refer to the Overview for a guide to using this library.