librdkafka
The Apache Kafka C/C++ client library
RdKafka::Handle Class Referenceabstract

Base handle, super class for specific clients. More...

#include <rdkafkacpp.h>

Inheritance diagram for RdKafka::Handle:

Public Member Functions

virtual std::string name () const =0
 
virtual std::string memberid () const =0
 Returns the client's broker-assigned group member id. More...
 
virtual int poll (int timeout_ms)=0
 Polls the provided kafka handle for events. More...
 
virtual int outq_len ()=0
 Returns the current out queue length. More...
 
virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
 Request Metadata from broker. More...
 
virtual ErrorCode pause (std::vector< TopicPartition * > &partitions)=0
 Pause producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode resume (std::vector< TopicPartition * > &partitions)=0
 Resume producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0
 Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0
 Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode offsetsForTimes (std::vector< TopicPartition * > &offsets, int timeout_ms)=0
 Look up the offsets for the given partitions by timestamp. More...
 
virtual Queueget_partition_queue (const TopicPartition *partition)=0
 Retrieve queue for a given partition. More...
 
virtual ErrorCode set_log_queue (Queue *queue)=0
 Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls. More...
 
virtual void yield ()=0
 Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(), etc). More...
 
virtual std::string clusterid (int timeout_ms)=0
 Returns the ClusterId as reported in broker metadata. More...
 
virtual struct rd_kafka_s * c_ptr ()=0
 Returns the underlying librdkafka C rd_kafka_t handle. More...
 
virtual int32_t controllerid (int timeout_ms)=0
 Returns the current ControllerId (controller broker id) as reported in broker metadata. More...
 
virtual ErrorCode fatal_error (std::string &errstr) const =0
 Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occurred. More...
 
virtual ErrorCode oauthbearer_set_token (const std::string &token_value, int64_t md_lifetime_ms, const std::string &md_principal_name, const std::list< std::string > &extensions, std::string &errstr)=0
 Set SASL/OAUTHBEARER token and metadata. More...
 
virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr)=0
 SASL/OAUTHBEARER token refresh failure indicator. More...
 
virtual Errorsasl_background_callbacks_enable ()=0
 Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread. More...
 
virtual Queueget_sasl_queue ()=0
 
virtual Queueget_background_queue ()=0
 
virtual void * mem_malloc (size_t size)=0
 Allocate memory using the same allocator librdkafka uses. More...
 
virtual void mem_free (void *ptr)=0
 Free pointer returned by librdkafka. More...
 
virtual Errorsasl_set_credentials (const std::string &username, const std::string &password)=0
 Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by this Kafka client. More...
 

Detailed Description

Base handle, super class for specific clients.

Member Function Documentation

◆ name()

virtual std::string RdKafka::Handle::name ( ) const
pure virtual
Returns
the name of the handle

◆ memberid()

virtual std::string RdKafka::Handle::memberid ( ) const
pure virtual

Returns the client's broker-assigned group member id.

Remarks
This currently requires the high-level KafkaConsumer
Returns
Last assigned member id, or empty string if not currently a group member.

◆ poll()

virtual int RdKafka::Handle::poll ( int  timeout_ms)
pure virtual

Polls the provided kafka handle for events.

Events will trigger application provided callbacks to be called.

The timeout_ms argument specifies the maximum amount of time (in milliseconds) that the call will block waiting for events. For non-blocking calls, provide 0 as timeout_ms. To wait indefinately for events, provide -1.

Events:

  • delivery report callbacks (if an RdKafka::DeliveryCb is configured) [producer]
  • event callbacks (if an RdKafka::EventCb is configured) [producer & consumer]
Remarks
An application should make sure to call poll() at regular intervals to serve any queued callbacks waiting to be called.
Warning
This method MUST NOT be used with the RdKafka::KafkaConsumer, use its RdKafka::KafkaConsumer::consume() instead.
Returns
the number of events served.

◆ outq_len()

virtual int RdKafka::Handle::outq_len ( )
pure virtual

Returns the current out queue length.

The out queue contains messages and requests waiting to be sent to, or acknowledged by, the broker.

◆ metadata()

virtual ErrorCode RdKafka::Handle::metadata ( bool  all_topics,
const Topic only_rkt,
Metadata **  metadatap,
int  timeout_ms 
)
pure virtual

Request Metadata from broker.

Parameters: all_topics - if non-zero: request info about all topics in cluster, if zero: only request info about locally known topics. only_rkt - only request info about this topic metadatap - pointer to hold metadata result. The *metadatap pointer must be released with delete. timeout_ms - maximum response time before failing.

Returns
RdKafka::ERR_NO_ERROR on success (in which case *metadatap will be set), else RdKafka::ERR__TIMED_OUT on timeout or other error code on error.

◆ pause()

virtual ErrorCode RdKafka::Handle::pause ( std::vector< TopicPartition * > &  partitions)
pure virtual

Pause producing or consumption for the provided list of partitions.

