const ( // PurgeInFlight purges messages in-flight to or from the broker. // Purging these messages will void any future acknowledgements from the // broker, making it impossible for the application to know if these // messages were successfully delivered or not. // Retrying these messages may lead to duplicates. PurgeInFlight = int(C.RD_KAFKA_PURGE_F_INFLIGHT) // PurgeQueue Purge messages in internal queues. PurgeQueue = int(C.RD_KAFKA_PURGE_F_QUEUE) // PurgeNonBlocking Don't wait for background thread queue purging to finish. PurgeNonBlocking = int(C.RD_KAFKA_PURGE_F_NON_BLOCKING) )
const ( // AlterOperationSet sets/overwrites the configuration setting. AlterOperationSet = iota )
LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client
const LibrdkafkaLinkInfo = "static glibc_linux_amd64 from librdkafka-static-bundle-v2.6.0.tgz"
OffsetBeginning represents the earliest offset (logical)
const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)
OffsetEnd represents the latest offset (logical)
const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)
OffsetInvalid represents an invalid/unspecified offset
const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)
OffsetStored represents a stored offset
const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)
PartitionAny represents any partition (for partitioning), or unspecified value (for all other cases)
const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)
func LibraryVersion() (int, string)
LibraryVersion returns the underlying librdkafka library version as a (version_int, version_str) tuple.
func WriteErrorCodes(f *os.File)
WriteErrorCodes writes Go error code constants to file from the librdkafka error codes. This function is not intended for public use.
ACLBinding specifies the operation and permission type for a specific principal over one or more resources of the same type. Used by `AdminClient.CreateACLs`, returned by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`.
type ACLBinding struct { Type ResourceType // The resource type. // The resource name, which depends on the resource type. // For ResourceBroker the resource name is the broker id. Name string ResourcePatternType ResourcePatternType // The resource pattern, relative to the name. Principal string // The principal this ACLBinding refers to. Host string // The host that the call is allowed to come from. Operation ACLOperation // The operation/s specified by this binding. PermissionType ACLPermissionType // The permission type for the specified operation. }
ACLBindingFilter specifies a filter used to return a list of ACL bindings matching some or all of its attributes. Used by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`.
type ACLBindingFilter = ACLBinding
ACLBindingFilters is a slice of ACLBindingFilter that also implements the sort interface
type ACLBindingFilters []ACLBindingFilter
ACLBindings is a slice of ACLBinding that also implements the sort interface
type ACLBindings []ACLBinding
func (a ACLBindings) Len() int
func (a ACLBindings) Less(i, j int) bool
func (a ACLBindings) Swap(i, j int)
ACLOperation enumerates the different types of ACL operation.
type ACLOperation int
const ( // ACLOperationUnknown represents an unknown or unset operation ACLOperationUnknown ACLOperation = C.RD_KAFKA_ACL_OPERATION_UNKNOWN // ACLOperationAny in a filter, matches any ACLOperation ACLOperationAny ACLOperation = C.RD_KAFKA_ACL_OPERATION_ANY // ACLOperationAll represents all the operations ACLOperationAll ACLOperation = C.RD_KAFKA_ACL_OPERATION_ALL // ACLOperationRead a read operation ACLOperationRead ACLOperation = C.RD_KAFKA_ACL_OPERATION_READ // ACLOperationWrite represents a write operation ACLOperationWrite ACLOperation = C.RD_KAFKA_ACL_OPERATION_WRITE // ACLOperationCreate represents a create operation ACLOperationCreate ACLOperation = C.RD_KAFKA_ACL_OPERATION_CREATE // ACLOperationDelete represents a delete operation ACLOperationDelete ACLOperation = C.RD_KAFKA_ACL_OPERATION_DELETE // ACLOperationAlter represents an alter operation ACLOperationAlter ACLOperation = C.RD_KAFKA_ACL_OPERATION_ALTER // ACLOperationDescribe represents a describe operation ACLOperationDescribe ACLOperation = C.RD_KAFKA_ACL_OPERATION_DESCRIBE // ACLOperationClusterAction represents a cluster action operation ACLOperationClusterAction ACLOperation = C.RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION // ACLOperationDescribeConfigs represents a describe configs operation ACLOperationDescribeConfigs ACLOperation = C.RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS // ACLOperationAlterConfigs represents an alter configs operation ACLOperationAlterConfigs ACLOperation = C.RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS // ACLOperationIdempotentWrite represents an idempotent write operation ACLOperationIdempotentWrite ACLOperation = C.RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE )
func ACLOperationFromString(aclOperationString string) (ACLOperation, error)
ACLOperationFromString translates a ACL operation name to a ACLOperation value.
func (o ACLOperation) String() string
String returns the human-readable representation of an ACLOperation
ACLPermissionType enumerates the different types of ACL permission types.
type ACLPermissionType int
const ( // ACLPermissionTypeUnknown represents an unknown ACLPermissionType ACLPermissionTypeUnknown ACLPermissionType = C.RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN // ACLPermissionTypeAny in a filter, matches any ACLPermissionType ACLPermissionTypeAny ACLPermissionType = C.RD_KAFKA_ACL_PERMISSION_TYPE_ANY // ACLPermissionTypeDeny disallows access ACLPermissionTypeDeny ACLPermissionType = C.RD_KAFKA_ACL_PERMISSION_TYPE_DENY // ACLPermissionTypeAllow grants access ACLPermissionTypeAllow ACLPermissionType = C.RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW )
func ACLPermissionTypeFromString(aclPermissionTypeString string) (ACLPermissionType, error)
ACLPermissionTypeFromString translates a ACL permission type name to a ACLPermissionType value.
func (o ACLPermissionType) String() string
String returns the human-readable representation of an ACLPermissionType
AdminClient is derived from an existing Producer or Consumer
type AdminClient struct {
// contains filtered or unexported fields
}
func NewAdminClient(conf *ConfigMap) (*AdminClient, error)
NewAdminClient creats a new AdminClient instance with a new underlying client instance
func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error)
NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance. The AdminClient will use the same configuration and connections as the parent instance.
func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error)
NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance. The AdminClient will use the same configuration and connections as the parent instance.
func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error)
AlterConfigs alters/updates cluster resource configuration.
Updates are not transactional so they may succeed for a subset of the provided resources while others fail. The configuration for a particular resource is updated atomically, replacing values using the provided ConfigEntrys and reverting unspecified ConfigEntrys to their default values.
Requires broker version >=0.11.0.0
AlterConfigs will replace all existing configuration for the provided resources with the new configuration given, reverting all other configuration to their default values.
Multiple resources and resource types may be set, but at most one resource of type ResourceBroker is allowed per call since these resource requests must be sent to the broker specified in the resource. Deprecated: AlterConfigs is deprecated in favour of IncrementalAlterConfigs
func (a *AdminClient) AlterConsumerGroupOffsets( ctx context.Context, groupsPartitions []ConsumerGroupTopicPartitions, options ...AlterConsumerGroupOffsetsAdminOption) (acgor AlterConsumerGroupOffsetsResult, err error)
AlterConsumerGroupOffsets alters the offsets for topic partition(s) for consumer group(s).
Parameters:
Returns a AlterConsumerGroupOffsetsResult, containing a slice of ConsumerGroupTopicPartitions corresponding to the input slice, plus an error that is not `nil` for client level errors. Individual TopicPartitions inside each of the ConsumerGroupTopicPartitions should also be checked for errors. This will succeed at the partition level only if the group is not actively subscribed to the corresponding topic(s).
func (a *AdminClient) AlterUserScramCredentials( ctx context.Context, upsertions []UserScramCredentialUpsertion, deletions []UserScramCredentialDeletion, options ...AlterUserScramCredentialsAdminOption) (result AlterUserScramCredentialsResult, err error)
AlterUserScramCredentials alters SASL/SCRAM credentials. The pair (user, mechanism) must be unique among upsertions and deletions.
Parameters:
Returns a map from user name to the corresponding Error, with error code ErrNoError when the request succeeded.
func (a *AdminClient) Close()
Close an AdminClient instance.
func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error)
ClusterID returns the cluster ID as reported in broker metadata.
Note on cancellation: Although the underlying C function respects the timeout, it currently cannot be manually cancelled. That means manually cancelling the context will block until the C function call returns.
Requires broker version >= 0.10.0.
func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)
ControllerID returns the broker ID of the current controller as reported in broker metadata.
Note on cancellation: Although the underlying C function respects the timeout, it currently cannot be manually cancelled. That means manually cancelling the context will block until the C function call returns.
Requires broker version >= 0.10.0.
func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error)
CreateACLs creates one or more ACL bindings.
Parameters:
Returns a slice of CreateACLResult with a ErrNoError ErrorCode when the operation was successful plus an error that is not nil for client level errors
func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error)
CreatePartitions creates additional partitions for topics.
func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error)
CreateTopics creates topics in cluster.
The list of TopicSpecification objects define the per-topic partition count, replicas, etc.
Topic creation is non-atomic and may succeed for some topics but fail for others, make sure to check the result for topic-specific errors.
Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, options ...DeleteACLsAdminOption) (result []DeleteACLsResult, err error)
DeleteACLs deletes ACL bindings matching one or more ACL binding filters.
Parameters:
Returns a slice of ACLBinding for each filter when the operation was successful plus an error that is not `nil` for client level errors
func (a *AdminClient) DeleteConsumerGroups( ctx context.Context, groups []string, options ...DeleteConsumerGroupsAdminOption) (result DeleteConsumerGroupsResult, err error)
DeleteConsumerGroups deletes a batch of consumer groups. Parameters:
Returns a DeleteConsumerGroupsResult containing a slice of ConsumerGroupResult, with group-level errors, (if any) contained inside; and an error that is not nil for client level errors.
func (a *AdminClient) DeleteRecords(ctx context.Context, recordsToDelete []TopicPartition, options ...DeleteRecordsAdminOption) (result DeleteRecordsResults, err error)
DeleteRecords deletes records (messages) in topic partitions older than the offsets provided.
Parameters:
Returns a DeleteRecordsResults, which contains a slice of DeleteRecordsResult, each representing the result for one topic partition. Individual TopicPartitions inside the DeleteRecordsResult should be checked for errors. If successful, the DeletedRecords within the DeleteRecordsResult will be non-nil, and contain the low-watermark offset (smallest available offset of all live replicas).
func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error)
DeleteTopics deletes a batch of topics.
This operation is not transactional and may succeed for a subset of topics while failing others. It may take several seconds after the DeleteTopics result returns success for all the brokers to become aware that the topics are gone. During this time, topic metadata and configuration may continue to return information about deleted topics.
Requires broker version >= 0.10.1.0
func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, options ...DescribeACLsAdminOption) (result *DescribeACLsResult, err error)
DescribeACLs matches ACL bindings by filter.
Parameters:
Returns a slice of ACLBindings when the operation was successful plus an error that is not `nil` for client level errors
func (a *AdminClient) DescribeCluster( ctx context.Context, options ...DescribeClusterAdminOption) (result DescribeClusterResult, err error)
DescribeCluster describes the cluster
Parameters:
Returns ClusterDescription, which contains current cluster ID and controller along with a slice of Nodes. It also has a slice of allowed ACLOperations.
func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error)
DescribeConfigs retrieves configuration for cluster resources.
The returned configuration includes default values, use ConfigEntryResult.IsDefault or ConfigEntryResult.Source to distinguish default values from manually configured settings.
The value of config entries where .IsSensitive is true will always be nil to avoid disclosing sensitive information, such as security settings.
Configuration entries where .IsReadOnly is true can't be modified (with AlterConfigs).
Synonym configuration entries are returned if the broker supports it (broker version >= 1.1.0). See .Synonyms.
Requires broker version >=0.11.0.0
Multiple resources and resource types may be requested, but at most one resource of type ResourceBroker is allowed per call since these resource requests must be sent to the broker specified in the resource.
func (a *AdminClient) DescribeConsumerGroups( ctx context.Context, groups []string, options ...DescribeConsumerGroupsAdminOption) (result DescribeConsumerGroupsResult, err error)
DescribeConsumerGroups describes groups from cluster as specified by the groups list.
Parameters:
Returns DescribeConsumerGroupsResult, which contains a slice of ConsumerGroupDescriptions corresponding to the input groups, plus an error that is not `nil` for client level errors. Individual ConsumerGroupDescriptions inside the slice should also be checked for errors.
func (a *AdminClient) DescribeTopics( ctx context.Context, topics TopicCollection, options ...DescribeTopicsAdminOption) (result DescribeTopicsResult, err error)
DescribeTopics describes topics from cluster as specified by the topics list.
Parameters:
Returns DescribeTopicsResult, which contains a slice of TopicDescriptions corresponding to the input topics, plus an error that is not `nil` for client level errors. Individual TopicDescriptions inside the slice should also be checked for errors. Individual TopicDescriptions also have a slice of allowed ACLOperations.
func (a *AdminClient) DescribeUserScramCredentials( ctx context.Context, users []string, options ...DescribeUserScramCredentialsAdminOption) (result DescribeUserScramCredentialsResult, err error)
DescribeUserScramCredentials describe SASL/SCRAM credentials for the specified user names.
Parameters:
Returns a map from user name to user SCRAM credentials description. Each description can have an individual error.
func (a *AdminClient) ElectLeaders(ctx context.Context, electLeaderRequest ElectLeadersRequest, options ...ElectLeadersAdminOption) (result ElectLeadersResult, err error)
ElectLeaders performs Preferred or Unclean Elections for the specified topic Partitions or for all of them.
Parameters:
Returns ElectLeadersResult, which contains a slice of TopicPartitions containing the partitions for which the leader election was performed. If we are passing partitions as nil, the broker will perform leader elections for all partitions, but the results will only contain partitions for which there was an election or resulted in an error. Individual TopicPartitions inside the ElectLeadersResult should be checked for errors. Additionally, an error that is not nil for client-level errors is returned.
func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
func (a *AdminClient) IncrementalAlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error)
IncrementalAlterConfigs alters/updates cluster resource configuration.
Updates are not transactional so they may succeed for some resources while fail for others. The configs for a particular resource are updated atomically, executing the corresponding incremental operations on the provided configurations.
Requires broker version >=2.3.0
IncrementalAlterConfigs will only change configurations for provided resources with the new configuration given.
Multiple resources and resource types may be set, but at most one resource of type ResourceBroker is allowed per call since these resource requests must be sent to the broker specified in the resource.
func (a *AdminClient) IsClosed() bool
IsClosed returns boolean representing if client is closed or not
func (a *AdminClient) ListConsumerGroupOffsets( ctx context.Context, groupsPartitions []ConsumerGroupTopicPartitions, options ...ListConsumerGroupOffsetsAdminOption) (lcgor ListConsumerGroupOffsetsResult, err error)
ListConsumerGroupOffsets fetches the offsets for topic partition(s) for consumer group(s).
Parameters:
Returns a ListConsumerGroupOffsetsResult, containing a slice of ConsumerGroupTopicPartitions corresponding to the input slice, plus an error that is not `nil` for client level errors. Individual TopicPartitions inside each of the ConsumerGroupTopicPartitions should also be checked for errors.
func (a *AdminClient) ListConsumerGroups( ctx context.Context, options ...ListConsumerGroupsAdminOption) (result ListConsumerGroupsResult, err error)
ListConsumerGroups lists the consumer groups available in the cluster.
Parameters:
Returns a ListConsumerGroupsResult, which contains a slice corresponding to each group in the cluster and a slice of errors encountered while listing. Additionally, an error that is not nil for client-level errors is returned. Both the returned error, and the errors slice should be checked.
func (a *AdminClient) ListOffsets( ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec, options ...ListOffsetsAdminOption) (result ListOffsetsResult, err error)
ListOffsets describe offsets for the specified TopicPartiton based on an OffsetSpec.
Parameters:
Returns a ListOffsetsResult. Each TopicPartition's ListOffset can have an individual error.
func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per https://tools.ietf.org/html/rfc7628#section-3.1 ); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error
SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
func (a *AdminClient) SetSaslCredentials(username, password string) error
SetSaslCredentials sets the SASL credentials used for this admin client. The new credentials will overwrite the old ones (which were set when creating the admin client or by a previous call to SetSaslCredentials). The new credentials will be used the next time the admin client needs to authenticate to a broker. This method will not disconnect existing broker connections that were established with the old credentials. This method applies only to the SASL PLAIN and SCRAM mechanisms.
func (a *AdminClient) String() string
String returns a human readable name for an AdminClient instance
AdminOption is a generic type not to be used directly.
See CreateTopicsAdminOption et.al.
type AdminOption interface {
// contains filtered or unexported methods
}
AdminOptionIncludeAuthorizedOperations decides if the broker should return authorized operations.
Default: false
Valid for DescribeConsumerGroups, DescribeTopics, DescribeCluster.
type AdminOptionIncludeAuthorizedOperations struct {
// contains filtered or unexported fields
}
func SetAdminOptionIncludeAuthorizedOperations(val bool) (ao AdminOptionIncludeAuthorizedOperations)
SetAdminOptionIncludeAuthorizedOperations decides if the broker should return authorized operations.
Default: false
Valid for DescribeConsumerGroups, DescribeTopics, DescribeCluster.
AdminOptionIsolationLevel sets the overall request IsolationLevel.
Default: `ReadUncommitted`.
Valid for ListOffsets.
type AdminOptionIsolationLevel struct {
// contains filtered or unexported fields
}
func SetAdminIsolationLevel(isolationLevel IsolationLevel) (ao AdminOptionIsolationLevel)
SetAdminIsolationLevel sets the overall IsolationLevel for a request.
Default: `ReadUncommitted`.
Valid for ListOffsets.
AdminOptionMatchConsumerGroupStates decides groups in which state(s) should be listed.
Default: nil (lists groups in all states).
Valid for ListConsumerGroups.
type AdminOptionMatchConsumerGroupStates struct {
// contains filtered or unexported fields
}
func SetAdminMatchConsumerGroupStates(val []ConsumerGroupState) (ao AdminOptionMatchConsumerGroupStates)
SetAdminMatchConsumerGroupStates sets the state(s) that must be listed.
Default: nil (lists groups in all states).
Valid for ListConsumerGroups.
AdminOptionMatchConsumerGroupTypes decides the type(s) that must be listed.
Default: nil (lists groups of all types).
Valid for ListConsumerGroups.
type AdminOptionMatchConsumerGroupTypes struct {
// contains filtered or unexported fields
}
func SetAdminMatchConsumerGroupTypes(val []ConsumerGroupType) (ao AdminOptionMatchConsumerGroupTypes)
SetAdminMatchConsumerGroupTypes set the type(s) that must be listed.
Default: nil (lists groups of all types).
Valid for ListConsumerGroups.
AdminOptionOperationTimeout sets the broker's operation timeout, such as the timeout for CreateTopics to complete the creation of topics on the controller before returning a result to the application.
CreateTopics, DeleteTopics, CreatePartitions: a value 0 will return immediately after triggering topic creation, while > 0 will wait this long for topic creation to propagate in cluster.
Default: 0 (return immediately).
Valid for CreateTopics, DeleteTopics, CreatePartitions.
type AdminOptionOperationTimeout struct {
// contains filtered or unexported fields
}
func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout)
SetAdminOperationTimeout sets the broker's operation timeout, such as the timeout for CreateTopics to complete the creation of topics on the controller before returning a result to the application.
CreateTopics, DeleteTopics, CreatePartitions: a value 0 will return immediately after triggering topic creation, while > 0 will wait this long for topic creation to propagate in cluster.
Default: 0 (return immediately).
Valid for CreateTopics, DeleteTopics, CreatePartitions.
AdminOptionRequestTimeout sets the overall request timeout, including broker lookup, request transmission, operation time on broker, and response.
Default: `socket.timeout.ms`.
Valid for all Admin API methods.
type AdminOptionRequestTimeout struct {
// contains filtered or unexported fields
}
func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout)
SetAdminRequestTimeout sets the overall request timeout, including broker lookup, request transmission, operation time on broker, and response.
Default: `socket.timeout.ms`.
Valid for all Admin API methods.
AdminOptionRequireStableOffsets decides if the broker should return stable offsets (transaction-committed).
Default: false
Valid for ListConsumerGroupOffsets.
type AdminOptionRequireStableOffsets struct {
// contains filtered or unexported fields
}
func SetAdminRequireStableOffsets(val bool) (ao AdminOptionRequireStableOffsets)
SetAdminRequireStableOffsets decides if the broker should return stable offsets (transaction-committed).
Default: false
Valid for ListConsumerGroupOffsets.
AdminOptionValidateOnly tells the broker to only validate the request, without performing the requested operation (create topics, etc).
Default: false.
Valid for CreateTopics, CreatePartitions, AlterConfigs
type AdminOptionValidateOnly struct {
// contains filtered or unexported fields
}
func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly)
SetAdminValidateOnly tells the broker to only validate the request, without performing the requested operation (create topics, etc).
Default: false.
Valid for CreateTopics, DeleteTopics, CreatePartitions, AlterConfigs
AlterConfigOpType specifies the operation to perform on the ConfigEntry for IncrementalAlterConfig
type AlterConfigOpType int
const ( // AlterConfigOpTypeSet sets/overwrites the configuration // setting. AlterConfigOpTypeSet AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET // AlterConfigOpTypeDelete sets the configuration setting // to default or NULL. AlterConfigOpTypeDelete AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE // AlterConfigOpTypeAppend appends the value to existing // configuration settings. AlterConfigOpTypeAppend AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND // AlterConfigOpTypeSubtract subtracts the value from // existing configuration settings. AlterConfigOpTypeSubtract AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT )
func (o AlterConfigOpType) String() string
String returns the human-readable representation of an AlterOperation
AlterConfigsAdminOption - see setters.
See SetAdminRequestTimeout, SetAdminValidateOnly, SetAdminIncremental.
type AlterConfigsAdminOption interface {
// contains filtered or unexported methods
}
AlterConsumerGroupOffsetsAdminOption - see setter.
See SetAdminRequestTimeout.
type AlterConsumerGroupOffsetsAdminOption interface {
// contains filtered or unexported methods
}
AlterConsumerGroupOffsetsResult represents the result of a AlterConsumerGroupOffsets operation.
type AlterConsumerGroupOffsetsResult struct { // A slice of ConsumerGroupTopicPartitions, each element represents a group's // TopicPartitions and Offsets. ConsumerGroupsTopicPartitions []ConsumerGroupTopicPartitions }
AlterOperation specifies the operation to perform on the ConfigEntry. Currently only AlterOperationSet.
type AlterOperation int
func (o AlterOperation) String() string
String returns the human-readable representation of an AlterOperation
AlterUserScramCredentialsAdminOption - see setter.
See SetAdminRequestTimeout.
type AlterUserScramCredentialsAdminOption interface {
// contains filtered or unexported methods
}
AlterUserScramCredentialsResult represents the result of a AlterUserScramCredentials call.
type AlterUserScramCredentialsResult struct { // Errors - Map from user name // to an Error, with ErrNoError code on success. Errors map[string]Error }
AssignedPartitions consumer group rebalance event: assigned partition set
type AssignedPartitions struct { Partitions []TopicPartition }
func (e AssignedPartitions) String() string
BrokerMetadata contains per-broker metadata
type BrokerMetadata struct { ID int32 Host string Port int }
ConfigEntry holds parameters for altering a resource's configuration.
type ConfigEntry struct { // Name of configuration entry, e.g., topic configuration property name. Name string // Value of configuration entry. Value string // Deprecated: Operation to perform on the entry. Operation AlterOperation // Operation to perform on the entry incrementally. IncrementalOperation AlterConfigOpType }
func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry
StringMapToConfigEntries creates a new map of ConfigEntry objects from the provided string map. The AlterOperation is set on each created entry.
func StringMapToIncrementalConfigEntries(stringMap map[string]string, operationMap map[string]AlterConfigOpType) []ConfigEntry
StringMapToIncrementalConfigEntries creates a new map of ConfigEntry objects from the provided string map an operation map. The AlterConfigOpType is set on each created entry.
func (c ConfigEntry) String() string
String returns a human-readable representation of a ConfigEntry.
ConfigEntryResult contains the result of a single configuration entry from a DescribeConfigs request.
type ConfigEntryResult struct { // Name of configuration entry, e.g., topic configuration property name. Name string // Value of configuration entry. Value string // Source indicates the configuration source. Source ConfigSource // IsReadOnly indicates whether the configuration entry can be altered. IsReadOnly bool // IsDefault indicates whether the value is at its default. IsDefault bool // IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset. IsSensitive bool // IsSynonym indicates whether the configuration entry is a synonym for another configuration property. IsSynonym bool // Synonyms contains a map of configuration entries that are synonyms to this configuration entry. Synonyms map[string]ConfigEntryResult }
func (c ConfigEntryResult) String() string
String returns a human-readable representation of a ConfigEntryResult.
ConfigMap is a map containing standard librdkafka configuration properties as documented in: https://github.com/confluentinc/librdkafka/tree/master/CONFIGURATION.md
The special property "default.topic.config" (optional) is a ConfigMap containing default topic configuration properties.
The use of "default.topic.config" is deprecated, topic configuration properties shall be specified in the standard ConfigMap. For backwards compatibility, "default.topic.config" (if supplied) takes precedence.
type ConfigMap map[string]ConfigValue
func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error)
Get finds the given key in the ConfigMap and returns its value. If the key is not found `defval` is returned. If the key is found but the type does not match that of `defval` (unless nil) an ErrInvalidArg error is returned.
func (m ConfigMap) Set(kv string) error
Set implements flag.Set (command line argument parser) as a convenience for `-X key=value` config.
func (m ConfigMap) SetKey(key string, value ConfigValue) error
SetKey sets configuration property key to value.
For user convenience a key prefixed with {topic}. will be set on the "default.topic.config" sub-map, this use is deprecated.
ConfigResource holds parameters for altering an Apache Kafka configuration resource
type ConfigResource struct { // Type of resource to set. Type ResourceType // Name of resource to set. Name string // Config entries to set. // Configuration updates are atomic, any configuration property not provided // here will be reverted (by the broker) to its default value. // Use DescribeConfigs to retrieve the list of current configuration entry values. Config []ConfigEntry }
func (c ConfigResource) String() string
String returns a human-readable representation of a ConfigResource
ConfigResourceResult provides the result for a resource from a AlterConfigs or DescribeConfigs request.
type ConfigResourceResult struct { // Type of returned result resource. Type ResourceType // Name of returned result resource. Name string // Error, if any, of returned result resource. Error Error // Config entries, if any, of returned result resource. Config map[string]ConfigEntryResult }
func (c ConfigResourceResult) String() string
String returns a human-readable representation of a ConfigResourceResult.
ConfigSource represents an Apache Kafka config source
type ConfigSource int
const ( // ConfigSourceUnknown is the default value ConfigSourceUnknown ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG // ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic ConfigSourceDynamicTopic ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG // ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker ConfigSourceDynamicBroker ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG // ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster ConfigSourceDynamicDefaultBroker ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG // ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file) ConfigSourceStaticBroker ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG // ConfigSourceDefault is built-in default configuration for configs that have a default value ConfigSourceDefault ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG )
func (t ConfigSource) String() string
String returns the human-readable representation of a ConfigSource type
ConfigValue supports the following types:
bool, int, string, any type with the standard String() interface
type ConfigValue interface{}
Consumer implements a High-level Apache Kafka Consumer instance
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer(conf *ConfigMap) (*Consumer, error)
NewConsumer creates a new high-level Consumer instance.
conf is a *ConfigMap with standard librdkafka configuration properties.
Supported special configuration properties:
go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel. If set to true the app must handle the AssignedPartitions and RevokedPartitions events and call Assign() and Unassign() respectively. go.events.channel.enable (bool, false) - [deprecated] Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. go.events.channel.size (int, 1000) - Events() channel size go.logs.channel.enable (bool, false) - Forward log to Logs() channel. go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true.
WARNING: Due to the buffering nature of channels (and queues in general) the use of the events channel risks receiving outdated events and messages. Minimizing go.events.channel.size reduces the risk and number of outdated events and messages but does not eliminate the factor completely. With a channel size of 1 at most one event or message may be outdated.
func (c *Consumer) Assign(partitions []TopicPartition) (err error)
Assign an atomic set of partitions to consume.
The .Offset field of each TopicPartition must either be set to an absolute starting offset (>= 0), or one of the logical offsets (`kafka.OffsetEnd` etc), but should typically be set to `kafka.OffsetStored` to have the consumer use the committed offset as a start position, with a fallback to `auto.offset.reset` if there is no committed offset.
This replaces the current assignment.
func (c *Consumer) Assignment() (partitions []TopicPartition, err error)
Assignment returns the current partition assignments
func (c *Consumer) AssignmentLost() bool
AssignmentLost returns true if current partition assignment has been lost. This method is only applicable for use with a subscribing consumer when handling a rebalance event or callback. Partitions that have been lost may already be owned by other members in the group and therefore commiting offsets, for example, may fail.
func (c *Consumer) Close() (err error)
Close Consumer instance. The object is no longer usable after this call.
func (c *Consumer) Commit() ([]TopicPartition, error)
Commit offsets for currently assigned partitions This is a blocking call. Returns the committed offsets on success.
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
CommitMessage commits offset based on the provided message. This is a blocking call. Returns the committed offsets on success.
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
CommitOffsets commits the provided list of offsets This is a blocking call. Returns the committed offsets on success.
func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
Committed retrieves committed offsets for the given set of partitions
func (c *Consumer) Events() chan Event
Events returns the Events channel (if enabled)
Deprecated: Events (channel based consumer) is deprecated in favour of Poll().
func (c *Consumer) GetConsumerGroupMetadata() (*ConsumerGroupMetadata, error)
GetConsumerGroupMetadata returns the consumer's current group metadata. This object should be passed to the transactional producer's SendOffsetsToTransaction() API.
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
func (c *Consumer) GetRebalanceProtocol() string
GetRebalanceProtocol returns the current consumer group rebalance protocol, which is either "EAGER" or "COOPERATIVE". If the rebalance protocol is not known in the current state an empty string is returned. Should typically only be called during rebalancing.
func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
GetWatermarkOffsets returns the cached low and high offsets for the given topic and partition. The high offset is populated on every fetch response or via calling QueryWatermarkOffsets. The low offset is populated every statistics.interval.ms if that value is set. OffsetInvalid will be returned if there is no cached offset for either value.
func (c *Consumer) IncrementalAssign(partitions []TopicPartition) (err error)
IncrementalAssign adds the specified partitions to the current set of partitions to consume.
The .Offset field of each TopicPartition must either be set to an absolute starting offset (>= 0), or one of the logical offsets (`kafka.OffsetEnd` etc), but should typically be set to `kafka.OffsetStored` to have the consumer use the committed offset as a start position, with a fallback to `auto.offset.reset` if there is no committed offset.
The new partitions must not be part of the current assignment.
func (c *Consumer) IncrementalUnassign(partitions []TopicPartition) (err error)
IncrementalUnassign removes the specified partitions from the current set of partitions to consume.
The .Offset field of the TopicPartition is ignored.
The removed partitions must be part of the current assignment.
func (c *Consumer) IsClosed() bool
IsClosed returns boolean representing if client is closed or not
func (c *Consumer) Logs() chan LogEvent
Logs returns the log channel if enabled, or nil otherwise.
func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
OffsetsForTimes looks up offsets by timestamp for the given partitions.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.
The timestamps to query are represented as `.Offset` in the `times` argument and the looked up offsets are represented as `.Offset` in the returned `offsets` list.
The function will block for at most timeoutMs milliseconds.
Duplicate Topic+Partitions are not supported. Per-partition errors may be returned in the `.Error` field.
func (c *Consumer) Pause(partitions []TopicPartition) (err error)
Pause consumption for the provided list of partitions
Note that messages already enqueued on the consumer's Event channel (if `go.events.channel.enable` has been set) will NOT be purged by this call, set `go.events.channel.size` accordingly.
func (c *Consumer) Poll(timeoutMs int) (event Event)
Poll the consumer for messages or events.
The following callbacks may be triggered:
Subscribe()'s rebalanceCb
Returns nil on timeout, else an Event
func (c *Consumer) Position(partitions []TopicPartition) (offsets []TopicPartition, err error)
Position returns the current consume position for the given partitions. Typical use is to call Assignment() to get the partition list and then pass it to Position() to get the current consume position for each of the assigned partitions. The consume position is the next message to read from the partition. i.e., the offset of the last message seen by the application + 1.
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets queries the broker for the low and high offsets for the given topic and partition.
func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error)
ReadMessage polls the consumer for a message.
This is a convenience API that wraps Poll() and only returns messages or errors. All other event types are discarded.
The call will block for at most `timeout` waiting for a new message or error. `timeout` may be set to -1 for indefinite wait.
Timeout is returned as (nil, err) where `err.(kafka.Error).IsTimeout() == true`.
Messages are returned as (msg, nil), while general errors are returned as (nil, err), and partition-specific errors are returned as (msg, err) where msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.
func (c *Consumer) Resume(partitions []TopicPartition) (err error)
Resume consumption for the provided list of partitions
func (c *Consumer) Seek(partition TopicPartition, ignoredTimeoutMs int) error
Seek seeks the given topic partitions using the offset from the TopicPartition.
The ignoredTimeoutMs parameter is ignored. Instead, this method blocks until the fetcher state is updated for the given partition with the new offset. This guarantees that no previously fetched messages for the old offset (or fetch position) will be passed to the application once this call returns. It will still take some time after the method returns until messages are fetched at the new offset.
Seek() may only be used for partitions already being consumed (through Assign() or implicitly through a self-rebalanced Subscribe()). To set the starting offset it is preferred to use Assign() and provide a starting offset for each partition.
Returns an error on failure or nil otherwise. Deprecated: Seek is deprecated in favour of SeekPartitions().
func (c *Consumer) SeekPartitions(partitions []TopicPartition) ([]TopicPartition, error)
SeekPartitions seeks the given topic partitions to the per-partition offset stored in the .Offset field of each partition.
The offset may be either absolute (>= 0) or a logical offset (e.g. OffsetEnd).
SeekPartitions() may only be used for partitions already being consumed (through Assign() or implicitly through a self-rebalanced Subscribe()). To set the starting offset it is preferred to use Assign() in a kafka.AssignedPartitions handler and provide a starting offset for each partition.
Returns an error on failure or nil otherwise. Individual partition errors should be checked in the per-partition .Error field.
func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per https://tools.ietf.org/html/rfc7628#section-3.1 ); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error
SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
func (c *Consumer) SetSaslCredentials(username, password string) error
SetSaslCredentials sets the SASL credentials used for this consumer. The new credentials will overwrite the old ones (which were set when creating the consumer or by a previous call to SetSaslCredentials). The new credentials will be used the next time the consumer needs to authenticate to a broker. This method will not disconnect existing broker connections that were established with the old credentials. This method applies only to the SASL PLAIN and SCRAM mechanisms.
func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error)
StoreMessage stores offset based on the provided message. This is a convenience method that uses StoreOffsets to do the actual work.
func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)
StoreOffsets stores the provided list of offsets that will be committed to the offset store according to `auto.commit.interval.ms` or manual offset-less Commit().
Returns the stored offsets on success. If at least one offset couldn't be stored, an error and a list of offsets is returned. Each offset can be checked for specific errors via its `.Error` member.
func (c *Consumer) String() string
Strings returns a human readable name for a Consumer instance
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
Subscribe to a single topic This replaces the current subscription
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
SubscribeTopics subscribes to the provided list of topics. This replaces the current subscription.
func (c *Consumer) Subscription() (topics []string, err error)
Subscription returns the current subscription as set by Subscribe()
func (c *Consumer) Unassign() (err error)
Unassign the current set of partitions to consume.
func (c *Consumer) Unsubscribe() (err error)
Unsubscribe from the current subscription, if any.
ConsumerGroupDescription represents the result of DescribeConsumerGroups for a single group.
type ConsumerGroupDescription struct { // Group id. GroupID string // Error, if any, of result. Check with `Error.Code() != ErrNoError`. Error Error // Is a simple consumer group. IsSimpleConsumerGroup bool // Partition assignor identifier. PartitionAssignor string // Consumer group state. State ConsumerGroupState // Consumer group coordinator (has ID == -1 if not known). Coordinator Node // Members list. Members []MemberDescription // Operations allowed for the group (nil if not available or not requested) AuthorizedOperations []ACLOperation }
ConsumerGroupListing represents the result of ListConsumerGroups for a single group.
type ConsumerGroupListing struct { // Group id. GroupID string // Is a simple consumer group. IsSimpleConsumerGroup bool // Group state. State ConsumerGroupState // Group type. Type ConsumerGroupType }
ConsumerGroupMetadata reflects the current consumer group member metadata.
type ConsumerGroupMetadata struct {
// contains filtered or unexported fields
}
func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error)
NewTestConsumerGroupMetadata creates a new consumer group metadata instance mainly for testing use. Use GetConsumerGroupMetadata() to retrieve the real metadata.
ConsumerGroupResult provides per-group operation result (error) information.
type ConsumerGroupResult struct { // Group name Group string // Error, if any, of result. Check with `Error.Code() != ErrNoError`. Error Error }
func (g ConsumerGroupResult) String() string
String returns a human-readable representation of a ConsumerGroupResult.
ConsumerGroupState represents a consumer group state
type ConsumerGroupState int
const ( // ConsumerGroupStateUnknown - Unknown ConsumerGroupState ConsumerGroupStateUnknown ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN // ConsumerGroupStatePreparingRebalance - preparing rebalance ConsumerGroupStatePreparingRebalance ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_PREPARING_REBALANCE // ConsumerGroupStateCompletingRebalance - completing rebalance ConsumerGroupStateCompletingRebalance ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_COMPLETING_REBALANCE // ConsumerGroupStateStable - stable ConsumerGroupStateStable ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_STABLE // ConsumerGroupStateDead - dead group ConsumerGroupStateDead ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_DEAD // ConsumerGroupStateEmpty - empty group ConsumerGroupStateEmpty ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY )
func ConsumerGroupStateFromString(stateString string) (ConsumerGroupState, error)
ConsumerGroupStateFromString translates a consumer group state name/string to a ConsumerGroupState value.
func (t ConsumerGroupState) String() string
String returns the human-readable representation of a consumer_group_state
ConsumerGroupTopicPartitions represents a consumer group's TopicPartitions.
type ConsumerGroupTopicPartitions struct { // Group name Group string // Partitions list Partitions []TopicPartition }
func (gtp ConsumerGroupTopicPartitions) String() string
ConsumerGroupType represents a consumer group type
type ConsumerGroupType int
const ( // ConsumerGroupTypeUnknown - Unknown ConsumerGroupType ConsumerGroupTypeUnknown ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN // ConsumerGroupTypeConsumer - Consumer ConsumerGroupType ConsumerGroupTypeConsumer ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER // ConsumerGroupTypeClassic - Classic ConsumerGroupType ConsumerGroupTypeClassic ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC )
func ConsumerGroupTypeFromString(typeString string) ConsumerGroupType
ConsumerGroupTypeFromString translates a consumer group type name/string to a ConsumerGroupType value.
func (t ConsumerGroupType) String() string
String returns the human-readable representation of a ConsumerGroupType
CreateACLResult provides create ACL error information.
type CreateACLResult struct { // Error, if any, of result. Check with `Error.Code() != ErrNoError`. Error Error }
CreateACLsAdminOption - see setter.
See SetAdminRequestTimeout
type CreateACLsAdminOption interface {
// contains filtered or unexported methods
}
CreatePartitionsAdminOption - see setters.
See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
type CreatePartitionsAdminOption interface {
// contains filtered or unexported methods
}
CreateTopicsAdminOption - see setters.
See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
type CreateTopicsAdminOption interface {
// contains filtered or unexported methods
}
DeleteACLsAdminOption - see setter.
See SetAdminRequestTimeout
type DeleteACLsAdminOption interface {
// contains filtered or unexported methods
}
DeleteACLsResult provides delete ACLs result or error information.
type DeleteACLsResult = DescribeACLsResult
DeleteConsumerGroupsAdminOption - see setters.
See SetAdminRequestTimeout.
type DeleteConsumerGroupsAdminOption interface {
// contains filtered or unexported methods
}
DeleteConsumerGroupsResult represents the result of a DeleteConsumerGroups call.
type DeleteConsumerGroupsResult struct { // Slice of ConsumerGroupResult. ConsumerGroupResults []ConsumerGroupResult }
DeleteRecordsAdminOption - see setter.
See SetAdminRequestTimeout, SetAdminOperationTimeout.
type DeleteRecordsAdminOption interface {
// contains filtered or unexported methods
}
DeleteRecordsResult represents the result of a DeleteRecords call for a single partition.
type DeleteRecordsResult struct { // One of requested partitions. // The Error field is set if any occurred for that partition. TopicPartition TopicPartition // Deleted records information, or nil if an error occurred. DeletedRecords *DeletedRecords }
DeleteRecordsResults represents the results of a DeleteRecords call.
type DeleteRecordsResults struct { // A slice of DeleteRecordsResult, one for each requested topic partition. DeleteRecordsResults []DeleteRecordsResult }
DeleteTopicsAdminOption - see setters.
See SetAdminRequestTimeout, SetAdminOperationTimeout.
type DeleteTopicsAdminOption interface {
// contains filtered or unexported methods
}
DeletedRecords contains information about deleted records of a single partition
type DeletedRecords struct { // Low-watermark offset after deletion LowWatermark Offset }
DescribeACLsAdminOption - see setter.
See SetAdminRequestTimeout
type DescribeACLsAdminOption interface {
// contains filtered or unexported methods
}
DescribeACLsResult provides describe ACLs result or error information.
type DescribeACLsResult struct { // Slice of ACL bindings matching the provided filter ACLBindings ACLBindings // Error, if any, of result. Check with `Error.Code() != ErrNoError`. Error Error }
DescribeClusterAdminOption - see setter.
See SetAdminRequestTimeout, SetAdminOptionIncludeAuthorizedOperations.
type DescribeClusterAdminOption interface {
// contains filtered or unexported methods
}
DescribeClusterResult represents the result of DescribeCluster.
type DescribeClusterResult struct { // Cluster id for the cluster (always available if broker version >= 0.10.1.0, otherwise nil). ClusterID *string // Current controller broker for the cluster (nil if there is none). Controller *Node // List of brokers in the cluster. Nodes []Node // Operations allowed for the cluster (nil if not available or not requested). AuthorizedOperations []ACLOperation }
DescribeConfigsAdminOption - see setters.
See SetAdminRequestTimeout.
type DescribeConfigsAdminOption interface {
// contains filtered or unexported methods
}
DescribeConsumerGroupsAdminOption - see setter.
See SetAdminRequestTimeout, SetAdminOptionIncludeAuthorizedOperations.
type DescribeConsumerGroupsAdminOption interface {
// contains filtered or unexported methods
}
DescribeConsumerGroupsResult represents the result of a DescribeConsumerGroups call.
type DescribeConsumerGroupsResult struct { // Slice of ConsumerGroupDescription. ConsumerGroupDescriptions []ConsumerGroupDescription }
DescribeTopicsAdminOption - see setter.
See SetAdminRequestTimeout, SetAdminOptionIncludeAuthorizedOperations.
type DescribeTopicsAdminOption interface {
// contains filtered or unexported methods
}
DescribeTopicsResult represents the result of a DescribeTopics call.
type DescribeTopicsResult struct { // Slice of TopicDescription. TopicDescriptions []TopicDescription }
DescribeUserScramCredentialsAdminOption - see setter.
See SetAdminRequestTimeout.
type DescribeUserScramCredentialsAdminOption interface {
// contains filtered or unexported methods
}
DescribeUserScramCredentialsResult represents the result of a DescribeUserScramCredentials call.
type DescribeUserScramCredentialsResult struct { // Descriptions - Map from user name // to UserScramCredentialsDescription Descriptions map[string]UserScramCredentialsDescription }
ElectLeadersAdminOption - see setter.
See SetAdminRequestTimeout, SetAdminOperationTimeout.
type ElectLeadersAdminOption interface {
// contains filtered or unexported methods
}
ElectLeadersRequest holds parameters for the type of election to be performed and the topic partitions for which election has to be performed
type ElectLeadersRequest struct {
// contains filtered or unexported fields
}
func NewElectLeadersRequest(electionType ElectionType, partitions []TopicPartition) ElectLeadersRequest
NewElectLeadersRequest creates a new ElectLeadersRequest with the given election type and topic partitions
ElectLeadersResult holds the result of the election performed
type ElectLeadersResult struct { // TopicPartitions for which election has been performed and the per-partition error, if any // that occurred while running the election for the specific TopicPartition. TopicPartitions []TopicPartition }
ElectionType represents the type of election to be performed
type ElectionType int
const ( // ElectionTypePreferred - Preferred election type ElectionTypePreferred ElectionType = C.RD_KAFKA_ELECTION_TYPE_PREFERRED // ElectionTypeUnclean - Unclean election type ElectionTypeUnclean ElectionType = C.RD_KAFKA_ELECTION_TYPE_UNCLEAN )
func ElectionTypeFromString(electionTypeString string) (ElectionType, error)
ElectionTypeFromString translates an election type name to an ElectionType value.
Error provides a Kafka-specific error container
type Error struct {
// contains filtered or unexported fields
}
func NewError(code ErrorCode, str string, fatal bool) (err Error)
NewError creates a new Error.
func (e Error) Code() ErrorCode
Code returns the ErrorCode of an Error
func (e Error) Error() string
Error returns a human readable representation of an Error Same as Error.String()
func (e Error) IsFatal() bool
IsFatal returns true if the error is a fatal error. A fatal error indicates the client instance is no longer operable and should be terminated. Typical causes include non-recoverable idempotent producer errors.
func (e Error) IsRetriable() bool
IsRetriable returns true if the operation that caused this error may be retried. This flag is currently only set by the Transactional producer API.
func (e Error) IsTimeout() bool
IsTimeout returns true if the error is a timeout error. A timeout error indicates that the operation timed out locally.
func (e Error) String() string
String returns a human readable representation of an Error
func (e Error) TxnRequiresAbort() bool
TxnRequiresAbort returns true if the error is an abortable transaction error that requires the application to abort the current transaction with AbortTransaction() and start a new transaction with BeginTransaction() if it wishes to proceed with transactional operations. This flag is only set by the Transactional producer API.
ErrorCode is the integer representation of local and broker error codes
type ErrorCode int
const ( // ErrBadMsg Local: Bad message format ErrBadMsg ErrorCode = C.RD_KAFKA_RESP_ERR__BAD_MSG // ErrBadCompression Local: Invalid compressed data ErrBadCompression ErrorCode = C.RD_KAFKA_RESP_ERR__BAD_COMPRESSION // ErrDestroy Local: Broker handle destroyed ErrDestroy ErrorCode = C.RD_KAFKA_RESP_ERR__DESTROY // ErrFail Local: Communication failure with broker ErrFail ErrorCode = C.RD_KAFKA_RESP_ERR__FAIL // ErrTransport Local: Broker transport failure ErrTransport ErrorCode = C.RD_KAFKA_RESP_ERR__TRANSPORT // ErrCritSysResource Local: Critical system resource failure ErrCritSysResource ErrorCode = C.RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE // ErrResolve Local: Host resolution failure ErrResolve ErrorCode = C.RD_KAFKA_RESP_ERR__RESOLVE // ErrMsgTimedOut Local: Message timed out ErrMsgTimedOut ErrorCode = C.RD_KAFKA_RESP_ERR__MSG_TIMED_OUT // ErrPartitionEOF Broker: No more messages ErrPartitionEOF ErrorCode = C.RD_KAFKA_RESP_ERR__PARTITION_EOF // ErrUnknownPartition Local: Unknown partition ErrUnknownPartition ErrorCode = C.RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION // ErrFs Local: File or filesystem error ErrFs ErrorCode = C.RD_KAFKA_RESP_ERR__FS // ErrUnknownTopic Local: Unknown topic ErrUnknownTopic ErrorCode = C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC // ErrAllBrokersDown Local: All broker connections are down ErrAllBrokersDown ErrorCode = C.RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN // ErrInvalidArg Local: Invalid argument or configuration ErrInvalidArg ErrorCode = C.RD_KAFKA_RESP_ERR__INVALID_ARG // ErrTimedOut Local: Timed out ErrTimedOut ErrorCode = C.RD_KAFKA_RESP_ERR__TIMED_OUT // ErrQueueFull Local: Queue full ErrQueueFull ErrorCode = C.RD_KAFKA_RESP_ERR__QUEUE_FULL // ErrIsrInsuff Local: ISR count insufficient ErrIsrInsuff ErrorCode = C.RD_KAFKA_RESP_ERR__ISR_INSUFF // ErrNodeUpdate Local: Broker node update ErrNodeUpdate ErrorCode = C.RD_KAFKA_RESP_ERR__NODE_UPDATE // ErrSsl Local: SSL error ErrSsl ErrorCode = C.RD_KAFKA_RESP_ERR__SSL // ErrWaitCoord Local: Waiting for coordinator ErrWaitCoord ErrorCode = C.RD_KAFKA_RESP_ERR__WAIT_COORD // ErrUnknownGroup Local: Unknown group ErrUnknownGroup ErrorCode = C.RD_KAFKA_RESP_ERR__UNKNOWN_GROUP // ErrInProgress Local: Operation in progress ErrInProgress ErrorCode = C.RD_KAFKA_RESP_ERR__IN_PROGRESS // ErrPrevInProgress Local: Previous operation in progress ErrPrevInProgress ErrorCode = C.RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS // ErrExistingSubscription Local: Existing subscription ErrExistingSubscription ErrorCode = C.RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION // ErrAssignPartitions Local: Assign partitions ErrAssignPartitions ErrorCode = C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS // ErrRevokePartitions Local: Revoke partitions ErrRevokePartitions ErrorCode = C.RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS // ErrConflict Local: Conflicting use ErrConflict ErrorCode = C.RD_KAFKA_RESP_ERR__CONFLICT // ErrState Local: Erroneous state ErrState ErrorCode = C.RD_KAFKA_RESP_ERR__STATE // ErrUnknownProtocol Local: Unknown protocol ErrUnknownProtocol ErrorCode = C.RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL // ErrNotImplemented Local: Not implemented ErrNotImplemented ErrorCode = C.RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED // ErrAuthentication Local: Authentication failure ErrAuthentication ErrorCode = C.RD_KAFKA_RESP_ERR__AUTHENTICATION // ErrNoOffset Local: No offset stored ErrNoOffset ErrorCode = C.RD_KAFKA_RESP_ERR__NO_OFFSET // ErrOutdated Local: Outdated ErrOutdated ErrorCode = C.RD_KAFKA_RESP_ERR__OUTDATED // ErrTimedOutQueue Local: Timed out in queue ErrTimedOutQueue ErrorCode = C.RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE // ErrUnsupportedFeature Local: Required feature not supported by broker ErrUnsupportedFeature ErrorCode = C.RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE // ErrWaitCache Local: Awaiting cache update ErrWaitCache ErrorCode = C.RD_KAFKA_RESP_ERR__WAIT_CACHE // ErrIntr Local: Operation interrupted ErrIntr ErrorCode = C.RD_KAFKA_RESP_ERR__INTR // ErrKeySerialization Local: Key serialization error ErrKeySerialization ErrorCode = C.RD_KAFKA_RESP_ERR__KEY_SERIALIZATION // ErrValueSerialization Local: Value serialization error ErrValueSerialization ErrorCode = C.RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION // ErrKeyDeserialization Local: Key deserialization error ErrKeyDeserialization ErrorCode = C.RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION // ErrValueDeserialization Local: Value deserialization error ErrValueDeserialization ErrorCode = C.RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION // ErrPartial Local: Partial response ErrPartial ErrorCode = C.RD_KAFKA_RESP_ERR__PARTIAL // ErrReadOnly Local: Read-only object ErrReadOnly ErrorCode = C.RD_KAFKA_RESP_ERR__READ_ONLY // ErrNoent Local: No such entry ErrNoent ErrorCode = C.RD_KAFKA_RESP_ERR__NOENT // ErrUnderflow Local: Read underflow ErrUnderflow ErrorCode = C.RD_KAFKA_RESP_ERR__UNDERFLOW // ErrInvalidType Local: Invalid type ErrInvalidType ErrorCode = C.RD_KAFKA_RESP_ERR__INVALID_TYPE // ErrRetry Local: Retry operation ErrRetry ErrorCode = C.RD_KAFKA_RESP_ERR__RETRY // ErrPurgeQueue Local: Purged in queue ErrPurgeQueue ErrorCode = C.RD_KAFKA_RESP_ERR__PURGE_QUEUE // ErrPurgeInflight Local: Purged in flight ErrPurgeInflight ErrorCode = C.RD_KAFKA_RESP_ERR__PURGE_INFLIGHT // ErrFatal Local: Fatal error ErrFatal ErrorCode = C.RD_KAFKA_RESP_ERR__FATAL // ErrInconsistent Local: Inconsistent state ErrInconsistent ErrorCode = C.RD_KAFKA_RESP_ERR__INCONSISTENT // ErrGaplessGuarantee Local: Gap-less ordering would not be guaranteed if proceeding ErrGaplessGuarantee ErrorCode = C.RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE // ErrMaxPollExceeded Local: Maximum application poll interval (max.poll.interval.ms) exceeded ErrMaxPollExceeded ErrorCode = C.RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED // ErrUnknownBroker Local: Unknown broker ErrUnknownBroker ErrorCode = C.RD_KAFKA_RESP_ERR__UNKNOWN_BROKER // ErrNotConfigured Local: Functionality not configured ErrNotConfigured ErrorCode = C.RD_KAFKA_RESP_ERR__NOT_CONFIGURED // ErrFenced Local: This instance has been fenced by a newer instance ErrFenced ErrorCode = C.RD_KAFKA_RESP_ERR__FENCED // ErrApplication Local: Application generated error ErrApplication ErrorCode = C.RD_KAFKA_RESP_ERR__APPLICATION // ErrAssignmentLost Local: Group partition assignment lost ErrAssignmentLost ErrorCode = C.RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST // ErrNoop Local: No operation performed ErrNoop ErrorCode = C.RD_KAFKA_RESP_ERR__NOOP // ErrAutoOffsetReset Local: No offset to automatically reset to ErrAutoOffsetReset ErrorCode = C.RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET // ErrLogTruncation Local: Partition log truncation detected ErrLogTruncation ErrorCode = C.RD_KAFKA_RESP_ERR__LOG_TRUNCATION // ErrInvalidDifferentRecord Local: an invalid record in the same batch caused the failure of this message too. ErrInvalidDifferentRecord ErrorCode = C.RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD // ErrUnknown Unknown broker error ErrUnknown ErrorCode = C.RD_KAFKA_RESP_ERR_UNKNOWN // ErrNoError Success ErrNoError ErrorCode = C.RD_KAFKA_RESP_ERR_NO_ERROR // ErrOffsetOutOfRange Broker: Offset out of range ErrOffsetOutOfRange ErrorCode = C.RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE // ErrInvalidMsg Broker: Invalid message ErrInvalidMsg ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_MSG // ErrUnknownTopicOrPart Broker: Unknown topic or partition ErrUnknownTopicOrPart ErrorCode = C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART // ErrInvalidMsgSize Broker: Invalid message size ErrInvalidMsgSize ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE // ErrLeaderNotAvailable Broker: Leader not available ErrLeaderNotAvailable ErrorCode = C.RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE // ErrNotLeaderForPartition Broker: Not leader for partition ErrNotLeaderForPartition ErrorCode = C.RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION // ErrRequestTimedOut Broker: Request timed out ErrRequestTimedOut ErrorCode = C.RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT // ErrBrokerNotAvailable Broker: Broker not available ErrBrokerNotAvailable ErrorCode = C.RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE // ErrReplicaNotAvailable Broker: Replica not available ErrReplicaNotAvailable ErrorCode = C.RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE // ErrMsgSizeTooLarge Broker: Message size too large ErrMsgSizeTooLarge ErrorCode = C.RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE // ErrStaleCtrlEpoch Broker: StaleControllerEpochCode ErrStaleCtrlEpoch ErrorCode = C.RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH // ErrOffsetMetadataTooLarge Broker: Offset metadata string too large ErrOffsetMetadataTooLarge ErrorCode = C.RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE // ErrNetworkException Broker: Broker disconnected before response received ErrNetworkException ErrorCode = C.RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION // ErrCoordinatorLoadInProgress Broker: Coordinator load in progress ErrCoordinatorLoadInProgress ErrorCode = C.RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS // ErrCoordinatorNotAvailable Broker: Coordinator not available ErrCoordinatorNotAvailable ErrorCode = C.RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE // ErrNotCoordinator Broker: Not coordinator ErrNotCoordinator ErrorCode = C.RD_KAFKA_RESP_ERR_NOT_COORDINATOR // ErrTopicException Broker: Invalid topic ErrTopicException ErrorCode = C.RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION // ErrRecordListTooLarge Broker: Message batch larger than configured server segment size ErrRecordListTooLarge ErrorCode = C.RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE // ErrNotEnoughReplicas Broker: Not enough in-sync replicas ErrNotEnoughReplicas ErrorCode = C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS // ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas ErrNotEnoughReplicasAfterAppend ErrorCode = C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND // ErrInvalidRequiredAcks Broker: Invalid required acks value ErrInvalidRequiredAcks ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS // ErrIllegalGeneration Broker: Specified group generation id is not valid ErrIllegalGeneration ErrorCode = C.RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION // ErrInconsistentGroupProtocol Broker: Inconsistent group protocol ErrInconsistentGroupProtocol ErrorCode = C.RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL // ErrInvalidGroupID Broker: Invalid group.id ErrInvalidGroupID ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_GROUP_ID // ErrUnknownMemberID Broker: Unknown member ErrUnknownMemberID ErrorCode = C.RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID // ErrInvalidSessionTimeout Broker: Invalid session timeout ErrInvalidSessionTimeout ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT // ErrRebalanceInProgress Broker: Group rebalance in progress ErrRebalanceInProgress ErrorCode = C.RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS // ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid ErrInvalidCommitOffsetSize ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE // ErrTopicAuthorizationFailed Broker: Topic authorization failed ErrTopicAuthorizationFailed ErrorCode = C.RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED // ErrGroupAuthorizationFailed Broker: Group authorization failed ErrGroupAuthorizationFailed ErrorCode = C.RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED // ErrClusterAuthorizationFailed Broker: Cluster authorization failed ErrClusterAuthorizationFailed ErrorCode = C.RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED // ErrInvalidTimestamp Broker: Invalid timestamp ErrInvalidTimestamp ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP // ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism ErrUnsupportedSaslMechanism ErrorCode = C.RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM // ErrIllegalSaslState Broker: Request not valid in current SASL state ErrIllegalSaslState ErrorCode = C.RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE // ErrUnsupportedVersion Broker: API version not supported ErrUnsupportedVersion ErrorCode = C.RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION // ErrTopicAlreadyExists Broker: Topic already exists ErrTopicAlreadyExists ErrorCode = C.RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS // ErrInvalidPartitions Broker: Invalid number of partitions ErrInvalidPartitions ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_PARTITIONS // ErrInvalidReplicationFactor Broker: Invalid replication factor ErrInvalidReplicationFactor ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR // ErrInvalidReplicaAssignment Broker: Invalid replica assignment ErrInvalidReplicaAssignment ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT // ErrInvalidConfig Broker: Configuration is invalid ErrInvalidConfig ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_CONFIG // ErrNotController Broker: Not controller for cluster ErrNotController ErrorCode = C.RD_KAFKA_RESP_ERR_NOT_CONTROLLER // ErrInvalidRequest Broker: Invalid request ErrInvalidRequest ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_REQUEST // ErrUnsupportedForMessageFormat Broker: Message format on broker does not support request ErrUnsupportedForMessageFormat ErrorCode = C.RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT // ErrPolicyViolation Broker: Policy violation ErrPolicyViolation ErrorCode = C.RD_KAFKA_RESP_ERR_POLICY_VIOLATION // ErrOutOfOrderSequenceNumber Broker: Broker received an out of order sequence number ErrOutOfOrderSequenceNumber ErrorCode = C.RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER // ErrDuplicateSequenceNumber Broker: Broker received a duplicate sequence number ErrDuplicateSequenceNumber ErrorCode = C.RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER // ErrInvalidProducerEpoch Broker: Producer attempted an operation with an old epoch ErrInvalidProducerEpoch ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH // ErrInvalidTxnState Broker: Producer attempted a transactional operation in an invalid state ErrInvalidTxnState ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_TXN_STATE // ErrInvalidProducerIDMapping Broker: Producer attempted to use a producer id which is not currently assigned to its transactional id ErrInvalidProducerIDMapping ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING // ErrInvalidTransactionTimeout Broker: Transaction timeout is larger than the maximum value allowed by the broker's max.transaction.timeout.ms ErrInvalidTransactionTimeout ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT // ErrConcurrentTransactions Broker: Producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing ErrConcurrentTransactions ErrorCode = C.RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS // ErrTransactionCoordinatorFenced Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer ErrTransactionCoordinatorFenced ErrorCode = C.RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED // ErrTransactionalIDAuthorizationFailed Broker: Transactional Id authorization failed ErrTransactionalIDAuthorizationFailed ErrorCode = C.RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED // ErrSecurityDisabled Broker: Security features are disabled ErrSecurityDisabled ErrorCode = C.RD_KAFKA_RESP_ERR_SECURITY_DISABLED // ErrOperationNotAttempted Broker: Operation not attempted ErrOperationNotAttempted ErrorCode = C.RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED // ErrKafkaStorageError Broker: Disk error when trying to access log file on disk ErrKafkaStorageError ErrorCode = C.RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR // ErrLogDirNotFound Broker: The user-specified log directory is not found in the broker config ErrLogDirNotFound ErrorCode = C.RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND // ErrSaslAuthenticationFailed Broker: SASL Authentication failed ErrSaslAuthenticationFailed ErrorCode = C.RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED // ErrUnknownProducerID Broker: Unknown Producer Id ErrUnknownProducerID ErrorCode = C.RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID // ErrReassignmentInProgress Broker: Partition reassignment is in progress ErrReassignmentInProgress ErrorCode = C.RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS // ErrDelegationTokenAuthDisabled Broker: Delegation Token feature is not enabled ErrDelegationTokenAuthDisabled ErrorCode = C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED // ErrDelegationTokenNotFound Broker: Delegation Token is not found on server ErrDelegationTokenNotFound ErrorCode = C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND // ErrDelegationTokenOwnerMismatch Broker: Specified Principal is not valid Owner/Renewer ErrDelegationTokenOwnerMismatch ErrorCode = C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH // ErrDelegationTokenRequestNotAllowed Broker: Delegation Token requests are not allowed on this connection ErrDelegationTokenRequestNotAllowed ErrorCode = C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED // ErrDelegationTokenAuthorizationFailed Broker: Delegation Token authorization failed ErrDelegationTokenAuthorizationFailed ErrorCode = C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED // ErrDelegationTokenExpired Broker: Delegation Token is expired ErrDelegationTokenExpired ErrorCode = C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED // ErrInvalidPrincipalType Broker: Supplied principalType is not supported ErrInvalidPrincipalType ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE // ErrNonEmptyGroup Broker: The group is not empty ErrNonEmptyGroup ErrorCode = C.RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP // ErrGroupIDNotFound Broker: The group id does not exist ErrGroupIDNotFound ErrorCode = C.RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND // ErrFetchSessionIDNotFound Broker: The fetch session ID was not found ErrFetchSessionIDNotFound ErrorCode = C.RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND // ErrInvalidFetchSessionEpoch Broker: The fetch session epoch is invalid ErrInvalidFetchSessionEpoch ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH // ErrListenerNotFound Broker: No matching listener ErrListenerNotFound ErrorCode = C.RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND // ErrTopicDeletionDisabled Broker: Topic deletion is disabled ErrTopicDeletionDisabled ErrorCode = C.RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED // ErrFencedLeaderEpoch Broker: Leader epoch is older than broker epoch ErrFencedLeaderEpoch ErrorCode = C.RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH // ErrUnknownLeaderEpoch Broker: Leader epoch is newer than broker epoch ErrUnknownLeaderEpoch ErrorCode = C.RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH // ErrUnsupportedCompressionType Broker: Unsupported compression type ErrUnsupportedCompressionType ErrorCode = C.RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE // ErrStaleBrokerEpoch Broker: Broker epoch has changed ErrStaleBrokerEpoch ErrorCode = C.RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH // ErrOffsetNotAvailable Broker: Leader high watermark is not caught up ErrOffsetNotAvailable ErrorCode = C.RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE // ErrMemberIDRequired Broker: Group member needs a valid member ID ErrMemberIDRequired ErrorCode = C.RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED // ErrPreferredLeaderNotAvailable Broker: Preferred leader was not available ErrPreferredLeaderNotAvailable ErrorCode = C.RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE // ErrGroupMaxSizeReached Broker: Consumer group has reached maximum size ErrGroupMaxSizeReached ErrorCode = C.RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED // ErrFencedInstanceID Broker: Static consumer fenced by other consumer with same group.instance.id ErrFencedInstanceID ErrorCode = C.RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID // ErrEligibleLeadersNotAvailable Broker: Eligible partition leaders are not available ErrEligibleLeadersNotAvailable ErrorCode = C.RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE // ErrElectionNotNeeded Broker: Leader election not needed for topic partition ErrElectionNotNeeded ErrorCode = C.RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED // ErrNoReassignmentInProgress Broker: No partition reassignment is in progress ErrNoReassignmentInProgress ErrorCode = C.RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS // ErrGroupSubscribedToTopic Broker: Deleting offsets of a topic while the consumer group is subscribed to it ErrGroupSubscribedToTopic ErrorCode = C.RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC // ErrInvalidRecord Broker: Broker failed to validate record ErrInvalidRecord ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_RECORD // ErrUnstableOffsetCommit Broker: There are unstable offsets that need to be cleared ErrUnstableOffsetCommit ErrorCode = C.RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT // ErrThrottlingQuotaExceeded Broker: Throttling quota has been exceeded ErrThrottlingQuotaExceeded ErrorCode = C.RD_KAFKA_RESP_ERR_THROTTLING_QUOTA_EXCEEDED // ErrProducerFenced Broker: There is a newer producer with the same transactionalId which fences the current one ErrProducerFenced ErrorCode = C.RD_KAFKA_RESP_ERR_PRODUCER_FENCED // ErrResourceNotFound Broker: Request illegally referred to resource that does not exist ErrResourceNotFound ErrorCode = C.RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND // ErrDuplicateResource Broker: Request illegally referred to the same resource twice ErrDuplicateResource ErrorCode = C.RD_KAFKA_RESP_ERR_DUPLICATE_RESOURCE // ErrUnacceptableCredential Broker: Requested credential would not meet criteria for acceptability ErrUnacceptableCredential ErrorCode = C.RD_KAFKA_RESP_ERR_UNACCEPTABLE_CREDENTIAL // ErrInconsistentVoterSet Broker: Indicates that the either the sender or recipient of a voter-only request is not one of the expected voters ErrInconsistentVoterSet ErrorCode = C.RD_KAFKA_RESP_ERR_INCONSISTENT_VOTER_SET // ErrInvalidUpdateVersion Broker: Invalid update version ErrInvalidUpdateVersion ErrorCode = C.RD_KAFKA_RESP_ERR_INVALID_UPDATE_VERSION // ErrFeatureUpdateFailed Broker: Unable to update finalized features due to server error ErrFeatureUpdateFailed ErrorCode = C.RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED // ErrPrincipalDeserializationFailure Broker: Request principal deserialization failed during forwarding ErrPrincipalDeserializationFailure ErrorCode = C.RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE // ErrUnknownTopicID Broker: Unknown topic id ErrUnknownTopicID ErrorCode = C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID // ErrFencedMemberEpoch Broker: The member epoch is fenced by the group coordinator ErrFencedMemberEpoch ErrorCode = C.RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH // ErrUnreleasedInstanceID Broker: The instance ID is still used by another member in the consumer group ErrUnreleasedInstanceID ErrorCode = C.RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID // ErrUnsupportedAssignor Broker: The assignor or its version range is not supported by the consumer group ErrUnsupportedAssignor ErrorCode = C.RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR // ErrStaleMemberEpoch Broker: The member epoch is stale ErrStaleMemberEpoch ErrorCode = C.RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH // ErrUnknownSubscriptionID Broker: Client sent a push telemetry request with an invalid or outdated subscription ID ErrUnknownSubscriptionID ErrorCode = C.RD_KAFKA_RESP_ERR_UNKNOWN_SUBSCRIPTION_ID // ErrTelemetryTooLarge Broker: Client sent a push telemetry request larger than the maximum size the broker will accept ErrTelemetryTooLarge ErrorCode = C.RD_KAFKA_RESP_ERR_TELEMETRY_TOO_LARGE )
func (c ErrorCode) String() string
String returns a human readable representation of an error code
Event generic interface
type Event interface { // String returns a human-readable representation of the event String() string }
Handle represents a generic client handle containing common parts for both Producer and Consumer.
type Handle interface { // SetOAuthBearerToken sets the the data to be transmitted // to a broker during SASL/OAUTHBEARER authentication. It will return nil // on success, otherwise an error if: // 1) the token data is invalid (meaning an expiration time in the past // or either a token value or an extension key or value that does not meet // the regular expression requirements as per // https://tools.ietf.org/html/rfc7628#section-3.1); // 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; // 3) SASL/OAUTHBEARER is supported but is not configured as the client's // authentication mechanism. SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error // SetOAuthBearerTokenFailure sets the error message describing why token // retrieval/setting failed; it also schedules a new token refresh event for 10 // seconds later so the attempt may be retried. It will return nil on // success, otherwise an error if: // 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; // 2) SASL/OAUTHBEARER is supported but is not configured as the client's // authentication mechanism. SetOAuthBearerTokenFailure(errstr string) error // IsClosed() returns the bool to check if the client is closed IsClosed() bool // contains filtered or unexported methods }
Header represents a single Kafka message header.
Message headers are made up of a list of Header elements, retaining their original insert order and allowing for duplicate Keys.
Key is a human readable string identifying the header. Value is the key's binary value, Kafka does not put any restrictions on the format of of the Value but it should be made relatively compact. The value may be a byte array, empty, or nil.
NOTE: Message headers are not available on producer delivery report messages.
type Header struct { Key string // Header name (utf-8 string) Value []byte // Header value (nil, empty, or binary) }
func (h Header) String() string
String returns the Header Key and data in a human representable possibly truncated form suitable for displaying to the user.
IsolationLevel is a type which is used for AdminOptions to set the IsolationLevel.
type IsolationLevel int
const ( // IsolationLevelReadUncommitted - read uncommitted isolation level IsolationLevelReadUncommitted IsolationLevel = C.RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED // IsolationLevelReadCommitted - read committed isolation level IsolationLevelReadCommitted IsolationLevel = C.RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED )
ListConsumerGroupOffsetsAdminOption - see setter.
See SetAdminRequestTimeout, SetAdminRequireStableOffsets.
type ListConsumerGroupOffsetsAdminOption interface {
// contains filtered or unexported methods
}
ListConsumerGroupOffsetsResult represents the result of a ListConsumerGroupOffsets operation.
type ListConsumerGroupOffsetsResult struct { // A slice of ConsumerGroupTopicPartitions, each element represents a group's // TopicPartitions and Offsets. ConsumerGroupsTopicPartitions []ConsumerGroupTopicPartitions }
ListConsumerGroupsAdminOption - see setter.
See SetAdminRequestTimeout, SetAdminMatchConsumerGroupStates, SetAdminMatchConsumerGroupTypes.
type ListConsumerGroupsAdminOption interface {
// contains filtered or unexported methods
}
ListConsumerGroupsResult represents ListConsumerGroups results and errors.
type ListConsumerGroupsResult struct { // List of valid ConsumerGroupListings. Valid []ConsumerGroupListing // List of errors. Errors []error }
ListOffsetsAdminOption - see setter.
See SetAdminRequestTimeout, SetAdminIsolationLevel.
type ListOffsetsAdminOption interface {
// contains filtered or unexported methods
}
ListOffsetsResult holds the map of TopicPartition to ListOffsetsResultInfo for a request.
type ListOffsetsResult struct { ResultInfos map[TopicPartition]ListOffsetsResultInfo }
ListOffsetsResultInfo describes the result of ListOffsets request for a Topic Partition.
type ListOffsetsResultInfo struct { Offset Offset Timestamp int64 LeaderEpoch *int32 Error Error }
LogEvent represent the log from librdkafka internal log queue
type LogEvent struct { Name string // Name of client instance Tag string // Log tag that provides context to the log Message (e.g., "METADATA" or "GRPCOORD") Message string // Log message Level int // Log syslog level, lower is more critical. Timestamp time.Time // Log timestamp }
func (logEvent LogEvent) String() string
MemberAssignment represents the assignment of a consumer group member.
type MemberAssignment struct { // Partitions assigned to current member. TopicPartitions []TopicPartition }
MemberDescription represents the description of a consumer group member.
type MemberDescription struct { // Client id. ClientID string // Group instance id. GroupInstanceID string // Consumer id. ConsumerID string // Group member host. Host string // Member assignment. Assignment MemberAssignment }
Message represents a Kafka message
type Message struct { TopicPartition TopicPartition Value []byte Key []byte Timestamp time.Time TimestampType TimestampType Opaque interface{} Headers []Header LeaderEpoch *int32 // Deprecated: LeaderEpoch or nil if not available. Use m.TopicPartition.LeaderEpoch instead. }
func (m *Message) String() string
String returns a human readable representation of a Message. Key and payload are not represented.
Metadata contains broker and topic metadata for all (matching) topics
type Metadata struct { Brokers []BrokerMetadata Topics map[string]TopicMetadata OriginatingBroker BrokerMetadata }
MockCluster represents a Kafka mock cluster instance which can be used for testing.
type MockCluster struct {
// contains filtered or unexported fields
}
func NewMockCluster(brokerCount int) (*MockCluster, error)
NewMockCluster provides a mock Kafka cluster with a configurable number of brokers that support a reasonable subset of Kafka protocol operations, error injection, etc.
The broker ids will start at 1 up to and including brokerCount.
Mock clusters provide localhost listeners that can be used as the bootstrap servers by multiple Kafka client instances.
Currently supported functionality: - Producer - Idempotent Producer - Transactional Producer - Low-level consumer - High-level balanced consumer groups with offset commits - Topic Metadata and auto creation
Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL.
func (mc *MockCluster) BootstrapServers() string
BootstrapServers returns the bootstrap.servers property for this MockCluster
func (mc *MockCluster) Close()
Close and destroy the MockCluster
func (mc *MockCluster) CreateTopic(topic string, partitions, replicationFactor int) error
CreateTopic creates a topic without having to use a producer
func (mc *MockCluster) SetBrokerDown(brokerID int) error
SetBrokerDown disconnects the broker and disallows any new connections. This does NOT trigger leader change. Use brokerID -1 for all brokers, or >= 0 for a specific broker.
func (mc *MockCluster) SetBrokerUp(brokerID int) error
SetBrokerUp makes the broker accept connections again. This does NOT trigger leader change. Use brokerID -1 for all brokers, or >= 0 for a specific broker.
func (mc *MockCluster) SetRoundtripDuration(brokerID int, duration time.Duration) error
SetRoundtripDuration sets the broker round-trip-time delay for the given broker. Use brokerID -1 for all brokers, or >= 0 for a specific broker.
Node represents a Kafka broker.
type Node struct { // Node id. ID int // Node host. Host string // Node port. Port int // Node rack (may be nil) Rack *string }
func (n Node) String() string
OAuthBearerToken represents the data to be transmitted to a broker during SASL/OAUTHBEARER authentication.
type OAuthBearerToken struct { // Token value, often (but not necessarily) a JWS compact serialization // as per https://tools.ietf.org/html/rfc7515#section-3.1; it must meet // the regular expression for a SASL/OAUTHBEARER value defined at // https://tools.ietf.org/html/rfc7628#section-3.1 TokenValue string // Metadata about the token indicating when it expires (local time); // it must represent a time in the future Expiration time.Time // Metadata about the token indicating the Kafka principal name // to which it applies (for example, "admin") Principal string // SASL extensions, if any, to be communicated to the broker during // authentication (all keys and values of which must meet the regular // expressions defined at https://tools.ietf.org/html/rfc7628#section-3.1, // and it must not contain the reserved "auth" key) Extensions map[string]string }
OAuthBearerTokenRefresh indicates token refresh is required
type OAuthBearerTokenRefresh struct { // Config is the value of the sasl.oauthbearer.config property Config string }
func (o OAuthBearerTokenRefresh) String() string
Offset type (int64) with support for canonical names
type Offset int64
func NewOffset(offset interface{}) (Offset, error)
NewOffset creates a new Offset using the provided logical string, an absolute int64 offset value, or a concrete Offset type. Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
func OffsetTail(relativeOffset Offset) Offset
OffsetTail returns the logical offset relativeOffset from current end of partition
func (o *Offset) Set(offset interface{}) error
Set offset value, see NewOffset()
func (o Offset) String() string
OffsetSpec specifies desired offsets while using ListOffsets.
type OffsetSpec int64
const ( // MaxTimestampOffsetSpec is used to describe the offset with the Max Timestamp which may be different then LatestOffsetSpec as Timestamp can be set client side. MaxTimestampOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP // EarliestOffsetSpec is used to describe the earliest offset for the TopicPartition. EarliestOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_EARLIEST // LatestOffsetSpec is used to describe the latest offset for the TopicPartition. LatestOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_LATEST )
func NewOffsetSpecForTimestamp(timestamp int64) OffsetSpec
NewOffsetSpecForTimestamp creates an OffsetSpec corresponding to the timestamp.
OffsetsCommitted reports committed offsets
type OffsetsCommitted struct { Error error Offsets []TopicPartition }
func (o OffsetsCommitted) String() string
PartitionEOF consumer reached end of partition Needs to be explicitly enabled by setting the `enable.partition.eof` configuration property to true.
type PartitionEOF TopicPartition
func (p PartitionEOF) String() string
PartitionMetadata contains per-partition metadata
type PartitionMetadata struct { ID int32 Error Error Leader int32 Replicas []int32 Isrs []int32 }
PartitionsSpecification holds parameters for creating additional partitions for a topic. PartitionsSpecification is analogous to NewPartitions in the Java Topic Admin API.
type PartitionsSpecification struct { // Topic to create more partitions for. Topic string // New partition count for topic, must be higher than current partition count. IncreaseTo int // (Optional) Explicit replica assignment. The outer array is // indexed by the new partition index (i.e., 0 for the first added // partition), while the inner per-partition array // contains the replica broker ids. The first broker in each // broker id list will be the preferred replica. ReplicaAssignment [][]int32 }
Producer implements a High-level Apache Kafka Producer instance
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer(conf *ConfigMap) (*Producer, error)
NewProducer creates a new high-level Producer instance.
conf is a *ConfigMap with standard librdkafka configuration properties.
Supported special configuration properties (type, default):
go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance). These batches do not relate to Kafka message batches in any way. Note: timestamps and headers are not supported with this interface. go.delivery.reports (bool, true) - Forward per-message delivery reports to the Events() channel. go.delivery.report.fields (string, "key,value") - Comma separated list of fields to enable for delivery reports. Allowed values: all, none (or empty string), key, value, headers Warning: There is a performance penalty to include headers in the delivery report. go.events.channel.size (int, 1000000) - Events(). go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages) go.logs.channel.enable (bool, false) - Forward log to Logs() channel. go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true.
func (p *Producer) AbortTransaction(ctx context.Context) error
AbortTransaction aborts the ongoing transaction.
This function should also be used to recover from non-fatal abortable transaction errors.
Any outstanding messages will be purged and fail with `ErrPurgeInflight` or `ErrPurgeQueue`.
Parameters:
Note: This function will block until all outstanding messages are purged and the transaction abort request has been successfully handled by the transaction coordinator, or until the `ctx` expires, which ever comes first. On timeout the application may call the function again.
Note: Will automatically call `Purge()` and `Flush()` to ensure all queued and in-flight messages are purged before attempting to abort the transaction. The application MUST serve the `producer.Events()` channel for delivery reports in a separate go-routine during this time.
Returns nil on success or an error object on failure. Check whether the returned error object permits retrying by calling `err.(kafka.Error).IsRetriable()`, or whether a fatal error has been raised by calling `err.(kafka.Error).IsFatal()`.
func (p *Producer) BeginTransaction() error
BeginTransaction starts a new transaction.
`InitTransactions()` must have been called successfully (once) before this function is called.
Upon successful return from this function the application has to perform at least one of the following operations within `transaction.timeout.ms` to avoid timing out the transaction on the broker:
Any messages produced, offsets sent (`SendOffsetsToTransaction()`), etc, after the successful return of this function will be part of the transaction and committed or aborted atomatically.
Finish the transaction by calling `CommitTransaction()` or abort the transaction by calling `AbortTransaction()`.
Returns nil on success or an error object on failure. Check whether a fatal error has been raised by calling `err.(kafka.Error).IsFatal()`.
Note: With the transactional producer, `Produce()`, et.al, are only allowed during an on-going transaction, as started with this function. Any produce call outside an on-going transaction, or for a failed transaction, will fail.
func (p *Producer) Close()
Close a Producer instance. The Producer object or its channels are no longer usable after this call.
func (p *Producer) CommitTransaction(ctx context.Context) error
CommitTransaction commits the current transaction.
Any outstanding messages will be flushed (delivered) before actually committing the transaction.
If any of the outstanding messages fail permanently the current transaction will enter the abortable error state and this function will return an abortable error, in this case the application must call `AbortTransaction()` before attempting a new transaction with `BeginTransaction()`.
Parameters:
Note: This function will block until all outstanding messages are delivered and the transaction commit request has been successfully handled by the transaction coordinator, or until the `ctx` expires, which ever comes first. On timeout the application may call the function again.
Note: Will automatically call `Flush()` to ensure all queued messages are delivered before attempting to commit the transaction. The application MUST serve the `producer.Events()` channel for delivery reports in a separate go-routine during this time.
Returns nil on success or an error object on failure. Check whether the returned error object permits retrying by calling `err.(kafka.Error).IsRetriable()`, or whether an abortable or fatal error has been raised by calling `err.(kafka.Error).TxnRequiresAbort()` or `err.(kafka.Error).IsFatal()` respectively.