Schemas, Serializers, and Deserializers for Confluent Platform

There are multiple ways for applications to interact with Apache Kafka® through the Confluent Platform. Whichever method you choose for your application, the most important factor is to ensure that your application is coordinating with Schema Registry to manage schemas and guarantee data compatibility.

There are two ways to interact with Kafka: using a native client for your language combined with serializers compatible with Schema Registry, or using the REST Proxy. Most commonly you will use the serializers if your application is developed in a language with supported serializers, whereas you would use the REST Proxy for applications written in other languages.

Native Clients with Serializers

Java

Java applications can use the standard Kafka producers and consumers, but will substitute the default ByteArraySerializer with io.confluent.kafka.serializers.KafkaAvroSerializer (and the equivalent deserializer), allowing Avro data to be passed into the producer directly and allowing the consumer to deserialize and return Avro data.

For a Maven project, include dependencies within the dependencies tag for the Avro serializer and for your desired version of Kafka:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.5.7</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>7.5.7-ccs</version>
    <scope>provided</scope>
</dependency>

In your code, you can create Kafka producers and consumers just as you normally would, with two adjustments:

  1. The generic types for key and value should be Object. This allows you to pass in primitive types, Maps, and Records.

  2. Set the key/value serializer or deserializer and Schema Registry URL options. For example if you were configuring a producer directly in your code:

    Properties props = new Properties();
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put("schema.registry.url", "http://localhost:8081");
    // Set any other properties
    KafkaProducer producer = new KafkaProducer(props);
    

    We recommend these values be set using a properties file that your application loads and passes to the producer constructor. The settings are similar for the deserializer.

Now your application code can send Avro data that will be automatically serialized and the schema will be registered or validated against Schema Registry. The application code is essentially the same as using Kafka without Confluent Platform:

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
Future<RecordAndMetadata> resultFuture = producer.send(user1);

This brief overview should get you started, and you can find more detailed information in the Formats, Serializers, and Deserializers section of the Schema Registry documentation.

C/C++

C and C++ applications can use the librdkafka client with libserdes for Avro and Schema Registry support. The Build Client Applications for Confluent Platform section discusses general usage of librdkafka for producing and consuming data in any format. In this section we cover how to integrate libserdes to serialize Avro and track schemas in Schema Registry.

First add the libserdes dependency to your project by including the headers and linking to the library. This process will vary by build system. Here is an example of how we can get the relevant values into Makefile variables:

SERDES_CFLAGS=$(shell pkg-config --cflags rdkafka)
SERDES_LIBS=$(shell pkg-config --libs rdkafka)

All operations in libserdes use a shared serdes_t object. Applications should start by creating a serdes_conf_t for configuration and then instantiating the serdes_t object:

#include <libserdes/serdes-avro.h>

char errstr[512];
serdes_conf_t *sconf = serdes_conf_new(NULL, 0,
                                       "schema.registry.url", "http://localhost:8081",
                                       NULL);

serdes_t *serdes = serdes_new(sconf, errstr, sizeof(errstr));
if (!serdes) {
    fprintf(stderr, "%% Failed to create serdes handle: %s\n", errstr);
    exit(1);
}

Using this context, you can interact with Schema Registry. For example, you can register a new schema and use it to serialize data:

const char *schema_name = "myschema";
const char *schema_def = "{\"type\": \"string\"}";
int schema_id = -1;
serdes_schema_t *schema = serdes_schema_add(serdes, schema_name, schema_id,
                                            schema_def, -1,
                                            errstr, sizeof(errstr));
if (!schema)
    FATAL("Failed to register schema: %s\n", errstr);

avro_value_t val;
// ... fill in the Avro value ...

void *ser_buf = NULL;
size_t ser_buf_size;
if (serdes_schema_serialize_avro(schema, &val, &ser_buf, &ser_buf_size,
                                 errstr, sizeof(errstr)))
    FATAL(stderr, "%% serialize_avro() failed: %s\n", errstr);

rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_FREE,
                 ser_buf, ser_buf_size, NULL, 0, NULL);
// ... check errors and free resources ...

If you already have a schema ID, you can look up the schema:

schema = serdes_schema_get(serdes, schema_name, schema_id,
                           errstr, sizeof(errstr));
if (!schema)
    FATAL("Failed to acquire schema \"%s\": %s\n", schema_name, errstr);

When consuming messages, you can deserialize an Avro message that was produced with the above code. This looks up the schema in Schema Registry automatically if it is not already cached locally and then deserializes the data:

rd_kafka_message_t *rkmessage = rd_kafka_consume(rkt, ...);
avro_value_t avro;
serdes_schema_t *schema;
serdes_err_t err = serdes_deserialize_avro(serdes, &avro, &schema,
                                           rkmessage->payload, rkmessage->len,
                                           errstr, sizeof(errstr));
if (err) {
    fprintf(stderr, "%% serdes_deserialize_avro failed: %s\n", errstr);
    exit(1);
}

// At this point, avro contains the data and schema contains the Avro schema

This brief overview explains how to integrate librdkafka and libserdes using their C APIs. Both libraries also include C++ APIs.

Go

Confluent’s Golang Client for Apache Kafka includes serializers and deserializers for schema-based data formats such as Avro, Protobuf, and JSON Schema. The following examples show how to use these for your Apache Kafka® Go application with Schema Registry and Avro-formatted data.

Producer

The following steps show the Go code snippets used to create a producer and serialize Avro data. For the full code sample used for this example, see the avro_generic_producer_example file provided at Confluent’s Golang Client for Apache Kafka examples repository.

  1. Import the required packages.

    package main
    
    import (
            "fmt"
            "os"
    
            "github.com/confluentinc/confluent-kafka-go/v2/kafka"
            "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
            "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
            "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro"
    )
    
  2. Check if there are enough command-line arguments to run the program and then parse the arguments.

    if len(os.Args) != 4 {
            fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <schema-registry> <topic>\n",
                    os.Args[0])
            os.Exit(1)
    }
    
    bootstrapServers := os.Args[1]
    url := os.Args[2]
    topic := os.Args[3]
    

    This check is used when starting the program from the command-line like this:

    go run myprogram.go <bootstrap-servers> <schema-registry> <topic>
    

    For example:

    go run myprogram.go localhost:9092 http://localhost:8081 my-topic
    
  3. Create a new Kafka producer.

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
    
    if err != nil {
            fmt.Printf("Failed to create producer: %s\n", err)
            os.Exit(1)
    }
    
  4. Create the Schema Registry client.

    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(url))
    
    if err != nil {
            fmt.Printf("Failed to create schema registry client: %s\n", err)
            os.Exit(1)
    }
    
  5. Create the Avro serializer.

    ser, err := avro.NewGenericSerializer(client, serde.ValueSerde, avro.NewSerializerConfig())
    
    if err != nil {
            fmt.Printf("Failed to create serializer: %s\n", err)
            os.Exit(1)
    }
    
  6. Create a User struct.

    value := User{
            Name:           "First user",
            FavoriteNumber: 42,
            FavoriteColor:  "blue",
    }
    
  7. Serialize the User struct using Avro.

    payload, err := ser.Serialize(topic, &value)
    
    if err != nil {
            fmt.Printf("Failed to serialize payload: %s\n", err)
            os.Exit(1)
    }
    
  8. Produce a Kafka message.

    err = p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          payload,
            Headers:        []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
    }, deliveryChan)
    
    if err != nil {
            fmt.Printf("Produce failed: %v\n", err)
            os.Exit(1)
    }
    

For the full code sample used for this example, see the avro_generic_producer_example file provided at Confluent’s Golang Client for Apache Kafka examples repository.

Consumer