Success or error is returned per-partition in the partitions list.

Returns
ErrorCode::NO_ERROR
See also
resume()

◆ resume()

virtual ErrorCode RdKafka::Handle::resume ( std::vector< TopicPartition * > &  partitions)
pure virtual

Resume producing or consumption for the provided list of partitions.

Success or error is returned per-partition in the partitions list.

Returns
ErrorCode::NO_ERROR
See also
pause()

◆ query_watermark_offsets()

virtual ErrorCode RdKafka::Handle::query_watermark_offsets ( const std::string &  topic,
int32_t  partition,
int64_t *  low,
int64_t *  high,
int  timeout_ms 
)
pure virtual

Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.

Offsets are returned in *low and *high respectively.

Returns
RdKafka::ERR_NO_ERROR on success or an error code on failure.

◆ get_watermark_offsets()

virtual ErrorCode RdKafka::Handle::get_watermark_offsets ( const std::string &  topic,
int32_t  partition,
int64_t *  low,
int64_t *  high 
)
pure virtual

Get last known low (oldest/beginning) and high (newest/end) offsets for partition.

The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each fetched message set from the broker.

If there is no cached offset (either low or high, or both) then OFFSET_INVALID will be returned for the respective offset.

Offsets are returned in *low and *high respectively.

Returns
RdKafka::ERR_NO_ERROR on success or an error code on failure.
Remarks
Shall only be used with an active consumer instance.

◆ offsetsForTimes()

virtual ErrorCode RdKafka::Handle::offsetsForTimes ( std::vector< TopicPartition * > &  offsets,
int  timeout_ms 
)
pure virtual

Look up the offsets for the given partitions by timestamp.

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

The timestamps to query are represented as offset in offsets on input, and offset() will return the closest earlier offset for the timestamp on output.

Timestamps are expressed as milliseconds since epoch (UTC).

The function will block for at most timeout_ms milliseconds.

Remarks
Duplicate Topic+Partitions are not supported.
Errors are also returned per TopicPartition, see err()
Returns
an error code for general errors, else RdKafka::ERR_NO_ERROR in which case per-partition errors might be set.

◆ get_partition_queue()

virtual Queue* RdKafka::Handle::get_partition_queue ( const TopicPartition partition)
pure virtual

Retrieve queue for a given partition.

Returns
The fetch queue for the given partition if successful. Else, NULL is returned.
Remarks
This function only works on consumers.

◆ set_log_queue()

virtual ErrorCode RdKafka::Handle::set_log_queue ( Queue queue)
pure virtual

Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls.

This allows an application to serve log callbacks (log_cb) in its thread of choice.

Parameters
queueQueue to forward logs to. If the value is NULL the logs are forwarded to the main queue.
Remarks
The configuration property log.queue MUST also be set to true.
librdkafka maintains its own reference to the provided queue.
Returns
ERR_NO_ERROR on success or an error code on error.

◆ yield()

virtual void RdKafka::Handle::yield ( )
pure virtual

Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(), etc).

A callback may use this to force an immediate return to the calling code (caller of e.g. Handle::poll()) without processing any further events.

Remarks
This function MUST ONLY be called from within a librdkafka callback.

◆ clusterid()

virtual std::string RdKafka::Handle::clusterid ( int  timeout_ms)
pure virtual

Returns the ClusterId as reported in broker metadata.

Parameters
timeout_msIf there is no cached value from metadata retrieval then this specifies the maximum amount of time (in milliseconds) the call will block waiting for metadata to be retrieved. Use 0 for non-blocking calls.
Remarks
Requires broker version >=0.10.0 and api.version.request=true.
Returns
Last cached ClusterId, or empty string if no ClusterId could be retrieved in the allotted timespan.

◆ c_ptr()

virtual struct rd_kafka_s* RdKafka::Handle::c_ptr ( )
pure virtual

Returns the underlying librdkafka C rd_kafka_t handle.

Warning
Calling the C API on this handle is not recommended and there is no official support for it, but for cases where the C++ does not provide the proper functionality this C handle can be used to interact directly with the core librdkafka API.
Remarks
The lifetime of the returned pointer is the same as the Topic object this method is called on.
Include <rdkafka/rdkafka.h> prior to including <rdkafka/rdkafkacpp.h>
Returns
rd_kafka_t*

◆ controllerid()

virtual int32_t RdKafka::Handle::controllerid ( int  timeout_ms)
pure virtual

Returns the current ControllerId (controller broker id) as reported in broker metadata.

Parameters
timeout_msIf there is no cached value from metadata retrieval then this specifies the maximum amount of time (in milliseconds) the call will block waiting for metadata to be retrieved. Use 0 for non-blocking calls.
Remarks
Requires broker version >=0.10.0 and api.version.request=true.
Returns
Last cached ControllerId, or -1 if no ControllerId could be retrieved in the allotted timespan.

◆ fatal_error()

