...

Package kafka

import "github.com/confluentinc/confluent-kafka-go/kafka"
Overview
Index

Overview ▾

Package kafka provides high-level Apache Kafka producer and consumers using bindings on-top of the librdkafka C library.

High-level Consumer

* Decide if you want to read messages and events from the `.Events()` channel (set `"go.events.channel.enable": true`) or by calling `.Poll()`.

* Create a Consumer with `kafka.NewConsumer()` providing at least the `bootstrap.servers` and `group.id` configuration properties.

* Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics) to join the group with the specified subscription set. Subscriptions are atomic, calling `.Subscribe*()` again will leave the group and rejoin with the new set of topics.

* Start reading events and messages from either the `.Events` channel or by calling `.Poll()`.

* When the group has rebalanced each client member is assigned a (sub-)set of topic+partitions. By default the consumer will start fetching messages for its assigned partitions at this point, but your application may enable rebalance events to get an insight into what the assigned partitions where as well as set the initial offsets. To do this you need to pass `"go.application.rebalance.enable": true` to the `NewConsumer()` call mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event with the assigned partition set. You can optionally modify the initial offsets (they'll default to stored offsets and if there are no previously stored offsets it will fall back to `"auto.offset.reset"` which defaults to the `latest` message) and then call `.Assign(partitions)` to start consuming. If you don't need to modify the initial offsets you will not need to call `.Assign()`, the client will do so automatically for you if you dont, unless you are using the channel-based consumer in which case you MUST call `.Assign()` when receiving the `AssignedPartitions` and `RevokedPartitions` events.

* As messages are fetched they will be made available on either the `.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.

* Handle messages, events and errors to your liking.

* When you are done consuming call `.Close()` to commit final offsets and leave the consumer group.

Producer

* Create a Producer with `kafka.NewProducer()` providing at least the `bootstrap.servers` configuration properties.

* Messages may now be produced either by sending a `*kafka.Message` on the `.ProduceChannel` or by calling `.Produce()`.

* Producing is an asynchronous operation so the client notifies the application of per-message produce success or failure through something called delivery reports. Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message` and you should check `msg.TopicPartition.Error` for `nil` to find out if the message was succesfully delivered or not. It is also possible to direct delivery reports to alternate channels by providing a non-nil `chan Event` channel to `.Produce()`. If no delivery reports are wanted they can be completely disabled by setting configuration property `"go.delivery.reports": false`.

* When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function `.Flush()` that will block until all message deliveries are done or the provided timeout elapses.

* Finally call `.Close()` to decommission the producer.

Events

Apart from emitting messages and delivery reports the client also communicates with the application through a number of different event types. An application may choose to handle or ignore these events.

Consumer events

* `*kafka.Message` - a fetched message.

* `AssignedPartitions` - The assigned partition set for this client following a rebalance. Requires `go.application.rebalance.enable`

* `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance. `AssignedPartitions` and `RevokedPartitions` are symmetrical. Requires `go.application.rebalance.enable`

* `PartitionEOF` - Consumer has reached the end of a partition. NOTE: The consumer will keep trying to fetch new messages for the partition.

* `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).

Producer events

* `*kafka.Message` - delivery report for produced message. Check `.TopicPartition.Error` for delivery result.

Generic events for both Consumer and Producer

* `KafkaError` - client (error codes are prefixed with _) or broker error. These errors are normally just informational since the client will try its best to automatically recover (eventually).

* `OAuthBearerTokenRefresh` - retrieval of a new SASL/OAUTHBEARER token is required. This event only occurs with sasl.mechanism=OAUTHBEARER. Be sure to invoke SetOAuthBearerToken() on the Producer/Consumer/AdminClient instance when a successful token retrieval is completed, otherwise be sure to invoke SetOAuthBearerTokenFailure() to indicate that retrieval failed (or if setting the token failed, which could happen if an extension doesn't meet the required regular expression); invoking SetOAuthBearerTokenFailure() will schedule a new event for 10 seconds later so another retrieval can be attempted.

Hint: If your application registers a signal notification (signal.Notify) makes sure the signals channel is buffered to avoid possible complications with blocking Poll() calls.

Note: The Confluent Kafka Go client is safe for concurrent use.

Index ▾

Constants
func LibraryVersion() (int, string)
type AdminClient
func NewAdminClient(conf *ConfigMap) (*AdminClient, error)
func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error)
func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error)
func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error)
func (a *AdminClient) Close()
func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error)
func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)
func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error)
func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error)
func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error)
func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error)
func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error
func (a *AdminClient) String() string
type AdminOption
type AdminOptionOperationTimeout
func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout)
type AdminOptionRequestTimeout
func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout)
type AdminOptionValidateOnly
func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly)
type AlterConfigsAdminOption
type AlterOperation
func (o AlterOperation) String() string
type AssignedPartitions
func (e AssignedPartitions) String() string
type BrokerMetadata
type ConfigEntry
func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry
func (c ConfigEntry) String() string
type ConfigEntryResult
func (c ConfigEntryResult) String() string
type ConfigMap
func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error)
func (m ConfigMap) Set(kv string) error
func (m ConfigMap) SetKey(key string, value ConfigValue) error
type ConfigResource
func (c ConfigResource) String() string
type ConfigResourceResult
func (c ConfigResourceResult) String() string
type ConfigSource
func (t ConfigSource) String() string
type ConfigValue
type Consumer
func NewConsumer(conf *ConfigMap) (*Consumer, error)
func (c *Consumer) Assign(partitions []TopicPartition) (err error)
func (c *Consumer) Assignment() (partitions []TopicPartition, err error)
func (c *Consumer) Close() (err error)
func (c *Consumer) Commit() ([]TopicPartition, error)
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
func (c *Consumer) Events() chan Event
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
func (c *Consumer) Pause(partitions []TopicPartition) (err error)
func (c *Consumer) Poll(timeoutMs int) (event Event)
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error)
func (c *Consumer) Resume(partitions []TopicPartition) (err error)
func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error
func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error
func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)
func (c *Consumer) String() string
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
func (c *Consumer) Subscription() (topics []string, err error)
func (c *Consumer) Unassign() (err error)
func (c *Consumer) Unsubscribe() (err error)
type CreatePartitionsAdminOption
type CreateTopicsAdminOption
type DeleteTopicsAdminOption
type DescribeConfigsAdminOption
type Error
func NewError(code ErrorCode, str string, fatal bool) (err Error)
func (e Error) Code() ErrorCode
func (e Error) Error() string
func (e Error) IsFatal() bool
func (e Error) String() string
type ErrorCode
func (c ErrorCode) String() string
type Event
type Handle
type Header
func (h Header) String() string
type Message
func (m *Message) String() string
type Metadata
type OAuthBearerToken
type OAuthBearerTokenRefresh
func (o OAuthBearerTokenRefresh) String() string
type Offset
func NewOffset(offset interface{}) (Offset, error)
func OffsetTail(relativeOffset Offset) Offset
func (o *Offset) Set(offset interface{}) error
func (o Offset) String() string
type OffsetsCommitted
func (o OffsetsCommitted) String() string
type PartitionEOF
func (p PartitionEOF) String() string
type PartitionMetadata
type PartitionsSpecification
type Producer
func NewProducer(conf *ConfigMap) (*Producer, error)
func (p *Producer) Close()
func (p *Producer) Events() chan Event
func (p *Producer) Flush(timeoutMs int) int
func (p *Producer) GetFatalError() error
func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
func (p *Producer) Len() int
func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error
func (p *Producer) ProduceChannel() chan *Message
func (p *Producer) Purge(flags int) error
func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error
func (p *Producer) String() string
func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode
type RebalanceCb
type ResourceType
func ResourceTypeFromString(typeString string) (ResourceType, error)
func (t ResourceType) String() string
type RevokedPartitions
func (e RevokedPartitions) String() string
type Stats
func (e Stats) String() string
type TimestampType
func (t TimestampType) String() string
type TopicMetadata
type TopicPartition
func (p TopicPartition) String() string
type TopicPartitions
func (tps TopicPartitions) Len() int
func (tps TopicPartitions) Less(i, j int) bool
func (tps TopicPartitions) Swap(i, j int)
type TopicResult
func (t TopicResult) String() string
type TopicSpecification

