Kafka Connect と Schema Registry の使用

Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.

Kafka Connect と Schema Registry は連携して、コネクターからスキーマ情報を取り込みます。Kafka Connect で内部的に使用される型のデータを、Avro や Protobuf、JSON Schema として表されるデータ型に変換するメカニズムは、 Kafka Connect コンバーター が提供します。ソースコネクターによって生成されたスキーマは、AvroConverterProtobufConverterJsonSchemaConverter によって自動的に登録されます。シンクコネクターは、各メッセージのデータに加えてスキーマ情報を受け取ります。これによってシンクコネクターは、データの構造を把握し、データベーステーブルの構造を保持したり、検索インデックスを作成したりといった追加的な機能を提供することができます。スキーマデータは、それぞれのコンバーターによって、Kafka Connect で内部的に使用されるデータ型に変更されます。

For additional information about converters and how they work, see Configuring Key and Value Converters.

コンバーターのプロパティの例

To use Kafka Connect with Schema Registry, you must specify the key.converter or value.converter properties in the connector or in the Connect worker configuration. The converters need an additional configuration for the Schema Registry URL, which is specified by providing the URL converter prefix as shown in the following property examples.

Avro

次に示したのは、Avro コンバーターのプロパティの例です。

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

Avro コンバーター(io.confluent.connect.avro.AvroConverter )とともに、以下の構成プロパティを追加できます。こうした Avro 固有のプロパティは、Avro コンバーターのプロパティが記述されているワーカーまたはコネクターの構成に追加されます。ワーカーまたはコネクターの構成にこれらのプロパティを追加する際は、プレフィックスとして key.converter.value.converter. を指定する必要があるので注意してください。以下に例を示します。

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true

セキュアな環境で Avro を使用する場合は、value.converter.schema.registry.ssl. プロパティを追加する必要があります。追加するプロパティの例を次に示します。

key.converter.schema.registry.ssl.truststore.location=<location>
key.converter.schema.registry.ssl.truststore.password=<trustore-password>
key.converter.schema.registry.ssl.keystore.location=<keystore-location>
key.converter.schema.registry.ssl.keystore.password=<keystore-password>
key.converter.schema.registry.ssl.key.password=<key-password>

value.converter.schema.registry.ssl.truststore.location=<location>
value.converter.schema.registry.ssl.truststore.password=<trustore-password>
value.converter.schema.registry.ssl.keystore.location=<keystore-location>
value.converter.schema.registry.ssl.keystore.password=<keystore-password>
value.converter.schema.registry.ssl.key.password=<key-password>

以下、前出の例に示した Avro 固有のプロパティの定義を列挙します。Connect Schema Registry の構成オプションの網羅的なリストについては、「構成オプション」を参照してください。

schema.cache.config

Avro コンバーターで使用されるスキーマキャッシュのサイズです。

  • 型: int
  • デフォルト: 1000
  • 重要度: 低
enhanced.avro.schema.support

強化された Avro スキーマを Avro コンバーターで使用できるようにします。true に設定すると、Avro スキーマから Connect スキーマへの変換時に、Avro スキーマのパッケージ情報と Enum が維持されます。この情報は、Connect スキーマから Avro スキーマへの変換時に再度追加されます。

  • 型: ブール値
  • デフォルト: false
  • 重要度: 低
connect.meta.data

Connect コンバーターのメタデータを出力スキーマに追加できるようにします。

  • 型: ブール値
  • デフォルト: true
  • 重要度: 低

connect.meta.data プロパティを設定すると、Connect スキーマから Avro スキーマへの変換時に、次に示す Connect スキーマのメタデータが維持されます。Avro スキーマから Connect スキーマへの変換時には、次のメタデータが再度追加されます。

  • doc
  • version
  • parameters
  • default value
  • name
  • type

Protobuf

次に示したのは、Protobuf コンバーターのプロパティの例です。

key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081

JSON スキーマ

次に示したのは、JSON Schema コンバーターのプロパティの例です。

key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081

互いに独立したキーコンバーターと値コンバーターの使用