virtual ErrorCode RdKafka::Handle::fatal_error ( std::string &  errstr) const
pure virtual

Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occurred.

This function is to be used with the Idempotent Producer and the Event class for EVENT_ERROR events to detect fatal errors.

Generally all errors raised by the error event are to be considered informational and temporary, the client will try to recover from all errors in a graceful fashion (by retrying, etc).

However, some errors should logically be considered fatal to retain consistency; in particular a set of errors that may occur when using the Idempotent Producer and the in-order or exactly-once producer guarantees can't be satisfied.

Parameters
errstrA human readable error string if a fatal error was set.
Returns
ERR_NO_ERROR if no fatal error has been raised, else any other error code.

◆ oauthbearer_set_token()

virtual ErrorCode RdKafka::Handle::oauthbearer_set_token ( const std::string &  token_value,
int64_t  md_lifetime_ms,
const std::string &  md_principal_name,
const std::list< std::string > &  extensions,
std::string &  errstr 
)
pure virtual

Set SASL/OAUTHBEARER token and metadata.

Parameters
token_valuethe mandatory token value to set, often (but not necessarily) a JWS compact serialization as per https://tools.ietf.org/html/rfc7515#section-3.1.
md_lifetime_mswhen the token expires, in terms of the number of milliseconds since the epoch.
md_principal_namethe Kafka principal name associated with the token.
extensionspotentially empty SASL extension keys and values where element [i] is the key and [i+1] is the key's value, to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1. The number of SASL extension keys plus values must be a non-negative multiple of 2. Any provided keys and values are copied.
errstrA human readable error string is written here, only if there is an error.

The SASL/OAUTHBEARER token refresh callback should invoke this method upon success. The extension keys must not include the reserved key "`auth`", and all extension keys and values must conform to the required format as per https://tools.ietf.org/html/rfc7628#section-3.1:

key            = 1*(ALPHA)
value          = *(VCHAR / SP / HTAB / CR / LF )
Returns
RdKafka::ERR_NO_ERROR on success, otherwise errstr set and:
RdKafka::ERR__INVALID_ARG if any of the arguments are invalid;
RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not supported by this build;
RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
See also
RdKafka::oauthbearer_set_token_failure
RdKafka::Conf::set() "oauthbearer_token_refresh_cb"

◆ oauthbearer_set_token_failure()

virtual ErrorCode RdKafka::Handle::oauthbearer_set_token_failure ( const std::string &  errstr)
pure virtual

SASL/OAUTHBEARER token refresh failure indicator.

Parameters
errstrhuman readable error reason for failing to acquire a token.

The SASL/OAUTHBEARER token refresh callback should invoke this method upon failure to refresh the token.

Returns
RdKafka::ERR_NO_ERROR on success, otherwise:
RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not supported by this build;
RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
See also
RdKafka::oauthbearer_set_token
RdKafka::Conf::set() "oauthbearer_token_refresh_cb"

◆ sasl_background_callbacks_enable()

virtual Error* RdKafka::Handle::sasl_background_callbacks_enable ( )
pure virtual

Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread.

This serves as an alternative for applications that do not call RdKafka::Handle::poll() (et.al.) at regular intervals.

◆ get_sasl_queue()

virtual Queue* RdKafka::Handle::get_sasl_queue ( )
pure virtual
Returns
the SASL callback queue, if enabled, else NULL.
See also
RdKafka::Conf::enable_sasl_queue()

◆ get_background_queue()

virtual Queue* RdKafka::Handle::get_background_queue ( )
pure virtual
Returns
the librdkafka background thread queue.

◆ mem_malloc()

virtual void* RdKafka::Handle::mem_malloc ( size_t  size)
pure virtual

Allocate memory using the same allocator librdkafka uses.

This is typically an abstraction for the malloc(3) call and makes sure the application can use the same memory allocator as librdkafka for allocating pointers that are used by librdkafka.

Remarks
Memory allocated by mem_malloc() must be freed using mem_free().

◆ mem_free()

virtual void RdKafka::Handle::mem_free ( void *  ptr)
pure virtual

Free pointer returned by librdkafka.

This is typically an abstraction for the free(3) call and makes sure the application can use the same memory allocator as librdkafka for freeing pointers returned by librdkafka.

In standard setups it is usually not necessary to use this interface rather than the free(3) function.

Remarks
mem_free() must only be used for pointers returned by APIs that explicitly mention using this function for freeing.

◆ sasl_set_credentials()

virtual Error* RdKafka::Handle::sasl_set_credentials ( const std::string &  username,
const std::string &  password 
)
pure virtual

Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by this Kafka client.

This function sets or resets the SASL username and password credentials used by this Kafka client. The new credentials will be used the next time this client needs to authenticate to a broker. will not disconnect existing connections that might have been made using the old credentials.

Remarks
This function only applies to the SASL PLAIN and SCRAM mechanisms.
Returns
NULL on success or an error object on error.

The documentation for this class was generated from the following file: