アプリケーション開発

テスト用のローカル環境 または 本稼働環境 で Confluent Platform をセットアップして起動すると、データの生成と消費に Apache Kafka® を使用できるようになります。このドキュメントでは、Kafka の操作に使用できるツールについて、また、アプリケーションに最適なツールの選択方法について説明します。実際に開始する前に、 Confluent Platform のコンポーネント、特に Schema Registry について理解しておいてください。

アプリケーションで Confluent Platform を介して Kafka とやり取りするには、いくつかの方法があります。どの方法をアプリケーションで採用するとしても、アプリケーションが Schema Registry と調整を行って、スキーマを管理し、データの互換性を確保することが最も重要な要素となります。幸い、Confluent Platform ではこれが容易になっており、アプリケーション開発者から見てほぼ透過的になっています。

Kafka とやり取りするには、使用する言語用のネイティブクライアントを Schema Registry と互換性のあるシリアライザーと組み合わせて使用する方法と、REST Proxy を使用する方法の 2 つの方法があります。アプリケーションの開発に使用する言語で、サポートされているシリアライザーが存在する場合は、シリアライザーを使用するのが一般的です。それ以外の言語で記述されたアプリケーション用には、REST Proxy を使用します。

ネイティブクライアントとシリアライザー

Java

Java アプリケーションでは、標準の Kafka プロデューサーおよびコンシューマーを使用できますが、デフォルトの ByteArraySerializerio.confluent.kafka.serializers.KafkaAvroSerializer (および対応するデシリアライザー)に置き換えられ、Avro データをプロデューサーに直接渡すこと、およびコンシューマーで逆シリアル化して Avro データを返すことができます。

Maven プロジェクトでは、Avro シリアライザー用および使用するバージョンの Kafka 用に依存関係を dependencies タグに含めます。

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>6.2.4</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>6.2.4-ccs</version>
    <scope>provided</scope>
</dependency>

コードでは、2 つの調整を行うことで、Kafka のプロデューサーおよびコンシューマーを通常どおりに作成できます。

  1. キーと値の汎用的な型は Object とする必要があります。これにより、プリミティブ型、Map、Records を渡すことができます。

  2. キー/値のシリアライザーまたはデシリアライザーと、Schema Registry URL オプションを設定します。たとえば、コード内で直接プロデューサーを構成していた場合は、次のようにします。

    Properties props = new Properties();
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put("schema.registry.url", "http://localhost:8081");
    // Set any other properties
    KafkaProducer producer = new KafkaProducer(props);
    

    これらの値はプロパティファイルを使用して設定することをお勧めします。プロパティファイルをアプリケーションで読み込み、プロデューサーコンストラクターに渡します。デシリアライザーでも設定は同様です。

これで、アプリケーションコードで Avro データを送信できます。送信されたデータは自動的にシリアル化され、スキーマの登録や Schema Registry での検証が行われます。アプリケーションコードは、Confluent Platform なしで Kafka を使用する場合と基本的に同じです。

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
Future<RecordAndMetadata> resultFuture = producer.send(user1);

ここでは、開始にあたって参考になる簡単な概要を説明していますが、より詳細な情報については、Schema Registry のドキュメントの「フォーマット、シリアライザー、逆シリアライザー」セクションを参照してください。

C/C++

C および C++ のアプリケーションでは、librdkafka クライアントを Avro および Schema Registry のサポート用の libserdes とともに使用できます。「Kafka クライアント」セクションでは、任意のフォーマットのデータの生成および消費を行うための、librdkafka の一般的な使用方法について説明します。このセクションでは、libserdes との連携により、Avro をシリアル化し、Schema Registry のスキーマを追跡する方法について説明します。

まず、プロジェクトに libserdes の依存関係を追加します。それには、ヘッダーをインクルードし、ライブラリをリンクします。このプロセスは、ビルドシステムによって異なります。この例は、Makefile の変数に適切な値を設定する方法を示しています。

SERDES_CFLAGS=$(shell pkg-config --cflags rdkafka)
SERDES_LIBS=$(shell pkg-config --libs rdkafka)

libserdes のすべての処理で、共有の serdes_t オブジェクトが使用されます。アプリケーションでは最初に構成用の serdes_conf_t を作成してから、serdes_t オブジェクトをインスタンス化する必要があります。

#include <libserdes/serdes-avro.h>

char errstr[512];
serdes_conf_t *sconf = serdes_conf_new(NULL, 0,
                                       "schema.registry.url", "http://localhost:8081",
                                       NULL);

serdes_t *serdes = serdes_new(sconf, errstr, sizeof(errstr));
if (!serdes) {
    fprintf(stderr, "%% Failed to create serdes handle: %s\n", errstr);
    exit(1);
}

このコンテキストを使用して、Schema Registry とやり取りできます。たとえば、新しいスキーマを登録し、そのスキーマを使用してデータをシリアル化できます。

const char *schema_name = "myschema";
const char *schema_def = "{\"type\": \"string\"}";
int schema_id = -1;
serdes_schema_t *schema = serdes_schema_add(serdes, schema_name, schema_id,
                                            schema_def, -1,
                                            errstr, sizeof(errstr));
if (!schema)
    FATAL("Failed to register schema: %s\n", errstr);

avro_value_t val;
// ... fill in the Avro value ...

void *ser_buf = NULL;
size_t ser_buf_size;
if (serdes_schema_serialize_avro(schema, &val, &ser_buf, &ser_buf_size,
                                 errstr, sizeof(errstr)))
    FATAL(stderr, "%% serialize_avro() failed: %s\n", errstr);

rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_FREE,
                 ser_buf, ser_buf_size, NULL, 0, NULL);
// ... check errors and free resources ...

既にスキーマ ID がある場合は、スキーマの検索ができます。

schema = serdes_schema_get(serdes, schema_name, schema_id,
                           errstr, sizeof(errstr));
if (!schema)
    FATAL("Failed to acquire schema \"%s\": %s\n", schema_name, errstr);

メッセージを消費する際には、上記のコードで生成された Avro メッセージを逆シリアル化できます。これにより、Schema Registry 内のスキーマがローカルにキャッシュされていない場合は、自動的に検索され、データが逆シリアル化されます。

rd_kafka_message_t *rkmessage = rd_kafka_consume(rkt, ...);
avro_value_t avro;
serdes_schema_t *schema;
serdes_err_t err = serdes_deserialize_avro(serdes, &avro, &schema,
                                           rkmessage->payload, rkmessage->len,
                                           errstr, sizeof(errstr));
if (err) {
    fprintf(stderr, "%% serdes_deserialize_avro failed: %s\n", errstr);
    exit(1);
}

// At this point, avro contains the data and schema contains the Avro schema

この簡単な概要では、C の API を使用して、librdkafka および libserdes と連携する方法を説明しています。いずれのライブラリでも C++ API をインクルードします。

REST Proxy

REST Proxy は、Schema Registry と互換性のあるシリアライザーを持つネイティブクライアントのない言語で使用する必要があります。Kafka とやり取りするための、言語に依存しない便利な方法です。ほぼすべての標準ライブラリで HTTP および JSON が適切にサポートされているため、使用する言語用の API のラッパーが存在しない場合でも、簡単に API を使用できます。また、Avro と JSON の間の変換が自動的に行われます。これにより、Avro のサポートが十分でない言語でのアプリケーションの記述が容易になります。

API の詳細については、「REST Proxy API リファレンス」に包括的に記載されていますが、ここでは、いくつかの重要なやり取りについて説明します。まず、Kafka へのデータを生成する必要があります。それには、/topics/{topicName} リソースへの POST リクエストを構築します。リクエストには、データ用のスキーマ(この例ではプレーンな整数)およびレコードのリストを含めます。また、任意で各レコードのパーティションを含めます。

POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
  "records": [
    {
      "value": 12
    },
    {
      "value": 24,
      "partition": 1
    }
  ]
}

REST Proxy は、データを適切に Avro に変換するためにコンテンツタイプ情報を必要とするため、Content-Type ヘッダーを指定する必要があります。応答には、Java クライアント API から受け取るのと同様の、パブリッシュされたデータのパーティションとオフセット(または失敗の場合はエラー)に関する情報が含まれます。また、Schema Registry で登録または検索したスキーマ ID も含まれます。

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 2,
      "offset": 103
    },
    {
      "partition": 1,
      "offset": 104
    }
  ]
}

以降のリクエストでは、完全なスキーマではなく、このスキーマ ID を使用できるので、各リクエストのオーバーヘッドを減らすことができます。また、同様のリクエストフォーマットを使用し、/topics/{topicName}/partitions/{partition} エンドポイントを指定して、特定のパーティションへのデータを生成できます。

快適なスループットを実現するには、各 HTTP リクエストに複数のレコードを含めて、生成リクエストを一括処理することが重要です。持続性とレイテンシの要件によっては、レコードのキューを保持し、キューが一定のサイズに達するかタイムアウトが生じた場合にのみリクエストを送信するシンプルな実装にすることができます。

コンシューマーはステートフルであるため、データの消費はもう少し複雑になります。ただし、最初に必要なのは 2 つの API 呼び出しのみであることは変わりません。詳細と例については、「API リファレンス」を参照してください。

最後に、API では、設定されたブローカー、トピックのリスト、パーティションごとの情報など、クラスターに関するメタデータも提供されます。ただし、ほとんどのアプリケーションでは、これらのエンドポイントを使用する必要はありません。

コミュニティで開発された Java 以外のクライアント を使用し、 Schema Registry API を使用して、登録とスキーマ検証を手動で管理することもできます。ただし、この方法はエラーが発生しやすく、アプリケーションごとに複製する必要があるため、REST Proxy で公開されていない機能が必要な場合を除いて、REST Proxy を使用することをお勧めします。