librdkafka
The Apache Kafka C/C++ client library
RdKafka::Conf Class Referenceabstract

Configuration interface. More...

#include <rdkafkacpp.h>

Public Types

enum  ConfType {
  CONF_GLOBAL ,
  CONF_TOPIC
}
 Configuration object type. More...
 
enum  ConfResult {
  CONF_UNKNOWN = -2 ,
  CONF_INVALID = -1 ,
  CONF_OK = 0
}
 RdKafka::Conf::Set() result code. More...
 

Public Member Functions

virtual Conf::ConfResult set (const std::string &name, const std::string &value, std::string &errstr)=0
 Set configuration property name to value value. More...
 
virtual Conf::ConfResult set (const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr)=0
 Use with name = "dr_cb".
 
virtual Conf::ConfResult set (const std::string &name, OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb, std::string &errstr)=0
 Use with name = "oauthbearer_token_refresh_cb".
 
virtual Conf::ConfResult set (const std::string &name, EventCb *event_cb, std::string &errstr)=0
 Use with name = "event_cb".
 
virtual Conf::ConfResult set (const std::string &name, const Conf *topic_conf, std::string &errstr)=0
 Use with name = "default_topic_conf". More...
 
virtual Conf::ConfResult set (const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr)=0
 Use with name = "partitioner_cb".
 
virtual Conf::ConfResult set (const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb, std::string &errstr)=0
 Use with name = "partitioner_key_pointer_cb".
 
virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb, std::string &errstr)=0
 Use with name = "socket_cb".
 
virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb, std::string &errstr)=0
 Use with name = "open_cb".
 
virtual Conf::ConfResult set (const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr)=0
 Use with name = "rebalance_cb".
 
virtual Conf::ConfResult set (const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr)=0
 Use with name = "offset_commit_cb".
 
virtual Conf::ConfResult set (const std::string &name, SslCertificateVerifyCb *ssl_cert_verify_cb, std::string &errstr)=0
 Use with name = "ssl_cert_verify_cb". More...
 
virtual Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type, RdKafka::CertificateEncoding cert_enc, const void *buffer, size_t size, std::string &errstr)=0
 Set certificate/key cert_type from the cert_enc encoded memory at buffer of size bytes. More...
 
virtual Conf::ConfResult get (const std::string &name, std::string &value) const =0
 Query single configuration value. More...
 
virtual Conf::ConfResult get (DeliveryReportCb *&dr_cb) const =0
 Query single configuration value. More...
 
virtual Conf::ConfResult get (OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const =0
 Query single configuration value. More...
 
virtual Conf::ConfResult get (EventCb *&event_cb) const =0
 Query single configuration value. More...
 
virtual Conf::ConfResult get (PartitionerCb *&partitioner_cb) const =0
 Query single configuration value. More...
 
virtual Conf::ConfResult get (PartitionerKeyPointerCb *&partitioner_kp_cb) const =0
 Query single configuration value. More...
 
virtual Conf::ConfResult get (SocketCb *&socket_cb) const =0
 Query single configuration value. More...
 
virtual Conf::ConfResult get (OpenCb *&open_cb) const =0
 Query single configuration value. More...
 
virtual Conf::ConfResult get (RebalanceCb *&rebalance_cb) const =0
 Query single configuration value. More...
 
virtual Conf::ConfResult get (OffsetCommitCb *&offset_commit_cb) const =0
 Query single configuration value. More...
 
virtual Conf::ConfResult get (SslCertificateVerifyCb *&ssl_cert_verify_cb) const =0
 Use with name = "ssl_cert_verify_cb".
 
virtual std::list< std::string > * dump ()=0
 Dump configuration names and values to list containing name,value tuples.
 
virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb, std::string &errstr)=0
 Use with name = "consume_cb".
 
virtual struct rd_kafka_conf_s * c_ptr_global ()=0
 Returns the underlying librdkafka C rd_kafka_conf_t handle. More...
 
virtual struct rd_kafka_topic_conf_s * c_ptr_topic ()=0
 Returns the underlying librdkafka C rd_kafka_topic_conf_t handle. More...
 
virtual Conf::ConfResult set_engine_callback_data (void *value, std::string &errstr)=0
 Set callback_data for ssl engine. More...
 
virtual Conf::ConfResult enable_sasl_queue (bool enable, std::string &errstr)=0
 Enable/disable creation of a queue specific to SASL events and callbacks. More...
 

Static Public Member Functions

static Confcreate (ConfType type)
 Create configuration object.
 

Detailed Description

Configuration interface.