Package files

00version.go adminapi.go adminoptions.go build_dynamic.go config.go consumer.go context.go error.go event.go generated_errors.go handle.go header.go kafka.go message.go metadata.go misc.go offset.go producer.go testhelpers.go time.go

Constants

const (
    // ResourceUnknown - Unknown
    ResourceUnknown = ResourceType(C.RD_KAFKA_RESOURCE_UNKNOWN)
    // ResourceAny - match any resource type (DescribeConfigs)
    ResourceAny = ResourceType(C.RD_KAFKA_RESOURCE_ANY)
    // ResourceTopic - Topic
    ResourceTopic = ResourceType(C.RD_KAFKA_RESOURCE_TOPIC)
    // ResourceGroup - Group
    ResourceGroup = ResourceType(C.RD_KAFKA_RESOURCE_GROUP)
    // ResourceBroker - Broker
    ResourceBroker = ResourceType(C.RD_KAFKA_RESOURCE_BROKER)
)
const (
    // ConfigSourceUnknown is the default value
    ConfigSourceUnknown = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG)
    // ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic
    ConfigSourceDynamicTopic = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG)
    // ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker
    ConfigSourceDynamicBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG)
    // ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster
    ConfigSourceDynamicDefaultBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG)
    // ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file)
    ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG)
    // ConfigSourceDefault is built-in default configuration for configs that have a default value
    ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
)
const (
    // TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support
    TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
    // TimestampCreateTime indicates timestamp set by producer (source time)
    TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME)
    // TimestampLogAppendTime indicates timestamp set set by broker (store time)
    TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
)
const (
    // PurgeInFlight purges messages in-flight to or from the broker.
    // Purging these messages will void any future acknowledgements from the
    // broker, making it impossible for the application to know if these
    // messages were successfully delivered or not.
    // Retrying these messages may lead to duplicates.
    PurgeInFlight = int(C.RD_KAFKA_PURGE_F_INFLIGHT)

    // PurgeQueue Purge messages in internal queues.
    PurgeQueue = int(C.RD_KAFKA_PURGE_F_QUEUE)

    // PurgeNonBlocking Don't wait for background thread queue purging to finish.
    PurgeNonBlocking = int(C.RD_KAFKA_PURGE_F_NON_BLOCKING)
)
const (
    // AlterOperationSet sets/overwrites the configuration setting.
    AlterOperationSet = iota
)

OffsetBeginning represents the earliest offset (logical)

const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)

OffsetEnd represents the latest offset (logical)

const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)

OffsetInvalid represents an invalid/unspecified offset

const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)

OffsetStored represents a stored offset

const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)

PartitionAny represents any partition (for partitioning), or unspecified value (for all other cases)

const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)

func LibraryVersion

func LibraryVersion() (int, string)

LibraryVersion returns the underlying librdkafka library version as a (version_int, version_str) tuple.

type AdminClient

AdminClient is derived from an existing Producer or Consumer

type AdminClient struct {
    // contains filtered or unexported fields
}

func NewAdminClient

func NewAdminClient(conf *ConfigMap) (*AdminClient, error)

NewAdminClient creats a new AdminClient instance with a new underlying client instance

func NewAdminClientFromConsumer

func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error)

NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance. The AdminClient will use the same configuration and connections as the parent instance.

func NewAdminClientFromProducer

func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error)

NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance. The AdminClient will use the same configuration and connections as the parent instance.

func (*AdminClient) AlterConfigs

func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error)

AlterConfigs alters/updates cluster resource configuration.

Updates are not transactional so they may succeed for a subset of the provided resources while others fail. The configuration for a particular resource is updated atomically, replacing values using the provided ConfigEntrys and reverting unspecified ConfigEntrys to their default values.

Requires broker version >=0.11.0.0

AlterConfigs will replace all existing configuration for the provided resources with the new configuration given, reverting all other configuration to their default values.

Multiple resources and resource types may be set, but at most one resource of type ResourceBroker is allowed per call since these resource requests must be sent to the broker specified in the resource.

func (*AdminClient) Close

func (a *AdminClient) Close()

Close an AdminClient instance.

func (*AdminClient) ClusterID

func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error)

ClusterID returns the cluster ID as reported in broker metadata.

Note on cancellation: Although the underlying C function respects the timeout, it currently cannot be manually cancelled. That means manually cancelling the context will block until the C function call returns.

Requires broker version >= 0.10.0.

func (*AdminClient) ControllerID

func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)

ControllerID returns the broker ID of the current controller as reported in broker metadata.

Note on cancellation: Although the underlying C function respects the timeout, it currently cannot be manually cancelled. That means manually cancelling the context will block until the C function call returns.

Requires broker version >= 0.10.0.

func (*AdminClient) CreatePartitions

func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error)

CreatePartitions creates additional partitions for topics.

func (*AdminClient) CreateTopics

func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error)

CreateTopics creates topics in cluster.

The list of TopicSpecification objects define the per-topic partition count, replicas, etc.

Topic creation is non-atomic and may succeed for some topics but fail for others, make sure to check the result for topic-specific errors.

Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.

func (*AdminClient) DeleteTopics

func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error)

DeleteTopics deletes a batch of topics.

This operation is not transactional and may succeed for a subset of topics while failing others. It may take several seconds after the DeleteTopics result returns success for all the brokers to become aware that the topics are gone. During this time, topic metadata and configuration may continue to return information about deleted topics.

Requires broker version >= 0.10.1.0

func (*AdminClient) DescribeConfigs

func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error)

DescribeConfigs retrieves configuration for cluster resources.

The returned configuration includes default values, use ConfigEntryResult.IsDefault or ConfigEntryResult.Source to distinguish default values from manually configured settings.