キーコンバーターと値コンバーターは、相互に依存することなく使用できます。たとえば、キーには StringConverter を使用する一方で、値には、Schema Registry で用いられているコンバーターを使用することができます。キーと値のプロパティが独立している例を次に示します。

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

コンバータープロパティの場所と継承

Confluent Platform first looks for converter configuration properties in the connector configuration. If none are found there, properties in the Connect worker configuration are used. You have the following three options for how to set these properties. Each one affects how the properties are inherited among the worker and connectors.

  • すべてのコンバータープロパティ(Schema Registry の URL プレフィックスを含む)を各コネクターの構成に指定します。
  • すべてのコンバータープロパティをワーカーの構成にのみ指定します。このケースでは、ワーカーのコンバータープロパティがすべてのコネクターに継承されます。
  • すべてのコンバータープロパティをワーカーの構成に指定し、コネクターの構成にコンバーターのオーバーライドを追加します。

重要

  • コンバーターの値および関連する Schema Registry の URL がワーカーとコネクターの両方に定義されている場合、ワーカーの設定がコネクターの設定によって 上書き されます。
  • コネクターまたはワーカーにコンバーターを(オーバーライドとして、または単独の設定として)指定する場合は、コンバーターと Schema Registry URL の両方を 必ず含めて ください。そのようにしないと、コネクターまたはワーカーでエラーが発生します。
  • ワーカーには定義されていないコンバーターをコネクターに指定する場合、コネクターの構成には、すべてのコンバータープロパティ (キーコンバーター、値コンバーター、Schema Registry のホストとポート)を指定する必要があります。

サンプルシナリオ

以下に示したのは、このサンプルシナリオで使用されているワーカーの構成プロパティです。

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://host-1:port
value.converter=org.apache.kafka.connect.storage.StringConverter

前述したワーカーのプロパティを使用して以下の 3 つのコネクターを起動します。コネクターの構成プロパティは次のとおりです。

  • connector-1 の構成:

    name=connector-1
    <no converter configuration properties used>
    
  • connector-2 の構成:

    name=connector-2
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://host-2:port
    
  • connector-3 の構成:

    name=connector-3
    key.converter=io.confluent.connect.avro.AvroConverter
    

デプロイの結果は次のとおりです。

  • connector-1 では、ワーカーの構成プロパティが使用されます。ワーカーの構成プロパティには、Avro コンバーター(io.confluent.connect.avro.AvroConverter)と Schema Registry の host-1:port が指定されています。
  • connector-2 は、Avro コンバーター(io.confluent.connect.avro.AvroConverter)と Schema Registry の host-2:port を使います。
  • connector-3 はコネクターの構成を使おうと試みますが、Schema Registry の URL の構成プロパティが見つからないためにエラーが発生します。Avro、Protobuf、JSON Schema では、Schema Registry の URL の構成プロパティが必須です。
  • すべてのコネクター が、ワーカーの value.converter プロパティである org.apache.kafka.connect.storage.StringConverter を使用します。

ちなみに

コンバーターの詳細については、「コンバーターとシリアル化について」を参照してください。

構成オプション

schema.registry.url

スキーマの登録または検索に使用できる Schema Registry インスタンスの URL のコンマ区切りリストです。

  • 型: list
  • デフォルト: ""
  • 重要度: 高
auto.register.schemas

Schema Registry へのスキーマの登録をシリアライザーに試行させるかどうかを指定します。

  • 型: ブール値
  • デフォルト: true
  • 重要度: 中
use.latest.version

auto.register.schemasfalse に設定されている場合にのみ適用されます。auto.register.schemasfalse に設定されていて、なおかつ use.latest.versiontrue に設定されている場合、Schema Registry は、クライアントに渡されたオブジェクトのスキーマを導出してシリアル化するのではなく、サブジェクトにおける最新バージョンのスキーマをシリアル化に使用します。

  • 型: ブール値
  • デフォルト: false
  • 重要度: 中

注釈

