Replicator の構成および実行

Replicator を構成して実行する方法は 2 つあります。

参考

トラブルシューティングを行い、パフォーマンスを向上させるには、「Replicator の調整」と「データ負荷が増えた場合にパフォーマンスを改善するための圧縮の設定」の構成オプションを参照してください。

Replicator を実行可能ファイルとして実行する

Replicator 実行可能ファイルは、Replicator を Connect クラスター内で構成して起動するための迅速かつ簡単な方法です。これは、既存の Connect クラスターを使用できないデプロイで推奨されます。Confluent Platform のインストール後、インストールディレクトリ内にある bin/replicator (ZIP および TAR)が Replicator 実行可能ファイルです。bin/replicator を引数なしで実行すると、使用可能なコマンドライン引数のリストがターミナルに出力されます。

重要

Replicator 実行可能ファイルは、インストールされている Confluent Platform のバージョンと一致する Replicator コネクターのバージョンを使用します。Replicator コネクターは Confluent Hub から入手できますが、Confluent から指示のない限り、インストール後に Replicator コネクターを変更またはアップグレードしないでください。

送信元クラスターの構成

Replicator 実行可能ファイルでは、送信元クラスターのコンシューマーに対する適切なすべての構成がプロパティファイルに指定されている必要があります。--consumer.config を使用して、これらのプロパティが含まれているファイルを参照します。たとえば、consumer.properties という名前の、以下のコンテンツが含まれているファイルを指定します。

bootstrap.servers=localhost:9082

このファイル内のすべての Kafka コンシューマープロパティは有効です。これらの完全なリストについては、「コンシューマーの構成」を参照してください。

送信先クラスターの構成

Replicator 実行可能ファイルでは、送信先クラスターへのプロデューサーに対する適切なすべての構成がプロパティファイルに配置されている必要があります。--producer.config を使用して、これらのプロパティが含まれているファイルを参照します。たとえば、producer.properties という名前の、以下のコンテンツが含まれているファイルを指定します。

bootstrap.servers=localhost:9092

このファイル内のすべての Kafka プロデューサープロパティは有効です。これらの完全なリストについては、「プロデューサーの構成」を参照してください。

注釈

--consumer.config--producer.config に定義されているプロパティ名にはプレフィックスを付けてはならず、参照されるコンシューマー/プロデューサー構成のプロパティ名と一致する必要があります。

Replicator の構成

Replicator 実行可能ファイルでは、--replication.config によって参照されるプロパティファイルに定義されている、接続に関連しない Replicator プロパティをオーバーライドできます。Replicator プロパティの以下のセクションの構成をここに配置できます。

たとえば、replication.properties という名前の、以下のコンテンツが含まれているファイルを指定します。

confluent.topic.bootstrap.servers=localhost:9092
offset.start=consumer

注釈

group.id プロパティは Replicator 実行可能ファイルでは特殊なケースであるため指定しないでください。詳細については、「クラスター ID とグループ ID」を参照してください。

Connect クラスターの構成

Replicator 実行可能ファイルでは、--replication.config によって参照されるプロパティファイルに定義されている、Connect に関連するプロパティをオーバーライドできます。Connect 構成の完全なリストについては、「Connect の構成」を参照してください。

たとえば、replication.properties という名前の、以下のコンテンツが含まれているファイルを指定します。

offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.topic=connect-configs

重要

--replication.config によって参照されるファイルにはクライアント構成を組み込まないでください。以下のプレフィックスは許可されません。別個の構成ファイルにプレフィックスなしで指定します。

プレフィックス 実行可能ファイルの構成
src.kafka. --consumer.config
src.consumer. --consumer.config
dest.kafka. --producer.config

インターセプターモニタリングの構成

インターセプターのモニタリングを有効にする場合、そのプロパティを同じファイルまたは別個のファイルに組み込むことができます。このファイルは、それぞれ --consumer.monitoring.config および --producer.monitoring.config パラメーターを使用して Replicator 実行可能ファイルに渡します。これらのプロパティには、producer. または consumer. のプレフィックスは不要です。たとえば、producer.interceptor.classes ではなく interceptor.classes を使用できます。たとえば構成として、interceptors.properties という名前の、以下のコンテンツが含まれているファイルを指定できます。

interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
confluent.monitoring.interceptor.bootstrap.servers=localhost:9092

レプリケーションの構成および実行

Replicator 実行可能ファイルには、--cluster.id パラメーターが必要です。このパラメーターは、Replicator 実行可能ファイルが起動したときに作成される Connect クラスターの一意の識別子を定義します。同じ cluster.id を持つ Replicator 実行可能ファイルのインスタンスはクラスターを構成し、Replicator ワークロードを共有します。

注釈

  • 同じ --cluster.id を持つ Replicator 実行可能ファイルのすべてのインスタンスは、まったく同じ構成を使用して起動する必要があります。
  • 実行可能ファイルを使用しない(Connect ワーカーを使用する)デプロイの場合、Connect ワーカーの group.id はクラスターの一意の ID として使用され、--cluster.id と同じ目的を果たします。両方のパラメーターの詳細については、「クラスター ID とグループ ID」を参照してください。
  • Replicator の name は、組み込みの Connect クラスターに送られるコネクター名です。Replicator の name のデフォルトは "replicator" で、group.id が指定されていない場合はコンシューマー group.id として使用されます。Replicator を実行可能ファイルとして実行する場合、Replicator の name (指定した値またはデフォルト値)を使用してコンシューマーグループ名を設定します。

これで、データレプリケーションに関連する構成プロパティを指定できます。これを行うには以下の複数の方法があります。

  • すべての構成を 1 つのファイルに格納し、--replication.config パラメーターを使用してこのファイルを Replicator 実行可能ファイルに渡します。以下に例を示します。

    replicator \
     --consumer.config ./consumer.properties \
     --producer.config ./producer.properties \
     --cluster.id replicator \
     --replication.config ./replication.properties
    
  • それぞれが 1 つのプロパティに対応する個別のパラメーターを使用して、コマンドラインからレプリケーションプロパティを渡します。たとえば、--whitelist を使用して元のトピックを指定します。Confluent ライセンスは、--confluent.license を使用して渡すことができます。以下に例を示します。

    replicator \
    --consumer.config ./consumer.properties \
    --producer.config ./producer.properties \
    --cluster.id replicator \
    --whitelist test-topic \
    --confluent.license "XYZ"
    
  • 一部のレプリケーションプロパティはファイルに格納し、残りはコマンドライン引数を使用して渡します。以下に例を示します。

    replicator \
     --consumer.config ./consumer.properties \
     --producer.config ./producer.properties \
     --cluster.id replicator \
     --replication.config ./replication.properties \
     --whitelist test-topic
    

ログ記録の構成

Replicator 実行可能ファイルは、ログ記録設定を etc/kafka-connect-replicator/replicator-log4j.properties ファイルから読み込みます。デフォルトではコンソールに書き込みますが、本稼働環境のデプロイではログをファイルに記録する必要があります。Replicator 実行可能ファイルを起動する前に、以下の行を replicator-log4j.properties ファイルに追加します。

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/replicator.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true

上記で作成したアペンダーファイルを log4j.rootLogger パラメーターに追加します。

# By default
log4j.rootLogger=INFO, stdout, file

Replicator をコネクターとして実行する

推奨される分散 Connect クラスターで Replicator をコネクターとして実行するには、「Kafka クラスターで Replicator を手動で構成および実行する」を参照してください。

Confluent Platform を AWS VM にデプロイする場合は、バースト可能な CPU タイプ(T2、T3、T3a、および T4g)を持つ VM は高スループットのストリーミングワークロードをサポートしないことに注意する必要があります。これらの VM 上で実行されている Replicator ワーカーノードは、クレジットの有効期限が切れるとスループットが低下します。そのため、高い CPU レベルで長時間実行されることが想定され、ベースラインのリソースレートを上回るワークロードをサポートする Confluent Platform ノードには、これらの VM は適していません。

Replicator のテスト

以下は、Replicator の汎用的なテストシナリオです。同様のテスト戦略は、「Replicator の構成および実行」セクションの Replicator チュートリアルの一部で、さらに詳しく取り上げています。

  1. テストトピックを作成します。

    まだ作成していない場合は、以下のコマンドを使用して送信元クラスターで test-topic という名前のトピックを作成します。

    ./bin/kafka-topics --create --topic test-topic --replication-factor \
    1 --partitions 4 --bootstrap-server localhost:9082
    
    ./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092
    

    上記のコマンドの kafka-topics --describe --topic ステップでは、test-topic.replica が存在するかどうかをチェックします。トピックが存在する場合、4 つのパーティションが作成されたことを確認します。一般に、Replicator は送信先トピックに送信元トピックと少なくとも同数のパーティションがあることを確認します。それ以上存在しても構いませんが、Replicator は送信元データのパーティション割り当てを保持するため、それ以外のパーティションは利用されません。

  2. 送信元クラスターにデータを送信します。

    送信元クラスターにトピックを作成したら、Kafka プロデューサーを使用してそのトピックにデータを送信し、送信元クラスターの test-topic に書き込むことができます。その後、送信先クラスターの test-topic.replica から消費することでデータがレプリケートされたことを確認できます。

    たとえば、Kafka のコンソールプロデューサーを使用して一連の数字を送信するには、以下のコマンドを使用します。

    seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082
    
  3. コンシューマーを実行して、送信先クラスターがデータを取得したことを確認します。

    その後、コンソールコンシューマーを使用して送信先クラスターへのデリバリーを確認できます。

    ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica \
    --bootstrap-server localhost:9092
    

Docker イメージからの Replicator 実行可能ファイルの実行

Replicator を実行する最も簡単な方法は、スクリプトまたは Docker イメージ から実行可能ファイルとして実行することです。このドキュメントには、Replicator 用の docker run コマンドと構成パラメーターが記載されています。

Replicator 実行可能ファイルのコマンドラインパラメーター

使用できるコマンドラインパラメーターは以下のとおりです。

コマンドラインパラメーター 説明
--blacklist <トピックブラックリスト> ホワイトリストに含まれているか正規表現と一致している場合であっても、複製されるべきではないトピックのコンマ区切りリスト。
--cluster.id <Replicator クラスター ID>(必須) Replicator クラスターの一意の識別子を指定します。
--cluster.threads <Replicator スレッドの合計数> Replicator クラスター内のすべてのワーカー全体でのスレッドの合計数です。このコマンドにより既存のクラスターにある別の Replicator ワーカーが起動する場合、これを使用してクラスター全体のスレッド数を変更できます。
--confluent.license <Confluent ライセンスキー> Replicator を使用できるようにする Confluent ライセンスキーです。ライセンスキーがない場合、Replicator は 30 日間試用できます。すでにご契約されている場合は、詳細について Confluent サポートにお問い合わせください。
--consumer.config <consumer.properties>(必須) 送信元クラスターからコンシューマーが読み取るための構成設定を含むファイルの場所を指定します。
--consumer.monitoring.config <consumer-monitoring.properties> Replicator コンシューマーに関するモニタリング情報が送信される Kafka クラスターのプロデューサー設定を含むファイルの場所を指定します。これはモニタリングを有効にする場合に指定する必要がありますが、送信元クラスターまたは送信先クラスターとは異なる Kafka クラスターを参照することができます。メトリクスを送信先クラスターに書き込むため、--producer-config として同じファイルを使用します。
-h--help   ヘルプ情報を表示します。
--producer.config <producer.properties>(必須) 送信先クラスターにプロデューサーが書き込むための構成設定を含むファイルの場所を指定します。
--producer.monitoring.config <producer-monitoring.properties> Replicator プロデューサーに関するモニタリング情報が送信される Kafka クラスターのプロデューサー設定を含むファイルの場所を指定します。これはモニタリングを有効にする場合に指定する必要がありますが、送信元クラスターまたは送信先クラスターとは異なる Kafka クラスターを参照することができます。メトリクスを送信先クラスターに書き込むため、--producer-config として同じファイルを使用します。
--replication.config <replication.properties> レプリケーションの構成設定を含むファイルの場所を指定します。使用する場合、このファイル内のすべてのプロパティはコマンドラインパラメーターによってオーバーライドできます。指定しない場合、トピックがレプリケートされる方法を定義するすべてのプロパティをコマンドラインで指定する必要があります。
--topic.auto.create true/false 必要に応じて、送信先クラスターにトピックを自動的に作成するかどうかを指定します。
--topic.config.sync true/false トピック構成を送信先クラスターと定期的に同期するかどうかを指定します。
--topic.config.sync.interval.ms <トピック構成の同期間隔(ミリ秒)> 'topic.config.sync' が有効にされている場合に構成の変更を確認する頻度です。
--topic.create.backoff.ms <トピック作成バックオフ(ミリ秒)> トピックの自動作成または展開を再試行するまでの待機時間。
--topic.poll.interval.ms <トピック構成の同期間隔(ミリ秒)> 新しいトピックについて送信元クラスターをポーリングする頻度を指定します。
--topic.preserve.partitions true/false 送信元クラスターと一致するように送信先クラスター内のパーティション数を自動的に増やし、送信元クラスターからレプリケーションされたメッセージが送信先クラスター内の同じパーティションを使用するかどうかを指定します。
--topic.regex <レプリケートされるトピックと一致する正規表現> レプリケーションされるトピックの名前と一致する正規表現。この表現に一致する(またはホワイトリストに記載されている)と同時にブラックリストに記載されていないトピックはレプリケーションされます。
--topic.rename.format <名前変更フォーマット> 送信先クラスター内のトピック名のフォーマット制御文字列。送信元のトピック名を表すプレースホルダーとして ${topic} を含めることができます。たとえば、トピック 'orders' の ${topic}_dc1 は、送信先トピック名 'orders_dc1' にマップされます。--replication.config で指定されているファイルの内部に配置できます。
--topic.timestamp.type <トピックタイムスタンプタイプ> 送信先クラスター内のトピックのタイムスタンプの種類です。
--whitelist <トピックホワイトリスト> レプリケーションするトピックの名前のコンマ区切りリスト。このリストにあり、ブラックリストにないトピックはレプリケーションされます。