The value of config entries where .IsSensitive is true will always be nil to avoid disclosing sensitive information, such as security settings.

Configuration entries where .IsReadOnly is true can't be modified (with AlterConfigs).

Synonym configuration entries are returned if the broker supports it (broker version >= 1.1.0). See .Synonyms.

Requires broker version >=0.11.0.0

Multiple resources and resource types may be requested, but at most one resource of type ResourceBroker is allowed per call since these resource requests must be sent to the broker specified in the resource.

func (*AdminClient) GetMetadata

func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)

GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.

func (*AdminClient) SetOAuthBearerToken

func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error

SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per https://tools.ietf.org/html/rfc7628#section-3.1 ); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.

func (*AdminClient) SetOAuthBearerTokenFailure

func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error

SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.

func (*AdminClient) String

func (a *AdminClient) String() string

String returns a human readable name for an AdminClient instance

type AdminOption

AdminOption is a generic type not to be used directly.

See CreateTopicsAdminOption et.al.

type AdminOption interface {
    // contains filtered or unexported methods
}

type AdminOptionOperationTimeout

AdminOptionOperationTimeout sets the broker's operation timeout, such as the timeout for CreateTopics to complete the creation of topics on the controller before returning a result to the application.

CreateTopics, DeleteTopics, CreatePartitions: a value 0 will return immediately after triggering topic creation, while > 0 will wait this long for topic creation to propagate in cluster.

Default: 0 (return immediately).

Valid for CreateTopics, DeleteTopics, CreatePartitions.

type AdminOptionOperationTimeout struct {
    // contains filtered or unexported fields
}

func SetAdminOperationTimeout

func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout)

SetAdminOperationTimeout sets the broker's operation timeout, such as the timeout for CreateTopics to complete the creation of topics on the controller before returning a result to the application.

CreateTopics, DeleteTopics, CreatePartitions: a value 0 will return immediately after triggering topic creation, while > 0 will wait this long for topic creation to propagate in cluster.

Default: 0 (return immediately).

Valid for CreateTopics, DeleteTopics, CreatePartitions.

type AdminOptionRequestTimeout

AdminOptionRequestTimeout sets the overall request timeout, including broker lookup, request transmission, operation time on broker, and response.

Default: `socket.timeout.ms`.

Valid for all Admin API methods.

type AdminOptionRequestTimeout struct {
    // contains filtered or unexported fields
}

func SetAdminRequestTimeout

func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout)

SetAdminRequestTimeout sets the overall request timeout, including broker lookup, request transmission, operation time on broker, and response.

Default: `socket.timeout.ms`.

Valid for all Admin API methods.

type AdminOptionValidateOnly

AdminOptionValidateOnly tells the broker to only validate the request, without performing the requested operation (create topics, etc).

Default: false.

Valid for CreateTopics, CreatePartitions, AlterConfigs

type AdminOptionValidateOnly struct {
    // contains filtered or unexported fields
}

func SetAdminValidateOnly

func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly)

SetAdminValidateOnly tells the broker to only validate the request, without performing the requested operation (create topics, etc).

Default: false.

Valid for CreateTopics, DeleteTopics, CreatePartitions, AlterConfigs

type AlterConfigsAdminOption

AlterConfigsAdminOption - see setters.

See SetAdminRequestTimeout, SetAdminValidateOnly, SetAdminIncremental.

type AlterConfigsAdminOption interface {
    // contains filtered or unexported methods
}

type AlterOperation

AlterOperation specifies the operation to perform on the ConfigEntry. Currently only AlterOperationSet.

type AlterOperation int

func (AlterOperation) String

func (o AlterOperation) String() string

String returns the human-readable representation of an AlterOperation

type AssignedPartitions

AssignedPartitions consumer group rebalance event: assigned partition set

type AssignedPartitions struct {
    Partitions []TopicPartition
}

func (AssignedPartitions) String

func (e AssignedPartitions) String() string

type BrokerMetadata

BrokerMetadata contains per-broker metadata

type BrokerMetadata struct {
    ID   int32
    Host string
    Port int
}

type ConfigEntry

ConfigEntry holds parameters for altering a resource's configuration.

type ConfigEntry struct {
    // Name of configuration entry, e.g., topic configuration property name.
    Name string
    // Value of configuration entry.
    Value string
    // Operation to perform on the entry.
    Operation AlterOperation
}

func StringMapToConfigEntries

func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry

StringMapToConfigEntries creates a new map of ConfigEntry objects from the provided string map. The AlterOperation is set on each created entry.

func (ConfigEntry) String

func (c ConfigEntry) String() string

String returns a human-readable representation of a ConfigEntry.

type ConfigEntryResult

ConfigEntryResult contains the result of a single configuration entry from a DescribeConfigs request.

type ConfigEntryResult struct {
    // Name of configuration entry, e.g., topic configuration property name.
    Name string
    // Value of configuration entry.
    Value string
    // Source indicates the configuration source.
    Source ConfigSource
    // IsReadOnly indicates whether the configuration entry can be altered.
    IsReadOnly bool
    // IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset.
    IsSensitive bool
    // IsSynonym indicates whether the configuration entry is a synonym for another configuration property.
    IsSynonym bool
    // Synonyms contains a map of configuration entries that are synonyms to this configuration entry.
    Synonyms map[string]ConfigEntryResult
}

func (ConfigEntryResult) String

func (c ConfigEntryResult) String() string

String returns a human-readable representation of a ConfigEntryResult.

type ConfigMap

ConfigMap is a map containing standard librdkafka configuration properties as documented in: https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md

The special property "default.topic.config" (optional) is a ConfigMap containing default topic configuration properties.

The use of "default.topic.config" is deprecated, topic configuration properties shall be specified in the standard ConfigMap. For backwards compatibility, "default.topic.config" (if supplied) takes precedence.

type ConfigMap map[string]ConfigValue

func (ConfigMap) Get

func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error)

Get finds the given key in the ConfigMap and returns its value. If the key is not found `defval` is returned. If the key is found but the type does not match that of `defval` (unless nil) an ErrInvalidArg error is returned.

func (ConfigMap) Set

func (m ConfigMap) Set(kv string) error

Set implements flag.Set (command line argument parser) as a convenience for `-X key=value` config.

func (ConfigMap) SetKey

func (m ConfigMap) SetKey(key string, value ConfigValue) error

SetKey sets configuration property key to value.

For user convenience a key prefixed with {topic}. will be set on the "default.topic.config" sub-map, this use is deprecated.

type ConfigResource

ConfigResource holds parameters for altering an Apache Kafka configuration resource

type ConfigResource struct {
    // Type of resource to set.
    Type ResourceType
    // Name of resource to set.
    Name string
    // Config entries to set.
    // Configuration updates are atomic, any configuration property not provided
    // here will be reverted (by the broker) to its default value.
    // Use DescribeConfigs to retrieve the list of current configuration entry values.
    Config []ConfigEntry
}

func (ConfigResource) String

func (c ConfigResource) String() string

String returns a human-readable representation of a ConfigResource

type ConfigResourceResult

ConfigResourceResult provides the result for a resource from a AlterConfigs or DescribeConfigs request.

type ConfigResourceResult struct {
    // Type of returned result resource.
    Type ResourceType
    // Name of returned result resource.
    Name string
    // Error, if any, of returned result resource.
    Error Error
    // Config entries, if any, of returned result resource.
    Config map[string]ConfigEntryResult
}

func (ConfigResourceResult) String

func (c ConfigResourceResult) String() string

String returns a human-readable representation of a ConfigResourceResult.

type ConfigSource

ConfigSource represents an Apache Kafka config source

type ConfigSource int

func (ConfigSource) String

func (t ConfigSource) String() string

String returns the human-readable representation of a ConfigSource type

type ConfigValue

ConfigValue supports the following types:

bool, int, string, any type with the standard String() interface
type ConfigValue interface{}

type Consumer

Consumer implements a High-level Apache Kafka Consumer instance

type Consumer struct {
    // contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(conf *ConfigMap) (*Consumer, error)

NewConsumer creates a new high-level Consumer instance.

Supported special configuration properties:

go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
                                     If set to true the app must handle the AssignedPartitions and
                                     RevokedPartitions events and call Assign() and Unassign()
                                     respectively.
go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
go.events.channel.size (int, 1000) - Events() channel size

WARNING: Due to the buffering nature of channels (and queues in general) the use of the events channel risks receiving outdated events and messages. Minimizing go.events.channel.size reduces the risk and number of outdated events and messages but does not eliminate the factor completely. With a channel size of 1 at most one event or message may be outdated.

func (*Consumer) Assign

func (c *Consumer) Assign(partitions []TopicPartition) (err error)

Assign an atomic set of partitions to consume. This replaces the current assignment.

func (*Consumer) Assignment

func (c *Consumer) Assignment() (partitions []TopicPartition, err error)

Assignment returns the current partition assignments

func (*Consumer) Close

func (c *Consumer) Close() (err error)

Close Consumer instance. The object is no longer usable after this call.

func (*Consumer) Commit

func (c *Consumer) Commit() ([]TopicPartition, error)

Commit offsets for currently assigned partitions This is a blocking call. Returns the committed offsets on success.

func (*Consumer) CommitMessage

func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)

CommitMessage commits offset based on the provided message. This is a blocking call. Returns the committed offsets on success.

func (*Consumer) CommitOffsets

func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)

CommitOffsets commits the provided list of offsets This is a blocking call. Returns the committed offsets on success.

func (*Consumer) Committed

func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)

Committed retrieves committed offsets for the given set of partitions

func (*Consumer) Events

func (c *Consumer) Events() chan Event

Events returns the Events channel (if enabled)

func (*Consumer) GetMetadata

func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)

GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.

func (*Consumer) GetWatermarkOffsets

func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)

GetWatermarkOffsets returns the cached low and high offsets for the given topic and partition. The high offset is populated on every fetch response or via calling QueryWatermarkOffsets. The low offset is populated every statistics.interval.ms if that value is set. OffsetInvalid will be returned if there is no cached offset for either value.

func (*Consumer) OffsetsForTimes

func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)

OffsetsForTimes looks up offsets by timestamp for the given partitions.

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

The timestamps to query are represented as `.Offset` in the `times` argument and the looked up offsets are represented as `.Offset` in the returned `offsets` list.

The function will block for at most timeoutMs milliseconds.

Duplicate Topic+Partitions are not supported. Per-partition errors may be returned in the `.Error` field.

func (*Consumer) Pause

func (c *Consumer) Pause(partitions []TopicPartition) (err error)

Pause consumption for the provided list of partitions

Note that messages already enqueued on the consumer's Event channel (if `go.events.channel.enable` has been set) will NOT be purged by this call, set `go.events.channel.size` accordingly.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMs int) (event Event)

Poll the consumer for messages or events.

Will block for at most timeoutMs milliseconds

The following callbacks may be triggered:

Subscribe()'s rebalanceCb

Returns nil on timeout, else an Event

func (*Consumer) QueryWatermarkOffsets

func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)

QueryWatermarkOffsets queries the broker for the low and high offsets for the given topic and partition.

func (*Consumer) ReadMessage

func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error)

ReadMessage polls the consumer for a message.

This is a convenience API that wraps Poll() and only returns messages or errors. All other event types are discarded.

The call will block for at most `timeout` waiting for a new message or error. `timeout` may be set to -1 for indefinite wait.

Timeout is returned as (nil, err) where err is `kafka.(Error).Code == Kafka.ErrTimedOut`.

Messages are returned as (msg, nil), while general errors are returned as (nil, err), and partition-specific errors are returned as (msg, err) where msg.TopicPartition provides partition-specific information (such as topic, partition and offset).

All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.

func (*Consumer) Resume

func (c *Consumer) Resume(partitions []TopicPartition) (err error)

Resume consumption for the provided list of partitions

func (*Consumer) Seek

func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error

Seek seeks the given topic partitions using the offset from the TopicPartition.

If timeoutMs is not 0 the call will wait this long for the seek to be performed. If the timeout is reached the internal state will be unknown and this function returns ErrTimedOut. If timeoutMs is 0 it will initiate the seek but return immediately without any error reporting (e.g., async).

Seek() may only be used for partitions already being consumed (through Assign() or implicitly through a self-rebalanced Subscribe()). To set the starting offset it is preferred to use Assign() and provide a starting offset for each partition.

Returns an error on failure or nil otherwise.

func (*Consumer) SetOAuthBearerToken

func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error

SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per https://tools.ietf.org/html/rfc7628#section-3.1 ); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.

func (*Consumer) SetOAuthBearerTokenFailure

func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error

SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.

func (*Consumer) StoreOffsets

func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)

StoreOffsets stores the provided list of offsets that will be committed to the offset store according to `auto.commit.interval.ms` or manual offset-less Commit().

Returns the stored offsets on success. If at least one offset couldn't be stored, an error and a list of offsets is returned. Each offset can be checked for specific errors via its `.Error` member.

func (*Consumer) String

func (c *Consumer) String() string

Strings returns a human readable name for a Consumer instance

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error

Subscribe to a single topic This replaces the current subscription

func (*Consumer) SubscribeTopics

func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)

SubscribeTopics subscribes to the provided list of topics. This replaces the current subscription.

func (*Consumer) Subscription

func (c *Consumer) Subscription() (topics []string, err error)

Subscription returns the current subscription as set by Subscribe()

func (*Consumer) Unassign

func (c *Consumer) Unassign() (err error)

Unassign the current set of partitions to consume.

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe() (err error)

Unsubscribe from the current subscription, if any.

type CreatePartitionsAdminOption

CreatePartitionsAdminOption - see setters.

See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.

type CreatePartitionsAdminOption interface {
    // contains filtered or unexported methods
}

type CreateTopicsAdminOption

CreateTopicsAdminOption - see setters.

See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.

type CreateTopicsAdminOption interface {
    // contains filtered or unexported methods
}

