Application Development

There are multiple ways for applications to interact with Kafka through the Confluent Platform. Whichever method you choose for your application, the most important factor is to ensure that your application is coordinating with Schema Registry to manage schemas and guarantee data compatibility.

There are two ways to interact with Kafka: using a native client for your language combined with serializers compatible with Schema Registry, or using the REST Proxy. Most commonly you will use the serializers if your application is developed in a language with supported serializers, whereas you would use the REST Proxy for applications written in other languages.

Native Clients with Serializers

Java

Java applications can use the standard Kafka producers and consumers, but will substitute the default ByteArraySerializer with io.confluent.kafka.serializers.KafkaAvroSerializer (and the equivalent deserializer), allowing Avro data to be passed into the producer directly and allowing the consumer to deserialize and return Avro data.

For a Maven project, include dependencies within the dependencies tag for the Avro serializer and for your desired version of Kafka:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.2.8</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>7.2.8-ccs</version>
    <scope>provided</scope>
</dependency>

In your code, you can create Kafka producers and consumers just as you normally would, with two adjustments:

  1. The generic types for key and value should be Object. This allows you to pass in primitive types, Maps, and Records.

  2. Set the key/value serializer or deserializer and Schema Registry URL options. For example if you were configuring a producer directly in your code:

    Properties props = new Properties();
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put("schema.registry.url", "http://localhost:8081");
    // Set any other properties
    KafkaProducer producer = new KafkaProducer(props);
    

    We recommend these values be set using a properties file that your application loads and passes to the producer constructor. The settings are similar for the deserializer.

Now your application code can send Avro data that will be automatically serialized and the schema will be registered or validated against Schema Registry. The application code is essentially the same as using Kafka without Confluent Platform:

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
Future<RecordAndMetadata> resultFuture = producer.send(user1);

This brief overview should get you started, and you can find more detailed information in the Formats, Serializers, and Deserializers section of the Schema Registry documentation.

C/C++

C and C++ applications can use the librdkafka client with libserdes for Avro and Schema Registry support. The Build Client Applications for Confluent Platform section discusses general usage of librdkafka for producing and consuming data in any format. In this section we cover how to integrate libserdes to serialize Avro and track schemas in Schema Registry.

First add the libserdes dependency to your project by including the headers and linking to the library. This process will vary by build system. Here is an example of how we can get the relevant values into Makefile variables:

SERDES_CFLAGS=$(shell pkg-config --cflags rdkafka)
SERDES_LIBS=$(shell pkg-config --libs rdkafka)

All operations in libserdes use a shared serdes_t object. Applications should start by creating a serdes_conf_t for configuration and then instantiating the serdes_t object:

#include <libserdes/serdes-avro.h>

char errstr[512];
serdes_conf_t *sconf = serdes_conf_new(NULL, 0,
                                       "schema.registry.url", "http://localhost:8081",
                                       NULL);

serdes_t *serdes = serdes_new(sconf, errstr, sizeof(errstr));
if (!serdes) {
    fprintf(stderr, "%% Failed to create serdes handle: %s\n", errstr);
    exit(1);
}

Using this context, you can interact with Schema Registry. For example, you can register a new schema and use it to serialize data:

const char *schema_name = "myschema";
const char *schema_def = "{\"type\": \"string\"}";
int schema_id = -1;
serdes_schema_t *schema = serdes_schema_add(serdes, schema_name, schema_id,
                                            schema_def, -1,
                                            errstr, sizeof(errstr));
if (!schema)
    FATAL("Failed to register schema: %s\n", errstr);

avro_value_t val;
// ... fill in the Avro value ...

void *ser_buf = NULL;
size_t ser_buf_size;
if (serdes_schema_serialize_avro(schema, &val, &ser_buf, &ser_buf_size,
                                 errstr, sizeof(errstr)))
    FATAL(stderr, "%% serialize_avro() failed: %s\n", errstr);

rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_FREE,
                 ser_buf, ser_buf_size, NULL, 0, NULL);
// ... check errors and free resources ...

If you already have a schema ID, you can look up the schema:

schema = serdes_schema_get(serdes, schema_name, schema_id,
                           errstr, sizeof(errstr));
if (!schema)
    FATAL("Failed to acquire schema \"%s\": %s\n", schema_name, errstr);

When consuming messages, you can deserialize an Avro message that was produced with the above code. This looks up the schema in Schema Registry automatically if it is not already cached locally and then deserializes the data:

rd_kafka_message_t *rkmessage = rd_kafka_consume(rkt, ...);
avro_value_t avro;
serdes_schema_t *schema;
serdes_err_t err = serdes_deserialize_avro(serdes, &avro, &schema,
                                           rkmessage->payload, rkmessage->len,
                                           errstr, sizeof(errstr));
if (err) {
    fprintf(stderr, "%% serdes_deserialize_avro failed: %s\n", errstr);
    exit(1);
}

// At this point, avro contains the data and schema contains the Avro schema

This brief overview explains how to integrate librdkafka and libserdes using their C APIs. Both libraries also include C++ APIs.

REST Proxy

The REST Proxy should be used for any language that does not have native clients with serializers compatible with Schema Registry. It is a convenient, language-agnostic method for interacting with Kafka. Almost all standard libraries have good support for HTTP and JSON, so even if a wrapper of the API does not exist for your language it should still be easy to use the API. It also automatically translates between Avro and JSON. This simplifies writing applications in languages that do not have good Avro support.

The REST Proxy API Reference describes the complete API in detail, but we will highlight some key interactions here. First, you will want to produce data to Kafka. To do so, construct a POST request to the /topics/{topicName} resource including the schema for the data (plain integers in this example) and a list of records, optionally including the partition for each record.

POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
  "records": [
    {
      "value": 12
    },
    {
      "value": 24,
      "partition": 1
    }
  ]
}

Note that REST Proxy relies on content type information to properly convert data to Avro, so you must specify the Content-Type header. The response includes the same information you would receive from the Java clients API about the partition and offset of the published data (or errors in case of failure). Additionally, it includes the schema IDs it registered or looked up in Schema Registry.

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 2,
      "offset": 103
    },
    {
      "partition": 1,
      "offset": 104
    }
  ]
}

In future requests, you can use this schema ID instead of the full schema, reducing the overhead for each request. You can also produce data to specific partitions using a similar request format with the /topics/{topicName}/partitions/{partition} endpoint.

To achieve good throughput, it is important to batch your produce requests so that each HTTP request contains many records. Depending on durability and latency requirements, this can be as simple as maintaining a queue of records and only send a request when the queue has reached a certain size or a timeout is triggered.

Consuming data is a bit more complex because consumers are stateful. However, it still only requires two API calls to get started. See the API Reference for complete details and examples.

Finally, the API also provides metadata about the cluster, such as the set brokers, list of topics, and per-partition information. However, most applications will not need to use these endpoints.

Note that it is also possible to use non-Java clients developed by the community and manage registration and schema validation manually using the Schema Registry API. However, as this is error-prone and must be duplicated across every application, we recommend using the REST Proxy unless you need features that are not exposed via the REST Proxy.