Holds either global or topic configuration that are passed to RdKafka::Consumer::create(), RdKafka::Producer::create(), RdKafka::KafkaConsumer::create(), etc.

See also
CONFIGURATION.md for the full list of supported properties.

Member Enumeration Documentation

◆ ConfType

Configuration object type.

Enumerator
CONF_GLOBAL 

Global configuration

CONF_TOPIC 

Topic specific configuration

◆ ConfResult

RdKafka::Conf::Set() result code.

Enumerator
CONF_UNKNOWN 

Unknown configuration property

CONF_INVALID 

Invalid configuration value

CONF_OK 

Configuration property was succesfully set

Member Function Documentation

◆ set() [1/3]

virtual Conf::ConfResult RdKafka::Conf::set ( const std::string &  name,
const std::string &  value,
std::string &  errstr 
)
pure virtual

Set configuration property name to value value.

Fallthrough: Topic-level configuration properties may be set using this interface in which case they are applied on the default_topic_conf. If no default_topic_conf has been set one will be created. Any sub-sequent set("default_topic_conf", ..) calls will replace the current default topic configuration.

Returns
CONF_OK on success, else writes a human readable error description to errstr on error.

◆ set() [2/3]

virtual Conf::ConfResult RdKafka::Conf::set ( const std::string &  name,
const Conf topic_conf,
std::string &  errstr 
)
pure virtual

Use with name = "default_topic_conf".

Sets the default topic configuration to use for for automatically subscribed topics.

See also
RdKafka::KafkaConsumer::subscribe()

◆ set() [3/3]

virtual Conf::ConfResult RdKafka::Conf::set ( const std::string &  name,
SslCertificateVerifyCb ssl_cert_verify_cb,
std::string &  errstr 
)
pure virtual

Use with name = "ssl_cert_verify_cb".

Returns
CONF_OK on success or CONF_INVALID if SSL is not supported in this build.

◆ set_ssl_cert()

virtual Conf::ConfResult RdKafka::Conf::set_ssl_cert ( RdKafka::CertificateType  cert_type,
RdKafka::CertificateEncoding  cert_enc,
const void *  buffer,
size_t  size,
std::string &  errstr 
)
pure virtual

Set certificate/key cert_type from the cert_enc encoded memory at buffer of size bytes.

Parameters
cert_typeCertificate or key type to configure.
cert_encBuffer encoding type.
bufferMemory pointer to encoded certificate or key. The memory is not referenced after this function returns.
sizeSize of memory at buffer.
errstrA human-readable error string will be written to this string on failure.
Returns
CONF_OK on success or CONF_INVALID if the memory in buffer is of incorrect encoding, or if librdkafka was not built with SSL support.
Remarks
Calling this method multiple times with the same cert_type will replace the previous value.
Calling this method with buffer set to NULL will clear the configuration for cert_type.
The private key may require a password, which must be specified with the ssl.key.password configuration property prior to calling this function.
Private and public keys in PEM format may also be set with the ssl.key.pem and ssl.certificate.pem configuration properties.
CA certificate in PEM format may also be set with the ssl.ca.pem configuration property.
When librdkafka is linked to OpenSSL 3.0 and the certificate is encoded using an obsolete cipher, it might be necessary to set up an OpenSSL configuration file to load the "legacy" provider and set the OPENSSL_CONF environment variable. See https://github.com/openssl/openssl/blob/master/README-PROVIDERS.md for more information.

◆ get() [1/10]

virtual Conf::ConfResult RdKafka::Conf::get ( const std::string &  name,
std::string &  value 
) const
pure virtual

Query single configuration value.

Do not use this method to get callbacks registered by the configuration file. Instead use the specific get() methods with the specific callback parameter in the signature.

Fallthrough: Topic-level configuration properties from the default_topic_conf may be retrieved using this interface.

Returns
CONF_OK if the property was set previously set and returns the value in value.

◆ get() [2/10]

virtual Conf::ConfResult RdKafka::Conf::get ( DeliveryReportCb *&  dr_cb) const
pure virtual

Query single configuration value.

Returns
CONF_OK if the property was set previously set and returns the value in dr_cb.

◆ get() [3/10]

virtual Conf::ConfResult RdKafka::Conf::get ( OAuthBearerTokenRefreshCb *&  oauthbearer_token_refresh_cb) const
pure virtual

Query single configuration value.

Returns
CONF_OK if the property was set previously set and returns the value in oauthbearer_token_refresh_cb.

◆ get() [4/10]

virtual Conf::ConfResult RdKafka::Conf::get ( EventCb *&  event_cb) const
pure virtual