type DeleteTopicsAdminOption

DeleteTopicsAdminOption - see setters.

See SetAdminRequestTimeout, SetAdminOperationTimeout.

type DeleteTopicsAdminOption interface {
    // contains filtered or unexported methods
}

type DescribeConfigsAdminOption

DescribeConfigsAdminOption - see setters.

See SetAdminRequestTimeout.

type DescribeConfigsAdminOption interface {
    // contains filtered or unexported methods
}

type Error

Error provides a Kafka-specific error container

type Error struct {
    // contains filtered or unexported fields
}

func NewError

func NewError(code ErrorCode, str string, fatal bool) (err Error)

NewError creates a new Error.

func (Error) Code

func (e Error) Code() ErrorCode

Code returns the ErrorCode of an Error

func (Error) Error

func (e Error) Error() string

Error returns a human readable representation of an Error Same as Error.String()

func (Error) IsFatal

func (e Error) IsFatal() bool

IsFatal returns true if the error is a fatal error. A fatal error indicates the client instance is no longer operable and should be terminated. Typical causes include non-recoverable idempotent producer errors.

func (Error) String

func (e Error) String() string

String returns a human readable representation of an Error

type ErrorCode

ErrorCode is the integer representation of local and broker error codes

type ErrorCode int
const (
    // ErrBadMsg Local: Bad message format
    ErrBadMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_MSG)
    // ErrBadCompression Local: Invalid compressed data
    ErrBadCompression ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_COMPRESSION)
    // ErrDestroy Local: Broker handle destroyed
    ErrDestroy ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__DESTROY)
    // ErrFail Local: Communication failure with broker
    ErrFail ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FAIL)
    // ErrTransport Local: Broker transport failure
    ErrTransport ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TRANSPORT)
    // ErrCritSysResource Local: Critical system resource failure
    ErrCritSysResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE)
    // ErrResolve Local: Host resolution failure
    ErrResolve ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RESOLVE)
    // ErrMsgTimedOut Local: Message timed out
    ErrMsgTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MSG_TIMED_OUT)
    // ErrPartitionEOF Broker: No more messages
    ErrPartitionEOF ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTITION_EOF)
    // ErrUnknownPartition Local: Unknown partition
    ErrUnknownPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
    // ErrFs Local: File or filesystem error
    ErrFs ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FS)
    // ErrUnknownTopic Local: Unknown topic
    ErrUnknownTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
    // ErrAllBrokersDown Local: All broker connections are down
    ErrAllBrokersDown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)
    // ErrInvalidArg Local: Invalid argument or configuration
    ErrInvalidArg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_ARG)
    // ErrTimedOut Local: Timed out
    ErrTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
    // ErrQueueFull Local: Queue full
    ErrQueueFull ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__QUEUE_FULL)
    // ErrIsrInsuff Local: ISR count insufficient
    ErrIsrInsuff ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ISR_INSUFF)
    // ErrNodeUpdate Local: Broker node update
    ErrNodeUpdate ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NODE_UPDATE)
    // ErrSsl Local: SSL error
    ErrSsl ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__SSL)
    // ErrWaitCoord Local: Waiting for coordinator
    ErrWaitCoord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_COORD)
    // ErrUnknownGroup Local: Unknown group
    ErrUnknownGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_GROUP)
    // ErrInProgress Local: Operation in progress
    ErrInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__IN_PROGRESS)
    // ErrPrevInProgress Local: Previous operation in progress
    ErrPrevInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS)
    // ErrExistingSubscription Local: Existing subscription
    ErrExistingSubscription ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION)
    // ErrAssignPartitions Local: Assign partitions
    ErrAssignPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
    // ErrRevokePartitions Local: Revoke partitions
    ErrRevokePartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS)
    // ErrConflict Local: Conflicting use
    ErrConflict ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CONFLICT)
    // ErrState Local: Erroneous state
    ErrState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__STATE)
    // ErrUnknownProtocol Local: Unknown protocol
    ErrUnknownProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL)
    // ErrNotImplemented Local: Not implemented
    ErrNotImplemented ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED)
    // ErrAuthentication Local: Authentication failure
    ErrAuthentication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTHENTICATION)
    // ErrNoOffset Local: No offset stored
    ErrNoOffset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NO_OFFSET)
    // ErrOutdated Local: Outdated
    ErrOutdated ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__OUTDATED)
    // ErrTimedOutQueue Local: Timed out in queue
    ErrTimedOutQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE)
    // ErrUnsupportedFeature Local: Required feature not supported by broker
    ErrUnsupportedFeature ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE)
    // ErrWaitCache Local: Awaiting cache update
    ErrWaitCache ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_CACHE)
    // ErrIntr Local: Operation interrupted
    ErrIntr ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INTR)
    // ErrKeySerialization Local: Key serialization error
    ErrKeySerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_SERIALIZATION)
    // ErrValueSerialization Local: Value serialization error
    ErrValueSerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION)
    // ErrKeyDeserialization Local: Key deserialization error
    ErrKeyDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION)
    // ErrValueDeserialization Local: Value deserialization error
    ErrValueDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION)
    // ErrPartial Local: Partial response
    ErrPartial ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTIAL)
    // ErrReadOnly Local: Read-only object
    ErrReadOnly ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__READ_ONLY)
    // ErrNoent Local: No such entry
    ErrNoent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOENT)
    // ErrUnderflow Local: Read underflow
    ErrUnderflow ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNDERFLOW)
    // ErrInvalidType Local: Invalid type
    ErrInvalidType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_TYPE)
    // ErrRetry Local: Retry operation
    ErrRetry ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RETRY)
    // ErrPurgeQueue Local: Purged in queue
    ErrPurgeQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PURGE_QUEUE)
    // ErrPurgeInflight Local: Purged in flight
    ErrPurgeInflight ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PURGE_INFLIGHT)
    // ErrFatal Local: Fatal error
    ErrFatal ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FATAL)
    // ErrInconsistent Local: Inconsistent state
    ErrInconsistent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INCONSISTENT)
    // ErrGaplessGuarantee Local: Gap-less ordering would not be guaranteed if proceeding
    ErrGaplessGuarantee ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE)
    // ErrMaxPollExceeded Local: Maximum application poll interval (max.poll.interval.ms) exceeded
    ErrMaxPollExceeded ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED)
    // ErrUnknown Unknown broker error
    ErrUnknown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN)
    // ErrNoError Success
    ErrNoError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_ERROR)
    // ErrOffsetOutOfRange Broker: Offset out of range
    ErrOffsetOutOfRange ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE)
    // ErrInvalidMsg Broker: Invalid message
    ErrInvalidMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG)
    // ErrUnknownTopicOrPart Broker: Unknown topic or partition
    ErrUnknownTopicOrPart ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
    // ErrInvalidMsgSize Broker: Invalid message size
    ErrInvalidMsgSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE)
    // ErrLeaderNotAvailable Broker: Leader not available
    ErrLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
    // ErrNotLeaderForPartition Broker: Not leader for partition
    ErrNotLeaderForPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION)
    // ErrRequestTimedOut Broker: Request timed out
    ErrRequestTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT)
    // ErrBrokerNotAvailable Broker: Broker not available
    ErrBrokerNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE)
    // ErrReplicaNotAvailable Broker: Replica not available
    ErrReplicaNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE)
    // ErrMsgSizeTooLarge Broker: Message size too large
    ErrMsgSizeTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE)
    // ErrStaleCtrlEpoch Broker: StaleControllerEpochCode
    ErrStaleCtrlEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH)
    // ErrOffsetMetadataTooLarge Broker: Offset metadata string too large
    ErrOffsetMetadataTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE)
    // ErrNetworkException Broker: Broker disconnected before response received
    ErrNetworkException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION)
    // ErrGroupLoadInProgress Broker: Group coordinator load in progress
    ErrGroupLoadInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS)
    // ErrGroupCoordinatorNotAvailable Broker: Group coordinator not available
    ErrGroupCoordinatorNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE)
    // ErrNotCoordinatorForGroup Broker: Not coordinator for group
    ErrNotCoordinatorForGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP)
    // ErrTopicException Broker: Invalid topic
    ErrTopicException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION)
    // ErrRecordListTooLarge Broker: Message batch larger than configured server segment size
    ErrRecordListTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE)
    // ErrNotEnoughReplicas Broker: Not enough in-sync replicas
    ErrNotEnoughReplicas ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS)
    // ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas
    ErrNotEnoughReplicasAfterAppend ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND)
    // ErrInvalidRequiredAcks Broker: Invalid required acks value
    ErrInvalidRequiredAcks ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS)
    // ErrIllegalGeneration Broker: Specified group generation id is not valid
    ErrIllegalGeneration ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
    // ErrInconsistentGroupProtocol Broker: Inconsistent group protocol
    ErrInconsistentGroupProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL)
    // ErrInvalidGroupID Broker: Invalid group.id
    ErrInvalidGroupID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_GROUP_ID)
    // ErrUnknownMemberID Broker: Unknown member
    ErrUnknownMemberID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
    // ErrInvalidSessionTimeout Broker: Invalid session timeout
    ErrInvalidSessionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT)
    // ErrRebalanceInProgress Broker: Group rebalance in progress
    ErrRebalanceInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS)
    // ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid
    ErrInvalidCommitOffsetSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE)
    // ErrTopicAuthorizationFailed Broker: Topic authorization failed
    ErrTopicAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED)
    // ErrGroupAuthorizationFailed Broker: Group authorization failed
    ErrGroupAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED)
    // ErrClusterAuthorizationFailed Broker: Cluster authorization failed
    ErrClusterAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED)
    // ErrInvalidTimestamp Broker: Invalid timestamp
    ErrInvalidTimestamp ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP)
    // ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism
    ErrUnsupportedSaslMechanism ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM)
    // ErrIllegalSaslState Broker: Request not valid in current SASL state
    ErrIllegalSaslState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE)
    // ErrUnsupportedVersion Broker: API version not supported
    ErrUnsupportedVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)
    // ErrTopicAlreadyExists Broker: Topic already exists
    ErrTopicAlreadyExists ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS)
    // ErrInvalidPartitions Broker: Invalid number of partitions
    ErrInvalidPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PARTITIONS)
    // ErrInvalidReplicationFactor Broker: Invalid replication factor
    ErrInvalidReplicationFactor ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR)
    // ErrInvalidReplicaAssignment Broker: Invalid replica assignment
    ErrInvalidReplicaAssignment ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT)
    // ErrInvalidConfig Broker: Configuration is invalid
    ErrInvalidConfig ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_CONFIG)
    // ErrNotController Broker: Not controller for cluster
    ErrNotController ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_CONTROLLER)
    // ErrInvalidRequest Broker: Invalid request
    ErrInvalidRequest ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUEST)
    // ErrUnsupportedForMessageFormat Broker: Message format on broker does not support request
    ErrUnsupportedForMessageFormat ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT)
    // ErrPolicyViolation Broker: Policy violation
    ErrPolicyViolation ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_POLICY_VIOLATION)
    // ErrOutOfOrderSequenceNumber Broker: Broker received an out of order sequence number
    ErrOutOfOrderSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER)
    // ErrDuplicateSequenceNumber Broker: Broker received a duplicate sequence number
    ErrDuplicateSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER)
    // ErrInvalidProducerEpoch Broker: Producer attempted an operation with an old epoch
    ErrInvalidProducerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH)
    // ErrInvalidTxnState Broker: Producer attempted a transactional operation in an invalid state
    ErrInvalidTxnState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TXN_STATE)
    // ErrInvalidProducerIDMapping Broker: Producer attempted to use a producer id which is not currently assigned to its transactional id
    ErrInvalidProducerIDMapping ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING)
    // ErrInvalidTransactionTimeout Broker: Transaction timeout is larger than the maximum value allowed by the broker's max.transaction.timeout.ms
    ErrInvalidTransactionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT)
    // ErrConcurrentTransactions Broker: Producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
    ErrConcurrentTransactions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS)
    // ErrTransactionCoordinatorFenced Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer
    ErrTransactionCoordinatorFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED)
    // ErrTransactionalIDAuthorizationFailed Broker: Transactional Id authorization failed
    ErrTransactionalIDAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
    // ErrSecurityDisabled Broker: Security features are disabled
    ErrSecurityDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SECURITY_DISABLED)
    // ErrOperationNotAttempted Broker: Operation not attempted
    ErrOperationNotAttempted ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED)
    // ErrKafkaStorageError Disk error when trying to access log file on the disk
    ErrKafkaStorageError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR)
    // ErrLogDirNotFound The user-specified log directory is not found in the broker config
    ErrLogDirNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND)
    // ErrSaslAuthenticationFailed SASL Authentication failed
    ErrSaslAuthenticationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED)
    // ErrUnknownProducerID Unknown Producer Id
    ErrUnknownProducerID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID)
    // ErrReassignmentInProgress Partition reassignment is in progress
    ErrReassignmentInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS)
    // ErrDelegationTokenAuthDisabled Delegation Token feature is not enabled
    ErrDelegationTokenAuthDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED)
    // ErrDelegationTokenNotFound Delegation Token is not found on server
    ErrDelegationTokenNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND)
    // ErrDelegationTokenOwnerMismatch Specified Principal is not valid Owner/Renewer
    ErrDelegationTokenOwnerMismatch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH)
    // ErrDelegationTokenRequestNotAllowed Delegation Token requests are not allowed on this connection
    ErrDelegationTokenRequestNotAllowed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED)
    // ErrDelegationTokenAuthorizationFailed Delegation Token authorization failed
    ErrDelegationTokenAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED)
    // ErrDelegationTokenExpired Delegation Token is expired
    ErrDelegationTokenExpired ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED)
    // ErrInvalidPrincipalType Supplied principalType is not supported
    ErrInvalidPrincipalType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE)
    // ErrNonEmptyGroup The group is not empty
    ErrNonEmptyGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP)
    // ErrGroupIDNotFound The group id does not exist
    ErrGroupIDNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND)
    // ErrFetchSessionIDNotFound The fetch session ID was not found
    ErrFetchSessionIDNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND)
    // ErrInvalidFetchSessionEpoch The fetch session epoch is invalid
    ErrInvalidFetchSessionEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH)
    // ErrListenerNotFound No matching listener
    ErrListenerNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND)
    // ErrTopicDeletionDisabled Topic deletion is disabled
    ErrTopicDeletionDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED)
    // ErrFencedLeaderEpoch Leader epoch is older than broker epoch
    ErrFencedLeaderEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH)
    // ErrUnknownLeaderEpoch Leader epoch is newer than broker epoch
    ErrUnknownLeaderEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH)
    // ErrUnsupportedCompressionType Unsupported compression type
    ErrUnsupportedCompressionType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE)
    // ErrStaleBrokerEpoch Broker epoch has changed
    ErrStaleBrokerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH)
    // ErrOffsetNotAvailable Leader high watermark is not caught up
    ErrOffsetNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE)
    // ErrMemberIDRequired Group member needs a valid member ID
    ErrMemberIDRequired ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED)
    // ErrPreferredLeaderNotAvailable Preferred leader was not available
    ErrPreferredLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE)
    // ErrGroupMaxSizeReached Consumer group has reached maximum size
    ErrGroupMaxSizeReached ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED)
)

func (ErrorCode) String

func (c ErrorCode) String() string

String returns a human readable representation of an error code

type Event

Event generic interface

type Event interface {
    // String returns a human-readable representation of the event
    String() string
}

type Handle

Handle represents a generic client handle containing common parts for both Producer and Consumer.

type Handle interface {
    // SetOAuthBearerToken sets the the data to be transmitted
    // to a broker during SASL/OAUTHBEARER authentication. It will return nil
    // on success, otherwise an error if:
    // 1) the token data is invalid (meaning an expiration time in the past
    // or either a token value or an extension key or value that does not meet
    // the regular expression requirements as per
    // https://tools.ietf.org/html/rfc7628#section-3.1);
    // 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build;
    // 3) SASL/OAUTHBEARER is supported but is not configured as the client's
    // authentication mechanism.
    SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error

    // SetOAuthBearerTokenFailure sets the error message describing why token
    // retrieval/setting failed; it also schedules a new token refresh event for 10
    // seconds later so the attempt may be retried. It will return nil on
    // success, otherwise an error if:
    // 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build;
    // 2) SASL/OAUTHBEARER is supported but is not configured as the client's
    // authentication mechanism.
    SetOAuthBearerTokenFailure(errstr string) error
    // contains filtered or unexported methods
}

Header represents a single Kafka message header.

Message headers are made up of a list of Header elements, retaining their original insert order and allowing for duplicate Keys.

Key is a human readable string identifying the header. Value is the key's binary value, Kafka does not put any restrictions on the format of of the Value but it should be made relatively compact. The value may be a byte array, empty, or nil.

NOTE: Message headers are not available on producer delivery report messages.

type Header struct {
    Key   string // Header name (utf-8 string)
    Value []byte // Header value (nil, empty, or binary)
}

func (Header) String

func (h Header) String() string

String returns the Header Key and data in a human representable possibly truncated form suitable for displaying to the user.

type Message

Message represents a Kafka message

type Message struct {
    TopicPartition TopicPartition
    Value          []byte
    Key            []byte
    Timestamp      time.Time
    TimestampType  TimestampType
    Opaque         interface{}
    Headers        []Header
}

func (*Message) String

func (m *Message) String() string

String returns a human readable representation of a Message. Key and payload are not represented.

type Metadata

Metadata contains broker and topic metadata for all (matching) topics

type Metadata struct {
    Brokers []BrokerMetadata
    Topics  map[string]TopicMetadata

    OriginatingBroker BrokerMetadata
}

type OAuthBearerToken

OAuthBearerToken represents the data to be transmitted to a broker during SASL/OAUTHBEARER authentication.

type OAuthBearerToken struct {
    // Token value, often (but not necessarily) a JWS compact serialization
    // as per https://tools.ietf.org/html/rfc7515#section-3.1; it must meet
    // the regular expression for a SASL/OAUTHBEARER value defined at
    // https://tools.ietf.org/html/rfc7628#section-3.1
    TokenValue string
    // Metadata about the token indicating when it expires (local time);
    // it must represent a time in the future
    Expiration time.Time
    // Metadata about the token indicating the Kafka principal name
    // to which it applies (for example, "admin")
    Principal string
    // SASL extensions, if any, to be communicated to the broker during
    // authentication (all keys and values of which must meet the regular
    // expressions defined at https://tools.ietf.org/html/rfc7628#section-3.1,
    // and it must not contain the reserved "auth" key)
    Extensions map[string]string
}

type OAuthBearerTokenRefresh

OAuthBearerTokenRefresh indicates token refresh is required

type OAuthBearerTokenRefresh struct {
    // Config is the value of the sasl.oauthbearer.config property
    Config string
}

func (OAuthBearerTokenRefresh) String

func (o OAuthBearerTokenRefresh) String() string

type Offset

Offset type (int64) with support for canonical names

type Offset int64

func NewOffset

func NewOffset(offset interface{}) (Offset, error)

NewOffset creates a new Offset using the provided logical string, or an absolute int64 offset value. Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"

func OffsetTail

func OffsetTail(relativeOffset Offset) Offset

OffsetTail returns the logical offset relativeOffset from current end of partition

func (*Offset) Set

func (o *Offset) Set(offset interface{}) error

Set offset value, see NewOffset()

func (Offset) String

func (o Offset) String() string

type OffsetsCommitted

OffsetsCommitted reports committed offsets

type OffsetsCommitted struct {
    Error   error
    Offsets []TopicPartition
}

func (OffsetsCommitted) String

func (o OffsetsCommitted) String() string

type PartitionEOF

PartitionEOF consumer reached end of partition Needs to be explicitly enabled by setting the `enable.partition.eof` configuration property to true.

type PartitionEOF TopicPartition

func (PartitionEOF) String

func (p PartitionEOF) String() string

type PartitionMetadata

PartitionMetadata contains per-partition metadata

type PartitionMetadata struct {
    ID       int32
    Error    Error
    Leader   int32
    Replicas []int32
    Isrs     []int32
}

type PartitionsSpecification

PartitionsSpecification holds parameters for creating additional partitions for a topic. PartitionsSpecification is analogous to NewPartitions in the Java Topic Admin API.

type PartitionsSpecification struct {
    // Topic to create more partitions for.
    Topic string
    // New partition count for topic, must be higher than current partition count.
    IncreaseTo int
    // (Optional) Explicit replica assignment. The outer array is
    // indexed by the new partition index (i.e., 0 for the first added
    // partition), while the inner per-partition array
    // contains the replica broker ids. The first broker in each
    // broker id list will be the preferred replica.
    ReplicaAssignment [][]int32
}

type Producer

Producer implements a High-level Apache Kafka Producer instance

type Producer struct {
    // contains filtered or unexported fields
}

func NewProducer

func NewProducer(conf *ConfigMap) (*Producer, error)

NewProducer creates a new high-level Producer instance.

conf is a *ConfigMap with standard librdkafka configuration properties, see here:

Supported special configuration properties:

go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance).
                                  These batches do not relate to Kafka message batches in any way.
                                  Note: timestamps and headers are not supported with this interface.
go.delivery.reports (bool, true) - Forward per-message delivery reports to the
                                   Events() channel.
go.events.channel.size (int, 1000000) - Events() channel size
go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)

func (*Producer) Close

func (p *Producer) Close()

Close a Producer instance. The Producer object or its channels are no longer usable after this call.

func (*Producer) Events

func (p *Producer) Events() chan Event

Events returns the Events channel (read)

func (*Producer) Flush

func (p *Producer) Flush(timeoutMs int) int

Flush and wait for outstanding messages and requests to complete delivery. Includes messages on ProduceChannel. Runs until value reaches zero or on timeoutMs. Returns the number of outstanding events still un-flushed.

func (*Producer) GetFatalError

func (p *Producer) GetFatalError() error

GetFatalError returns an Error object if the client instance has raised a fatal error, else nil.

func (*Producer) GetMetadata

func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)

GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.

func (*Producer) Len

func (p *Producer) Len() int

Len returns the number of messages and requests waiting to be transmitted to the broker as well as delivery reports queued for the application. Includes messages on ProduceChannel.

func (*Producer) OffsetsForTimes

func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)

OffsetsForTimes looks up offsets by timestamp for the given partitions.

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

The timestamps to query are represented as `.Offset` in the `times` argument and the looked up offsets are represented as `.Offset` in the returned `offsets` list.

The function will block for at most timeoutMs milliseconds.

Duplicate Topic+Partitions are not supported. Per-partition errors may be returned in the `.Error` field.

func (*Producer) Produce

func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error

Produce single message. This is an asynchronous call that enqueues the message on the internal transmit queue, thus returning immediately. The delivery report will be sent on the provided deliveryChan if specified, or on the Producer object's Events() channel if not. msg.Timestamp requires librdkafka >= 0.9.4 (else returns ErrNotImplemented), api.version.request=true, and broker >= 0.10.0.0. msg.Headers requires librdkafka >= 0.11.4 (else returns ErrNotImplemented), api.version.request=true, and broker >= 0.11.0.0. Returns an error if message could not be enqueued.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *Message

ProduceChannel returns the produce *Message channel (write)

func (*Producer) Purge

func (p *Producer) Purge(flags int) error

Purge messages currently handled by this producer instance.

flags is a combination of PurgeQueue, PurgeInFlight and PurgeNonBlocking.

The application will need to call Poll(), Flush() or read the Events() channel after this call to serve delivery reports for the purged messages.

Messages purged from internal queues fail with the delivery report error code set to ErrPurgeQueue, while purged messages that are in-flight to or from the broker will fail with the error code set to ErrPurgeInflight.

Warning: Purging messages that are in-flight to or from the broker

will ignore any sub-sequent acknowledgement for these messages
received from the broker, effectively making it impossible
for the application to know if the messages were successfully
produced or not. This may result in duplicate messages if the
application retries these messages at a later time.

Note: This call may block for a short time while background thread

queues are purged.

Returns nil on success, ErrInvalidArg if the purge flags are invalid or unknown.

func (*Producer) QueryWatermarkOffsets

func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)

QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition.

func (*Producer) SetOAuthBearerToken

func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error

SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per https://tools.ietf.org/html/rfc7628#section-3.1 ); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.

func (*Producer) SetOAuthBearerTokenFailure

func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error

SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.

func (*Producer) String

func (p *Producer) String() string

String returns a human readable name for a Producer instance

func (*Producer) TestFatalError

func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode

TestFatalError triggers a fatal error in the underlying client. This is to be used strictly for testing purposes.

type RebalanceCb

RebalanceCb provides a per-Subscribe*() rebalance event callback. The passed Event will be either AssignedPartitions or RevokedPartitions

type RebalanceCb func(*Consumer, Event) error

type ResourceType

ResourceType represents an Apache Kafka resource type

type ResourceType int

func ResourceTypeFromString

func ResourceTypeFromString(typeString string) (ResourceType, error)

ResourceTypeFromString translates a resource type name/string to a ResourceType value.

func (ResourceType) String

func (t ResourceType) String() string

String returns the human-readable representation of a ResourceType

type RevokedPartitions

RevokedPartitions consumer group rebalance event: revoked partition set

type RevokedPartitions struct {
    Partitions []TopicPartition
}

func (RevokedPartitions) String

func (e RevokedPartitions) String() string

type Stats

Stats statistics event

type Stats struct {
    // contains filtered or unexported fields
}

func (Stats) String

func (e Stats) String() string

type TimestampType

TimestampType is a the Message timestamp type or source

type TimestampType int

func (TimestampType) String

func (t TimestampType) String() string

type TopicMetadata

TopicMetadata contains per-topic metadata

type TopicMetadata struct {
    Topic      string
    Partitions []PartitionMetadata
    Error      Error
}

type TopicPartition

TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.

type TopicPartition struct {
    Topic     *string
    Partition int32
    Offset    Offset
    Metadata  *string
    Error     error
}

func (TopicPartition) String

func (p TopicPartition) String() string

type TopicPartitions

TopicPartitions is a slice of TopicPartitions that also implements the sort interface

type TopicPartitions []TopicPartition

func (TopicPartitions) Len

func (tps TopicPartitions) Len() int

func (TopicPartitions) Less

func (tps TopicPartitions) Less(i, j int) bool

func (TopicPartitions) Swap

func (tps TopicPartitions) Swap(i, j int)

type TopicResult

TopicResult provides per-topic operation result (error) information.

type TopicResult struct {
    // Topic name
    Topic string
    // Error, if any, of result. Check with `Error.Code() != ErrNoError`.
    Error Error
}

func (TopicResult) String

func (t TopicResult) String() string

String returns a human-readable representation of a TopicResult.

type TopicSpecification

TopicSpecification holds parameters for creating a new topic. TopicSpecification is analogous to NewTopic in the Java Topic Admin API.

type TopicSpecification struct {
    // Topic name to create.
    Topic string
    // Number of partitions in topic.
    NumPartitions int
    // Default replication factor for the topic's partitions, or zero
    // if an explicit ReplicaAssignment is set.
    ReplicationFactor int
    // (Optional) Explicit replica assignment. The outer array is
    // indexed by the partition number, while the inner per-partition array
    // contains the replica broker ids. The first broker in each
    // broker id list will be the preferred replica.
    ReplicaAssignment [][]int32
    // Topic configuration.
    Config map[string]string
}