スキーマ参照を使用して、AvroJSON スキーマ、または Protobuf同じトピックに複数のイベントタイプ を組み合わせる方法も確認してください。

latest.compatibility.strict

use.latest.versiontrue に設定されている場合にのみ適用されます。

latest.compatibility.stricttrue (デフォルト)の場合、シリアル化中に use.latest.version=true が指定されていると、最新のサブジェクトバージョンが、シリアル化されるオブジェクトのスキーマとの後方互換性を持っているかどうかを検証するチェックが実行されます。チェックに失敗すると、エラーが発生します。チェックに成功すると、シリアル化が実行されます。

latest.compatibility.strictfalse の場合、最新のサブジェクトバージョンがシリアル化に使用され、互換性チェックは行われません。この場合、シリアル化に失敗することがあります。Kafka Connect コンバータースキーマ参照 の実装時には、たとえば、互換性要件を緩和する(latest.compatibility.strictfalse に設定する)ことが役に立つ可能性があります。

  • 型: ブール値
  • デフォルト: true
  • 重要度: 中

注釈

スキーマ進化と互換性」も参照してください。

max.schemas.per.subject

ローカルにキャッシュまたは作成できるスキーマの最大数。

  • 型: int
  • デフォルト: 1000
  • 重要度: 低
key.subject.name.strategy

キースキーマを Schema Registry に登録する際に使用されるサブジェクト名の構築方法を決定します。詳細については、「Schema Registry サブジェクト命名方法」を参照してください。

io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy の任意の実装を指定できます。デフォルトでは、<topic>-key がサブジェクトとして使用されます。io.confluent.kafka.serializers.subject.SubjectNameStrategy の実装を指定することは 4.1.3 時点で非推奨となっており、使用した場合、パフォーマンスが低下することがあります。

  • 型: class
  • デフォルト: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • 重要度: 中
value.subject.name.strategy

値スキーマを Schema Registry に登録する際に使用されるサブジェクト名の構築方法を決定します。詳細については、「Schema Registry サブジェクト命名方法」を参照してください。

io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy の任意の実装を指定できます。デフォルトでは、<topic>-value がサブジェクトとして使用されます。io.confluent.kafka.serializers.subject.SubjectNameStrategy の実装を指定することは 4.1.3 時点で非推奨となっており、使用した場合、パフォーマンスが低下することがあります。

  • 型: class
  • デフォルト: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • 重要度: 中
basic.auth.credentials.source

基本認証ヘッダーの認証情報の選択方法を指定します。サポートされる値は URL、USER_INFO、SASL_INHERIT です。

  • 型: string
  • デフォルト: "URL"
  • 重要度: 中
basic.auth.user.info

基本認証のユーザー情報を {username}:{password} という形式で指定します。schema.registry.basic.auth.user.info は、この構成のエイリアスとしては非推奨となっています。

  • 型: password
  • デフォルト: ""
  • 重要度: 中

以降の Schema Registry 専用プロパティはクライアントで構成できます。Confluent Platform バージョン 5.4.0(以降)で使用できます。詳細については、「HTTPS 用の追加構成」でクライアントの構成に関する情報を参照してください。

schema.registry.ssl.truststore.location

トラストストアファイルの場所。(例: schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks)。

  • 型: string
  • デフォルト: ""
  • 重要度: 中
schema.registry.ssl.truststore.password

トラストストアファイルのパスワード。パスワードが設定されていなくてもトラストストアにアクセスすることはできますが、整合性チェックが無効になります。

  • 型: password
  • デフォルト: ""
  • 重要度: 中
schema.registry.ssl.keystore.location

キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。たとえば、「schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks」のように指定します。

  • 型: string
  • デフォルト: ""
  • 重要度: 中
schema.registry.ssl.keystore.password

キーストアファイルのストアパスワード。クライアントでは省略可能です。 ssl.keystore.location を構成した場合にのみ必要となります。

  • 型: password
  • デフォルト: ""
  • 重要度: 中
schema.registry.ssl.key.password

キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。

  • 型: password
  • デフォルト: ""
  • 重要度: 中