Confluent Replicator 構成プロパティ

このセクションは、Replicator 構成オプションについてのリファレンスとなっています。

Replicator の機能、ユースケース、およびフェイルオーバーシナリオでの使用の詳細については、「Replicator の概要」を参照してください。

このコネクターを使用するには、connector.class 構成プロパティで、そのコネクタークラスの名前を指定します。

connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector

コネクター固有の構成プロパティについて、以降で説明します。

ソーストピック

重要

__consumer_timestamps トピックでは、Replicator のオフセット変換にのみ関係するため、送信元から送信先にコピーされません。オフセット変換の詳細については、「コンシューマーのオフセット変換について」を参照してください。

topic.regex

送信先クラスターにレプリケートするトピックの正規表現。正規表現の詳細については、「Java Regular Expressions」を参照してください。

  • 型: string
  • デフォルト: null
  • 重要度: 高

重要

Replicator では、topic.regex または topic.whitelist に一致する場合でも、内部の __consumer_offsets または __transaction_state トピックを送信元クラスターからコピーしません。

注釈

topic.regex プロパティと以下の 2 つのプロパティ topic.whitelisttopic.blacklist は、以下の順に適用されます。

  1. topic.blacklist で指定されているトピックはレプリケートされません。
  2. topic.whitelist で指定されているすべてのトピックまたは topic.regex に一致するトピックはレプリケートされます(ホワイトリストに登録されているトピックは、正規表現に一致する必要はありません)。

3 つのプロパティすべてを 1 つの Replicator 構成で指定できます。これらが連携して、レプリケートするトピックが決定されます。たとえば、一連のトピック名 topic1topic2topic3topic4 がある場合、topic.whitelisttopic1topic.regextopic[2,3,4]topic.blacklisttopic4 を指定できます。Replicator はこれらの制約を組み合わせて、レプリケートされたトピックの最終的なリストとして、topic1topic2topic3 を含むリストを生成します。

3 つのすべてのプロパティがデフォルト値のままの場合、トピックはレプリケートされません。

topic.whitelist

レプリケートするトピックのホワイトリスト。

  • 型: list
  • デフォルト: ""
  • 重要度: 高

重要

起動時に、Replicator はレプリケートするトピックをリストします。topic.whitelist のいずれかのトピックが存在しないか、その ACL が不十分であるためにトピックをリストできない場合、Replicator は失敗します。Replicator のプリンシパルには、リスト内のすべてのトピックの ACL を記述するためのアクセス許可が付与されている必要があります。詳細については、「セキュリティ」を参照してください。

Replicator バージョン 5.5 以降の場合は、topic.whitelist にリストされているトピックは、送信元クラスターに存在している必要があります。Replicator の起動後にトピックを作成する場合は、代わりに topic.regex を使用します。

topic.blacklist

レプリケーションから除外するトピック。

  • 型: list
  • デフォルト: ""
  • 重要度: 高
topic.poll.interval.ms

topic.whitelist または topic.regex に一致する新しいトピックについて送信元クラスターをポーリングする頻度。

  • 型: int
  • デフォルト: 120000
  • 指定可能な値: [0,…]
  • 重要度: 低

重要

topic.regextopic.whitelist、および topic.blacklist 構成は、以下の順に適用されます。

  1. topic.blacklist で指定されているトピックはレプリケートされません。
  2. topic.whitelist で指定されているすべてのトピックまたは topic.regex に一致するトピックはレプリケートされます(ホワイトリストに登録されているトピックは、正規表現に一致する必要はありません)。

3 つのすべてのプロパティがデフォルト値のままの場合、トピックはレプリケートされません。

ソースデータ変換

src.key.converter

送信元クラスターから取得されたメッセージのキーフィールドのコンバーター。

  • 型: class
  • デフォルト: io.confluent.connect.replicator.util.ByteArrayConverter
  • 重要度: 低
src.value.converter

送信元クラスターから取得したメッセージの値フィールドのコンバーター。

  • 型: class
  • デフォルト: io.confluent.connect.replicator.util.ByteArrayConverter
  • 重要度: 低
src.header.converter

シリアル化された Kafka ヘッダーを Kafka Connect ヘッダーに変換するために使用される HeaderConverter クラス。デフォルト値は ByteArrayConverter で、単純に入力バイトを送信先レコードに渡します。

  • 型: class
  • デフォルト: io.confluent.connect.replicator.util.ByteArrayConverter
  • 重要度: 低

ソース Kafka

以下の構成オプションは、Kafka クライアントで使用される共通のプロパティです。Replicator は、これらのオプションを使用して、送信元クラスター(コンシューマー、AdminClient)に接続します。src.kafka プレフィックスを使用する有効なクライアントプロパティは、送信元クラスターに接続するクライアントに転送されます。コンシューマーと AdminClient の構成が同じである場合は、src.kafka を使用する必要があります。

src.kafka.bootstrap.servers

ソース Kafka クラスターとの初期接続を確立するために使用するホストとポートのペアのリスト。クライアントは、ここでブートストラップ用に指定されるサーバーに関係なくすべてのサーバーを使用します。このリストは、一連のサーバーをすべて検出するために使用する初期ホストを制御するだけのものです。このリストは、host1:port1,host2:port2,… という形式にする必要があります。これらのサーバーは、初期接続ですべてのクラスターメンバーシップを検出するためにのみ使用されます。すべてのサーバーセットを指定する必要はありませんが、フェイルオーバーに備えて、複数指定しておくことができます。

  • 型: list
  • 重要度: 高
src.kafka.client.id

リクエストを実行する際にサーバーに渡す ID 文字列。この文字列は、リクエストの送信元の追跡に使用されます。論理アプリケーション名をサーバー側のリクエストログに含めることができます。

  • 型: string
  • デフォルト: ""
  • 重要度: 低
src.kafka.request.timeout.ms

クライアントがリクエストの応答を待つ最大の待機時間を指定します。タイムアウト時間が経過するまで応答を受信できなかった場合、クライアントは、リクエストを再送し、再試行回数が上限に達した場合は、リクエストを失敗とします。

  • 型: int
  • デフォルト: 30000
  • 指定可能な値: [0,…]
  • 重要度: 中
src.kafka.retry.backoff.ms

トピックのパーティションに対するリクエストが失敗した場合に再試行を行うまでの待機時間を指定します。これにより、失敗シナリオでリクエストが短時間に繰り返し送信されることを回避できます。

  • 型: long
  • デフォルト: 100
  • 指定可能な値: [0,…]
  • 重要度: 低
src.kafka.connections.max.idle.ms

アイドル接続を閉じるまでの待ち時間を指定します(ミリ秒)。

  • 型: long
  • デフォルト: 540000
  • 重要度: 中
src.kafka.reconnect.backoff.ms

ホストへの再接続を試行する前の待機時間を指定します。これにより、ホストへの接続が短時間に何度も繰り返されることを回避できます。このバックオフは、コンシューマーからブローカーに送信されるすべてのリクエストに適用されます。

  • 型: long
  • デフォルト: 50
  • 指定可能な値: [0,…]
  • 重要度: 低
src.kafka.metric.reporters

Metrics Reporter として使用するクラスのリスト。MetricReporter インターフェイスを実装することで、新しいメトリック作成の通知を受けるクラスをプラグインすることができます。JMX 統計情報を登録するための JmxReporter が必ず含まれます。

  • 型: list
  • デフォルト: ""
  • 重要度: 低
src.kafka.metrics.num.samples

メトリクスの計算用に維持されるサンプルの数。

  • 型: int
  • デフォルト: 2
  • 指定可能な値: [1,…]
  • 重要度: 低
src.kafka.metrics.sample.window.ms

メトリクスサンプルが計算される時間枠。

  • 型: long
  • デフォルト: 30000
  • 指定可能な値: [0,…]
  • 重要度: 低
src.kafka.send.buffer.bytes

データを送信する際に使用される TCP 送信バッファ(SO_SNDBUF)のサイズ。値が -1 である場合は、オペレーティングシステムのデフォルトが使用されます。

  • 型: int
  • デフォルト: 131072
  • 指定可能な値: [-1、...]
  • 重要度: 中
src.kafka.receive.buffer.bytes

データを読み取る際に使用される TCP 受信バッファ(SO_RCVBUF)のサイズ。値が -1 である場合は、オペレーティングシステムのデフォルトが使用されます。

  • 型: int
  • デフォルト: 65536
  • 指定可能な値: [-1、...]
  • 重要度: 中
src.kafka.timestamps.topic.replication.factor

コンシューマータイムスタンプトピックのレプリケーション係数。

  • 型: int
  • デフォルト: 3
  • 重要度: 高
src.kafka.timestamps.topic.num.partitions

コンシューマータイムスタンプトピックのパーティションの数。

  • 型: int
  • デフォルト: 50
  • 重要度: 高

ソース Kafka: セキュリティ

ちなみに

さまざまなプロトコルで使用できる Replicator の暗号化と認証のオプションの詳細については、 Replicator セキュリティ の概要を参照してください。

src.kafka.security.protocol

ブローカーとの通信に使用されるプロトコル。指定可能な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。

  • 型: string
  • デフォルト: PLAINTEXT
  • 指定可能な値: [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL]
  • 重要度: 中
src.kafka.sasl.mechanism

クライアントの接続に使用される SASL メカニズム。セキュリティプロバイダーを利用できるものであれば、どのメカニズムでも構いません。GSSAPI がデフォルトのメカニズムです。

  • 型: string
  • デフォルト: GSSAPI
  • 重要度: 中
src.kafka.sasl.kerberos.ticket.renew.window.factor

最後の更新からチケットの有効期限までの時間が指定のウィンドウ係数に達するまでの間、ログインスレッドはスリープ状態になります。この時間の経過後、チケットの更新が試行されます。

  • 型: double
  • デフォルト: 0.8
  • 重要度: 低
src.kafka.sasl.kerberos.min.time.before.relogin

更新試行から次の更新試行までの、ログインスレッドのスリープ時間。

  • 型: long
  • デフォルト: 60000
  • 重要度: 低
src.kafka.sasl.kerberos.kinit.cmd

Kerberos の kinit コマンドパス。

  • 型: string
  • デフォルト: /usr/bin/kinit
  • 重要度: 低
src.kafka.sasl.kerberos.service.name

Kafka が実行される際の Kerberos プリンシパル名。これは、Kafka の JAAS 構成または Kafka の構成のどちらかで定義できます。

  • 型: string
  • デフォルト: null
  • 重要度: 中
src.kafka.sasl.kerberos.ticket.renew.jitter

更新時間に追加されたランダムジッターのパーセンテージ。

  • 型: double
  • デフォルト: 0.05
  • 重要度: 低
src.kafka.ssl.protocol

SSLContext の生成に使用する SSL プロトコルです。デフォルト設定は TLS であり、ほとんどのケースに適しています。最新の Java 仮想マシンで使用できる値は、TLS、TLSv1.1、TLSv1.2 です。古い JVM では SSL、SSLv2、および SSLv3 がサポートされている場合もありますが、セキュリティに関する既知の脆弱性があるため、使用しないことをお勧めします。

  • 型: string
  • デフォルト: TLS
  • 重要度: 中
src.kafka.ssl.provider

SSL 接続に使用するセキュリティプロバイダーの名前。デフォルト値は、JVM のデフォルトのセキュリティプロバイダーです。

  • 型: string
  • デフォルト: null
  • 重要度: 中
src.kafka.ssl.enabled.protocols

SSL 接続で有効なプロトコルのリスト。

  • 型: list
  • デフォルト: TLSv1.2,TLSv1.1,TLSv1
  • 重要度: 中
src.kafka.ssl.keystore.location

キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。

  • 型: string
  • デフォルト: null
  • 重要度: 高
src.kafka.ssl.cipher.suites

暗号スイートのリストです。これは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS または SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。デフォルトでは、使用可能なすべての暗号スイートがサポートされます。

  • 型: list
  • デフォルト: null
  • 重要度: 低
src.kafka.ssl.secure.random.implementation

SSL 暗号化操作に使用する SecureRandom PRNG 実装。

  • 型: string
  • デフォルト: null
  • 重要度: 低
src.kafka.ssl.truststore.type

トラストストアファイルのファイルフォーマット。

  • 型: string
  • デフォルト: JKS
  • 重要度: 中
src.kafka.ssl.keystore.type

キーストアファイルのファイルフォーマット。クライアントでは省略可能です。

  • 型: string
  • デフォルト: JKS
  • 重要度: 中
src.kafka.ssl.trustmanager.algorithm

SSL 接続のトラストマネージャーファクトリで使用されるアルゴリズム。デフォルト値は、Java 仮想マシン用に構成されているトラストマネージャーファクトリアルゴリズムです。

  • 型: string
  • デフォルト: PKIX
  • 重要度: 低
src.kafka.ssl.truststore.location

トラストストアファイルの場所。

  • 型: string
  • デフォルト: null
  • 重要度: 高
src.kafka.ssl.keystore.password

キーストアファイルのストアパスワード。クライアントでは省略可能です。 ssl.keystore.location を構成した場合にのみ必要となります。

  • 型: password
  • デフォルト: null
  • 重要度: 高
src.kafka.ssl.keymanager.algorithm

SSL 接続のキーマネージャーファクトリで使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに構成されているキーマネージャーファクトリアルゴリズムです。

  • 型: string
  • デフォルト: SunX509
  • 重要度: 低
src.kafka.ssl.key.password

キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。

  • 型: password
  • デフォルト: null
  • 重要度: 高
src.kafka.ssl.truststore.password

トラストストアファイルのパスワード。

  • 型: password
  • デフォルト: null
  • 重要度: 高
src.kafka.ssl.endpoint.identification.algorithm

サーバー証明書を使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。

  • 型: string
  • デフォルト: https
  • 重要度: 低

ソース Kafka: コンシューマー

以下の構成オプションは、Kafka コンシューマーに固有のプロパティです。これらのオプションは、src.kafka プロパティと組み合わせて使用され、送信元クラスターに接続するコンシューマーに転送されます。

src.consumer.allow.auto.create.topics

トピックへのサブスクライブまたはトピックの割り当て時にブローカーでトピックを自動作成できるようにします。サブスクライブするトピックが自動作成されるのは、ブローカーで auto.create.topics.enable ブローカー構成の使用が許可されている場合のみです。0.11.0 より古いブローカーを使用する場合は、この構成を false に設定する必要があります。

  • 型: ブール値
  • デフォルト: false
  • 重要度: 中

注釈

通常、コンシューマーではこの構成のデフォルトは true です。ただし、Replicator では、コンシューマーによるトピックの作成を許可しないように、この構成は内部的にデフォルトで false に設定されています。Replicator の実行中にトピックを削除するには、topic.auto.createtrue に設定されている場合、以下の 2 つの方法のいずれかで行います。

  • Replicator を停止し、すべてのトピックを削除します。

または

  • Kafka クラスター上で auto.create.topics.enablefalse に設定します。
src.consumer.interceptor.classes

インターセプターとして使用するクラスのリスト。ConsumerInterceptor インターフェイスを実装すると、コンシューマーが受信したレコードをインターセプト(および場合によっては変換)することができます。デフォルトでは、インターセプターはありません。

  • 型: list
  • デフォルト: null
  • 重要度: 低
src.consumer.fetch.max.wait.ms

fetch.min.bytes によって指定された要件を即座に満たすための十分なデータがない場合に、フェッチリクエストに応答するまでサーバーがブロックする最大時間。

  • 型: int
  • デフォルト: 500
  • 指定可能な値: [0,…]
  • 重要度: 低
src.consumer.fetch.min.bytes

サーバーがフェッチリクエストに対して返す必要がある最小データ量。利用可能なデータが不十分な場合、リクエストはその量のデータが集積するのを待機してからリクエストに応答します。デフォルト設定の 1 バイトは、データが 1 バイトでもあればすぐにフェッチリクエストに応答することを意味します。データがない場合は、データの到着を待機した後にタイムアウトします。これを 1 より大きい値に設定すると、サーバーはさらに多くのデータが集積するのを待機します。これにより、サーバースループットは少し向上しますが、レイテンシは多少長くなります。

  • 型: int
  • デフォルト: 1
  • 指定可能な値: [0,…]
  • 重要度: 高
src.consumer.fetch.max.bytes

サーバーがフェッチリクエストに対して返す必要がある最大データ量。これは絶対最大値ではありません。フェッチの最初の空でないパーティション内の最初のメッセージがこの値より大きい場合でも、コンシューマーが先に進むことができるように、メッセージは返されます。ブローカーによって受け入れられる最大メッセージサイズは、message.max.bytes (ブローカー構成)または max.message.bytes (トピック構成)で定義されます。コンシューマーは、複数のフェッチを並列処理します。

  • 型: int
  • デフォルト: 52428800
  • 指定可能な値: [0,…]
  • 重要度: 中
src.consumer.max.partition.fetch.bytes

サーバーが返すパーティションごとの最大データ量。フェッチの最初の空でないパーティション内の最初のメッセージがこの制限より大きい場合でも、コンシューマーが先に進むことができるように、メッセージは返されます。ブローカーによって受け入れられる最大メッセージサイズは、message.max.bytes (ブローカー構成)または max.message.bytes (トピック構成)で定義されます。コンシューマーリクエストのサイズの制限については、「fetch.max.bytes」を参照してください

  • 型: int
  • デフォルト: 1048576
  • 指定可能な値: [0,…]
  • 重要度: 高
src.consumer.max.poll.records

poll() への単一の呼び出しで返されるレコードの最大数。

  • 型: int
  • デフォルト: 500
  • 指定可能な値: [1,…]
  • 重要度: 中
src.consumer.check.crcs

消費されたレコードの CRC32 を自動的に確認します。これにより、メッセージに対するネットワーク上またはディスク上の破損が発生しなくなります。このチェックにより、オーバーヘッドがいくらか増えるため、非常に高いパフォーマンスが必要な場合は無効にすることができます。

  • 型: ブール値
  • デフォルト: true
  • 重要度: 低

送信先トピック

topic.rename.format

送信先クラスター内のトピック名のフォーマット制御文字列。送信元のトピック名を表すプレースホルダーとして ${topic} を含めることができます。たとえば、トピック ordersdc_${topic} は、送信先トピック名 dc_orders にマップされます。

複数の送信元クラスターから Replicator を構成する場合は、トピック名が競合する可能性があるため注意してください。通常は、(上の例のように)各クラスターに個別のプレフィックスまたはサフィックスを付けることをお勧めします。

  • 型: string
  • デフォルト: ${topic}
  • 重要度: 高

ちなみに

Confluent Platform 5.2.0 以降では、Confluent Replicator を使用して、スキーマ を別の Schema Registry に移行することができます。移行先は、セルフマネージド型クラスターでも Confluent Cloud 内のクラスターでもかまいません。

次のように、スキーマのデフォルトのサブジェクトトランスレーターを使用するように Confluent Replicator を構成したとします。

schema.subject.translator.class=io.confluent.connect.replicator.schemas.DefaultSubjectTranslator

この場合、スキーマの移行中にターゲット Schema Registry 内のサブジェクトの名前を変更するには、topic.rename.format を使用します。

たとえば、topic.rename.format${topic}-replica である場合、移行元の Schema Registry にある mytopic-value というサブジェクトが、移行先の "新しい" レジストリでは、mytopic-replica-value という名前に変更されます。

名前が変更されるのは、サブジェクト命名方法として TopicNameStrategy-key または -value が付いたサブジェクト名)を使用するスキーマだけです。その他のサブジェクト(異なる命名方法を使用するサブジェクト)はすべて移行されますが、名前は変更されません。

topic.auto.create

必要に応じて、送信先クラスターにトピックを自動的に作成するかどうかを指定します。

  • 型: ブール値
  • デフォルト: true
  • 重要度: 低
topic.preserve.partitions

送信元クラスターと一致するように送信先クラスター内のパーティション数を自動的に増やし、送信元クラスターからレプリケーションされたメッセージが送信先クラスター内の同じパーティションを使用するかどうかを指定します。

  • 型: ブール値
  • デフォルト: true
  • 重要度: 低

注釈

  • topic.preserve.partitionsfalse に設定する場合は、offset.topic.commitfalse に設定する必要があります。Replicator では、topic.preserve.partitions=false の場合、offset.topic.commit=true はサポートされません。
  • offset.topic.commitfalse に設定されている場合、送信元クラスターにコンシューマーグループは作成されません。
topic.create.backoff.ms

トピックの自動作成または展開を再試行するまでの待機時間。

  • 型: int
  • デフォルト: 120000
  • 指定可能な値: [0,…]
  • 重要度: 低
topic.config.sync

トピック構成を送信先クラスターと定期的に同期するかどうかを指定します。

  • 型: ブール値
  • デフォルト: true
  • 重要度: 低
topic.config.sync.interval.ms

topic.config.sync が有効な場合に、構成の変更を確認する頻度。

  • 型: int
  • デフォルト: 120000
  • 指定可能な値: [0,…]
  • 重要度: 低
topic.timestamp.type

送信先クラスター内のトピックのタイムスタンプの種類です。

  • 型: string
  • デフォルト: CreateTime
  • 指定可能な値: [CreateTime、LogAppendTime]
  • 重要度: 低
dest.topic.replication.factor

送信先クラスターのトピックのレプリケーション係数を設定します。

  • 型: int
  • デフォルト: 送信元レプリケーション係数
  • 指定可能な値: > 0
  • 重要度: 低

送信先 Kafka

ちなみに

Replicator 5.4.0 から、dest.kafka.* 構成は必須ではなくなりました。これらの構成が指定されていない場合、値は Connect ワーカーのプロデューサー構成から推測されます。

以下の構成オプションは、Kafka クライアントで使用される共通のプロパティです。Replicator は、これらのオプションを使用して、送信先クラスター(AdminClient)に接続します。dest.kafka プレフィックスを使用する有効なクライアントプロパティは、送信先クラスターに接続するクライアントに転送されて、管理操作に使用されます。

dest.kafka.bootstrap.servers

送信先 Kafka クラスターとの初期接続を確立するために使用するホストとポートのペアのリスト。クライアントは、ここでブートストラップ用に指定されるサーバーに関係なくすべてのサーバーを使用します。このリストは、一連のサーバーをすべて検出するために使用する初期ホストを制御するだけのものです。このリストは、host1:port1,host2:port2,… という形式にする必要があります。これらのサーバーは、初期接続ですべてのクラスターメンバーシップを検出するためにのみ使用されます。すべてのサーバーセットを指定する必要はありませんが、フェイルオーバーに備えて、複数指定しておくことができます。

  • 型: list
  • 重要度: 高
dest.kafka.client.id

リクエストを実行する際にサーバーに渡す ID 文字列。この文字列は、リクエストの送信元の追跡に使用されます。論理アプリケーション名をサーバー側のリクエストログに含めることができます。

  • 型: string
  • デフォルト: ""
  • 重要度: 低
dest.kafka.request.timeout.ms

クライアントがリクエストの応答を待つ最大の待機時間を指定します。タイムアウト時間が経過するまで応答を受信できなかった場合、クライアントは、リクエストを再送し、再試行回数が上限に達した場合は、リクエストを失敗とします。

  • 型: int
  • デフォルト: 30000
  • 指定可能な値: [0,…]
  • 重要度: 中
dest.kafka.retry.backoff.ms

トピックのパーティションに対するリクエストが失敗した場合に再試行を行うまでの待機時間を指定します。これにより、失敗シナリオでリクエストが短時間に繰り返し送信されることを回避できます。

  • 型: long
  • デフォルト: 100
  • 指定可能な値: [0,…]
  • 重要度: 低
dest.kafka.connections.max.idle.ms

アイドル接続を閉じるまでの待ち時間を指定します(ミリ秒)。

  • 型: long
  • デフォルト: 540000
  • 重要度: 中
dest.kafka.reconnect.backoff.ms

ホストへの再接続を試行する前の待機時間を指定します。これにより、ホストへの接続が短時間に何度も繰り返されることを回避できます。このバックオフは、コンシューマーからブローカーに送信されるすべてのリクエストに適用されます。

  • 型: long
  • デフォルト: 50
  • 指定可能な値: [0,…]
  • 重要度: 低
dest.kafka.metric.reporters

Metrics Reporter として使用するクラスのリスト。MetricReporter インターフェイスを実装することで、新しいメトリック作成の通知を受けるクラスをプラグインすることができます。JMX 統計情報を登録するための JmxReporter が必ず含まれます。

  • 型: list
  • デフォルト: ""
  • 重要度: 低
dest.kafka.metrics.num.samples

メトリクスの計算用に維持されるサンプルの数。

  • 型: int
  • デフォルト: 2
  • 指定可能な値: [1,…]
  • 重要度: 低
dest.kafka.metrics.sample.window.ms

メトリクスサンプルが計算される時間枠。

  • 型: long
  • デフォルト: 30000
  • 指定可能な値: [0,…]
  • 重要度: 低
dest.kafka.send.buffer.bytes

データを送信する際に使用される TCP 送信バッファ(SO_SNDBUF)のサイズ。値が -1 である場合は、オペレーティングシステムのデフォルトが使用されます。

  • 型: int
  • デフォルト: 131072
  • 指定可能な値: [-1、...]
  • 重要度: 中
dest.kafka.receive.buffer.bytes

データを読み取る際に使用される TCP 受信バッファ(SO_RCVBUF)のサイズ。値が -1 である場合は、オペレーティングシステムのデフォルトが使用されます。

  • 型: int
  • デフォルト: 65536
  • 指定可能な値: [-1、...]
  • 重要度: 中

送信先 Kafka: セキュリティ

ちなみに

さまざまなプロトコルで使用できる Replicator の暗号化と認証のオプションの詳細については、 Replicator セキュリティ の概要を参照してください。

dest.kafka.security.protocol

ブローカーとの通信に使用されるプロトコル。指定可能な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。

  • 型: string
  • デフォルト: PLAINTEXT
  • 指定可能な値: [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL]
  • 重要度: 中
dest.kafka.sasl.mechanism

クライアントの接続に使用される SASL メカニズム。セキュリティプロバイダーを利用できるものであれば、どのメカニズムでも構いません。GSSAPI がデフォルトのメカニズムです。

  • 型: string
  • デフォルト: GSSAPI
  • 重要度: 中
dest.kafka.sasl.kerberos.ticket.renew.window.factor

チケットが更新されるまでログインスレッドがスリープ状態になる時間のウィンドウ係数を指定します。時間のウィンドウ係数は、最後の更新からチケットの有効期限まで計算されます。

  • 型: double
  • デフォルト: 0.8
  • 重要度: 低
dest.kafka.sasl.kerberos.min.time.before.relogin

更新試行から次の更新試行までの、ログインスレッドのスリープ時間。

  • 型: long
  • デフォルト: 60000
  • 重要度: 低
dest.kafka.sasl.kerberos.kinit.cmd

Kerberos の kinit コマンドパス。

  • 型: string
  • デフォルト: /usr/bin/kinit
  • 重要度: 低
dest.kafka.sasl.kerberos.service.name

Kafka が実行される際の Kerberos プリンシパル名。これは、Kafka の JAAS 構成または Kafka の構成のどちらかで定義できます。

  • 型: string
  • デフォルト: null
  • 重要度: 中
dest.kafka.sasl.kerberos.ticket.renew.jitter

更新時間に追加されたランダムジッターのパーセンテージ。

  • 型: double
  • デフォルト: 0.05
  • 重要度: 低
dest.kafka.ssl.protocol

SSLContext の生成に使用する SSL プロトコルです。デフォルト設定は TLS であり、ほとんどのケースに適しています。最新の Java 仮想マシンで使用できる値は、TLS、TLSv1.1、TLSv1.2 です。古い JVM では SSL、SSLv2、および SSLv3 がサポートされている場合もありますが、セキュリティに関する既知の脆弱性があるため、使用しないことをお勧めします。

  • 型: string
  • デフォルト: TLS
  • 重要度: 中
dest.kafka.ssl.provider

SSL 接続に使用するセキュリティプロバイダーの名前。デフォルト値は、JVM のデフォルトのセキュリティプロバイダーです。

  • 型: string
  • デフォルト: null
  • 重要度: 中
dest.kafka.ssl.enabled.protocols

SSL 接続で有効なプロトコルのリスト。

  • 型: list
  • デフォルト: TLSv1.2,TLSv1.1,TLSv1
  • 重要度: 中
dest.kafka.ssl.keystore.location

キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。

  • 型: string
  • デフォルト: null
  • 重要度: 高
dest.kafka.ssl.cipher.suites

暗号スイートのリストです。これは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS または SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。デフォルトでは、使用可能なすべての暗号スイートがサポートされます。

  • 型: list
  • デフォルト: null
  • 重要度: 低
dest.kafka.ssl.secure.random.implementation

SSL 暗号化操作に使用する SecureRandom PRNG 実装。

  • 型: string
  • デフォルト: null
  • 重要度: 低
dest.kafka.ssl.truststore.type

トラストストアファイルのファイルフォーマット。

  • 型: string
  • デフォルト: JKS
  • 重要度: 中
dest.kafka.ssl.keystore.type

キーストアファイルのファイルフォーマット。クライアントでは省略可能です。

  • 型: string
  • デフォルト: JKS
  • 重要度: 中
dest.kafka.ssl.trustmanager.algorithm

SSL 接続のトラストマネージャーファクトリで使用されるアルゴリズム。デフォルト値は、Java 仮想マシン用に構成されているトラストマネージャーファクトリアルゴリズムです。

  • 型: string
  • デフォルト: PKIX
  • 重要度: 低
dest.kafka.ssl.truststore.location

トラストストアファイルの場所。

  • 型: string
  • デフォルト: null
  • 重要度: 高
dest.kafka.ssl.keystore.password

キーストアファイルのストアパスワード。クライアントでは省略可能です。 ssl.keystore.location を構成した場合にのみ必要となります。

  • 型: password
  • デフォルト: null
  • 重要度: 高
dest.kafka.ssl.keymanager.algorithm

SSL 接続のキーマネージャーファクトリで使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに構成されているキーマネージャーファクトリアルゴリズムです。

  • 型: string
  • デフォルト: SunX509
  • 重要度: 低
dest.kafka.ssl.key.password

キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。

  • 型: password
  • デフォルト: null
  • 重要度: 高
dest.kafka.ssl.truststore.password

トラストストアファイルのパスワード。

  • 型: password
  • デフォルト: null
  • 重要度: 高
dest.kafka.ssl.endpoint.identification.algorithm

サーバー証明書を使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。

  • 型: string
  • デフォルト: null
  • 重要度: 低

送信先データ変換

key.converter

送信先クラスターに書き込まれたメッセージのキーフィールドのコンバーター。データ変換の必要がない場合は、io.confluent.connect.replicator.util.ByteArrayConverter を使用します。

  • 型: class
  • デフォルト: ワーカー構成から継承
  • 重要度: 低
value.converter

送信先クラスターに書き込まれたメッセージの値フィールドのコンバーター。データ変換の必要がない場合は、io.confluent.connect.replicator.util.ByteArrayConverter を使用します。

  • 型: class
  • デフォルト: ワーカー構成から継承
  • 重要度: 低
header.converter

シリアル化された Kafka ヘッダーを Kafka Connect ヘッダーに変換するために使用される HeaderConverter クラス。現在、デフォルト値は SimpleHeaderConverter で、base64 でエンコードされたヘッダーを出力します。送信元クラスター名、トピック名、およびタイムスタンプを示す、人間が理解できるヘッダーを取得するには、これを明示的に ByteArrayConverter に設定する必要があります。たとえば、header.converter=io.confluent.connect.replicator.util.ByteArrayConverter のように設定します。

  • 型: class
  • デフォルト: SimpleHeaderConverter
  • 重要度: 低

コンシューマーオフセット変換

重要

オフセット変換は Java コンシューマー専用です。他のタイプのアプリケーションでは動作しません。

offset.translator.tasks.max

オフセット変換を実行する Replicator タスクの最大数。-1(デフォルト)の場合、すべてのタスクがオフセット変換を実行します。

  • 型: int
  • デフォルト: -1
  • 重要度: 中
offset.translator.tasks.separate

トピックのレプリケーションを実行するタスクとは別のタスクでオフセットを変換するかどうかです。

  • 型: ブール値
  • デフォルト: false
  • 重要度: 中
offset.translator.batch.period.ms

オフセット変換リクエストがバッチ処理される期間(ミリ秒)。

  • 型: int
  • デフォルト: 60000
  • 重要度: 中
offset.translator.batch.size

オフセット変換リクエストのバッチの最大サイズ。

  • 型: int
  • デフォルト: 1
  • 重要度: 中
offset.timestamps.commit

セカンダリクラスターへの切り替え時に正しく再開できるように、Replicator の内部オフセットタイムスタンプをコミットするかどうか。

  • 型: ブール値
  • デフォルト: true
  • 重要度: 中
src.kafka.timestamps.topic.replication.factor
ソース Kafka」の src.kafka.timestamps.topic.replication.factor を参照してください。
src.kafka.timestamps.topic.num.partitions
ソース Kafka」の src.kafka.timestamps.topic.num.partitions を参照してください。
provenance.header.enable

レプリケーション中に来歴ヘッダーを使用できるようにするかどうかです。

  • 型: ブール値
  • デフォルト: false
  • 重要度: 中

重複または循環メッセージ反復を防ぐための来歴ヘッダーの使用」も参照してください。そのセクションで説明されているように、来歴ヘッダーを有効にする場合は、header.converter=io.confluent.connect.replicator.util.ByteArrayConverter を明示的に設定する必要があります。

注釈

Kafka メッセージがカスタムヘッダーを持っており、来歴ヘッダーとともにレプリケートされている場合(provenance.header.enable=true)、ヘッダーを処理しているコンシューマーアプリケーションは、__replicator_id キーを持つヘッダーをスキップする必要があります。コンシューマーがレコードを処理する方法によっては、そのためのコードの変更が不要な場合があります。コンシューマーが特定のヘッダーを要求している場合、変更は不要です。しかし、コンシューマーがすべてのヘッダーを取得するために Kafka クラス Interface HeaderstoArray を使用する場合、コンシューマーは __replicator_id を明示的にスキップする必要があります。そうしない場合は、レプリケーションによって、「could not decode JSON types ..」や「unrecognized tokens」などのエラーが発生します。

以下に、ConsumerRecord を使用してヘッダーキーを読み取って __replicator_id をスキップする、コンシューマー用の Java コードの例を示します。

ConsumerRecord<byte[], byte[]> record = ...
  Headers headers = record.headers();
  if (headers != null) {
    for (Header header : headers) {
      if (!"__replicator_id".equals(header.key())) {
        ... process application header...
      }
    }
  }
provenance.header.filter.overrides

別の一時 Replicator インスタンスで構成し、メッセージの循環反復を防ぐ通常のレプリケーションフィルター処理ルールから除外するデータを指定します。以下のようなタプルのセミコロン区切りリストです。

  • クラスター ID を来歴ヘッダー内のクラスター ID と照合するための正規表現
  • トピックを来歴ヘッダー内のトピック名と照合するための正規表現
  • タイムスタンプの範囲

このパラメーターは、元のクラスタークラウド内の Kafka トピックのデータを完全にリストアできない特定のシナリオで、ディザスターリカバリのために使用します。そのような場合は、リカバリーの開始点として、クラスター内の残りのデータを削除し、複数のデータセンターにまたがって Replicator を再実行します。このシナリオで Replicator を再実行する場合は、通常のフィルター処理ルールをオーバーライドします。そうしなければ、前にコピーされたデータのレプリケーションがスキップされます。詳細と例については、ホワイトペーパー「Disaster Recovery for Multi-Datacenter Apache Kafka Deployments」の「Manually Reset Consumer Offsets」にあるトピック「Data Synchronization in Active-Passive Design」を参照してください。

fetch.offset.expiry.ms

オフセット変換のフェッチオフセットリクエストが期限切れになって破棄されるまでの時間(単位: ミリ秒)です。

  • 型: long
  • デフォルト: 600000
  • 重要度: 低
fetch.offset.retry.backoff.ms

