Kafka Connect の FAQ

SinkConnector の出力データフォーマットを変更するにはどうしたらよいですか。

出力システムに書き込まれるフォーマットは、SinkConnector 自体に依存します。SinkConnector は、key.converter および value.converter で指定されているコンバーターで生成されたフォーマットから出力データフォーマットにレコードを変換します。サポートされている出力フォーマットを確認するには、対象のコネクターのドキュメントを参照してください。

コネクターの構成の更新によりタスクのバランス調整がトリガーされるのはなぜですか。

すべてのコネクターの構成の更新によってタスクのバランス調整がトリガーされるわけではありませんが、多くの場合はトリガーされます。タスクのバランス調整がトリガーされるのは、実際には、バランス調整を行わないとその Connect で安全な実行ができないため、タスクの再構成が必要であるためです。バランス調整が行われる基本的な理由は、次の 2 つです。

  1. コネクターまたはタスクの総数が変わったことにより、アクティブなワーカー内での再割り当てが必要になった。
  2. タスクの構成が変更され、Connect では、正常な動作を確保するためにタスクの調整が必要になるかどうかを判断できない。たとえば、パーティションの再割り当てが行われた場合にメッセージがスキップされたり繰り返されたりすることを避ける必要がある場合など。

スタンドアロンモードではなく分散モードを使用する必要があるのはなぜですか。

We recommend using distributed mode for most production use cases. Please see here for details.

Kafka Connect を使用するにはカスタムコードを記述する必要がありますか。

Connector Hub には、多数のユースケースに対応するさまざまなコネクターが用意されており、多くの場合、コードを記述する必要はありません。対象のユースケースに対応していない場合は、用意されているコネクターのいずれかを拡張するか、新しいコネクターを記述することが必要になることがあります。そのような場合は、「開発者ガイド」を参照してください。

Schema Registry は、Kafka Connect を実行するうえで必須のサービスですか。

いいえ。必須ではありません。ただし、データフォーマットとして Avro を使用する予定がある場合は、使用することが推奨されています。これは、シリアル化やスキーマの進化で役立つためです。詳しくは、 こちら を参照してください。

Connect ワーカーからプロデューサーおよびコンシューマーの JMX メトリクスにアクセスするにはどうしたらよいですか。

Connect ワーカーの構成ファイルに以下のプロパティを追加します。ConfluentMetricsReporter Metrics Reporter は各ワーカー JVM からプロデューサーおよびコンシューマーの JMX メトリクスを取得して、Apache Kafka® トピックに書き込みます( _confluent-connect-metrics)。

metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.bootstrap.servers=metrics-kafka:9092
confluent.metrics.reporter.topic=_confluent-connect-metrics
confluent.metrics.reporter.topic.replicas=1
confluent.metrics.reporter.whitelist=.*
confluent.metrics.reporter.publish.ms=60000

_confluent-connect-metrics のメッセージを消費するには、次のコマンドを使用します。

kafka-console-consumer --topic _confluent-connect-metrics \
                       --bootstrap-server metrics-kafka:9092 \
                       --formatter io.confluent.metrics.reporter.ConfluentMetricsFormatter

Connect でプレーン JSON データを使用するにはどうしたらよいですか。

Connect でプレーン JSON データを使用すると、次のようなエラーメッセージがユーザーに表示されます。

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields

You will need to set the schemas.enable parameters for the converter to false as described here.

ソースコネクター X で出力フォーマット Y はサポートされていますか。

ソースコネクターはシステムからデータを読み取り、コンバーターを使用して、使用されている Connect データフォーマットからバイトアレイにデータを変更します。これは、ソースコネクターでは、Kafka へのデータの書き込みには、対応するコンバーターが存在するあらゆるフォーマットを使用できることを意味します。コンバーターの詳細については、 こちら を参照してください。

ちなみに

Confluent にはこの内容に関する詳細な記事が用意されています。「Kafka Connect の詳説 – コンバーターとシリアル化について」を参照してください。

コネクターをデプロイしていない状態で、Connect ワーカーの CPU 使用率が高いのはなぜですか。

これは、ワーカーの起動時に、使用可能なコネクタープラグインを検索するために、CLASSPATH にあるすべてのファイルが読み取られるためです。この問題を回避するには、ワーカーの CLASSPATH にファイルシステムの大部分を指定しないようにします。

Connect シンクコネクターでは、カスタムクライアントなど、他のクライアントによって書き込まれたデータを読み取ることはできますか。

データがどのように書き込まれたかによります。書き込まれるデータに、key.converter および value.converter で指定されているコンバーターとの互換性が必要です。たとえば、Schema Registry で指定されているコンバーターを使用して Avro データを書き込むアプリケーションが生成するデータは、同じコンバーターを使用しているシンクコネクターで読み取ることができる、互換性のあるデータになります。

スタンドアロンモードでコネクターをテストした後、コネクターを再起動しても、再度データの書き込みが行われません。

スタンドアロンのコネクターは、offset.storage.file.filename で指定されているローカルファイルにオフセットを保存します。コネクターは通常、データの再処理は行わないようになっています。コネクターでデータの再処理が行われるようにするには、この構成で指定されているファイルを削除します。

新しいバージョンの Connect と以前のバージョンのブローカーを組み合わせて使用できますか。

Connect は、Kafka 用 Java クライアントと同じ 互換性ルール に従います。Confluent Platform 3.2.0 および Kafka 0.10.2 の時点では一般に、Kafka ブローカーと Kafka Connect ワーカーのいずれも古いバージョンと新しいバージョンを混在させることができます。ただし、Kafka Connect の新機能(ヘッダーのサポートなど)は、Connect と併用するブローカーでもその機能がサポートされていないと、使用できません。