Kafka Connect と Schema Registry の使用¶
Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.
Kafka Connect と Schema Registry は連携して、コネクターからスキーマ情報を取り込みます。Kafka Connect で内部的に使用される型のデータを、Avro や Protobuf、JSON Schema として表されるデータ型に変換するメカニズムは、 Kafka Connect コンバーター が提供します。ソースコネクターによって生成されたスキーマは、AvroConverter
、ProtobufConverter
、JsonSchemaConverter
によって自動的に登録されます。シンクコネクターは、各メッセージのデータに加えてスキーマ情報を受け取ります。これによってシンクコネクターは、データの構造を把握し、データベーステーブルの構造を保持したり、検索インデックスを作成したりといった追加的な機能を提供することができます。スキーマデータは、それぞれのコンバーターによって、Kafka Connect で内部的に使用されるデータ型に変更されます。
For additional information about converters and how they work, see Configuring Key and Value Converters.
コンバーターのプロパティの例¶
To use Kafka Connect with Schema Registry, you must specify the key.converter
or
value.converter
properties in the connector or in the Connect worker configuration. The converters need an additional
configuration for the Schema Registry URL, which is specified by providing the URL converter prefix as shown in the following property examples.
Avro¶
次に示したのは、Avro コンバーターのプロパティの例です。
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Avro コンバーター(io.confluent.connect.avro.AvroConverter
)とともに、以下の構成プロパティを追加できます。こうした Avro 固有のプロパティは、Avro コンバーターのプロパティが記述されているワーカーまたはコネクターの構成に追加されます。ワーカーまたはコネクターの構成にこれらのプロパティを追加する際は、プレフィックスとして key.converter.
と value.converter.
を指定する必要があるので注意してください。以下に例を示します。
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
セキュアな環境で Avro を使用する場合は、value.converter.schema.registry.ssl.
プロパティを追加する必要があります。追加するプロパティの例を次に示します。
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.ssl.truststore=<location>
value.converter.schema.registry.ssl.truststore=<trustore-password>
value.converter.schema.registry.ssl.keystore=<keystore-location>
value.converter.schema.registry.ssl.keystore=<keystore-password>
value.converter.schema.registry.ssl.key.password=<key-password>
value.converter.enhanced.avro.schema.support=true
以下、前出の例に示した Avro 固有のプロパティの定義を列挙します。Connect Schema Registry の構成オプションの網羅的なリストについては、「構成オプション」を参照してください。
schema.cache.config
Avro コンバーターで使用されるスキーマキャッシュのサイズです。
- 型: int
- デフォルト: 1000
- 重要度: 低
enhanced.avro.schema.support
強化された Avro スキーマを Avro コンバーターで使用できるようにします。
true
に設定すると、Avro スキーマから Connect スキーマへの変換時に、Avro スキーマのパッケージ情報と Enum が維持されます。この情報は、Connect スキーマから Avro スキーマへの変換時に再度追加されます。- 型: ブール値
- デフォルト: false
- 重要度: 低
connect.meta.data
Connect コンバーターのメタデータを出力スキーマに追加できるようにします。
- 型: ブール値
- デフォルト: true
- 重要度: 低
connect.meta.data
プロパティを設定すると、Connect スキーマから Avro スキーマへの変換時に、次に示す Connect スキーマのメタデータが維持されます。Avro スキーマから Connect スキーマへの変換時には、次のメタデータが再度追加されます。- doc
- version
- parameters
- default value
- name
- type
Protobuf¶
次に示したのは、Protobuf コンバーターのプロパティの例です。
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081
JSON スキーマ¶
次に示したのは、JSON Schema コンバーターのプロパティの例です。
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
互いに独立したキーコンバーターと値コンバーターの使用¶
キーコンバーターと値コンバーターは、相互に依存することなく使用できます。たとえば、キーには StringConverter
を使用する一方で、値には、Schema Registry で用いられているコンバーターを使用することができます。キーと値のプロパティが独立している例を次に示します。
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
コンバータープロパティの場所と継承¶
Confluent Platform first looks for converter configuration properties in the connector configuration. If none are found there, properties in the Connect worker configuration are used. You have the following three options for how to set these properties. Each one affects how the properties are inherited among the worker and connectors.
- すべてのコンバータープロパティ(Schema Registry の URL プレフィックスを含む)を各コネクターの構成に指定します。
- すべてのコンバータープロパティをワーカーの構成にのみ指定します。このケースでは、ワーカーのコンバータープロパティがすべてのコネクターに継承されます。
- すべてのコンバータープロパティをワーカーの構成に指定し、コネクターの構成にコンバーターのオーバーライドを追加します。
重要
- コンバーターの値および関連する Schema Registry の URL がワーカーとコネクターの両方に定義されている場合、ワーカーの設定がコネクターの設定によって 上書き されます。
- コネクターまたはワーカーにコンバーターを(オーバーライドとして、または単独の設定として)指定する場合は、コンバーターと Schema Registry URL の両方を 必ず含めて ください。そのようにしないと、コネクターまたはワーカーでエラーが発生します。
- ワーカーには定義されていないコンバーターをコネクターに指定する場合、コネクターの構成には、すべてのコンバータープロパティ (キーコンバーター、値コンバーター、Schema Registry のホストとポート)を指定する必要があります。
サンプルシナリオ¶
以下に示したのは、このサンプルシナリオで使用されているワーカーの構成プロパティです。
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://host-1:port
value.converter=org.apache.kafka.connect.storage.StringConverter
前述したワーカーのプロパティを使用して以下の 3 つのコネクターを起動します。コネクターの構成プロパティは次のとおりです。
connector-1 の構成:
name=connector-1 <no converter configuration properties used>
connector-2 の構成:
name=connector-2 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://host-2:port
connector-3 の構成:
name=connector-3 key.converter=io.confluent.connect.avro.AvroConverter
デプロイの結果は次のとおりです。
- connector-1 では、ワーカーの構成プロパティが使用されます。ワーカーの構成プロパティには、Avro コンバーター(
io.confluent.connect.avro.AvroConverter
)と Schema Registry のhost:port
が指定されています。 - connector-2 は、Avro コンバーター(
io.confluent.connect.avro.AvroConverter
)と Schema Registry のhost-2:port
を使います。 - connector-3 はコネクターの構成を使おうと試みますが、Schema Registry の URL の構成プロパティが見つからないためにエラーが発生します。Avro、Protobuf、JSON Schema では、Schema Registry の URL の構成プロパティが必須です。
- すべてのコネクター が、ワーカーの
value.converter
プロパティであるorg.apache.kafka.connect.storage.StringConverter
を使用します。
ちなみに
コンバーターの詳細については、「コンバーターとシリアル化について」を参照してください。
構成オプション¶
schema.registry.url
スキーマの登録または検索に使用できる Schema Registry インスタンスの URL のコンマ区切りリストです。
- 型: list
- デフォルト: ""
- 重要度: 高
auto.register.schemas
Schema Registry へのスキーマの登録をシリアライザーに試行させるかどうかを指定します。
- 型: ブール値
- デフォルト: true
- 重要度: 中
use.latest.version
auto.register.schemas
がfalse
に設定されている場合にのみ適用されます。auto.register.schemas
がfalse
に設定されていて、なおかつuse.latest.version
がtrue
に設定されている場合、Schema Registry は、クライアントに渡されたオブジェクトのスキーマを導出してシリアル化するのではなく、サブジェクトにおける最新バージョンのスキーマをシリアル化に使用します。- 型: ブール値
- デフォルト: false
- 重要度: 中
注釈
スキーマ参照を使用して、Avro、JSON スキーマ、または Protobuf で 同じトピックに複数のイベントタイプ を組み合わせる方法も確認してください。
latest.compatibility.strict
use.latest.version
がtrue
に設定されている場合にのみ適用されます。latest.compatibility.strict
がtrue
(デフォルト)の場合、シリアル化中にuse.latest.version=true
が指定されていると、最新のサブジェクトバージョンが、シリアル化されるオブジェクトのスキーマとの後方互換性を持っているかどうかを検証するチェックが実行されます。チェックに失敗すると、エラーが発生します。チェックに成功すると、シリアル化が実行されます。latest.compatibility.strict
がfalse
の場合、最新のサブジェクトバージョンがシリアル化に使用され、互換性チェックは行われません。この場合、シリアル化に失敗することがあります。Kafka Connect コンバーター や スキーマ参照 の実装時には、たとえば、互換性要件を緩和する(latest.compatibility.strict
をfalse
に設定する)ことが役に立つ可能性があります。- 型: ブール値
- デフォルト: true
- 重要度: 中
注釈
「スキーマ進化と互換性」も参照してください。
max.schemas.per.subject
ローカルにキャッシュまたは作成できるスキーマの最大数。
- 型: int
- デフォルト: 1000
- 重要度: 低
key.subject.name.strategy
キースキーマを Schema Registry に登録する際に使用されるサブジェクト名の構築方法を決定します。詳細については、「Schema Registry サブジェクト命名方法」を参照してください。
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
の任意の実装を指定できます。デフォルトでは、<topic>-key
がサブジェクトとして使用されます。io.confluent.kafka.serializers.subject.SubjectNameStrategy
の実装を指定することは4.1.3
時点で非推奨となっており、使用した場合、パフォーマンスが低下することがあります。- 型: class
- デフォルト: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- 重要度: 中
value.subject.name.strategy
値スキーマを Schema Registry に登録する際に使用されるサブジェクト名の構築方法を決定します。詳細については、「Schema Registry サブジェクト命名方法」を参照してください。
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
の任意の実装を指定できます。デフォルトでは、<topic>-value
がサブジェクトとして使用されます。io.confluent.kafka.serializers.subject.SubjectNameStrategy
の実装を指定することは4.1.3
時点で非推奨となっており、使用した場合、パフォーマンスが低下することがあります。- 型: class
- デフォルト: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- 重要度: 中
basic.auth.credentials.source
基本認証ヘッダーの認証情報の選択方法を指定します。サポートされる値は URL、USER_INFO、SASL_INHERIT です。
- 型: string
- デフォルト: "URL"
- 重要度: 中
basic.auth.user.info
Basic Auth のユーザー情報を {username}:{password} という形式で指定します。schema.registry.basic.auth.user.info は、この構成のエイリアスとしては非推奨となっています。
- 型: password
- デフォルト: ""
- 重要度: 中
以降の Schema Registry 専用プロパティはクライアントで構成できます。Confluent Platform バージョン 5.4.0(以降)で使用できます。詳細については、「HTTPS 用の追加構成」でクライアントの構成に関する情報を参照してください。
schema.registry.ssl.truststore.location
トラストストアファイルの場所。(例:
schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
)。- 型: string
- デフォルト: ""
- 重要度: 中
schema.registry.ssl.truststore.password
トラストストアファイルのパスワード。パスワードが設定されていなくてもトラストストアにアクセスすることはできますが、整合性チェックが無効になります。
- 型: password
- デフォルト: ""
- 重要度: 中
schema.registry.ssl.keystore.location
キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。たとえば、「
schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks
」のように指定します。- 型: string
- デフォルト: ""
- 重要度: 中
schema.registry.ssl.keystore.password
キーストアファイルのストアパスワード。クライアントでは省略可能です。
ssl.keystore.location
を構成した場合にのみ必要となります。- 型: password
- デフォルト: ""
- 重要度: 中
schema.registry.ssl.key.password
キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。
- 型: password
- デフォルト: ""
- 重要度: 中