Query single configuration value.

Returns
CONF_OK if the property was set previously set and returns the value in event_cb.

◆ get() [5/10]

virtual Conf::ConfResult RdKafka::Conf::get ( PartitionerCb *&  partitioner_cb) const
pure virtual

Query single configuration value.

Returns
CONF_OK if the property was set previously set and returns the value in partitioner_cb.

◆ get() [6/10]

virtual Conf::ConfResult RdKafka::Conf::get ( PartitionerKeyPointerCb *&  partitioner_kp_cb) const
pure virtual

Query single configuration value.

Returns
CONF_OK if the property was set previously set and returns the value in partitioner_kp_cb.

◆ get() [7/10]

virtual Conf::ConfResult RdKafka::Conf::get ( SocketCb *&  socket_cb) const
pure virtual

Query single configuration value.

Returns
CONF_OK if the property was set previously set and returns the value in socket_cb.

◆ get() [8/10]

virtual Conf::ConfResult RdKafka::Conf::get ( OpenCb *&  open_cb) const
pure virtual

Query single configuration value.

Returns
CONF_OK if the property was set previously set and returns the value in open_cb.

◆ get() [9/10]

virtual Conf::ConfResult RdKafka::Conf::get ( RebalanceCb *&  rebalance_cb) const
pure virtual

Query single configuration value.

Returns
CONF_OK if the property was set previously set and returns the value in rebalance_cb.

◆ get() [10/10]

virtual Conf::ConfResult RdKafka::Conf::get ( OffsetCommitCb *&  offset_commit_cb) const
pure virtual

Query single configuration value.

Returns
CONF_OK if the property was set previously set and returns the value in offset_commit_cb.

◆ c_ptr_global()

virtual struct rd_kafka_conf_s* RdKafka::Conf::c_ptr_global ( )
pure virtual

Returns the underlying librdkafka C rd_kafka_conf_t handle.

Warning
Calling the C API on this handle is not recommended and there is no official support for it, but for cases where the C++ does not provide the proper functionality this C handle can be used to interact directly with the core librdkafka API.
Remarks
The lifetime of the returned pointer is the same as the Conf object this method is called on.
Include <rdkafka/rdkafka.h> prior to including <rdkafka/rdkafkacpp.h>
Returns
rd_kafka_conf_t* if this is a CONF_GLOBAL object, else NULL.

◆ c_ptr_topic()

virtual struct rd_kafka_topic_conf_s* RdKafka::Conf::c_ptr_topic ( )
pure virtual

Returns the underlying librdkafka C rd_kafka_topic_conf_t handle.

Warning
Calling the C API on this handle is not recommended and there is no official support for it, but for cases where the C++ does not provide the proper functionality this C handle can be used to interact directly with the core librdkafka API.
Remarks
The lifetime of the returned pointer is the same as the Conf object this method is called on.
Include <rdkafka/rdkafka.h> prior to including <rdkafka/rdkafkacpp.h>
Returns
rd_kafka_topic_conf_t* if this is a CONF_TOPIC object, else NULL.

◆ set_engine_callback_data()

virtual Conf::ConfResult RdKafka::Conf::set_engine_callback_data ( void *  value,
std::string &  errstr 
)
pure virtual

Set callback_data for ssl engine.

Remarks
The ssl.engine.location configuration must be set for this to have affect.
The memory pointed to by value must remain valid for the lifetime of the configuration object and any Kafka clients that use it.
Returns
CONF_OK on success, else CONF_INVALID.

◆ enable_sasl_queue()

virtual Conf::ConfResult RdKafka::Conf::enable_sasl_queue ( bool  enable,
std::string &  errstr 
)
pure virtual

Enable/disable creation of a queue specific to SASL events and callbacks.

For SASL mechanisms that trigger callbacks (currently OAUTHBEARER) this configuration API allows an application to get a dedicated queue for the SASL events/callbacks. After enabling the queue with this API the application can retrieve the queue by calling RdKafka::Handle::get_sasl_queue() on the client instance. This queue may then be served directly by the application (RdKafka::Queue::poll()) or forwarded to another queue, such as the background queue.

A convenience function is available to automatically forward the SASL queue to librdkafka's background thread, see RdKafka::Handle::sasl_background_callbacks_enable().

By default (enable = false) the main queue (as served by RdKafka::Handle::poll(), et.al.) is used for SASL callbacks.

Remarks
The SASL queue is currently only used by the SASL OAUTHBEARER " mechanism's token refresh callback.

The documentation for this class was generated from the following file: