Kafka Go Client

Confluent develops and maintains a Go client for Apache Kafka® that offers a producer and a consumer.

For a step-by-step guide on building a Go client application for Kafka, see Getting Started with Apache Kafka and Go.

Go Client installation

The Go client, called confluent-kafka-go, is distributed via GitHub and gopkg.in to pin to specific versions. The Changelog showing release updates is available in that same repo. The Go client uses librdkafka, the C client, internally and exposes it as Go library using cgo. Starting with confluent-kafka-go v1.4.0, the librdkafka client is now included in the Go client and no separate installation of librdkafka is required for the supported platforms (Linux (glibc and musl based), and Mac OSX).

For other platforms the following instructions still apply: To install the Go client, first install the C client including its development package as well as a C build toolchain including pkg-config. On Red Hat-based Linux distributions install the following packages in addition to librdkafka:

sudo yum groupinstall "Development Tools"

On Debian-based distributions, install the following in addition to librdkafka:

sudo apt-get install build-essential pkg-config git

On macOS using Homebrew, install the following:

brew install pkg-config git

Next, use go get to install the library:

go get gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

Your Go code can now import and use the client. You can also build and run a small command line utility, go-kafkacat, to ensure the installation was successful:

go get gopkg.in/confluentinc/confluent-kafka-go.v1/examples/go-kafkacat
$GOPATH/bin/go-kafkacat --help

See the clients documentation for code examples showing how to use the library.

The source code is also available in the ZIP and TAR archives under the directory src/.

Go Client example code

For a step-by-step tutorial using the Go client including code samples for the producer and consumer see this guide.

There are also additional examples here.

Kafka Producer

Initialization

The Go client uses a ConfigMap object to pass configuration to the producer:

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

p, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "host1:9092,host2:9092",
    "client.id": socket.gethostname(),
    "acks": "all"})

if err != nil {
    fmt.Printf("Failed to create producer: %s\n", err)
    os.Exit(1)
}

Asynchronous writes

In Go, you initiate a send by calling the Produce() method, passing a Message object and an optional chan Event that can be used to listen for the result of the send. The Message object contains an opaque interface{} field that can be used to pass arbitrary data with the message to the subsequent event handler:

delivery_chan := make(chan kafka.Event, 10000)
err = p.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: topic, Partition: kafka.PartitionAny},
    Value: []byte(value)},
    delivery_chan,
)

To produce asynchronously, you can use a Goroutine to handle message delivery reports and possibly other event types (errors, stats, etc) concurrently:

go func() {
    for e := range p.Events() {
        switch ev := e.(type) {
        case *kafka.Message:
            if ev.TopicPartition.Error != nil {
                fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
            } else {
                fmt.Printf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
                    *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
            }
        }
    }
}()

Synchronous writes

Making writes synchronous is typically a bad idea since it decreases throughput. Nevertheless, you can make writes synchronous by receiving from the delivery channel passed to the Produce() method call as shown below:

delivery_chan := make(chan kafka.Event, 10000)
err = p.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: topic, Partition: kafka.PartitionAny},
    Value: []byte(value)},
    delivery_chan
)

 e := <-delivery_chan
 m := e.(*kafka.Message)

 if m.TopicPartition.Error != nil {
     fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
 } else {
     fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
             *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
 }
 close(delivery_chan)

Alternatively, you can wait for all messages to be acknowledged and use the Flush() method:

p.Flush()

Important

Flush() will only serve the producer’s Events() channel, not application-specified delivery channels. If Flush() is called and no goroutine is processing the delivery channel, its buffer may fill up and cause the flush to timeout.

Kafka Consumer

Initialization

The Go client uses a ConfigMap object to pass configuration to the consumer:

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
     "bootstrap.servers":    "host1:9092,host2:9092",
     "group.id":             "foo",
     "auto.offset.reset":    "smallest"})

Basic usage

The Go client uses librdkafka internally so it follows a multi-threaded approach to Kafka consumption. The API returns only a single message or event at a time:

for run == true {
    ev := consumer.Poll(100)
    switch e := ev.(type) {
    case *kafka.Message:
        // application-specific processing
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

Regarding the poll timeout, the idea is to poll at intervals until an event, with minimal CPU usage. Some options are to use 0 but this has high CPU usage, or to use -1, which will poll indefinitely until an event, but has the potential of a SIGINT event if execution is stuck in librdkafka. Using 100 as polling value is a happy medium which enables consistent polling but avoids high CPU or the SIGINT scenario.

Go Client code examples

Basic poll loop

The consumer API is centered around the Consumer.Poll() method, which is used to retrieve records from the brokers. The Consumer.SubscribeTopics() method controls which topics will be fetched in poll. This consumer example shows typical usage,which involves an initial call to Consumer.SubscribeTopics() to setup the topics of interest and then a loop which calls Consumer.Poll() until the application is shut down.

err = consumer.SubscribeTopics(topics, nil)

for run == true {
    ev := consumer.Poll(100)
    switch e := ev.(type) {
    case *kafka.Message:
        fmt.Printf("%% Message on %s:\n%s\n",
            e.TopicPartition, string(e.Value))
    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

consumer.Close()

If no records are received, then Consumer.Poll() will return an empty record set.

Note that you should always call Consumer.Close() after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired. Latch is added to this example to ensure that the consumer has time to finish closing before finishing shut down.

Synchronous commits

The Go client provides a synchronous Commit() method call. Other variants of commit methods also accept a list of offsets to commit or a Message in order to commit offsets relative to a consumed message. When using manual offset commit, be sure to disable the enable.auto.commit configuration.

msg_count := 0
for run == true {
    ev := consumer.Poll(100)
    switch e := ev.(type) {
    case *kafka.Message:
        msg_count += 1
        if msg_count % MIN_COMMIT_COUNT == 0 {
            consumer.Commit()
        }
        fmt.Printf("%% Message on %s:\n%s\n",
            e.TopicPartition, string(e.Value))

    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

In this example, a synchronous commit is triggered every MIN_COMMIT_COUNT messages. You could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly.

Delivery guarantees

In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery, but you must be a little careful with the commit failure.

for run == true {
    ev := consumer.Poll(100)
    switch e := ev.(type) {
    case *kafka.Message:
        err = consumer.CommitMessage(e)
        if err == nil {
            msg_process(e)
        }

    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

For simplicity in this example, Consumer.CommitMessage() is used prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded.

Asynchronous commits

To commit asynchronously, simply execute the commit in a goroutine to commit asynchronously:

msg_count := 0
for run == true {
    ev := consumer.Poll(100)
    switch e := ev.(type) {
    case *kafka.Message:
        msg_count += 1
        if msg_count % MIN_COMMIT_COUNT == 0 {
            go func() {
                offsets, err := consumer.Commit()
            }()
        }
        fmt.Printf("%% Message on %s:\n%s\n",
            e.TopicPartition, string(e.Value))

    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

Rebalance events are exposed as events returned by the Poll() method. To see these events you must create the consumer with the go.application.rebalance.enable configuration and handle AssignedPartitions and RevokedPartitions events by explicitly calling Assign() and Unassign() for AssignedPartitions and RevokedPartitions respectively:

consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
     "bootstrap.servers":    "host1:9092,host2:9092",
     "group.id":             "foo",
     "go.application.rebalance.enable": true})

msg_count := 0
for run == true {
    ev := consumer.Poll(100)
    switch e := ev.(type) {
    case kafka.AssignedPartitions:
        fmt.Fprintf(os.Stderr, "%% %v\n", e)
        c.Assign(e.Partitions)
    case kafka.RevokedPartitions:
        fmt.Fprintf(os.Stderr, "%% %v\n", e)
        c.Unassign()
    case *kafka.Message:
        msg_count += 1
        if msg_count % MIN_COMMIT_COUNT == 0 {
            consumer.Commit()
        }

        fmt.Printf("%% Message on %s:\n%s\n",
            e.TopicPartition, string(e.Value))

    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

API documentation

Click here to view the Go Client API documentation.

Suggested resources