Schemas, Serializers, and Deserializers for Confluent Platform
There are multiple ways for applications to interact with Apache Kafka® through the Confluent Platform. Whichever method you choose for your application, the most important factor is to ensure that your application is coordinating with Schema Registry to manage schemas and guarantee data compatibility.
There are two ways to interact with Kafka: using a native client for your language combined with serializers compatible with Schema Registry, or using the REST Proxy. Most commonly, use the serializers if the application is developed in a language with supported serializers. Use the REST Proxy for applications written in other languages.
Support Notes
Clients to Schema Registry track with Confluent Platform versions.
Schema Registry clients 7.7.4 and later include support for auto retries for 429 exceptions. A 429 exception is related to rate limiting by the Schema Registry server. When a client exceeds the set rate limit, the server responds with an HTTP 429 status code. This signals to the client that it needs to slow down its requests. Schema Registry clients built with Confluent Platform versions prior to 7.7.4 do not include support for auto retries for 429 exceptions.
Native Clients with Serializers
Java
Java applications can use the standard Kafka producers and consumers, but will substitute the default ByteArraySerializer with io.confluent.kafka.serializers.KafkaAvroSerializer (and the equivalent deserializer), allowing Avro data to be passed into the producer directly and allowing the consumer to deserialize and return Avro data.
For a Maven project, include dependencies within the dependencies tag for the Avro serializer and for your desired version of Kafka:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>8.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>8.2.0-ccs</version>
<scope>provided</scope>
</dependency>
In code, create Kafka producers and consumers with two adjustments:
The generic types for key and value should be Object. This allows you to pass in primitive types, Maps, and Records.
Set the key/value serializer or deserializer and Schema Registry URL options. For example, if you’re configuring a producer directly in your code:
Properties props = new Properties(); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); props.put("schema.registry.url", "http://localhost:8081"); // Set any other properties KafkaProducer producer = new KafkaProducer(props);
Set these values using a properties file that the application loads and passes to the producer constructor. The settings are similar for the deserializer.
Now your application code can send Avro data that will be automatically serialized and the schema will be registered or validated against Schema Registry. The application code is essentially the same as using Kafka without Confluent Platform:
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
Future<RecordAndMetadata> resultFuture = producer.send(user1);
For more detailed information, see Formats, Serializers, and Deserializers in the Schema Registry documentation.
C/C++
Important
libserdes is discontinued: The libserdes library has been discontinued and replaced by libschemaregistry.
For RHEL 9 and later versions, pre-packaged RPM deliverables for Avro and libserdes are not available
Instead, integrate these libraries directly into your projects using CMake
See the libschemaregistry repository for integration instructions and migration guidance
The information below is maintained for legacy systems using RHEL 8 and earlier
C and C++ applications can use the librdkafka client with libserdes for Avro and Schema Registry support. The Build Streaming Applications on Confluent Platform section discusses general usage of librdkafka for producing and consuming data in any format. In this section we cover how to integrate libserdes to serialize Avro and track schemas in Schema Registry.
First, add the libserdes dependency to the project by including the headers and linking to the library. This process varies by build system. The following example shows how to get the relevant values into Makefile variables:
SERDES_CFLAGS=$(shell pkg-config --cflags rdkafka)
SERDES_LIBS=$(shell pkg-config --libs rdkafka)
All operations in libserdes use a shared serdes_t object. Applications should start by creating a serdes_conf_t for configuration and then instantiating the serdes_t object:
#include <libserdes/serdes-avro.h>
char errstr[512];
serdes_conf_t *sconf = serdes_conf_new(NULL, 0,
"schema.registry.url", "http://localhost:8081",
NULL);
serdes_t *serdes = serdes_new(sconf, errstr, sizeof(errstr));
if (!serdes) {
fprintf(stderr, "%% Failed to create serdes handle: %s\n", errstr);
exit(1);
}
Using this context, you can interact with Schema Registry. For example, you can register a new schema and use it to serialize data:
const char *schema_name = "myschema";
const char *schema_def = "{\"type\": \"string\"}";
int schema_id = -1;
serdes_schema_t *schema = serdes_schema_add(serdes, schema_name, schema_id,
schema_def, -1,
errstr, sizeof(errstr));
if (!schema)
FATAL("Failed to register schema: %s\n", errstr);
avro_value_t val;
// ... fill in the Avro value ...
void *ser_buf = NULL;
size_t ser_buf_size;
if (serdes_schema_serialize_avro(schema, &val, &ser_buf, &ser_buf_size,
errstr, sizeof(errstr)))
FATAL(stderr, "%% serialize_avro() failed: %s\n", errstr);
rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_FREE,
ser_buf, ser_buf_size, NULL, 0, NULL);
// ... check errors and free resources ...
If a schema ID already exists, you can look up the schema:
schema = serdes_schema_get(serdes, schema_name, schema_id,
errstr, sizeof(errstr));
if (!schema)
FATAL("Failed to acquire schema \"%s\": %s\n", schema_name, errstr);
When consuming messages, you can deserialize an Avro message that was produced with the above code. This looks up the schema in Schema Registry automatically if it is not already cached locally, and then deserializes the data:
rd_kafka_message_t *rkmessage = rd_kafka_consume(rkt, ...);
avro_value_t avro;
serdes_schema_t *schema;
serdes_err_t err = serdes_deserialize_avro(serdes, &avro, &schema,
rkmessage->payload, rkmessage->len,
errstr, sizeof(errstr));
if (err) {
fprintf(stderr, "%% serdes_deserialize_avro failed: %s\n", errstr);
exit(1);
}
// At this point, avro contains the data and schema contains the Avro schema
This brief overview explains how to integrate librdkafka and libserdes using their C APIs. Both libraries also include C++ APIs.
Go
Confluent’s Golang Client for Apache Kafka includes serializers and deserializers for schema-based data formats such as Avro, Protobuf, and JSON Schema. The following examples show how to use these for your Apache Kafka® Go application with Schema Registry and Avro-formatted data.
Producer
The following steps show the Go code snippets used to create a producer and serialize Avro data. For the full code sample used for this example, see the avro_generic_producer_example file provided at Confluent’s Golang Client for Apache Kafka examples repository.
Import the required packages.
package main import ( "fmt" "os" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro" )
Check if there are enough command-line arguments to run the program and then parse the arguments.
if len(os.Args) != 4 { fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <schema-registry> <topic>\n", os.Args[0]) os.Exit(1) } bootstrapServers := os.Args[1] url := os.Args[2] topic := os.Args[3]
This check is used when starting the program from the command line:
go run myprogram.go <bootstrap-servers> <schema-registry> <topic>
For example:
go run myprogram.go localhost:9092 http://localhost:8081 my-topic
Create a new Kafka producer.
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) if err != nil { fmt.Printf("Failed to create producer: %s\n", err) os.Exit(1) }
Create the Schema Registry client.
client, err := schemaregistry.NewClient(schemaregistry.NewConfig(url)) if err != nil { fmt.Printf("Failed to create schema registry client: %s\n", err) os.Exit(1) }
Create the Avro serializer.
ser, err := avro.NewGenericSerializer(client, serde.ValueSerde, avro.NewSerializerConfig()) if err != nil { fmt.Printf("Failed to create serializer: %s\n", err) os.Exit(1) }
Create a User struct.
value := User{ Name: "First user", FavoriteNumber: 42, FavoriteColor: "blue", }
Serialize the User struct using Avro.
payload, err := ser.Serialize(topic, &value) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) }
Produce a Kafka message.
err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: payload, Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, }, deliveryChan) if err != nil { fmt.Printf("Produce failed: %v\n", err) os.Exit(1) }
For the full code sample used for this example, see the avro_generic_producer_example file provided at Confluent’s Golang Client for Apache Kafka examples repository.
Consumer
The following steps show the Go code snippets used to create a consumer and deserialize Avro data. For the full code sample used for this example, see the avro_generic_consumer_example file provided at Confluent’s Golang Client for Apache Kafka examples repository.
Import the required packages.
package main import ( "fmt" "os" "os/signal" "syscall" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro" )
Check if there are enough command-line arguments to run the program and then parse the arguments.
if len(os.Args) < 5 { fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <schema-registry> <group> <topics..>\n", os.Args[0]) os.Exit(1) } bootstrapServers := os.Args[1] url := os.Args[2] group := os.Args[3] topics := os.Args[4:]
This check is used when starting the program from the command line:
go run myprogram.go <bootstrap-servers> <schema-registry> <group> <topic>
For example:
go run myprogram.go localhost:9092 http://localhost:8081 consumer-group-name my-topic
Set up a signal channel to catch SIGINT and SIGTERM signals.
sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
Create a new Kafka consumer.
// The session.timeout.ms is ignored when group.protocol=consumer is set. // When group.protocol=consumer is set, use the broker configuration // group.consumer.session.timeout.ms to control session timeout. // c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": bootstrapServers, "group.id": group, "session.timeout.ms": 6000, "auto.offset.reset": "earliest"}) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) os.Exit(1) } fmt.Printf("Created Consumer %v\n", c)
Create the Schema Registry client.
client, err := schemaregistry.NewClient(schemaregistry.NewConfig(url)) if err != nil { fmt.Printf("Failed to create schema registry client: %s\n", err) os.Exit(1) }
Create the Avro deserializer.
deser, err := avro.NewGenericDeserializer(client, serde.ValueSerde, avro.NewDeserializerConfig()) if err != nil { fmt.Printf("Failed to create deserializer: %s\n", err) os.Exit(1) }
Subscribe to Kafka topics.
err = c.SubscribeTopics(topics, nil)
Run the consumer loop. The consumer loop is annotated with comments (//).
// Define a control flag for the consuming loop. run := true // Start a loop to consume messages until an interrupt signal is received. for run { select { // Check if a termination signal was received to exit the loop. case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false default: // Poll Kafka for messages with a timeout. ev := c.Poll(100) if ev == nil { continue } // Type-switch to handle messages or errors received from Kafka. switch e := ev.(type) { case *kafka.Message: // If it's a Kafka message, deserialize the Avro data into a User struct. value := User{} err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value) if err != nil { fmt.Printf("Failed to deserialize payload: %s\n", err) } else { fmt.Printf("%% Message on %s:\n%+v\n", e.TopicPartition, value) } if e.Headers != nil { // Optionally print out any message headers. fmt.Printf("%% Headers: %v\n", e.Headers) } case kafka.Error: // Errors should generally be considered // informational, the client will try to // automatically recover. fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) default: // Ignore any other types of events. fmt.Printf("Ignored %v\n", e) } } } // When the loop ends, close the consumer. fmt.Printf("Closing consumer\n") c.Close() } // User struct represents the expected Avro data structure. type User struct { Name string `json:"name"` FavoriteNumber int64 `json:"favorite_number"` FavoriteColor string `json:"favorite_color"` }
For the full code sample used for this example, see the avro_generic_consumer_example file provided at Confluent’s Golang Client for Apache Kafka examples repository.
REST Proxy
Use the REST Proxy for any language that does not have native clients with serializers compatible with Schema Registry. It is a convenient, language-agnostic method for interacting with Kafka. Almost all standard libraries have good support for HTTP and JSON, so even if a wrapper of the API does not exist for a language, the API is still easy to use. The REST Proxy also automatically translates between Avro and JSON. This simplifies writing applications in languages that do not have good Avro support.
The REST Proxy API Reference describes the complete API in detail. The following highlights key interactions. First, produce data to Kafka by constructing a POST request to the /topics/{topicName} resource including the schema for the data (plain integers in this example) and a list of records, optionally including the partition for each record.
POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json
{
"value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
"records": [
{
"value": 12
},
{
"value": 24,
"partition": 1
}
]
}
Note that REST Proxy relies on content type information to properly convert data to Avro, so you must specify the Content-Type header. The response includes the same information received from the Java clients API about the partition and offset of the published data (or errors in case of failure). Additionally, it includes the schema IDs it registered or looked up in Schema Registry.
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json
{
"key_schema_id": null,
"value_schema_id": 32,
"offsets": [
{
"partition": 2,
"offset": 103
},
{
"partition": 1,
"offset": 104
}
]
}
In future requests, you can use this schema ID instead of the full schema, reducing the overhead for each request. You can also produce data to specific partitions using a similar request format with the /topics/{topicName}/partitions/{partition} endpoint.
To achieve good throughput, batch produce requests so that each HTTP request contains many records. Depending on durability and latency requirements, this can be as simple as maintaining a queue of records and sending a request only when the queue reaches a certain size or a timeout is triggered.
Consuming data is a bit more complex because consumers are stateful. However, it still only requires two API calls to get started. See the API Reference for complete details and examples.
Finally, the API also provides metadata about the cluster, such as the set brokers, list of topics, and per-partition information. However, most applications will not need to use these endpoints.
It is also possible to use non-Java clients developed by the community and manage registration and schema validation manually using the Schema Registry API. However, as this is error-prone and must be duplicated across every application, use the REST Proxy unless you need features that are not exposed via the REST Proxy.