(package) new Client(globalConf, SubClientType, topicConf, existingClientnullable)
Base class for Consumer and Producer.
This should not be created independently, but rather is the base class on which both producer and consumer get their common functionality.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
globalConf |
object | Global configuration in key value pairs. |
|
SubClientType |
function | The function representing the subclient type. In C++ land this needs to be a class that inherits from Connection. |
|
topicConf |
object | Topic configuration in key value pairs |
|
existingClient |
object |
<nullable> |
a producer or a consumer to derive this client from. Only used by the AdminClient. Must be connected. |
Extends
- Emitter
Methods
connect(metadataOptionsnullable, cbnullable) → {RdKafka.Client}
Connect to the broker and receive its metadata.
Connects to a broker by establishing the client and fetches its metadata.
Parameters:
Name | Type | Attributes | Description | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
metadataOptions |
object |
<nullable> |
Options to be sent to the metadata. Properties
|
||||||||||||
cb |
RdKafka.Client~connectionCallback |
<nullable> |
Callback that indicates we are done connecting. |
Fires:
Returns:
- Returns itself.
- Type
- RdKafka.Client
connectedTime() → {number}
Find out how long we have been connected to Kafka.
Returns:
- Milliseconds since the connection has been established.
- Type
- number
disconnect(cb)
Disconnect from the Kafka client.
This method will disconnect us from Kafka unless we are already in a disconnecting state. Use this when you're done reading or producing messages on a given client.
It will also emit the disconnected event.
Parameters:
Name | Type | Description |
---|---|---|
cb |
function | Callback to call when disconnection is complete. |
Fires:
getClient() → {Connection}
Get the native Kafka client.
You probably shouldn't use this, but if you want to execute methods directly on the C++ wrapper you can do it here.
- See:
-
- connection.cc
Returns:
- The native Kafka client.
- Type
- Connection
getLastError() → (nullable) {LibrdKafkaError}
Get the last error emitted if it exists.
Returns:
- Returns the LibrdKafkaError or null if one hasn't been thrown.
- Type
- LibrdKafkaError
getMetadata(metadataOptions, cb)
Get client metadata.
Use RdKafka.AdminClient#describeTopics instead for fetching metadata for specific topics.
Note: using a metadataOptions.topic
parameter has a potential side-effect.
A Topic object will be created, if it did not exist yet, with default options
and it will be cached by librdkafka.
A subsequent call to create the topic object with specific options (e.g. acks
) will return
the previous instance and the specific options will be silently ignored.
To avoid this side effect, the topic object can be created with the expected options before requesting metadata,
or the metadata request can be performed for all topics (by omitting metadataOptions.topic
).
Parameters:
Name | Type | Description | |||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
metadataOptions |
object | Metadata options to pass to the client. Properties
|
|||||||||
cb |
function | Callback to fire with the metadata. |
isConnected() → {boolean}
Whether or not we are connected to Kafka.
Returns:
- Whether we are connected.
- Type
- boolean
offsetsForTimes(toppars, timeout, cb)
Query offsets for times from the broker.
This function makes a call to the broker to get the offsets for times specified.
Parameters:
Name | Type | Description |
---|---|---|
toppars |
Array.<TopicPartition> | Array of topic partitions. The offset in these should instead refer to a timestamp you want offsets for |
timeout |
number | Number of ms to wait to recieve a response. |
cb |
RdKafka.Client~offsetsForTimesCallback | Callback to fire with the filled in offsets. |
queryWatermarkOffsets(topic, partition, timeout, cb)
Query offsets from the broker.
This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.
Parameters:
Name | Type | Description |
---|---|---|
topic |
string | Topic to recieve offsets from. |
partition |
number | Partition of the provided topic to recieve offsets from |
timeout |
number | Number of ms to wait to recieve a response. |
cb |
RdKafka.ClientClient~watermarkOffsetsCallback | Callback to fire with the offsets. |
setSaslCredentials(username, password)
Change SASL credentials to be sent on the next authentication attempt.
Only applicable if SASL authentication is being used.
Parameters:
Name | Type | Description |
---|---|---|
username |
string | |
password |
string |
Type Definitions
Metadata
Metadata object.
This is the representation of Kafka metadata in JavaScript.
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
orig_broker_id |
number | The broker ID of the original bootstrap broker. |
orig_broker_name |
string | The name of the original bootstrap broker. |
brokers |
Array.<RdKafka.Client~MetadataBroker> | An array of broker objects |
topics |
Array.<RdKafka.Client~MetadataTopic> | An array of topics. |
MetadataBroker
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
id |
number | Broker ID |
host |
string | Broker host |
port |
number | Broker port. |
MetadataPartition
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
id |
number | Partition id |
leader |
number | Broker ID for the partition leader |
replicas |
Array.<number> | Array of replica IDs |
isrs |
Array.<number> | Array of ISRS ids |
MetadataTopic
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
name |
string | Topic name |
partitions |
Array.<RdKafka.Client~MetadataPartition> | Array of partitions |
connectionCallback(err, metadata)
This callback is used to pass metadata or an error after a successful connection
Parameters:
Name | Type | Description |
---|---|---|
err |
Error | An error, if one occurred while connecting. |
metadata |
RdKafka.Client~Metadata | Metadata object. |
offsetsForTimesCallback(err, toppars)
This callback is used to pass toppars or an error after a successful times query
Parameters:
Name | Type | Description |
---|---|---|
err |
Error | An error, if one occurred while connecting. |
toppars |
Array.<TopicPartition> | Topic partitions with offsets filled in |
watermarkOffsets
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
high |
number | High (newest/end) offset |
low |
number | Low (oldest/beginning) offset |
watermarkOffsetsCallback(err, offsets)
This callback is used to pass offsets or an error after a successful query
Parameters:
Name | Type | Description |
---|---|---|
err |
Error | An error, if one occurred while connecting. |
offsets |
RdKafka.Client~watermarkOffsets | Watermark offsets |
Events
disconnected
Disconnect event. Called after disconnection is finished.
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
connectionOpened |
date | when the connection was opened. |
ready
Ready event. Called when the Client connects successfully
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
name |
string | the name of the client. |