Confluent Replicator 構成プロパティ¶
このセクションは、Replicator 構成オプションについてのリファレンスとなっています。
Replicator の機能、ユースケース、およびフェイルオーバーシナリオでの使用の詳細については、「Replicator の概要」を参照してください。
このコネクターを使用するには、connector.class
構成プロパティで、そのコネクタークラスの名前を指定します。
connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector
コネクター固有の構成プロパティについて、以降で説明します。
ソーストピック¶
重要
__consumer_timestamps
トピックでは、Replicator のオフセット変換にのみ関係するため、送信元から送信先にコピーされません。オフセット変換の詳細については、「コンシューマーのオフセット変換について」を参照してください。
topic.regex
送信先クラスターにレプリケートするトピックの正規表現。正規表現の詳細については、「Java Regular Expressions」を参照してください。
- 型: string
- デフォルト: null
- 重要度: 高
重要
Replicator では、topic.regex
または topic.whitelist
に一致する場合でも、内部の __consumer_offsets
または __transaction_state
トピックを送信元クラスターからコピーしません。
注釈
topic.regex
プロパティと以下の 2 つのプロパティ topic.whitelist
と topic.blacklist
は、以下の順に適用されます。
topic.blacklist
で指定されているトピックはレプリケートされません。topic.whitelist
で指定されているすべてのトピックまたはtopic.regex
に一致するトピックはレプリケートされます(ホワイトリストに登録されているトピックは、正規表現に一致する必要はありません)。
3 つのプロパティすべてを 1 つの Replicator 構成で指定できます。これらが連携して、レプリケートするトピックが決定されます。たとえば、一連のトピック名 topic1
、topic2
、topic3
、topic4
がある場合、topic.whitelist
に topic1
、topic.regex
に topic[2,3,4]
、topic.blacklist
に topic4
を指定できます。Replicator はこれらの制約を組み合わせて、レプリケートされたトピックの最終的なリストとして、topic1
、topic2
、topic3
を含むリストを生成します。
3 つのすべてのプロパティがデフォルト値のままの場合、トピックはレプリケートされません。
topic.whitelist
レプリケートするトピックのホワイトリスト。
- 型: list
- デフォルト: ""
- 重要度: 高
重要
起動時に、Replicator はレプリケートするトピックをリストします。
topic.whitelist
のいずれかのトピックが存在しないか、その ACL が不十分であるためにトピックをリストできない場合、Replicator は失敗します。Replicator のプリンシパルには、リスト内のすべてのトピックの ACL を記述するためのアクセス許可が付与されている必要があります。詳細については、「セキュリティ」を参照してください。Replicator バージョン 5.5 以降の場合は、
topic.whitelist
にリストされているトピックは、送信元クラスターに存在している必要があります。Replicator の起動後にトピックを作成する場合は、代わりにtopic.regex
を使用します。topic.blacklist
レプリケーションから除外するトピック。
- 型: list
- デフォルト: ""
- 重要度: 高
topic.poll.interval.ms
topic.whitelist または topic.regex に一致する新しいトピックについて送信元クラスターをポーリングする頻度。
- 型: int
- デフォルト: 120000
- 指定可能な値: [0,…]
- 重要度: 低
重要
topic.regex
、topic.whitelist
、および topic.blacklist
構成は、以下の順に適用されます。
topic.blacklist
で指定されているトピックはレプリケートされません。topic.whitelist
で指定されているすべてのトピックまたはtopic.regex
に一致するトピックはレプリケートされます(ホワイトリストに登録されているトピックは、正規表現に一致する必要はありません)。
3 つのすべてのプロパティがデフォルト値のままの場合、トピックはレプリケートされません。
ソースデータ変換¶
src.key.converter
送信元クラスターから取得されたメッセージのキーフィールドのコンバーター。
- 型: class
- デフォルト: io.confluent.connect.replicator.util.ByteArrayConverter
- 重要度: 低
src.value.converter
送信元クラスターから取得したメッセージの値フィールドのコンバーター。
- 型: class
- デフォルト: io.confluent.connect.replicator.util.ByteArrayConverter
- 重要度: 低
src.header.converter
シリアル化された Kafka ヘッダーを Kafka Connect ヘッダーに変換するために使用される HeaderConverter クラス。デフォルト値は ByteArrayConverter で、単純に入力バイトを送信先レコードに渡します。
- 型: class
- デフォルト: io.confluent.connect.replicator.util.ByteArrayConverter
- 重要度: 低
ソース Kafka¶
以下の構成オプションは、Kafka クライアントで使用される共通のプロパティです。Replicator は、これらのオプションを使用して、送信元クラスター(コンシューマー、AdminClient)に接続します。src.kafka
プレフィックスを使用する有効なクライアントプロパティは、送信元クラスターに接続するクライアントに転送されます。コンシューマーと AdminClient の構成が同じである場合は、src.kafka
を使用する必要があります。
src.kafka.bootstrap.servers
ソース Kafka クラスターとの初期接続を確立するために使用するホストとポートのペアのリスト。クライアントは、ここでブートストラップ用に指定されるサーバーに関係なくすべてのサーバーを使用します。このリストは、一連のサーバーをすべて検出するために使用する初期ホストを制御するだけのものです。このリストは、
host1:port1,host2:port2,…
という形式にする必要があります。これらのサーバーは、初期接続ですべてのクラスターメンバーシップを検出するためにのみ使用されます。すべてのサーバーセットを指定する必要はありませんが、フェイルオーバーに備えて、複数指定しておくことができます。- 型: list
- 重要度: 高
src.kafka.client.id
リクエストを実行する際にサーバーに渡す ID 文字列。この文字列は、リクエストの送信元の追跡に使用されます。論理アプリケーション名をサーバー側のリクエストログに含めることができます。
- 型: string
- デフォルト: ""
- 重要度: 低
src.kafka.request.timeout.ms
クライアントがリクエストの応答を待つ最大の待機時間を指定します。タイムアウト時間が経過するまで応答を受信できなかった場合、クライアントは、リクエストを再送し、再試行回数が上限に達した場合は、リクエストを失敗とします。
- 型: int
- デフォルト: 30000
- 指定可能な値: [0,…]
- 重要度: 中
src.kafka.retry.backoff.ms
トピックのパーティションに対するリクエストが失敗した場合に再試行を行うまでの待機時間を指定します。これにより、失敗シナリオでリクエストが短時間に繰り返し送信されることを回避できます。
- 型: long
- デフォルト: 100
- 指定可能な値: [0,…]
- 重要度: 低
src.kafka.connections.max.idle.ms
アイドル接続を閉じるまでの待ち時間を指定します(ミリ秒)。
- 型: long
- デフォルト: 540000
- 重要度: 中
src.kafka.reconnect.backoff.ms
ホストへの再接続を試行する前の待機時間を指定します。これにより、ホストへの接続が短時間に何度も繰り返されることを回避できます。このバックオフは、コンシューマーからブローカーに送信されるすべてのリクエストに適用されます。
- 型: long
- デフォルト: 50
- 指定可能な値: [0,…]
- 重要度: 低
src.kafka.metric.reporters
Metrics Reporter として使用するクラスのリスト。
MetricReporter
インターフェイスを実装することで、新しいメトリック作成の通知を受けるクラスをプラグインすることができます。JMX 統計情報を登録するための JmxReporter が必ず含まれます。- 型: list
- デフォルト: ""
- 重要度: 低
src.kafka.metrics.num.samples
メトリクスの計算用に維持されるサンプルの数。
- 型: int
- デフォルト: 2
- 指定可能な値: [1,…]
- 重要度: 低
src.kafka.metrics.sample.window.ms
メトリクスサンプルが計算される時間枠。
- 型: long
- デフォルト: 30000
- 指定可能な値: [0,…]
- 重要度: 低
src.kafka.send.buffer.bytes
データを送信する際に使用される TCP 送信バッファ(SO_SNDBUF)のサイズ。値が -1 である場合は、オペレーティングシステムのデフォルトが使用されます。
- 型: int
- デフォルト: 131072
- 指定可能な値: [-1、...]
- 重要度: 中
src.kafka.receive.buffer.bytes
データを読み取る際に使用される TCP 受信バッファ(SO_RCVBUF)のサイズ。値が -1 である場合は、オペレーティングシステムのデフォルトが使用されます。
- 型: int
- デフォルト: 65536
- 指定可能な値: [-1、...]
- 重要度: 中
src.kafka.timestamps.topic.replication.factor
コンシューマータイムスタンプトピックのレプリケーション係数。
- 型: int
- デフォルト: 3
- 重要度: 高
src.kafka.timestamps.topic.num.partitions
コンシューマータイムスタンプトピックのパーティションの数。
- 型: int
- デフォルト: 50
- 重要度: 高
ソース Kafka: セキュリティ¶
ちなみに
さまざまなプロトコルで使用できる Replicator の暗号化と認証のオプションの詳細については、 Replicator セキュリティ の概要を参照してください。
src.kafka.security.protocol
ブローカーとの通信に使用されるプロトコル。指定可能な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
- 型: string
- デフォルト: PLAINTEXT
- 指定可能な値: [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL]
- 重要度: 中
src.kafka.sasl.mechanism
クライアントの接続に使用される SASL メカニズム。セキュリティプロバイダーを利用できるものであれば、どのメカニズムでも構いません。GSSAPI がデフォルトのメカニズムです。
- 型: string
- デフォルト: GSSAPI
- 重要度: 中
src.kafka.sasl.kerberos.ticket.renew.window.factor
最後の更新からチケットの有効期限までの時間が指定のウィンドウ係数に達するまでの間、ログインスレッドはスリープ状態になります。この時間の経過後、チケットの更新が試行されます。
- 型: double
- デフォルト: 0.8
- 重要度: 低
src.kafka.sasl.kerberos.min.time.before.relogin
更新試行から次の更新試行までの、ログインスレッドのスリープ時間。
- 型: long
- デフォルト: 60000
- 重要度: 低
src.kafka.sasl.kerberos.kinit.cmd
Kerberos の kinit コマンドパス。
- 型: string
- デフォルト: /usr/bin/kinit
- 重要度: 低
src.kafka.sasl.kerberos.service.name
Kafka が実行される際の Kerberos プリンシパル名。これは、Kafka の JAAS 構成または Kafka の構成のどちらかで定義できます。
- 型: string
- デフォルト: null
- 重要度: 中
src.kafka.sasl.kerberos.ticket.renew.jitter
更新時間に追加されたランダムジッターのパーセンテージ。
- 型: double
- デフォルト: 0.05
- 重要度: 低
src.kafka.ssl.protocol
SSLContext の生成に使用する SSL プロトコルです。デフォルト設定は TLS であり、ほとんどのケースに適しています。最新の Java 仮想マシンで使用できる値は、TLS、TLSv1.1、TLSv1.2 です。古い JVM では SSL、SSLv2、および SSLv3 がサポートされている場合もありますが、セキュリティに関する既知の脆弱性があるため、使用しないことをお勧めします。
- 型: string
- デフォルト: TLS
- 重要度: 中
src.kafka.ssl.provider
SSL 接続に使用するセキュリティプロバイダーの名前。デフォルト値は、JVM のデフォルトのセキュリティプロバイダーです。
- 型: string
- デフォルト: null
- 重要度: 中
src.kafka.ssl.enabled.protocols
SSL 接続で有効なプロトコルのリスト。
- 型: list
- デフォルト: TLSv1.2,TLSv1.1,TLSv1
- 重要度: 中
src.kafka.ssl.keystore.location
キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。
- 型: string
- デフォルト: null
- 重要度: 高
src.kafka.ssl.cipher.suites
暗号スイートのリストです。これは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS または SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。デフォルトでは、使用可能なすべての暗号スイートがサポートされます。
- 型: list
- デフォルト: null
- 重要度: 低
src.kafka.ssl.secure.random.implementation
SSL 暗号化操作に使用する SecureRandom PRNG 実装。
- 型: string
- デフォルト: null
- 重要度: 低
src.kafka.ssl.truststore.type
トラストストアファイルのファイルフォーマット。
- 型: string
- デフォルト: JKS
- 重要度: 中
src.kafka.ssl.keystore.type
キーストアファイルのファイルフォーマット。クライアントでは省略可能です。
- 型: string
- デフォルト: JKS
- 重要度: 中
src.kafka.ssl.trustmanager.algorithm
SSL 接続のトラストマネージャーファクトリで使用されるアルゴリズム。デフォルト値は、Java 仮想マシン用に構成されているトラストマネージャーファクトリアルゴリズムです。
- 型: string
- デフォルト: PKIX
- 重要度: 低
src.kafka.ssl.truststore.location
トラストストアファイルの場所。
- 型: string
- デフォルト: null
- 重要度: 高
src.kafka.ssl.keystore.password
キーストアファイルのストアパスワード。クライアントでは省略可能です。
ssl.keystore.location
を構成した場合にのみ必要となります。- 型: password
- デフォルト: null
- 重要度: 高
src.kafka.ssl.keymanager.algorithm
SSL 接続のキーマネージャーファクトリで使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに構成されているキーマネージャーファクトリアルゴリズムです。
- 型: string
- デフォルト: SunX509
- 重要度: 低
src.kafka.ssl.key.password
キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。
- 型: password
- デフォルト: null
- 重要度: 高
src.kafka.ssl.truststore.password
トラストストアファイルのパスワード。
- 型: password
- デフォルト: null
- 重要度: 高
src.kafka.ssl.endpoint.identification.algorithm
サーバー証明書を使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。
- 型: string
- デフォルト: https
- 重要度: 低
ソース Kafka: コンシューマー¶
以下の構成オプションは、Kafka コンシューマーに固有のプロパティです。これらのオプションは、src.kafka
プロパティと組み合わせて使用され、送信元クラスターに接続するコンシューマーに転送されます。
src.consumer.allow.auto.create.topics
トピックへのサブスクライブまたはトピックの割り当て時にブローカーでトピックを自動作成できるようにします。サブスクライブするトピックが自動作成されるのは、ブローカーで
auto.create.topics.enable
ブローカー構成の使用が許可されている場合のみです。0.11.0 より古いブローカーを使用する場合は、この構成をfalse
に設定する必要があります。- 型: ブール値
- デフォルト: false
- 重要度: 中
注釈
- 通常、コンシューマーではこの構成のデフォルトは
true
です。ただし、Replicator では、コンシューマーによるトピックの作成を許可しないように、この構成は内部的にデフォルトでfalse
に設定されています。Replicator の実行中にトピックを削除するには、topic.auto.create
をtrue
に設定している場合は、Replicator を停止してすべてのトピックを削除するか、Kafka クラスターでauto.create.topics.enable
をfalse
に設定する方法のいずれかで行います。 - Replicator では、送信元トピックが レプリカ配置制約(confluent.placement.constraints) で構成されているトピックの自動作成はサポートされていません。回避策として、送信先クラスターにトピックを手動で作成し、Replicator 構成で
topic.auto.create
を無効にすることができます(「送信先トピック」のtopic.auto.create
を参照してください)。
src.consumer.interceptor.classes
インターセプターとして使用するクラスのリスト。
ConsumerInterceptor
インターフェイスを実装すると、コンシューマーが受信したレコードをインターセプト(および場合によっては変換)することができます。デフォルトでは、インターセプターはありません。- 型: list
- デフォルト: null
- 重要度: 低
src.consumer.fetch.max.wait.ms
fetch.min.bytes
によって指定された要件を即座に満たすための十分なデータがない場合に、フェッチリクエストに応答するまでサーバーがブロックする最大時間。- 型: int
- デフォルト: 500
- 指定可能な値: [0,…]
- 重要度: 低
src.consumer.fetch.min.bytes
サーバーがフェッチリクエストに対して返す必要がある最小データ量。利用可能なデータが不十分な場合、リクエストはその量のデータが集積するのを待機してからリクエストに応答します。デフォルト設定の 1 バイトは、データが 1 バイトでもあればすぐにフェッチリクエストに応答することを意味します。データがない場合は、データの到着を待機した後にタイムアウトします。これを 1 より大きい値に設定すると、サーバーはさらに多くのデータが集積するのを待機します。これにより、サーバースループットは少し向上しますが、レイテンシは多少長くなります。
- 型: int
- デフォルト: 1
- 指定可能な値: [0,…]
- 重要度: 高
src.consumer.fetch.max.bytes
サーバーがフェッチリクエストに対して返す必要がある最大データ量。これは絶対最大値ではありません。フェッチの最初の空でないパーティション内の最初のメッセージがこの値より大きい場合でも、コンシューマーが先に進むことができるように、メッセージは返されます。ブローカーによって受け入れられる最大メッセージサイズは、
message.max.bytes
(ブローカー構成)またはmax.message.bytes
(トピック構成)で定義されます。コンシューマーは、複数のフェッチを並列処理します。- 型: int
- デフォルト: 52428800
- 指定可能な値: [0,…]
- 重要度: 中
src.consumer.max.partition.fetch.bytes
サーバーが返すパーティションごとの最大データ量。フェッチの最初の空でないパーティション内の最初のメッセージがこの制限より大きい場合でも、コンシューマーが先に進むことができるように、メッセージは返されます。ブローカーによって受け入れられる最大メッセージサイズは、
message.max.bytes
(ブローカー構成)またはmax.message.bytes
(トピック構成)で定義されます。コンシューマーリクエストのサイズの制限については、「fetch.max.bytes
」を参照してください- 型: int
- デフォルト: 1048576
- 指定可能な値: [0,…]
- 重要度: 高
src.consumer.max.poll.records
poll() への単一の呼び出しで返されるレコードの最大数。
- 型: int
- デフォルト: 500
- 指定可能な値: [1,…]
- 重要度: 中
src.consumer.check.crcs
消費されたレコードの CRC32 を自動的に確認します。これにより、メッセージに対するネットワーク上またはディスク上の破損が発生しなくなります。このチェックにより、オーバーヘッドがいくらか増えるため、非常に高いパフォーマンスが必要な場合は無効にすることができます。
- 型: ブール値
- デフォルト: true
- 重要度: 低
送信先トピック¶
topic.rename.format
送信先クラスター内のトピック名のフォーマット制御文字列。送信元のトピック名を表すプレースホルダーとして
${topic}
を含めることができます。たとえば、トピックorders
のdc_${topic}
は、送信先トピック名dc_orders
にマップされます。複数の送信元クラスターから Replicator を構成する場合は、トピック名が競合する可能性があるため注意してください。通常は、(上の例のように)各クラスターに個別のプレフィックスまたはサフィックスを付けることをお勧めします。
- 型: string
- デフォルト: ${topic}
- 重要度: 高
ちなみに
Confluent Platform 5.2.0 以降では、Confluent Replicator を使用して、スキーマ を別の Schema Registry に移行することができます。移行先は、セルフマネージド型クラスターでも Confluent Cloud 内のクラスターでもかまいません。
次のように、スキーマのデフォルトのサブジェクトトランスレーターを使用するように Confluent Replicator を構成したとします。
schema.subject.translator.class=io.confluent.connect.replicator.schemas.DefaultSubjectTranslator
この場合、スキーマの移行中にターゲット Schema Registry 内のサブジェクトの名前を変更するには、
topic.rename.format
を使用します。たとえば、
topic.rename.format
が${topic}-replica
である場合、移行元の Schema Registry にあるmytopic-value
というサブジェクトが、移行先の "新しい" レジストリでは、mytopic-replica-value
という名前に変更されます。名前が変更されるのは、サブジェクト命名方法として
TopicNameStrategy
(-key
または-value
が付いたサブジェクト名)を使用するスキーマだけです。その他のサブジェクト(異なる命名方法を使用するサブジェクト)はすべて移行されますが、名前は変更されません。topic.auto.create
必要に応じて、送信先クラスターにトピックを自動的に作成するかどうかを指定します。
- 型: ブール値
- デフォルト: true
- 重要度: 低
注釈
Replicator では、送信元トピックが レプリカ配置制約(confluent.placement.constraints) で構成されているトピックの自動作成はサポートされていません。回避策として、送信先クラスターにトピックを手動で作成し、Replicator 構成で
topic.auto.create
を無効にすることができますtopic.preserve.partitions
送信元クラスターと一致するように送信先クラスター内のパーティション数を自動的に増やし、送信元クラスターからレプリケーションされたメッセージが送信先クラスター内の同じパーティションを使用するかどうかを指定します。
- 型: ブール値
- デフォルト: true
- 重要度: 低
注釈
topic.preserve.partitions
をfalse
に設定する場合は、offset.topic.commit
もfalse
に設定する必要があります。Replicator では、topic.preserve.partitions=false
の場合、offset.topic.commit=true
はサポートされません。offset.topic.commit
がfalse
に設定されている場合、送信元クラスターにコンシューマーグループは作成されません。
topic.create.backoff.ms
トピックの自動作成または展開を再試行するまでの待機時間。
- 型: int
- デフォルト: 120000
- 指定可能な値: [0,…]
- 重要度: 低
topic.config.sync
トピック構成を送信先クラスターと定期的に同期するかどうかを指定します。
- 型: ブール値
- デフォルト: true
- 重要度: 低
topic.config.sync.interval.ms
topic.config.sync
が有効な場合に、構成の変更を確認する頻度。- 型: int
- デフォルト: 120000
- 指定可能な値: [0,…]
- 重要度: 低
topic.timestamp.type
送信先クラスター内のトピックのタイムスタンプの種類です。
- 型: string
- デフォルト: CreateTime
- 指定可能な値: [CreateTime、LogAppendTime]
- 重要度: 低
dest.topic.replication.factor
送信先クラスターのトピックのレプリケーション係数を設定します。
- 型: int
- デフォルト: 送信元レプリケーション係数
- 指定可能な値: > 0
- 重要度: 低
送信先 Kafka¶
ちなみに
Replicator 5.4.0 から、dest.kafka.*
構成は必須ではなくなりました。これらの構成が指定されていない場合、値は Connect ワーカーのプロデューサー構成から推測されます。
以下の構成オプションは、Kafka クライアントで使用される共通のプロパティです。Replicator は、これらのオプションを使用して、送信先クラスター(AdminClient)に接続します。dest.kafka
プレフィックスを使用する有効なクライアントプロパティは、送信先クラスターに接続するクライアントに転送されて、管理操作に使用されます。
dest.kafka.bootstrap.servers
送信先 Kafka クラスターとの初期接続を確立するために使用するホストとポートのペアのリスト。クライアントは、ここでブートストラップ用に指定されるサーバーに関係なくすべてのサーバーを使用します。このリストは、一連のサーバーをすべて検出するために使用する初期ホストを制御するだけのものです。このリストは、
host1:port1,host2:port2,…
という形式にする必要があります。これらのサーバーは、初期接続ですべてのクラスターメンバーシップを検出するためにのみ使用されます。すべてのサーバーセットを指定する必要はありませんが、フェイルオーバーに備えて、複数指定しておくことができます。- 型: list
- 重要度: 高
dest.kafka.client.id
リクエストを実行する際にサーバーに渡す ID 文字列。この文字列は、リクエストの送信元の追跡に使用されます。論理アプリケーション名をサーバー側のリクエストログに含めることができます。
- 型: string
- デフォルト: ""
- 重要度: 低
dest.kafka.request.timeout.ms
クライアントがリクエストの応答を待つ最大の待機時間を指定します。タイムアウト時間が経過するまで応答を受信できなかった場合、クライアントは、リクエストを再送し、再試行回数が上限に達した場合は、リクエストを失敗とします。
- 型: int
- デフォルト: 30000
- 指定可能な値: [0,…]
- 重要度: 中
dest.kafka.retry.backoff.ms
トピックのパーティションに対するリクエストが失敗した場合に再試行を行うまでの待機時間を指定します。これにより、失敗シナリオでリクエストが短時間に繰り返し送信されることを回避できます。
- 型: long
- デフォルト: 100
- 指定可能な値: [0,…]
- 重要度: 低
dest.kafka.connections.max.idle.ms
アイドル接続を閉じるまでの待ち時間を指定します(ミリ秒)。
- 型: long
- デフォルト: 540000
- 重要度: 中
dest.kafka.reconnect.backoff.ms
ホストへの再接続を試行する前の待機時間を指定します。これにより、ホストへの接続が短時間に何度も繰り返されることを回避できます。このバックオフは、コンシューマーからブローカーに送信されるすべてのリクエストに適用されます。
- 型: long
- デフォルト: 50
- 指定可能な値: [0,…]
- 重要度: 低
dest.kafka.metric.reporters
Metrics Reporter として使用するクラスのリスト。
MetricReporter
インターフェイスを実装することで、新しいメトリック作成の通知を受けるクラスをプラグインすることができます。JMX 統計情報を登録するための JmxReporter が必ず含まれます。- 型: list
- デフォルト: ""
- 重要度: 低
dest.kafka.metrics.num.samples
メトリクスの計算用に維持されるサンプルの数。
- 型: int
- デフォルト: 2
- 指定可能な値: [1,…]
- 重要度: 低
dest.kafka.metrics.sample.window.ms
メトリクスサンプルが計算される時間枠。
- 型: long
- デフォルト: 30000
- 指定可能な値: [0,…]
- 重要度: 低
dest.kafka.send.buffer.bytes
データを送信する際に使用される TCP 送信バッファ(SO_SNDBUF)のサイズ。値が -1 である場合は、オペレーティングシステムのデフォルトが使用されます。
- 型: int
- デフォルト: 131072
- 指定可能な値: [-1、...]
- 重要度: 中
dest.kafka.receive.buffer.bytes
データを読み取る際に使用される TCP 受信バッファ(SO_RCVBUF)のサイズ。値が -1 である場合は、オペレーティングシステムのデフォルトが使用されます。
- 型: int
- デフォルト: 65536
- 指定可能な値: [-1、...]
- 重要度: 中
送信先 Kafka: セキュリティ¶
ちなみに
さまざまなプロトコルで使用できる Replicator の暗号化と認証のオプションの詳細については、 Replicator セキュリティ の概要を参照してください。
dest.kafka.security.protocol
ブローカーとの通信に使用されるプロトコル。指定可能な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
- 型: string
- デフォルト: PLAINTEXT
- 指定可能な値: [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL]
- 重要度: 中
dest.kafka.sasl.mechanism
クライアントの接続に使用される SASL メカニズム。セキュリティプロバイダーを利用できるものであれば、どのメカニズムでも構いません。GSSAPI がデフォルトのメカニズムです。
- 型: string
- デフォルト: GSSAPI
- 重要度: 中
dest.kafka.sasl.kerberos.ticket.renew.window.factor
チケットが更新されるまでログインスレッドがスリープ状態になる時間のウィンドウ係数を指定します。時間のウィンドウ係数は、最後の更新からチケットの有効期限まで計算されます。
- 型: double
- デフォルト: 0.8
- 重要度: 低
dest.kafka.sasl.kerberos.min.time.before.relogin
更新試行から次の更新試行までの、ログインスレッドのスリープ時間。
- 型: long
- デフォルト: 60000
- 重要度: 低
dest.kafka.sasl.kerberos.kinit.cmd
Kerberos の kinit コマンドパス。
- 型: string
- デフォルト: /usr/bin/kinit
- 重要度: 低
dest.kafka.sasl.kerberos.service.name
Kafka が実行される際の Kerberos プリンシパル名。これは、Kafka の JAAS 構成または Kafka の構成のどちらかで定義できます。
- 型: string
- デフォルト: null
- 重要度: 中
dest.kafka.sasl.kerberos.ticket.renew.jitter
更新時間に追加されたランダムジッターのパーセンテージ。
- 型: double
- デフォルト: 0.05
- 重要度: 低
dest.kafka.ssl.protocol
SSLContext の生成に使用する SSL プロトコルです。デフォルト設定は TLS であり、ほとんどのケースに適しています。最新の Java 仮想マシンで使用できる値は、TLS、TLSv1.1、TLSv1.2 です。古い JVM では SSL、SSLv2、および SSLv3 がサポートされている場合もありますが、セキュリティに関する既知の脆弱性があるため、使用しないことをお勧めします。
- 型: string
- デフォルト: TLS
- 重要度: 中
dest.kafka.ssl.provider
SSL 接続に使用するセキュリティプロバイダーの名前。デフォルト値は、JVM のデフォルトのセキュリティプロバイダーです。
- 型: string
- デフォルト: null
- 重要度: 中
dest.kafka.ssl.enabled.protocols
SSL 接続で有効なプロトコルのリスト。
- 型: list
- デフォルト: TLSv1.2,TLSv1.1,TLSv1
- 重要度: 中
dest.kafka.ssl.keystore.location
キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。
- 型: string
- デフォルト: null
- 重要度: 高
dest.kafka.ssl.cipher.suites
暗号スイートのリストです。これは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS または SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。デフォルトでは、使用可能なすべての暗号スイートがサポートされます。
- 型: list
- デフォルト: null
- 重要度: 低
dest.kafka.ssl.secure.random.implementation
SSL 暗号化操作に使用する SecureRandom PRNG 実装。
- 型: string
- デフォルト: null
- 重要度: 低
dest.kafka.ssl.truststore.type
トラストストアファイルのファイルフォーマット。
- 型: string
- デフォルト: JKS
- 重要度: 中
dest.kafka.ssl.keystore.type
キーストアファイルのファイルフォーマット。クライアントでは省略可能です。
- 型: string
- デフォルト: JKS
- 重要度: 中
dest.kafka.ssl.trustmanager.algorithm
SSL 接続のトラストマネージャーファクトリで使用されるアルゴリズム。デフォルト値は、Java 仮想マシン用に構成されているトラストマネージャーファクトリアルゴリズムです。
- 型: string
- デフォルト: PKIX
- 重要度: 低
dest.kafka.ssl.truststore.location
トラストストアファイルの場所。
- 型: string
- デフォルト: null
- 重要度: 高
dest.kafka.ssl.keystore.password
キーストアファイルのストアパスワード。クライアントでは省略可能です。
ssl.keystore.location
を構成した場合にのみ必要となります。- 型: password
- デフォルト: null
- 重要度: 高
dest.kafka.ssl.keymanager.algorithm
SSL 接続のキーマネージャーファクトリで使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに構成されているキーマネージャーファクトリアルゴリズムです。
- 型: string
- デフォルト: SunX509
- 重要度: 低
dest.kafka.ssl.key.password
キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。
- 型: password
- デフォルト: null
- 重要度: 高
dest.kafka.ssl.truststore.password
トラストストアファイルのパスワード。
- 型: password
- デフォルト: null
- 重要度: 高
dest.kafka.ssl.endpoint.identification.algorithm
サーバー証明書を使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。
- 型: string
- デフォルト: null
- 重要度: 低
送信先データ変換¶
key.converter
送信先クラスターに書き込まれたメッセージのキーフィールドのコンバーター。データ変換の必要がない場合は、
io.confluent.connect.replicator.util.ByteArrayConverter
を使用します。- 型: class
- デフォルト: ワーカー構成から継承
- 重要度: 低
value.converter
送信先クラスターに書き込まれたメッセージの値フィールドのコンバーター。データ変換の必要がない場合は、
io.confluent.connect.replicator.util.ByteArrayConverter
を使用します。- 型: class
- デフォルト: ワーカー構成から継承
- 重要度: 低
header.converter
シリアル化された Kafka ヘッダーを Kafka Connect ヘッダーに変換するために使用される HeaderConverter クラス。現在、デフォルト値は
SimpleHeaderConverter
で、base64 でエンコードされたヘッダーを出力します。送信元クラスター名、トピック名、およびタイムスタンプを示す、人間が理解できるヘッダーを取得するには、これを明示的にByteArrayConverter
に設定する必要があります。たとえば、header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
のように設定します。- 型: class
- デフォルト: SimpleHeaderConverter
- 重要度: 低
コンシューマーオフセット変換¶
重要
オフセット変換は Java コンシューマー専用です。他のタイプのアプリケーションでは動作しません。
offset.translator.tasks.max
オフセット変換を実行する Replicator タスクの最大数。-1(デフォルト)の場合、すべてのタスクがオフセット変換を実行します。
- 型: int
- デフォルト: -1
- 重要度: 中
offset.translator.tasks.separate
トピックのレプリケーションを実行するタスクとは別のタスクでオフセットを変換するかどうかです。
- 型: ブール値
- デフォルト: false
- 重要度: 中
offset.translator.batch.period.ms
オフセット変換リクエストがバッチ処理される期間(ミリ秒)。
- 型: int
- デフォルト: 60000
- 重要度: 中
offset.translator.batch.size
オフセット変換リクエストのバッチの最大サイズ。
- 型: int
- デフォルト: 1
- 重要度: 中
offset.timestamps.commit
セカンダリクラスターへの切り替え時に正しく再開できるように、Replicator の内部オフセットタイムスタンプをコミットするかどうか。
- 型: ブール値
- デフォルト: true
- 重要度: 中
src.kafka.timestamps.topic.replication.factor
- 「ソース Kafka」の
src.kafka.timestamps.topic.replication.factor
を参照してください。 src.kafka.timestamps.topic.num.partitions
- 「ソース Kafka」の
src.kafka.timestamps.topic.num.partitions
を参照してください。 provenance.header.enable
レプリケーション中に来歴ヘッダーを使用できるようにするかどうかです。
- 型: ブール値
- デフォルト: false
- 重要度: 中
「重複または循環メッセージ反復を防ぐための来歴ヘッダーの使用」も参照してください。そのセクションで説明されているように、来歴ヘッダーを有効にする場合は、
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
を明示的に設定する必要があります。注釈
Kafka メッセージがカスタムヘッダーを持っており、来歴ヘッダーとともにレプリケートされている場合(
provenance.header.enable=true
)、ヘッダーを処理しているコンシューマーアプリケーションは、__replicator_id
キーを持つヘッダーをスキップする必要があります。コンシューマーがレコードを処理する方法によっては、そのためのコードの変更が不要な場合があります。コンシューマーが特定のヘッダーを要求している場合、変更は不要です。しかし、コンシューマーがすべてのヘッダーを取得するために Kafka クラス Interface Headers のtoArray
を使用する場合、コンシューマーは__replicator_id
を明示的にスキップする必要があります。そうしない場合は、レプリケーションによって、「could not decode JSON types ..」や「unrecognized tokens」などのエラーが発生します。以下に、ConsumerRecord を使用してヘッダーキーを読み取って
__replicator_id
をスキップする、コンシューマー用の Java コードの例を示します。ConsumerRecord<byte[], byte[]> record = ... Headers headers = record.headers(); if (headers != null) { for (Header header : headers) { if (!"__replicator_id".equals(header.key())) { ... process application header... } } }
provenance.header.filter.overrides
別の一時 Replicator インスタンスで構成し、メッセージの循環反復を防ぐ通常のレプリケーションフィルター処理ルールから除外するデータを指定します。以下のようなタプルのセミコロン区切りリストです。
- クラスター ID を来歴ヘッダー内のクラスター ID と照合するための正規表現
- トピックを来歴ヘッダー内のトピック名と照合するための正規表現
- タイムスタンプの範囲
このパラメーターは、元のクラスタークラウド内の Kafka トピックのデータを完全にリストアできない特定のシナリオで、ディザスターリカバリのために使用します。そのような場合は、リカバリーの開始点として、クラスター内の残りのデータを削除し、複数のデータセンターにまたがって Replicator を再実行します。このシナリオで Replicator を再実行する場合は、通常のフィルター処理ルールをオーバーライドします。そうしなければ、前にコピーされたデータのレプリケーションがスキップされます。詳細と例については、ホワイトペーパー「Disaster Recovery for Multi-Datacenter Apache Kafka Deployments」の「Manually Reset Consumer Offsets」にあるトピック「Data Synchronization in Active-Passive Design」を参照してください。
fetch.offset.expiry.ms
オフセット変換のフェッチオフセットリクエストが期限切れになって破棄されるまでの時間(単位: ミリ秒)です。
- 型: long
- デフォルト: 600000
- 重要度: 低
fetch.offset.retry.backoff.ms
オフセット変換中に失敗したフェッチオフセットリクエストを再試行するまでの待機時間(単位: ミリ秒)です。
- 型: long
- デフォルト: 100
- 重要度: 低
スキーマ変換¶
注釈
これらのプロパティは、Replicator のスキーマ変換機能を使用する場合にのみ必要です。詳細については、「スキーマの移行」を参照してください。
schema.registry.topic
Schema Registry の永続ログとしての役割を果たすトピック。
- 型: string
- デフォルト: null
- 重要度: 中
schema.registry.url
スキーマの登録または検索に使用できる Schema Registry インスタンスの URL のコンマ区切りリストです。
- 型: string
- デフォルト: null
- 重要度: 高
schema.registry.max.schemas.per.subject
ローカルにキャッシュできるスキーマの最大数。
- 型: int
- デフォルト: 1000
- 重要度: 低
schema.registry.client.basic.auth.credentials.source
基本認証ヘッダーの認証情報の選択方法を指定します。サポートされる値は URL、USER_INFO、SASL_INHERIT です。
- 型: string
- デフォルト: URL
- 重要度: 中
schema.registry.client.basic.auth.user.info
Basic Auth のユーザー情報は、{username}:{password} という形式で指定します。
- 型: string
- デフォルト: null
- 重要度: 中
schema.subject.translator.class
スキーマサブジェクトのトランスレーター、または、変換を実行しない場合は null。プレフィックス
schema.subject.translator.
が付加されているプロパティは、トランスレーターの configure() メソッドに渡されます。- 型: class
- デフォルト: null
- 重要度: 低
オフセット管理¶
offset.start
レプリケーションの開始ポイントを決定するための設定を指定します。接続オフセットが存在する場合はそれを優先し、存在しない場合はコンシューマーオフセットを優先するには
connect
を使用します。コンシューマーオフセットが存在する場合はそれを優先し、存在しない場合は接続オフセットを優先するにはconsumer
を使用します。接続オフセットもコンシューマーオフセットも存在しない場合は、開始オフセットがトピックの先頭になります。- 型: string
- デフォルト: connect
- 重要度: 低
注釈
Replicator を双方向セットアップで実行しない限り、デフォルトの offset.start=connect
に設定することをお勧めします。
offset.topic.commit
メッセージが送信先クラスターに書き込まれた後、Replicator のコンシューマーオフセットを送信元 Kafka クラスターにコミットするかどうかを指定します。これらのコンシューマーオフセットを使用して、Replicator のラグを簡単に追跡できます。
- 型: ブール値
- デフォルト: true
- 重要度: 中
注釈
topic.preserve.partitions
をfalse
に設定する場合は、offset.topic.commit
もfalse
に設定する必要があります。Replicator では、topic.preserve.partitions=false
の場合、offset.topic.commit=true
はサポートされません。offset.topic.commit
がfalse
に設定されている場合、コンシューマーグループは Replicator によって送信元クラスターに作成されません。
クラスター ID とグループ ID¶
cluster.id
パラメーターは、Replicator を 実行可能ファイルとして 実行する場合に必要です。これは、Replicator 実行可能ファイルのいくつかのインスタンスが同じ --cluster.id
で始まる場合に形成されるクラスターの一意の識別子を定義し、実行可能ファイルのみのプロパティです。cluster.id
にデフォルト値はありません。実行時に指定する必要があります。
同じ cluster.id
を持つ Replicator 実行可能ファイルは、自動的に互いを検出し、クラスターを形成します。
(Connect ワーカーを使用する)非実行可能デプロイの場合、group.id
パラメーターは、Connect ワーカーで指定された一意の文字列で、そのワーカーが属するクラスターグループを識別します。デフォルトは connect-cluster
です。同じ group.id
を持つ 分散ワーカーの構成 は、自動的に互いを検出し、クラスターを形成します。
複数のクラスターが必要な場合は、デプロイタイプに応じて、異なる ID を適切に指定する必要があります。
- Replicator を 実行可能ファイルとして 実行する場合は、各クラスターに一意の
cluster.id
が必要です。 - 非実行可能分散ワーカー の場合は、各クラスターに一意の
group.id
が必要です。
ちなみに
group.id
は、Replicator ではなく Connect ワーカーのプロパティです。非実行可能 Replicator デプロイで cluster.id
と同じ目的を果たすため、ここで説明します。
Confluent Platform ライセンス¶
重要
5.5.0 以降、Replicator は、Replicator が実行中の場合でも、ライセンスキーの有効期限が切れるとすぐに失敗します。
ちなみに
Confluent Platform のライセンス情報全般については、「Confluent Platform ライセンス」を参照してください。
confluent.topic.bootstrap.servers
ライセンス供与に使用される Kafka クラスターとの初期接続を確立するために使用するホストとポートのペアのリスト。初期接続で、クラスター内のすべてのサーバーが検出されます。このリストは、
host1:port1,host2:port2,…
という形式にする必要があります。これらのサーバーは、初期接続ですべてのクラスターメンバーシップを検出するためにのみ使用されます。これは動的に変わる可能性があるので、このリストにすべてのサーバーセットを含める必要はありません(ただし、サーバーの障害に備えて、複数指定しておくこともできます)。- 型: list
- 重要度: 高
confluent.topic
Confluent Platform の構成(ライセンス情報など)で使用される Kafka トピックの名前。
- 型: string
- デフォルト: _confluent-command
- 重要度: 低
confluent.topic.replication.factor
Confluent Platform の構成(ライセンス情報など)で使用される Kafka のトピックのレプリケーション係数。
- 型: int
- デフォルト: 3
- 重要度: 低
Confluent ライセンスのプロパティ¶
注釈
デフォルトでは、ライセンス関連のプロパティ( confluent.topic.*
)は、dest.kafka.*
コネクタープロパティから継承されます。
confluent.license
Confluent では、各契約者にエンタープライズライセンスキーを発行します。ライセンスキーはテキストであるため、コピーアンドペーストで
confluent.license
の値として使用できます。試用ライセンスでは、コネクターを 30 日間使用できます。開発者ライセンスでは、ブローカーが 1 つの開発環境で、コネクターを無期限に使用できます。すでにご契約されている場合は、詳細について Confluent サポートにお問い合わせください。
- 型: string
- デフォルト: ""
- 指定可能な値: Confluent Platform ライセンス
- 重要度: 高
confluent.topic.ssl.truststore.location
トラストストアファイルの場所。
- 型: string
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.truststore.password
トラストストアファイルのパスワード。パスワードが設定されていなくてもトラストストアにアクセスできますが、整合性チェックが無効になります。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.keystore.location
キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。
- 型: string
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.keystore.password
キーストアファイルのストアパスワード。クライアントでは省略可能です。ssl.keystore.location を構成した場合にのみ必要となります。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.key.password
キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.security.protocol
ブローカーとの通信に使用されるプロトコル。指定可能な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
- 型: string
- デフォルト: "PLAINTEXT"
- 重要度: 中
ライセンストピックの構成¶
Confluent エンタープライズライセンスは、_confluent-command
トピックに格納されます。このトピックは、デフォルトで作成され、confluent.license
プロパティで指定されたライセンスキーに対応するライセンスが格納されます。
注釈
パブリックキーは Kafka のトピックには保管されません。
以下では、デフォルトの _confluent-command
トピックがさまざまなシナリオでどのように生成されるかを説明します。
confluent.license
プロパティを追加していない場合、またはこのプロパティを(confluent.license=
などで)空にした場合は、_confluent command
トピックに、30 日間の試用ライセンスが自動的に生成されます。- 有効なライセンスキーを(
confluent.license=<valid-license-key>
などで)追加すると、有効なライセンスが_confluent-command
トピックに追加されます。
以下は、開発およびテストのための最小限のプロパティの例です。
_confluent-command
トピックの名前は confluent.topic
プロパティを使用して変更できます(たとえば、環境に厳格な命名規則がある場合など)。以下の例は、この変更と、構成される Kafka ブートストラップサーバーを示しています。
confluent.topic=foo_confluent-command
confluent.topic.bootstrap.servers=localhost:9092
上の例は、開発およびテストで使用できるブートストラップサーバーの必要最小限のプロパティを示しています。本稼働環境の場合は、プレフィックスとして confluent.topic.
を付けて、通常のプロデューサー、コンシューマー、およびトピックの各構成プロパティをコネクターのプロパティに追加します。
ライセンストピックの ACL¶
_confluent-command
トピックには、confluent.license
プロパティで指定されたライセンスキーに対応するライセンスが格納されます。このトピックはデフォルトで作成されます。このトピックにアクセスするコネクターには、以下の ACL を構成する必要があります。
コネクターでトピックの作成が必要な場合は、リソースクラスターでの CREATE および DESCRIBE。
_confluent-command
トピックでの DESCRIBE、READ、および WRITE。重要
You may also use DESCRIBE and READ without WRITE to restrict access to read-only for license topic ACLs. If a topic exists, the LicenseManager will not try to create the topic.
ライセンスを使用する各プリンシパルに個々にアクセス許可を付与することも、 ワイルドカード入力 を使用してすべてのクライアントを許可することもできます。以下の例は、リソースクラスターおよび _confluent-command
トピックの ACL を構成するために使用できるコマンドを示しています。
リソースクラスターの CREATE および DESCRIBE の ACL を設定します。
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User:<principal> \ --operation CREATE --operation DESCRIBE --cluster
_confluent-command
トピックに対する DESCRIBE、READ、および WRITE の ACL を設定します。kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User:<principal> \ --operation DESCRIBE --operation READ --operation WRITE --topic _confluent-command
デフォルトの構成プロパティのオーバーライド¶
confluent.topic.replication.factor
を使用することにより、レプリケーション係数をオーバーライドできます。たとえば、(開発およびテスト用の)ブローカー数 3 未満の環境で送信先として Kafka クラスターを使用する場合、confluent.topic.replication.factor
プロパティに 1
を設定する必要があります。
プロデューサー固有のプロパティは、confluent.topic.producer.
プレフィックスを使用してオーバーライドできます。コンシューマー固有のプロパティは、confluent.topic.consumer.
プレフィックスを使用してオーバーライドできます。
デフォルト値を使用することも、他のプロパティをカスタマイズすることもできます。たとえば、confluent.topic.client.id
プロパティのデフォルトは、コネクターの名前に -licensing
サフィックスを付けたものです。クライアント接続に SSL または SASL を必要とするブローカーの構成設定では、このプレフィックスを使用できます。
トピックのクリーンアップのポリシーはオーバーライドできません。トピックは、常にパーティションが単一で、圧縮されるからです。また、このプレフィックスを使用してシリアライザーおよびデシリアライザーを指定しないでください。追加しても無視されます。