Class: Client

(package) RdKafka.Client(globalConf, SubClientType, topicConf, existingClientnullable)

(package) new Client(globalConf, SubClientType, topicConf, existingClientnullable)

Base class for Consumer and Producer.

This should not be created independently, but rather is the base class on which both producer and consumer get their common functionality.

Parameters:
Name Type Attributes Description
globalConf object

Global configuration in key value pairs.

SubClientType function

The function representing the subclient type. In C++ land this needs to be a class that inherits from Connection.

topicConf object

Topic configuration in key value pairs

existingClient object <nullable>

a producer or a consumer to derive this client from. Only used by the AdminClient. Must be connected.

Extends

  • Emitter

Methods

connect(metadataOptionsnullable, cbnullable) → {RdKafka.Client}

Connect to the broker and receive its metadata.

Connects to a broker by establishing the client and fetches its metadata.

Parameters:
Name Type Attributes Description
metadataOptions object <nullable>

Options to be sent to the metadata.

Properties
Name Type Description
topic string

Topic to fetch metadata for. Empty string is treated as empty.

allTopics boolean

Fetch metadata for all topics, not just the ones we know about.

timeout int

The timeout, in ms, to allow for fetching metadata. Defaults to 30000ms

cb RdKafka.Client~connectionCallback <nullable>

Callback that indicates we are done connecting.

Fires:
Returns:
  • Returns itself.
Type
RdKafka.Client

connectedTime() → {number}

Find out how long we have been connected to Kafka.

Returns:
  • Milliseconds since the connection has been established.
Type
number

disconnect(cb)

Disconnect from the Kafka client.

This method will disconnect us from Kafka unless we are already in a disconnecting state. Use this when you're done reading or producing messages on a given client.

It will also emit the disconnected event.

Parameters:
Name Type Description
cb function

Callback to call when disconnection is complete.

Fires:

getClient() → {Connection}

Get the native Kafka client.

You probably shouldn't use this, but if you want to execute methods directly on the C++ wrapper you can do it here.

See:
  • connection.cc
Returns:
  • The native Kafka client.
Type
Connection

getLastError() → (nullable) {LibrdKafkaError}

Get the last error emitted if it exists.

Returns:
  • Returns the LibrdKafkaError or null if one hasn't been thrown.
Type
LibrdKafkaError

getMetadata(metadataOptions, cb)

Get client metadata.

Use RdKafka.AdminClient#describeTopics instead for fetching metadata for specific topics.

Note: using a metadataOptions.topic parameter has a potential side-effect. A Topic object will be created, if it did not exist yet, with default options and it will be cached by librdkafka.

A subsequent call to create the topic object with specific options (e.g. acks) will return the previous instance and the specific options will be silently ignored.

To avoid this side effect, the topic object can be created with the expected options before requesting metadata, or the metadata request can be performed for all topics (by omitting metadataOptions.topic).

Parameters:
Name Type Description
metadataOptions object

Metadata options to pass to the client.

Properties
Name Type Description
topic string

Topic string for which to fetch metadata

timeout number

Max time, in ms, to try to fetch metadata before timing out. Defaults to 3000.

cb function

Callback to fire with the metadata.

isConnected() → {boolean}

Whether or not we are connected to Kafka.

Returns:
  • Whether we are connected.
Type
boolean

offsetsForTimes(toppars, timeout, cb)

Query offsets for times from the broker.

This function makes a call to the broker to get the offsets for times specified.

Parameters:
Name Type Description
toppars Array.<TopicPartition>

Array of topic partitions. The offset in these should instead refer to a timestamp you want offsets for

timeout number

Number of ms to wait to recieve a response.

cb RdKafka.Client~offsetsForTimesCallback

Callback to fire with the filled in offsets.

queryWatermarkOffsets(topic, partition, timeout, cb)

Query offsets from the broker.

This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.

Parameters:
Name Type Description
topic string

Topic to recieve offsets from.

partition number

Partition of the provided topic to recieve offsets from

timeout number

Number of ms to wait to recieve a response.

cb RdKafka.ClientClient~watermarkOffsetsCallback

Callback to fire with the offsets.

setSaslCredentials(username, password)

Change SASL credentials to be sent on the next authentication attempt.

Only applicable if SASL authentication is being used.

Parameters:
Name Type Description
username string
password string

Type Definitions

Metadata

Metadata object.

This is the representation of Kafka metadata in JavaScript.

Type:
  • object
Properties:
Name Type Description
orig_broker_id number

The broker ID of the original bootstrap broker.

orig_broker_name string

The name of the original bootstrap broker.

brokers Array.<RdKafka.Client~MetadataBroker>

An array of broker objects

topics Array.<RdKafka.Client~MetadataTopic>

An array of topics.

MetadataBroker

Type:
  • object
Properties:
Name Type Description
id number

Broker ID

host string

Broker host

port number

Broker port.

MetadataPartition

Type:
  • object
Properties:
Name Type Description
id number

Partition id

leader number

Broker ID for the partition leader

replicas Array.<number>

Array of replica IDs

isrs Array.<number>

Array of ISRS ids

MetadataTopic

Type:
  • object
Properties:
Name Type Description
name string

Topic name

partitions Array.<RdKafka.Client~MetadataPartition>

Array of partitions

connectionCallback(err, metadata)

This callback is used to pass metadata or an error after a successful connection

Parameters:
Name Type Description
err Error

An error, if one occurred while connecting.

metadata RdKafka.Client~Metadata

Metadata object.

offsetsForTimesCallback(err, toppars)

This callback is used to pass toppars or an error after a successful times query

Parameters:
Name Type Description
err Error

An error, if one occurred while connecting.

toppars Array.<TopicPartition>

Topic partitions with offsets filled in

watermarkOffsets

Type:
  • object
Properties:
Name Type Description
high number

High (newest/end) offset

low number

Low (oldest/beginning) offset

watermarkOffsetsCallback(err, offsets)

This callback is used to pass offsets or an error after a successful query

Parameters:
Name Type Description
err Error

An error, if one occurred while connecting.

offsets RdKafka.Client~watermarkOffsets

Watermark offsets

Events

disconnected

Disconnect event. Called after disconnection is finished.

Type:
  • object
Properties:
Name Type Description
connectionOpened date

when the connection was opened.

ready

Ready event. Called when the Client connects successfully

Type:
  • object
Properties:
Name Type Description
name string

the name of the client.