オフセット変換中に失敗したフェッチオフセットリクエストを再試行するまでの待機時間(単位: ミリ秒)です。

  • 型: long
  • デフォルト: 100
  • 重要度: 低

スキーマ変換

注釈

これらのプロパティは、Replicator のスキーマ変換機能を使用する場合にのみ必要です。詳細については、「スキーマの移行」を参照してください。

schema.registry.topic

Schema Registry の永続ログとしての役割を果たすトピック。

  • 型: string
  • デフォルト: null
  • 重要度: 中
schema.registry.url

スキーマの登録または検索に使用できる Schema Registry インスタンスの URL のコンマ区切りリストです。

  • 型: string
  • デフォルト: null
  • 重要度: 高
schema.registry.max.schemas.per.subject

ローカルにキャッシュできるスキーマの最大数。

  • 型: int
  • デフォルト: 1000
  • 重要度: 低
schema.registry.client.basic.auth.credentials.source

基本認証ヘッダーの認証情報の選択方法を指定します。サポートされる値は URL、USER_INFO、SASL_INHERIT です。

  • 型: string
  • デフォルト: URL
  • 重要度: 中
schema.registry.client.basic.auth.user.info

Basic Auth のユーザー情報は、{username}:{password} という形式で指定します。

  • 型: string
  • デフォルト: null
  • 重要度: 中
schema.subject.translator.class

スキーマサブジェクトのトランスレーター、または、変換を実行しない場合は null。プレフィックス schema.subject.translator. が付加されているプロパティは、トランスレーターの configure() メソッドに渡されます。

  • 型: class
  • デフォルト: null
  • 重要度: 低

オフセット管理

offset.start

レプリケーションの開始ポイントを決定するための設定を指定します。接続オフセットが存在する場合はそれを優先し、存在しない場合はコンシューマーオフセットを優先するには connect を使用します。コンシューマーオフセットが存在する場合はそれを優先し、存在しない場合は接続オフセットを優先するには consumer を使用します。接続オフセットもコンシューマーオフセットも存在しない場合は、開始オフセットがトピックの先頭になります。

  • 型: string
  • デフォルト: connect
  • 重要度: 低

注釈

Replicator を双方向セットアップで実行しない限り、デフォルトの offset.start=connect に設定することをお勧めします。

offset.topic.commit

メッセージが送信先クラスターに書き込まれた後、Replicator のコンシューマーオフセットを送信元 Kafka クラスターにコミットするかどうかを指定します。これらのコンシューマーオフセットを使用して、Replicator のラグを簡単に追跡できます。

  • 型: ブール値
  • デフォルト: true
  • 重要度: 中

注釈

  • topic.preserve.partitionsfalse に設定する場合は、offset.topic.commitfalse に設定する必要があります。Replicator では、topic.preserve.partitions=false の場合、offset.topic.commit=true はサポートされません。
  • offset.topic.commitfalse に設定されている場合、コンシューマーグループは Replicator によって送信元クラスターに作成されません。

クラスター ID とグループ ID

cluster.id パラメーターは、Replicator を 実行可能ファイルとして 実行する場合に必要です。これは、Replicator 実行可能ファイルのいくつかのインスタンスが同じ --cluster.id で始まる場合に形成されるクラスターの一意の識別子を定義し、実行可能ファイルのみのプロパティです。cluster.id にデフォルト値はありません。実行時に指定する必要があります。

同じ cluster.id を持つ Replicator 実行可能ファイルは、自動的に互いを検出し、クラスターを形成します。

(Connect ワーカーを使用する)非実行可能デプロイの場合、group.id パラメーターは、Connect ワーカーで指定された一意の文字列で、そのワーカーが属するクラスターグループを識別します。デフォルトは connect-cluster です。同じ group.id を持つ 分散ワーカーの構成 は、自動的に互いを検出し、クラスターを形成します。

複数のクラスターが必要な場合は、デプロイタイプに応じて、異なる ID を適切に指定する必要があります。

ちなみに

group.id は、Replicator ではなく Connect ワーカーのプロパティです。非実行可能 Replicator デプロイで cluster.id と同じ目的を果たすため、ここで説明します。

Confluent Platform ライセンス

confluent.topic.bootstrap.servers

ライセンス供与に使用される Kafka クラスターとの初期接続を確立するために使用するホストとポートのペアのリスト。初期接続で、クラスター内のすべてのサーバーが検出されます。このリストは、host1:port1,host2:port2,… という形式にする必要があります。これらのサーバーは、初期接続ですべてのクラスターメンバーシップを検出するためにのみ使用されます。これは動的に変わる可能性があるので、このリストにすべてのサーバーセットを含める必要はありません(ただし、サーバーの障害に備えて、複数指定しておくこともできます)。

  • 型: list
  • 重要度: 高
confluent.topic

Confluent Platform の構成(ライセンス情報など)で使用される Kafka トピックの名前。

  • 型: string
  • デフォルト: _confluent-command
  • 重要度: 低
confluent.topic.replication.factor

Confluent Platform の構成(ライセンス情報など)で使用される Kafka のトピックのレプリケーション係数。

  • 型: int
  • デフォルト: 3
  • 重要度: 低

Confluent ライセンスのプロパティ

注釈

デフォルトでは、ライセンス関連のプロパティ( confluent.topic.* )は、dest.kafka.* コネクタープロパティから継承されます。