The following steps show the Go code snippets used to create a consumer and deserialize Avro data. For the full code sample used for this example, see the avro_generic_consumer_example file provided at Confluent’s Golang Client for Apache Kafka examples repository.

  1. Import the required packages.

    package main
    
    import (
            "fmt"
            "os"
            "os/signal"
            "syscall"
    
            "github.com/confluentinc/confluent-kafka-go/v2/kafka"
            "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
            "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
            "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro"
    )
    
  2. Check if there are enough command-line arguments to run the program and then parse the arguments.

    if len(os.Args) < 5 {
            fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <schema-registry> <group> <topics..>\n",
              os.Args[0])
            os.Exit(1)
    }
    
    bootstrapServers := os.Args[1]
    url := os.Args[2]
    group := os.Args[3]
    topics := os.Args[4:]
    

    This check is used when starting the program from the command-line like this:

    go run myprogram.go <bootstrap-servers> <schema-registry> <group> <topic>
    

    For example:

    go run myprogram.go localhost:9092 http://localhost:8081 consumer-group-name my-topic
    
  3. Set up a signal channel to catch SIGINT and SIGTERM signals.

    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    
  4. Create a new Kafka consumer.

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers":  bootstrapServers,
            "group.id":           group,
            "session.timeout.ms": 6000,
            "auto.offset.reset":  "earliest"})
    
    if err != nil {
            fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
            os.Exit(1)
    }
    
    fmt.Printf("Created Consumer %v\n", c)
    
  5. Create the Schema Registry client.

    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(url))
    
    if err != nil {
            fmt.Printf("Failed to create schema registry client: %s\n", err)
            os.Exit(1)
    }
    
  6. Create the Avro deserializer.

    deser, err := avro.NewGenericDeserializer(client, serde.ValueSerde, avro.NewDeserializerConfig())
    
    if err != nil {
            fmt.Printf("Failed to create deserializer: %s\n", err)
            os.Exit(1)
    }
    
  7. Subscribe to Kafka topics.

    err = c.SubscribeTopics(topics, nil)
    
  8. Run the consumer loop. The consumer loop is annotated with comments (//).

      // Define a control flag for the consuming loop.
      run := true
    
      // Start a loop to consume messages until an interrupt signal is received.
      for run {
              select {
              // Check if a termination signal was received to exit the loop.
              case sig := <-sigchan:
                        fmt.Printf("Caught signal %v: terminating\n", sig)
                        run = false
              default:
                      // Poll Kafka for messages with a timeout.
                      ev := c.Poll(100)
                      if ev == nil {
                              continue
                      }
    
                      // Type-switch to handle messages or errors received from Kafka.
                      switch e := ev.(type) {
                      case *kafka.Message:
                             // If it's a Kafka message, deserialize the Avro data into a User struct.
                              value := User{}
                              err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value)
                              if err != nil {
                                      fmt.Printf("Failed to deserialize payload: %s\n", err)
                              } else {
                                      fmt.Printf("%% Message on %s:\n%+v\n", e.TopicPartition, value)
                              }
                              if e.Headers != nil {
                                    // Optionally print out any message headers.
                                    fmt.Printf("%% Headers: %v\n", e.Headers)
                              }
                      case kafka.Error:
                              // Errors should generally be considered
                              // informational, the client will try to
                              // automatically recover.
                              fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
                      default:
                              // Ignore any other types of events.
                              fmt.Printf("Ignored %v\n", e)
                      }
              }
      }
    
      // When the loop ends, close the consumer.
      fmt.Printf("Closing consumer\n")
      c.Close()
    }
    
    // User struct represents the expected Avro data structure.
    type User struct {
        Name           string `json:"name"`
        FavoriteNumber int64  `json:"favorite_number"`
        FavoriteColor  string `json:"favorite_color"`
    }
    

For the full code sample used for this example, see the avro_generic_consumer_example file provided at Confluent’s Golang Client for Apache Kafka examples repository.

REST Proxy

The REST Proxy should be used for any language that does not have native clients with serializers compatible with Schema Registry. It is a convenient, language-agnostic method for interacting with Kafka. Almost all standard libraries have good support for HTTP and JSON, so even if a wrapper of the API does not exist for your language it should still be easy to use the API. It also automatically translates between Avro and JSON. This simplifies writing applications in languages that do not have good Avro support.

The REST Proxy API Reference describes the complete API in detail, but we will highlight some key interactions here. First, you will want to produce data to Kafka. To do so, construct a POST request to the /topics/{topicName} resource including the schema for the data (plain integers in this example) and a list of records, optionally including the partition for each record.

POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
  "records": [
    {
      "value": 12
    },
    {
      "value": 24,
      "partition": 1
    }
  ]
}

Note that REST Proxy relies on content type information to properly convert data to Avro, so you must specify the Content-Type header. The response includes the same information you would receive from the Java clients API about the partition and offset of the published data (or errors in case of failure). Additionally, it includes the schema IDs it registered or looked up in Schema Registry.

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 2,
      "offset": 103
    },
    {
      "partition": 1,
      "offset": 104
    }
  ]
}

In future requests, you can use this schema ID instead of the full schema, reducing the overhead for each request. You can also produce data to specific partitions using a similar request format with the /topics/{topicName}/partitions/{partition} endpoint.

To achieve good throughput, it is important to batch your produce requests so that each HTTP request contains many records. Depending on durability and latency requirements, this can be as simple as maintaining a queue of records and only send a request when the queue has reached a certain size or a timeout is triggered.

Consuming data is a bit more complex because consumers are stateful. However, it still only requires two API calls to get started. See the API Reference for complete details and examples.

Finally, the API also provides metadata about the cluster, such as the set brokers, list of topics, and per-partition information. However, most applications will not need to use these endpoints.

Note that it is also possible to use non-Java clients developed by the community and manage registration and schema validation manually using the Schema Registry API. However, as this is error-prone and must be duplicated across every application, we recommend using the REST Proxy unless you need features that are not exposed via the REST Proxy.