Kafka Go Client¶
Confluent develops and maintains a Go client for Apache Kafka® that offers a producer and a consumer.
Go Client Installation¶
The Go client, called confluent-kafka-go, is distributed via GitHub
and gopkg.in to pin to specific versions. The Go client uses librdkafka, the C client,
internally and exposes it as Go library using cgo. To install the Go client, first install
the C client including its development package as well as a C build toolchain including
pkg-config
. On Red Hat-based Linux distributions install the following packages in addition to librdkafka:
sudo yum groupinstall "Development Tools"
On Debian-based distributions, install the following in addition to librdkafka:
sudo apt-get install build-essential pkg-config git
On macOS using Homebrew, install the following:
brew install pkg-config git
Next, use go get
to install the library:
go get gopkg.in/confluentinc/confluent-kafka-go.v0/kafka
Your Go code can now import and use the client. You can also build and run a small command line utility, go-kafkacat
,
to ensure the installation was successful:
go get gopkg.in/confluentinc/confluent-kafka-go.v0/examples/go-kafkacat
$GOPATH/bin/go-kafkacat --help
If you would like to statically link librdkafka, add the flag -tags static
to the go get
commands. This will
statically link to librdkafka so its dynamic library will not be required on the target deployment system. Note,
however, that librdkafka’s dependencies (such as ssl, sasl2, lz4, etc) will still be linked
dynamically and required on the target system (you can use ldd your-program
(Linux) or otool -L your-program
(macOS) to see what dynamic libraries it depends on).
An experimental option for creating a completely statically linked binary is
available as well.
Use the flag -tags
static_all
. This requires all dependencies to be available as static libraries (e.g., libsasl2.a
). Static libraries are
typically not installed by default but are available in the corresponding -dev
or -devel
packages (e.g.,
libsasl2-dev
).
See the clients documentation for code examples showing how to use the library.
The source code is also available in the ZIP and TAR archives under the directory src/
.
Kafka Producer¶
Initialization¶
The Go client uses a ConfigMap
object to pass configuration to the producer:
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
)
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "host1:9092,host2:9092",
"client.id": socket.gethostname(),
"default.topic.config": kafka.ConfigMap{'acks': 'all'}
})
if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}
Asynchronous writes¶
In Go, you initiate a send by calling the Produce()
method, passing a Message
object and an
optional chan Event
that can be used to listen for the result of the send. The Message
object contains an opaque interface{}
field that can be used to pass arbitrary data with the
message to the subsequent event handler.
delivery_chan := make(chan kafka.Event, 10000)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: "topic", Partition: kafka.PartitionAny},
Value: []byte(value)},
delivery_chan,
)
This producer example shows how to invoke some code after the write has completed, you can use
the delivery report channel passed to Produce
to wait for the result of the message send:
e := <-delivery_chan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(delivery_chan)
Synchronous writes¶
Making writes synchronous is typically a bad idea since it kills throughput.
Nevertheless, you can make writes synchronous by receiving from the delivery channel
passed to the Produce()
method call:
delivery_chan := make(chan kafka.Event, 10000)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: "topic", Partition: kafka.PartitionAny},
Value: []byte(value)},
delivery_chan
)
e := <-delivery_chan
m := e.(*kafka.Message)
Or, to wait for all messages to be acknowledge, use the Flush()
method:
p.Flush()
Important
Flush()
will only serve the producer’s Events()
channel, not
application-specified delivery channels. If Flush()
is called and no goroutine
is processing the delivery channel, its buffer may fill up and cause the
flush to timeout.
Kafka Consumer¶
Initialization¶
The Go client uses a ConfigMap
object to pass configuration to the consumer:
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
)
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "host1:9092,host2:9092",
"group.id": "foo",
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "smallest"}})
Basic usage¶
The Go client uses librdkafka internally so it follows a multi-threaded approach to Kafka consumption. The API returns only a single message or event at a time:
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
// application-specific processing
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
Go Client code examples¶
Basic poll loop¶
The consumer API is centered around the Consumer.Poll()
method, which is
used to retrieve records from the brokers. The Consumer.SubscribeTopics()
method
controls which topics will be fetched in poll. This consumer example shows typical usage,which
involves an initial call to Consumer.SubscribeTopics()
to setup the topics
of interest and then a loop which calls Consumer.Poll()
until the
application is shut down.
err = consumer.SubscribeTopics(topics, nil)
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
consumer.Close()
If no records are received, then Consumer.Poll()
will return
an empty record set.
Note that you should always call Consumer.Close()
after you are finished
using the consumer. Doing so will ensure that active sockets are
closed and internal state is cleaned up. It will also trigger a group
rebalance immediately which ensures that any partitions owned by the
consumer are re-assigned to another member in the group. If not closed
properly, the broker will trigger the rebalance only after the session
timeout has expired. Latch is added to this example to ensure
that the consumer has time to finish closing before finishing
shut down.
Synchronous commits¶
The Go client provides a synchronous Commit()
method call. Other variants of commit methods also accept a list of offsets to
commit or a Message
in order to commit offsets relative to a consumed message.
When using manual offset commit, be sure to disable the enable.auto.commit
configuration.
msg_count := 0
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0 {
consumer.Commit()
}
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
In this example, a synchronous commit is triggered every MIN_COMMIT_COUNT
messages. You could also trigger the commit on expiration of a
timeout to ensure there the committed position is updated regularly.
Delivery guarantees¶
In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery, but you must be a little careful with the commit failure.
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
err = consumer.CommitMessage(e)
if err == nil {
msg_process(e)
}
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
For simplicity in this example, Consumer.CommitMessage()
is used
prior to processing the message. Committing on every message would
produce a lot of overhead in practice. A better approach would be to
collect a batch of messages, execute the synchronous commit, and then
process the messages only if the commit succeeded.
Asynchronous Commits¶
To commit asynchronously, simply execute the commit in a goroutine to commit asynchronously:
msg_count := 0
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0 {
go func() {
offsets, err := consumer.Commit()
}()
}
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
Rebalance events are exposed as events returned by the Poll()
method. To see these events you must create the consumer with the
go.application.rebalance.enable
configuration and handle
AssignedPartitions
and RevokedPartitions
events by explicitly calling
Assign()
and Unassign()
for AssignedPartitions
and
RevokedPartitions
respectively:
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "host1:9092,host2:9092",
"group.id": "foo",
"go.application.rebalance.enable": true})
msg_count := 0
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case kafka.AssignedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
c.Assign(e.Partitions)
case kafka.RevokedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
c.Unassign()
case *kafka.Message:
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0 {
consumer.Commit()
}
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}