new HighLevelProducer(conf, topicConfnullable)
Producer class for sending messages to Kafka in a higher level fashion.
This is the main entry point for writing data to Kafka if you want more functionality than librdkafka supports out of the box. You configure this like you do any other client, with a global configuration and an optional default topic configuration.
Once you instantiate this object, you need to connect to it first. This allows you to get the metadata and make sure the connection can be made before you depend on it. After that, problems with the connection will be emitted when polling, which the HighLevelProducer does on its own.
This has a few restrictions, so it is not for free.
- You may not define opaque tokens. The higher level producer is powered by opaque tokens.
- Every message ack will dispatch an event on the node thread.
- Will use a ref counter to determine if there are outgoing produces.
This will return the new object you should use instead when doing your produce calls.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
conf |
object | Key value pairs to configure the producer. |
|
topicConf |
object |
<nullable> |
Key value pairs to create a default topic configuration. |
Extends
Methods
abortTransaction(timeout, cb) → {RdKafka.Producer}
Aborts the ongoing transaction (as started with beginTransaction
).
Parameters:
Name | Type | Description |
---|---|---|
timeout |
number | Number of milliseconds to try to abort, defaults to 5 seconds. |
cb |
function | Callback to fire when operation is completed. |
- Inherited From:
Returns:
- returns itself.
- Type
- RdKafka.Producer
beginTransaction() → {RdKafka.Producer}
Begin a transaction.
initTransaction
must have been called successfully (once) before this function is called.
There can only be one ongoing transaction at a time.
- Inherited From:
Returns:
- returns itself.
- Type
- RdKafka.Producer
commitTransaction(timeout, cb) → {RdKafka.Producer}
Commit the current transaction (as started with beginTransaction
).
Parameters:
Name | Type | Description |
---|---|---|
timeout |
number | Number of milliseconds to try to commit before giving up, defaults to 5 seconds. |
cb |
function | Callback to fire when operation is completed. |
- Inherited From:
Returns:
- returns itself.
- Type
- RdKafka.Producer
connect(metadataOptionsnullable, cbnullable) → {RdKafka.Client}
Connect to the broker and receive its metadata.
Connects to a broker by establishing the client and fetches its metadata.
Parameters:
Name | Type | Attributes | Description | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
metadataOptions |
object |
<nullable> |
Options to be sent to the metadata. Properties
|
||||||||||||
cb |
RdKafka.Client~connectionCallback |
<nullable> |
Callback that indicates we are done connecting. |
- Inherited From:
Fires:
Returns:
- Returns itself.
- Type
- RdKafka.Client
connectedTime() → {number}
Find out how long we have been connected to Kafka.
- Inherited From:
Returns:
- Milliseconds since the connection has been established.
- Type
- number
disconnect(timeoutnullable, cbnullable)
Disconnect the producer.
Flush everything on the internal librdkafka producer buffer. Then disconnect.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
timeout |
number |
<nullable> |
Number of milliseconds to try to flush before giving up, defaults to 5 seconds. |
cb |
function |
<nullable> |
The callback to fire when disconnected. |
- Inherited From:
flush(timeoutnullable, callbacknullable) → {RdKafka.Producer}
Flush the producer,
The producer accumulates messages upto linger.ms
on an internal buffer.
This method flushes on the buffer immediately. Do this before disconnects, usually.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
timeout |
number |
<nullable> |
Number of milliseconds to try to flush before giving up. |
callback |
function |
<nullable> |
Callback to fire when the flush is done. |
- Inherited From:
Returns:
- returns itself.
- Type
- RdKafka.Producer
getClient() → {Connection}
Get the native Kafka client.
You probably shouldn't use this, but if you want to execute methods directly on the C++ wrapper you can do it here.
- Inherited From:
- See:
-
- connection.cc
Returns:
- The native Kafka client.
- Type
- Connection
getLastError() → (nullable) {LibrdKafkaError}
Get the last error emitted if it exists.
- Inherited From:
Returns:
- Returns the LibrdKafkaError or null if one hasn't been thrown.
- Type
- LibrdKafkaError
getMetadata(metadataOptions, cb)
Get client metadata.
Use RdKafka.AdminClient#describeTopics instead for fetching metadata for specific topics.
Note: using a metadataOptions.topic
parameter has a potential side-effect.
A Topic object will be created, if it did not exist yet, with default options
and it will be cached by librdkafka.
A subsequent call to create the topic object with specific options (e.g. acks
) will return
the previous instance and the specific options will be silently ignored.
To avoid this side effect, the topic object can be created with the expected options before requesting metadata,
or the metadata request can be performed for all topics (by omitting metadataOptions.topic
).
Parameters:
Name | Type | Description | |||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
metadataOptions |
object | Metadata options to pass to the client. Properties
|
|||||||||
cb |
function | Callback to fire with the metadata. |
- Inherited From:
initTransactions(timeout, cb) → {RdKafka.Producer}
Initialize transactions for this producer instance.
Initialize transactions, this must only be performed once per transactional producer, before it can be used.
Parameters:
Name | Type | Description |
---|---|---|
timeout |
number | Number of milliseconds to try to initialize before giving up, defaults to 5 seconds. |
cb |
function | Callback to fire when operation is completed. |
- Inherited From:
Returns:
- returns itself.
- Type
- RdKafka.Producer
isConnected() → {boolean}
Whether or not we are connected to Kafka.
- Inherited From:
Returns:
- Whether we are connected.
- Type
- boolean
offsetsForTimes(toppars, timeout, cb)
Query offsets for times from the broker.
This function makes a call to the broker to get the offsets for times specified.
Parameters:
Name | Type | Description |
---|---|---|
toppars |
Array.<TopicPartition> | Array of topic partitions. The offset in these should instead refer to a timestamp you want offsets for |
timeout |
number | Number of ms to wait to recieve a response. |
cb |
RdKafka.Client~offsetsForTimesCallback | Callback to fire with the filled in offsets. |
- Inherited From:
poll() → {RdKafka.Producer}
Poll for events.
We need to run poll in order to learn about new events that have occurred. This is not done automatically when we produce, so we need to run it manually, or set the producer to automatically poll using RdKafka.Producer#setPollInterval or RdKafka.Producer#setPollInBackground.
- Inherited From:
Returns:
- returns itself.
- Type
- RdKafka.Producer
produce(topic, partition, message, key, timestamp, headers, callback) → {boolean}
Produce a message to Kafka asynchronously.
This is the method mainly used in this class. Use it to produce a message to Kafka.
When this is sent off, and you receive your callback, the assurances afforded to you will be equal to those provided by your ack level.
Parameters:
Name | Type | Description |
---|---|---|
topic |
string | The topic name to produce to. |
partition |
number | null | The partition number to produce to. |
message |
Buffer | null | The message to produce. |
key |
string | The key associated with the message. |
timestamp |
number | null | Timestamp to send with the message. |
headers |
object | A list of custom key value pairs that provide message metadata. |
callback |
function | Callback to call when the delivery report is recieved. |
- Overrides:
- See:
Throws:
-
- Throws a librdkafka error if it failed.
Returns:
- Throws an error if it failed, or returns true if not.
- Type
- boolean
queryWatermarkOffsets(topic, partition, timeout, cb)
Query offsets from the broker.
This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.
Parameters:
Name | Type | Description |
---|---|---|
topic |
string | Topic to recieve offsets from. |
partition |
number | Partition of the provided topic to recieve offsets from |
timeout |
number | Number of ms to wait to recieve a response. |
cb |
RdKafka.ClientClient~watermarkOffsetsCallback | Callback to fire with the offsets. |
- Inherited From:
sendOffsetsToTransaction(offsets, consumer, timeout, cb) → {RdKafka.Producer}
Send the current offsets of the consumer to the ongoing transaction.
Parameters:
Name | Type | Description |
---|---|---|
offsets |
Array.<RdKafka.TopicPartition> | An array of topic-partitions with offsets filled in. |
consumer |
RdKafka.KafkaConsumer | An instance of the consumer to send offsets for. |
timeout |
number | Number of milliseconds to try to send offsets, defaults to 5 seconds. |
cb |
function | Callback to return when operation is completed. |
- Inherited From:
Returns:
- returns itself.
- Type
- RdKafka.Producer
setKeySerializer(serializer)
Set the key serializer.
This allows the key inside the produce call to differ from the value of the key actually produced to kafka. Good if, for example, you want to serialize it to a particular format.
Parameters:
Name | Type | Description |
---|---|---|
serializer |
function |
setPollInBackground(set) → {RdKafka.Producer}
Set automatic polling for events on the librdkafka background thread.
This provides several advantages over setPollInterval
, as the polling
does not happen on the event loop, but on the C thread spawned by librdkafka,
and can be more efficient for high-throughput producers.
If set = true, this will disable any polling interval set by setPollInterval
.
Parameters:
Name | Type | Description |
---|---|---|
set |
boolean | Whether to poll in the background or not. |
- Inherited From:
Returns:
- returns itself.
- Type
- RdKafka.Producer
setPollInterval(interval) → {RdKafka.Producer}
Set automatic polling for events.
We need to run poll in order to learn about new events that have occurred. If you would like this done on an interval with disconnects and reconnections managed, you can do it here.
Parameters:
Name | Type | Description |
---|---|---|
interval |
number | Interval, in milliseconds, to poll for. |
- Inherited From:
Returns:
- returns itself.
- Type
- RdKafka.Producer
setSaslCredentials(username, password)
Change SASL credentials to be sent on the next authentication attempt.
Only applicable if SASL authentication is being used.
Parameters:
Name | Type | Description |
---|---|---|
username |
string | |
password |
string |
- Inherited From:
setTopicKeySerializer(serializer)
Set the topic-key serializer.
A serializer that takes the topic name in addition to the key.
Parameters:
Name | Type | Description |
---|---|---|
serializer |
function |
setTopicValueSerializer(serializer)
Set the topic-value serializer.
A serializer that takes the topic name in addition to the value.
Parameters:
Name | Type | Description |
---|---|---|
serializer |
function |
setValueSerializer(serializer)
Set the value serializer.
This allows the value inside the produce call to differ from the value of the value actually produced to kafka. Good if, for example, you want to serialize it to a particular format.
Parameters:
Name | Type | Description |
---|---|---|
serializer |
function |
Events
disconnected
Disconnect event. Called after disconnection is finished.
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
connectionOpened |
date | when the connection was opened. |
- Inherited From:
ready
Ready event. Called when the Client connects successfully
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
name |
string | the name of the client. |
- Inherited From: