ワーカーの構成プロパティ¶
Connect ワーカーに関連する構成プロパティを以下に記載します。最初のセクションには共通のプロパティが記載されています。これらは、スタンドアロンモードと分散モードのどちらでも設定できます。これらのプロパティでは、通信する Apache Kafka® クラスターや、操作の対象とするフォーマットデータなど、基本的な機能を制御できます。その後の 2 つのセクションには、スタンドアロンモード、分散モードのそれぞれに固有のプロパティが記載されています。
その他の構成プロパティについては、以下のセクションを参照してください。
- Connect および スキーマレジストリ: 「Kafka Connect と スキーマレジストリ の使用」を参照。
- プロデューサーの構成プロパティ : 「Kafka プロデューサー」を参照。
- コンシューマーの構成プロパティ : 「Kafka コンシューマー」を参照。
- SSL 暗号化のプロパティ : 「SSL による暗号化」を参照。
- すべての Kafka 構成プロパティ : 「Confluent Platform 構成リファレンス」を参照。
Connect ワーカーの機能の詳細については、「ワーカーの構成と実行」を参照してください。
共通のワーカー構成¶
bootstrap.servers
Kafka クラスターとの初期接続を確立するために使用するホストとポートのペアのリスト。ここでブートストラップ用にどのサーバーが指定されているかにかかわらず、クライアントはすべてのサーバーを使用します。このリストは、すべてのサーバーを検出するために使用する最初のホストにのみ影響します。このリストは、
host1:port1,host2:port2,...
という形式にする必要があります。これらのサーバーは、初期接続ですべてのクラスターメンバーシップを検出するためにのみ使用されます。これは動的に変わる可能性があるので、このリストにすべてのサーバーセットを含める必要はありません(ただし、サーバーの障害に備えて、複数指定しておくこともできます)。- 型: list
- デフォルト : [localhost:9092]
- 重要度: 高
key.converter
キーの Connect データ用のコンバーターのクラス。このプロパティでは、ソースコネクターで Kafka に書き込まれるデータ、シンクコネクターで Kafka から読み取られるデータのフォーマットを指定します。一般的なフォーマットには、Avro や JSON があります。
- 型: class
- デフォルト:
- 重要度: 高
value.converter
値の Connect データ用のコンバーターのクラス。このプロパティでは、ソースコネクターで Kafka に書き込まれるデータ、シンクコネクターで Kafka から読み取られるデータのフォーマットを指定します。一般的なフォーマットには、Avro や JSON があります。
- 型: class
- デフォルト:
- 重要度: 高
internal.key.converter
Converter
インターフェイスが実装されている、内部キーの Connect データ用のコンバーターのクラス。オフセットや構成などのデータの変換に使用されます。- 型: class
- デフォルト:
- 重要度: 低
internal.value.converter
Converter
インターフェイスが実装されている、オフセット値の Connect データ用のコンバーターのクラス。オフセットや構成などのデータの変換に使用されます。- 型: class
- デフォルト:
- 重要度: 低
offset.flush.interval.ms
タスクのオフセットをコミットする際の試行間隔。
- 型: long
- デフォルト: 60000
- 重要度: 低
offset.flush.timeout.ms
レコードをフラッシュしてパーティションのオフセットデータをオフセットストレージにコミットするまでの最長の待機時間(単位: ミリ秒)。この時間が経過すると、処理がキャンセルされ、それ以降の試行でコミットされるオフセットデータが元に戻ります。
- 型: long
- デフォルト: 5000
- 重要度: 低
plugin.path
Kafka Connect プラグイン が保存されているディレクトリへのパスのコンマ区切りのリスト。
- 型: string
- デフォルト:
- 重要度: 低
rest.advertised.host.name
このプロパティが設定されている場合、これが、接続先として他のワーカーに公開されるホスト名になります。
- 型: string
- 重要度: 低
rest.advertised.listener
ワーカー間の通信に使用されるリスナーを構成します。指定可能な値は、
http
またはhttps
のいずれかです。リスナーのプロパティが定義されていない場合、またはプロパティに HTTP リスナーが含まれている場合、このフィールドのデフォルト値はhttp
となります。リスナーのプロパティが定義されていて、HTTPS リスナーのみが含まれている場合、デフォルト値はhttps
となります。- 型: string
- 重要度: 低
rest.advertised.port
このプロパティが設定されている場合、これが、接続先として他のワーカーに公開されるポートになります。
- 型: int
- 重要度: 低
rest.host.name
REST API 用のホスト名。このプロパティが設定されている場合、このインターフェイスにのみバインドされます。
- 型: string
- 重要度: 低
rest.port
REST API がリッスンするポート。
- 型: int
- デフォルト: 8083
- 重要度: 低
response.http.headers.config
Confluent Platform コンポーネントの HTTP 応答で返す HTTP ヘッダーを選択する目的で使用します。複数の値は、
[action][header name] :[header value]
形式のコンマ区切り文字列で指定します。ここで[action]
は、set
、add
、setDate
、addDate
のいずれかになります。ヘッダーの値にコンマが含まれている場合は、その値を引用符で囲む必要があります。以下に例を示します。response.http.headers.config="add Cache-Control: no-cache, no-store, must-revalidate", add X-XSS-Protection: 1; mode=block, add Strict-Transport-Security: max-age=31536000; includeSubDomains, add X-Content-Type-Options: nosniff
- 型: string
- デフォルト: ""
- 重要度: 低
task.shutdown.graceful.timeout.ms
タスクが正常にシャットダウンするまでの待機時間。タスクごとの時間ではなく、合計時間です。すべてのタスクでシャットダウンがトリガーされると、順番に待機となります。
- 型: long
- デフォルト: 5000
- 重要度: 低
スタンドアロンワーカーの構成¶
共通のワーカーの構成オプションに加え、スタンドアロンモードでは以下のオプションを使用できます。
offset.storage.file.filename
コネクターのオフセットを保存するファイル。オフセットをディスク上に保存することにより、単一ノード上のスタンドアロンプロセスを一旦停止して開始する場合に、前回終了時の状態から再開できます。
- 型: string
- デフォルト: ""
- 重要度: 高
分散ワーカーの構成¶
共通のワーカーの構成オプションに加え、分散モードでは以下のオプションを使用できます。Connect ワーカーの機能の詳細については、「ワーカーの構成と実行」を参照してください。
group.id
このワーカーが属する Connect クラスターグループを示す一意の文字列。
重要
本稼働環境では、この構成を明示的に設定する必要があります。Confluent CLI を使用する場合、この構成プロパティにはデフォルトで
connect-cluster
が設定されます。同じgroup.id
を持つワーカーはすべて同じ Connect クラスターに属します。たとえば、Worker-a でgroup.id=connect-cluster-a
と設定されていて、Worker-b のgroup.id
も同じである場合、Worker-a と Worker-b は、connect-cluster-a
というクラスターを構成します。注釈
group.id
構成プロパティは、シンクコネクターには適用されません。シンクコネクターでは、group.id
はプログラムによってconnect-
というプレフィックスとコネクター名を使用して作成されます。- 型: string
- デフォルト: ""
- 重要度: 高
config.storage.topic
コネクターおよびタスクの構成データが保存されるトピックの名前。これは、同じ
group.id
を持つすべてのワーカーで同じになっている必要があります。Kafka Connect では、起動時に、単一のパーティションで、データの損失を回避するための圧縮後のクリーンアップポリシーを使用して、このトピックの自動的な生成を試行します。ただし、トピックが既に存在する場合はそのトピックをそのまま使用します。このトピックを手動で作成する場合は 必ず、単一のパーティションでレプリケーション係数を高く(3x 以上)設定して、圧縮トピックとして作成します。- 型: string
- デフォルト: ""
- 重要度: 高
config.storage.replication.factor
Kafka Connect でコネクターおよびタスクの構成データを保存するために使用されるトピックを作成する際のレプリケーション係数。本稼働システムでは 必ず
3
以上に設定する必要があります。ただし、クラスター内の Kafka ブローカーの数よりも大きな数値は設定できません。Kafka ブローカーのデフォルトのレプリケーション係数を使用するには、-1
を入力します。- 型: short
- デフォルト: 3
- 重要度: 低
offset.storage.topic
コネクターおよびタスクの構成のオフセットが保存されるトピックの名前。これは、同じ
group.id
を持つすべてのワーカーで同じになっている必要があります。Kafka Connect では、起動時に、複数のパーティションで、データの損失を回避するための圧縮後のクリーンアップポリシーを使用して、このトピックの自動的な生成を試行します。ただし、トピックが既に存在する場合はそのトピックをそのまま使用します。このトピックを手動で作成する場合は 必ず、レプリケーション係数を高くし(3x 以上)、圧縮トピックとして作成します。また、パーティション数を大きくして(Kafka の組み込みの__consumer_offsets
トピックのように、25 または 50 など)、大規模な Kafka Connect クラスターに対応できるようにします。- 型: string
- デフォルト: ""
- 重要度: 高
offset.storage.replication.factor
Connect でコネクターのオフセットを保存するために使用されるトピックを作成する際のレプリケーション係数。本稼働システムでは 必ず
3
以上に設定する必要があります。ただし、クラスター内の Kafka ブローカーの数よりも大きな数値は設定できません。Kafka ブローカーのデフォルトのレプリケーション係数を使用するには、-1
を入力します。- 型: short
- デフォルト: 3
- 重要度: 低
offset.storage.partitions
Connect でコネクターのオフセットを保存するために使用されるトピックを作成する際のパーティション数。大規模な Kafka Connect クラスターに対応するには、大きな値にする必要があります(Kafka の組み込みの
__consumer_offsets
トピックのように、25
または50
など)。Kafka ブローカーで構成されているデフォルトのパーティション数を使用するには、-1
を入力します。- 型: int
- デフォルト: 25
- 重要度: 低
status.storage.topic
コネクターおよびタスクの構成のステータスの更新が保存されるトピックの名前。これは、同じ
group.id
を持つすべてのワーカーで同じになっている必要があります。Kafka Connect では、起動時に、複数のパーティションで、データの損失を回避するための圧縮後のクリーンアップポリシーを使用して、このトピックの自動的な生成を試行します。ただし、トピックが既に存在する場合はそのトピックをそのまま使用します。このトピックを手動で作成する場合は 必ず、複数のパーティションで、レプリケーション係数を高く(3x 以上)設定して、圧縮トピックとして作成します。- 型: string
- デフォルト: ""
- 重要度: 高
status.storage.replication.factor
Connect でコネクターおよびタスクのステータスの更新を保存するために使用されるトピックを作成する際のレプリケーション係数。本稼働システムでは 必ず
3
以上に設定する必要があります。ただし、クラスター内の Kafka ブローカーの数よりも大きな数値は設定できません。Kafka ブローカーのデフォルトのレプリケーション係数を使用するには、-1
を入力します。- 型: short
- デフォルト: 3
- 重要度: 低
status.storage.partitions
Connect でコネクターおよびタスクのステータスの更新を保存するために使用されるトピックを作成する際のパーティション数。Kafka ブローカーで構成されているデフォルトのパーティション数を使用するには、
-1
を入力します。- 型: int
- デフォルト: 5
- 重要度: 低
heartbeat.interval.ms
Kafka のグループ管理機構を使用している場合に想定される、グループコーディネーターに対するハートビートの間隔。ハートビートは、ワーカーのセッションがアクティブであることを確認するため、また、グループで新たにメンバーの増減があった場合のバランス調整を行うために使用されます。
session.timeout.ms
よりも小さい値を設定しなければなりませんが、通常は、その値の 1/3 以下に設定する必要があります。さらに小さい値に調整して、通常のバランス調整で想定される時間をコントロールすることができます。- 型: int
- デフォルト: 3000
- 重要度: 高
session.timeout.ms
Kafka のグループ管理機構の使用時に、障害を検出するためのタイムアウト時間。
- 型: int
- デフォルト: 30000
- 重要度: 高
ssl.key.password
キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。
- 型: password
- 重要度: 高
ssl.keystore.location
キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。
- 型: string
- 重要度: 高
ssl.keystore.password
キーストアファイル用のストアパスワード。これは、クライアントでは省略可能であり、ssl.keystore.location が構成されている場合にのみ必要です。
- 型: password
- 重要度: 高
ssl.truststore.location
トラストストアファイルの場所。
- 型: string
- 重要度: 高
ssl.truststore.password
トラストストアファイルのパスワード。
- 型: password
- 重要度: 高
connections.max.idle.ms
アイドル状態の接続は、この構成で指定された時間(単位: ミリ秒)の経過後に終了されます。
- 型: long
- デフォルト: 540000
- 重要度: 中
receive.buffer.bytes
データを読み取る際に使用される TCP 受信バッファ(SO_RCVBUF)のサイズ。
- 型: int
- デフォルト: 32768
- 重要度: 中
request.timeout.ms
この構成では、クライアントがリクエストの応答を待つ最大の待機時間を指定します。タイムアウト時間が経過するまで応答を受信できなかった場合、クライアントは、必要に応じてリクエストを再送し、再試行回数が上限に達した場合は、リクエストを失敗とします。
- 型: int
- デフォルト: 40000
- 重要度: 中
sasl.kerberos.service.name
Kafka が実行される際の Kerberos プリンシパル名。これは、Kafka の JAAS 構成または Kafka の構成のどちらかで定義できます。
- 型: string
- 重要度: 中
security.protocol
ブローカーとの通信に使用されるプロトコル。指定可能な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
- 型: string
- デフォルト: "PLAINTEXT"
- 重要度: 中
send.buffer.bytes
データを送信する際に使用される TCP 送信バッファ(SO_SNDBUF)のサイズ。
- 型: int
- デフォルト: 131072
- 重要度: 中
ssl.enabled.protocols
SSL 接続で有効なプロトコルのリスト。TLSv1.2、TLSv1.1、TLSv1 がデフォルトで有効になります。
- 型: list
- デフォルト: [TLSv1.2, TLSv1.1, TLSv1]
- 重要度: 中
ssl.keystore.type
キーストアファイルのファイルフォーマット。クライアントでは省略可能です。デフォルト値は JKS です。
- 型: string
- デフォルト: "JKS"
- 重要度: 中
ssl.protocol
SSLContext の生成に使用する SSL プロトコル。デフォルト設定は TLS であり、ほとんどのケースに適しています。最新の JVM で使用できる値は、TLS、TLSv1.1、TLSv1.2 です。古い JVM では SSL、SSLv2、および SSLv3 がサポートされている場合もありますが、セキュリティに関する既知の脆弱性があるため、使用しないことをお勧めします。
- 型: string
- デフォルト: "TLS"
- 重要度: 中
ssl.provider
SSL 接続に使用するセキュリティプロバイダーの名前。デフォルト値は、JVM のデフォルトのセキュリティプロバイダーです。
- 型: string
- 重要度: 中
ssl.truststore.type
トラストストアファイルのファイルフォーマット。デフォルト値は JKS です。
- 型: string
- デフォルト: "JKS"
- 重要度: 中
worker.sync.timeout.ms
ワーカーが他のワーカーと同期されていない状態になり、構成を再同期する必要がある場合、ここで指定されている時間が経過するまで諦めずに待ってから、グループを離脱し、バックオフ時間待機して、再度参加します。
- 型: int
- デフォルト: 3000
- 重要度: 中
worker.unsync.backoff.ms
ワーカーが他のワーカーと同期されていない状態になり、Worker.sync.timeout.ms の時間内に同期が取れなかった場合、Connect クラスターを離脱して、ここで指定された時間待ってから、再度参加します。
- 型: int
- デフォルト: 300000
- 重要度: 中
client.id
リクエストを実行する際にサーバーに渡す ID 文字列。これは、論理アプリケーション名をサーバー側のリクエストログに含めることで、IP とポートだけでなく、リクエストのソースを追跡できるようにすることが目的です。
- 型: string
- デフォルト: ""
- 重要度: 低
metadata.max.age.ms
ここで指定された時間(単位: ミリ秒)が経過すると、メタデータが強制的に更新されます。これは、パーティションのリーダーシップに変更がなくても、新しいブローカーやパーティションがあれば積極的に検出できるようにするためです。
- 型: long
- デフォルト: 300000
- 重要度: 低
metric.reporters
Metrics Reporter として使用するクラスのリスト。
MetricReporter
インターフェイスを実装することで、新しいメトリック作成の通知を受けるクラスをプラグインすることができます。JMX 統計情報を登録するための JmxReporter が必ず含まれます。- 型: list
- デフォルト: []
- 重要度: 低
metrics.num.samples
メトリクスの計算用に維持されるサンプルの数。
- 型: int
- デフォルト: 2
- 重要度: 低
metrics.sample.window.ms
メトリクスの計算用に維持されるサンプルの数。
- 型: long
- デフォルト: 30000
- 重要度: 低
reconnect.backoff.ms
特定のホストへの再接続を試行するまでの待機時間。これにより、ホストへの接続が短時間に何度も繰り返されることを回避できます。このバックオフは、コンシューマーからブローカーに送信されるすべてのリクエストに適用されます。
- 型: long
- デフォルト: 50
- 重要度: 低
retry.backoff.ms
特定のトピックのパーティションに対するフェッチリクエストが失敗した場合に再試行を行うまでの待機時間。これにより、フェッチと失敗が短時間に何度も繰り返されることを回避できます。
- 型: long
- デフォルト: 100
- 重要度: 低
sasl.kerberos.kinit.cmd
Kerberos の kinit コマンドパス。デフォルトは /usr/bin/kinit です。
- 型: string
- デフォルト: "/usr/bin/kinit"
- 重要度: 低
sasl.kerberos.min.time.before.relogin
更新試行から次の更新試行までの、ログインスレッドのスリープ時間。
- 型: long
- デフォルト: 60000
- 重要度: 低
sasl.kerberos.ticket.renew.jitter
更新時間に追加されるランダムジッターのパーセンテージ。
- 型: double
- デフォルト: 0.05
- 重要度: 低
sasl.kerberos.ticket.renew.window.factor
最後の更新からチケットの有効期限までの時間が指定のウィンドウ係数に達するまでの間、ログインスレッドはスリープ状態になります。この時間の経過後、チケットの更新が試行されます。
- 型: double
- デフォルト: 0.8
- 重要度: 低
ssl.cipher.suites
暗号スイートのリスト。これは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS または SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。デフォルトでは、使用可能なすべての暗号スイートがサポートされます。
- 型: list
- 重要度: 低
ssl.endpoint.identification.algorithm
サーバー証明書を使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。
- 型: string
- 重要度: 低
ssl.keymanager.algorithm
SSL 接続のキーマネージャーファクトリで使用されるアルゴリズム。デフォルト値は、Java 仮想マシン用に構成されているキーマネージャーファクトリアルゴリズムです。
- 型: string
- デフォルト: "SunX509"
- 重要度: 低
ssl.trustmanager.algorithm
SSL 接続のトラストマネージャーファクトリで使用されるアルゴリズム。デフォルト値は、Java 仮想マシン用に構成されているトラストマネージャーファクトリアルゴリズムです。
- 型: string
- デフォルト: "PKIX"
- 重要度: 低
Confluent Platform ライセンスの構成¶
Confluent Platform 6.0 以降では、Confluent ライセンス関連のプロパティをスタンドアロンワーカーおよび分散ワーカーの構成に含めることができます。Connect ワーカーはそれらのライセンス関連プロパティをすべての Confluent の商用コネクターの構成に自動的に挿入します。ライセンス関連プロパティは、コネクターの構成に残しておくことも、コネクターの構成から削除することもできます。
confluent.license
Confluent では、各契約者にエンタープライズライセンスキーを発行します。ライセンスキーはテキストであるため、コピーアンドペーストで
confluent.license
の値として使用できます。試用ライセンスでは、コネクターを 30 日間使用できます。開発者ライセンスでは、ブローカーが 1 つの開発環境で、コネクターを無期限に使用できます。既にご契約されている場合は、詳細について Confluent サポートにお問い合わせください。
- 型: string
- デフォルト: ""
- 指定可能な値: Confluent Platform ライセンス
- 重要度: 高
confluent.topic.ssl.truststore.location
トラストストアファイルの場所。
- 型: string
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.truststore.password
トラストストアファイルのパスワード。パスワードが設定されていなくてもトラストストアにアクセスできますが、整合性チェックが無効になります。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.keystore.location
キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。
- 型: string
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.keystore.password
キーストアファイルのストアパスワード。クライアントでは省略可能です。ssl.keystore.location を構成した場合にのみ必要となります。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.key.password
キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.security.protocol
ブローカーとの通信に使用されるプロトコル。指定可能な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
- 型: string
- デフォルト: "PLAINTEXT"
- 重要度: 中
ライセンストピックの構成¶
Confluent エンタープライズライセンスは、_confluent-command
トピックに保管されます。このトピックは、デフォルトで作成され、confluent.license
プロパティで指定されたライセンスキーに対応するライセンスが格納されます。
注釈
パブリックキーは Kafka のトピックには保管されません。
以下では、デフォルトの _confluent-command
トピックがさまざまなシナリオでどのように生成されるかを説明します。
confluent.license
プロパティを追加していない場合、またはこのプロパティを(confluent.license=
などで)空にした場合は、_confluent command
トピックに、30 日間の試用ライセンスが自動的に生成されます。- 有効なライセンスキーを(
confluent.license=<valid-license-key>
などで)追加すると、有効なライセンスが_confluent-command
トピックに追加されます。
以下は、開発およびテスト用の最小限のプロパティの例です。
_confluent-command
トピックの名前は confluent.topic
プロパティを使用して変更できます(たとえば、環境に厳格な命名規則がある場合など)。以下の例は、この変更と、構成される Kafka ブートストラップサーバーを示しています。
confluent.topic=foo_confluent-command
confluent.topic.bootstrap.servers=localhost:9092
上の例は、開発およびテストで使用できるブートストラップサーバーの必要最小限のプロパティを示しています。本稼働環境の場合は、プレフィックスとして confluent.topic.
を付けて、通常のプロデューサー、コンシューマー、およびトピックの各構成プロパティをコネクターのプロパティに追加します。
ライセンストピックの ACL¶
_confluent-command
トピックには、confluent.license
プロパティで指定されたライセンスキーに対応するライセンスが格納されます。このトピックはデフォルトで作成されます。このトピックにアクセスするコネクターには、以下の ACL を構成する必要があります。
コネクターでトピックの作成が必要な場合は、リソースクラスターでの CREATE および DESCRIBE。
_confluent-command
トピックでの DESCRIBE、READ、および WRITE。重要
You may also use DESCRIBE and READ without WRITE to restrict access to read-only for license topic ACLs. If a topic exists, the LicenseManager will not try to create the topic.
ライセンスを使用する各プリンシパルに個々にアクセス許可を付与することも、ワイルドカード入力 を使用してすべてのクライアントを許可することもできます。以下の例は、リソースクラスターおよび _confluent-command
トピックの ACL を構成するために使用できるコマンドを示しています。
リソースクラスターの CREATE および DESCRIBE の ACL を設定します。
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User:<principal> \ --operation CREATE --operation DESCRIBE --cluster
_confluent-command
トピックでの DESCRIBE、READ、および WRITE の ACL を設定します。kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User:<principal> \ --operation DESCRIBE --operation READ --operation WRITE --topic _confluent-command
デフォルトの構成プロパティのオーバーライド¶
confluent.topic.replication.factor
を使用することにより、レプリケーション係数をオーバーライドできます。たとえば、(開発およびテスト用の)ブローカー数 3 未満の環境で送信先として Kafka クラスターを使用する場合、confluent.topic.replication.factor
プロパティに 1
を設定する必要があります。
プロデューサー固有のプロパティは、confluent.topic.producer.
プレフィックスを使用することによりオーバーライドできます。コンシューマー固有のプロパティは、confluent.topic.consumer.
プレフィックスを使用することによりオーバーライドできます。
デフォルト値を使用することも、他のプロパティをカスタマイズすることもできます。たとえば、confluent.topic.client.id
プロパティのデフォルトは、コネクターの名前に -licensing
サフィックスを付加したものです。クライアント接続に SSL または SASL を必要とするブローカーの構成設定では、このプレフィックスを使用します。
トピックのクリーンアップのポリシーはオーバーライドできません。トピックは、常にパーティションが単一で、圧縮されるからです。また、このプレフィックスを使用してシリアライザーおよびデシリアライザーを指定しないでください。追加しても無視されます。
ワーカーの構成のオーバーライド¶
デフォルトでは、コースコネクターおよびシンクコネクターは、ワーカーの構成からクライアント構成を継承します。ワーカーの構成で、producer.
または consumer.
のプレフィックスが付いたプロパティは、それぞれ、すべてのソースコネクターまたはシンクコネクター用のクライアントを作成するために使用されます。
特定のコネクターのプロデューサーまたはコンシューマーのプロパティをオーバーライドするには、ワーカーの構成でクライアントのオーバーライドを有効にし、ソースコネクターの構成には producer.override.*
を、シンクコネクターの構成には consumer.override.*
を使用します。
コネクターごとの構成プロパティを有効にし、デフォルトのワーカープロパティをオーバーライドするには、次の connector.client.config.override.policy
構成パラメーターをワーカーのプロパティファイルに追加します。
connector.client.config.override.policy
ConnectorClientConfigOverridePolicy の実装のクラス名またはエイリアス。ここで定義する構成は、コネクターでオーバーライドされる可能性があります。デフォルトの実装は、
None
です。その他の使用可能なポリシーは、All
およびPrincipal
です。- 型: string
- デフォルト: None
- 指定可能な値: [All、Principal]
- 重要度: 中
この構成のプロパティを connector.client.config.override.policy=All
に設定すると、ワーカーに属する各コネクターがワーカーの構成をオーバーライドできるようになります。これを実装するには、次のオーバーライドプレフィックスのいずれかをソースコネクターおよびシンクコネクターの構成に追加します。
producer.override.<source-configuration-property>
consumer.override.<sink-configuration-property>
例¶
次の例では、デフォルトのワーカープロパティ compression.type
をオーバーライドする行が 1 行追加されています。コネクターの構成が更新されると、Replicator コネクターで gzip 圧縮が使用されるようになります。
{
"name": "Replicator",
"config": {
"connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
"topic.whitelist": "_schemas",
"topic.rename.format": "\${topic}.replica",
"key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"src.kafka.bootstrap.servers": "srcKafka1:10091",
"dest.kafka.bootstrap.servers": "destKafka1:11091",
"tasks.max": "1",
"producer.override.compression.type": "gzip",
"confluent.topic.replication.factor": "1",
"schema.subject.translator.class": "io.confluent.connect.replicator.schemas.DefaultSubjectTranslator",
"schema.registry.topic": "_schemas",
"schema.registry.url": "http://destSchemaregistry:8086"
}
次の例では、デフォルトのワーカープロパティ auto.offset.reset
をオーバーライドする行が 1 行追加されています。コネクターの構成が更新されると、Elasticsearch コネクターで、デフォルトの Connect ワーカーのプロパティ値である latest
ではなく、earliest
が使用されるようになります。
{
"name": "Elasticsearch",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "orders",
"consumer.override.auto.offset.reset": "latest",
"tasks.max": 1,
"connection.url": "http://elasticsearch:9200",
"type.name": "type.name=kafkaconnect",
"key.ignore": "true",
"schema.ignore": "false",
"transforms": "renameTopic",
"transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.renameTopic.regex": "orders",
"transforms.renameTopic.replacement": "orders-latest"
}'
ワーカーのオーバーライドの構成のプロパティを connector.client.config.override.policy=Principal
と設定すると、コネクターごとに異なるサービスプリンシパルを使用できます。次の例は、 ロールベースアクセス制御(RBAC) を実装した場合のシンクコネクターのサービスプリンシパルのオーバーライドを示しています。
consumer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
username="<username>" \
password="<password>" \
metadataServerUrls="<metadata_server_urls>";
注釈
All
を設定すると、コネクターごとのオーバーライド機能に、他のワーカー構成プロパティに加え、ワーカーサービスプリンシパルのオーバーライドが含まれます。Principal
を設定すると、コネクターごとのオーバーライド機能が、サービスプリンシパルのオーバーライドのみに制限されます。
オーバーライドのカスタマイズ¶
ワーカーのオーバーライドで All
の設定を使用して、ワーカーで定義されたすべてのコネクターの構成プロパティがオーバーライドされることを許可しないようにすることが必要な場合があります。これは一般的ではありませんが、カスタムのオーバーライドポリシーを作成して、オーバーライドできるコネクターの構成とそのプロパティ値を制限することができます。
たとえば、バッチサイズを 1 MB に制限する batch.size
に対するカスタムポリシーを作成する必要がある場合は、この構成のプロパティに対して ConnectorClientConfigOverridePolicy を実装します。このクラスの実装には、構成のプロパティとその値のリストを制限するために必要なすべてのロジックが含まれます。