Replicator コネクター構成を Replicator 実行可能ファイル構成に変換する

Replicator Connect 構成を Replicator 実行可能ファイル構成に変換できます。2 つの主な違いは、Connect 構成には 2 つの構成ファイル(ワーカープロパティファイルとコネクタープロパティまたは JSON ファイル)があるのに対し、Replicator 実行可能ファイルには 3 つの構成ファイル(コンシューマー、プロデューサー、レプリケーションプロパティファイル)があることです。これについては以下のように考えると役立ちます。

  • コンシューマー構成ファイルには、送信元クラスターから消費する、Replicator 内に組み込まれたコンシューマーを構成するために必要なすべてのプロパティが含まれています。これには、送信元コンシューマーをチューニングするために使用する特別な構成に加えて、コンシューマーが送信元クラスターに接続するために必要なセキュリティと接続の詳細が含まれます。
  • プロデューサー構成ファイルには、送信先クラスターに対して生成する、Replicator 内に組み込まれたプロデューサーを構成するために必要なすべてのプロパティが含まれています。これには、送信先プロデューサーをチューニングするために使用する特別な構成に加えて、プロデューサーが送信先クラスターに接続するために必要なセキュリティと接続の詳細が含まれます。
  • レプリケーション構成ファイルには、送信元コンシューマーからデータを取得して送信先プロデューサーに渡す作業を行う実際の Replicator を構成するために必要なすべてのプロパティが含まれています。これには、Replicator で必要な Connect 固有の構成に加え、必要なすべての Replicator 構成が含まれます。

以下のワーカープロパティがあるとします。

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
connect.protocol=eager
connector.client.config.override.policy=All
bootstrap.servers=destination-cluster:9092
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\";

また、以下の Replicator JSON があるとします。

{
  "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector",
  "tasks.max":4,
  "topic.whitelist":"test-topic",
  "topic.rename.format":"${topic}.replica",
  "confluent.license":"XYZ"
  "name": "replicator",
  "header.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
  "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
  "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
  "src.consumer.max.poll.records":"10000",
  "producer.override.linger.ms":"10",
  "producer.override.compression.type":"lz4",
  "src.kafka.bootstrap.servers": "source-cluster:9092",
  "src.kafka.ssl.endpoint.identification.algorithm": "https",
  "src.kafka.security.protocol": "SASL_SSL",
  "src.kafka.sasl.mechanism": "PLAIN",
  "src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"sourceUser\" password=\"sourcePassword\";",
  "dest.kafka.bootstrap.servers": "destination-cluster:9092",
  "dest.kafka.ssl.endpoint.identification.algorithm": "https",
  "dest.kafka.security.protocol": "SASL_SSL",
  "dest.kafka.sasl.mechanism": "PLAIN",
  "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\";"
}

この場合、上記の 2 つの構成ファイル内の構成を、Replicator 実行可能ファイルを使用するために必要な以下のコンシューマー、プロデューサー、レプリケーション構成に変換できます。

コンシューマーの構成:

bootstrap.servers=source-cluster:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"sourceUser\" password=\"sourcePassword\";
max.poll.records=10000

コンシューマー構成の場合、src.kafka および src.consumer プレフィックスを削除し、送信元コンシューマー用の実際の構成をリストにします。Replicator 実行可能ファイルは、これがコンシューマー構成に配置されているため、送信元クラスターをポーリングする送信元コンシューマーにこれらの構成を適用する必要があることを認識します。

プロデューサーの構成

bootstrap.servers=destination-cluster:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\";
linger.ms=10
compression.type=lz4

プロデューサー構成の場合、dest.kafka および producer.overrides プレフィックスを削除し、送信先プロデューサー用の実際の構成をリストにします。Replicator 実行可能ファイルは、これがプロデューサー構成に配置されているため、送信先クラスターに書き込む送信先プロデューサーにこれらの構成を適用する必要があることを認識します。

レプリケーションの構成

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
connect.protocol=eager
tasks.max=4
topic.whitelist=test-topic
topic.rename.format=${topic}.replica
confluent.license=XYZ
name=replicator
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.replicator.util.ByteArrayConverter

レプリケーション構成の場合、Replicator または Connect 用の重要な構成のみを組み込みます。ここでは connector.client.config.override.policy 構成は不要です。これは、Replicator 実行可能ファイルは構成ファイルに指定されているプロデューサー構成を直接渡すためです。これにより、追加の Connect 構成を組み込むのではなく、レプリケーションのために重要なコンシューマーおよびプロデューサーの構成について考えることが容易になります。

Kafka クラスターで Replicator を手動で構成および実行する

Replicator のダウンロードおよびインストール」に従って Confluent Platform をダウンロードしてインストールしたら、Kafka ブローカーの 2 つのクラスターを開始します。

詳細については、「チュートリアル: クラスターの境界を越えたデータのレプリケーション」を参照してください。

Replicator は Kafka Connect プラグインです。Replicator を実行するには、以下の手順に従う必要があります。

  • Connect クラスターをインストールして構成する。
  • Confluent Replicator を Connect クラスターとして構成して実行する。

このセクションでは、これらの手順について詳しく説明し、使用できる Replicator の構成オプションを確認します。

Replicator 用の Connect クラスターの構成

Replicator runs as a plugin (Connector) in Connect, so you'll need to run Connect Workers before you can run Replicator. The quick start, shows how to run Replicator in a single node Connect cluster.

Refer to Connect documentation to learn how to run Connect in :distributed mode.

Replicator 用の分散 Connect ワーカーを構成する際は、以下の推奨事項とベストプラクティスを考慮してください。

送信元ブローカーと送信先ブローカーの構成

Connect クラスターは、Kafka ブローカーのクラスターに関連付けられます。Kafka クラスターのブローカーは、Connect ワーカーの bootstrap.servers 構成パラメーターで指定されます。Replicator を実行するために新しい Connect ワーカークラスターを構成する場合、このパラメーターに 送信先 Kafka ブローカークラスターが含まれていることを確認してください。Replicator を既存の Connect クラスターで実行する場合、これが既に 送信先 ブローカーに関連付けられていることを確認してください。

注釈

Replicator は、送信元クラスターからイベントを読み取ります。その後、送信先クラスターにイベントを書き込む Connect ワーカーにイベントを渡します。したがって、Replicator の構成には送信元に関する情報を組み込み、ワーカーの構成には送信先に関する情報を組み込みます。

Connect ワーカーのインストール場所

同じデータセンター内の 2 つの Kafka クラスター間ではなく、異なるデータセンター間でイベントをレプリケートする場合のベストプラクティスは、Connect ワーカーを 送信先 データセンターで実行することです。たとえば、ニューヨークからサンフランシスコにデータを送信する場合、Replicator をサンフランシスコで実行し、ニューヨークから米国全体のデータを消費するようにします。なぜなら、長距離ネットワークはデータセンター内より信頼性がやや低い場合があるためです。ネットワークパーティションが存在し、データセンター間の接続が失われた場合、コンシューマーがクラスターに接続できない状況の方が、プロデューサーが接続できない状況より悪影響が小さくなります。リモートでの消費は、リモートでの生成より優れたモデルです。つまり、Replicator を送信元データセンターで実行する際の固有のリスクはありません。Replicator は接続が失われている場合を含め、すべてのイベントを取得して転送します。

既存の Connect クラスターでの Replicator の実行

Replicator は他のコネクターと同じ Connect クラスターで実行できますが、場合によってはこれは推奨されません。

  • 距離が遠く、レイテンシが高い 2 つのデータセンター間でデータをレプリケートする場合、Connect ワーカーと Replicator の両方を適切にチューニングする必要があります。このため、データセンター内のチューニングはデータセンター間のチューニングとは異なります。Replicator に固有の Connect クラスターを設定することで、他の接続への影響を心配せずに Connect ワーカーを Replicator 用にチューニングできます。
  • コネクターに変更を加えると、コネクターが Connect ワーカーに再割り当てされている間、Replicator は一時停止します。コネクターを頻繁に開始および停止する場合、Replicator を固有のクラスターで実行し、中断なしで実行できるようにします。

Connect クラスター用のログ記録の構成

Connect のログ記録は、etc/kafka/connect-log4j.properties ファイルに構成します。デフォルトではコンソールに書き込みますが、本稼働環境のデプロイではログをファイルに記録する必要があります。Replicator を起動する前に、以下の行を connect-log4j.properties ファイルに追加します。

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/replicator.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true

作成したアペンダーファイルを log4j.rootLogger パラメーターに追加します。

# By default
log4j.rootLogger=INFO, stdout, file

Connect クラスター上での Replicator の構成および実行

You should have at least one distributed mode Connect Worker already up and running. To learn more, review the distributed mode documentation .

Connect ワーカーが稼働しているかどうかは、その REST API をチェックすることで確認できます。

curl http://localhost:8083/
{"version":"7.1.1-ccs","commit":"078e7dc02a100018"}

すべてが正常であれば、実行されている Connect ワーカーのバージョン番号とコミットハッシュが表示されます。

Replicator を実行するには、Connect REST API にその構成ファイルを JSON フォーマットで送信します。構成の例を以下に示します。

{
        "name":"replicator",
        "config":{
                "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector",
                "tasks.max":4,
                "key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
                "value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
                "src.kafka.bootstrap.servers":"localhost:9082",
                "topic.whitelist":"test-topic",
                "topic.rename.format":"${topic}.replica",
                "confluent.license":"XYZ"
        }
}

これは curl を使用して Replicator に送信できます。以下の例は、上記の JSON が example-replicator.json という名前のファイルに含まれていることを前提としています。

curl -X POST -d @example-replicator.json  http://localhost:8083/connectors --header "content-Type:application/json"

この例では、いくつかの重要な構成パラメーターが使用されています。すべての構成パラメーターの説明については、Confluent Replicator 構成プロパティ を参照してください。

  • key.converter および value.converter: Kafka レコードを Connect の内部フォーマットに変換するために使用するクラスです。この Connect ワーカー構成ではグローバルコンバーターを指定し、Replicator 構成に何も指定しない場合はこれらのコンバーターが使用されます。ただし、レプリケーションの場合、変換は必要ありません。送信元クラスターからバイトを読み取り、変更なしで送信先クラスターに書き込むことができます。したがって、グローバルコンバーターを ByteArrayConverter でオーバーライドできます。この場合、レコードはそのまま残されます。

  • src.kafka.bootstrap.servers: 送信元 クラスターのブローカーのリストです。

  • topic.whitelist: レプリケートするトピックの明示的なリストです。クイックスタートでは、test-topic という名前のトピックをレプリケートします。

    ちなみに

    また、topic.regex パラメーター で正規表現を使用して、レプリケートするトピックを Replicator に指示することもできます。特定のパターンに一致した場合に新しいトピックのレプリケーションが Replicator で自動的に開始されるようにする場合は、正規表現を使用します。たとえば、本稼働環境のすべてのトピックをレプリケートするには、prod.* に一致するトピックをレプリケートするように Replicator を構成します。新しいトピックをリストに追加する場合は、変更を有効にするために Replicator を再起動する必要があります。

  • topic.rename.format: 送信先クラスター内のトピック名を変更するために使用される置換文字列です。上記のスニペットでは、${topic}.replica が使用されています。 ${topic} は送信元クラスターのトピック名で置き換えられます。そのため、送信元クラスターからレプリケートされる test-topic の名前は送信先クラスターで test-topic.replica に変更されます。

  • confluent.license: Replicator は、ライセンスがなくても 30 日間試用できます。Confluent のお客様は、カスタマーサポートにお問い合わせのうえ、Replicator ライセンスを要求してください。ライセンスを受け取ったら、この例に示されているように使用します。

送信元クラスターでの Replicator の実行

Replicator は、可能な場合は送信先クラスターで実行する必要があります。それができない場合は、Confluent Platform 5.4.0 以降で送信元クラスター内で Replicator を実行することができます。Replicator をこのように実行するには、以下のように変更を加えます。

  • Connect ワーカー構成または --replication.config (Replicator 実行可能ファイルを使用する場合)の connector.client.config.override.policyAll に設定します。
  • Connect ワーカー構成の bootstrap.servers は送信元クラスターを参照している必要があります(Replicator 実行可能ファイルの場合はこれを --producer.config で指定します)。
  • 送信元クラスター用のすべてのクライアント構成(セキュリティなど)は、Connect ワーカー構成で指定されている必要があります(Replicator 実行可能ファイルの場合はこれらを --producer.config で指定します)。
  • コネクター構成の producer.override.bootstrap.servers は送信先クラスターを参照している必要があります(Replicator 実行可能ファイルの場合はこれを --replication.config で指定します)。
  • 送信先クラスター用のすべてのクライアント構成(セキュリティなど)は、コネクター構成でプレフィックス producer.override. を使用して指定されている必要があります(Replicator 実行可能ファイルの場合はこれらを --replication.config で指定します)。
  • プレフィックス src.kafka. および dest.kafka を使用した構成は、通常どおり指定する必要があります。

送信元クラスターでコネクターとして実行されている Replicator の構成例を以下に示します。

{
  "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
  "name": "replicator",
  "producer.override.ssl.endpoint.identification.algorithm": "https",
  "producer.override.sasl.mechanism": "PLAIN",
  "producer.override.request.timeout.ms": 20000,
  "producer.override.bootstrap.servers": "destination-cluster:9092",
  "producer.override.retry.backoff.ms": 500,
  "producer.override.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"someUser\" password=\"somePassword\";",
  "producer.override.security.protocol": "SASL_SSL",
  "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
  "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
  "topic.whitelist": "someTopic",
  "src.kafka.bootstrap.servers": "source-cluster:9092",
  "dest.kafka.bootstrap.servers": "destination-cluster:9092",
  "dest.kafka.ssl.endpoint.identification.algorithm": "https",
  "dest.kafka.security.protocol": "SASL_SSL",
  "dest.kafka.sasl.mechanism": "PLAIN",
  "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"someUser\" password=\"somePassword\";"
}

この構成では、Replicator は消費ではなくクラスター間で生成を行っており、この場合はデフォルトのプロデューサー構成は適切ではありません。以下の構成を調節して、プロデューサーフローのスループットを増やしてください。

  • producer.override.linger.ms=500
  • producer.override.batch.size=600000

これらの値は、開始点として提供されているに過ぎず、環境とユースケースに合わせてさらにチューニングする必要があります。

送信先が Confluent Cloud の場合に Replicator を送信元クラスターで実行する方法の詳細については、「Confluent Cloud の構成に対する Confluent Replicator」を参照してください。

ライセンスキー

ライセンスキーがない場合、Replicator 実行可能ファイルは 30 日間試用できます。Confluent のお客様は、カスタマーサポートにお問い合わせのうえ、Replicator ライセンスキーを要求してください。その後、Confluent サポートから受け取ったキーを --confluent.license コマンドラインパラメーターで使用するか、--replication.config に渡すレプリケーション構成ファイル内の confluent.license プロパティに追加します。

重要

5.5.0 以降、Replicator は、Replicator が実行中の場合でも、ライセンスキーの有効期限が切れるとすぐに失敗します。

おすすめの関連情報