Confluent モニタリングインターセプター¶
Control Center で生成および消費をモニタリングするには、Confluent Monitoring Interceptor をご使用の Apache Kafka® アプリケーションと一緒にインストールし、生成および消費される Kafka メッセージに対してこのインターセプターを使用するようにアプリケーションを構成します。これらのメッセージは、Control Center に送信されます。
インターセプターのインストール¶
Kafka 0.10.0.0 以降、Kafka クライアントでは、プラグ可能な インターセプター によるメッセージの調査(場合によっては変更)をサポートしています。Control Center では、生成および消費のモニタリング機能を提供するために、Confluent Monitoring Interceptor を使用して着信メッセージおよび送信メッセージに関する統計情報を収集することをクライアントに要求しています。
Control Center の生成および消費のモニタリング機能を使用するには、まず Confluent Monitoring Interceptor をクライアントマシンにインストールする必要があります。
Java クライアント¶
Java アプリケーションを開発するときは、Maven、Ivy、Gradle のいずれかを使用します。Maven 用 POM ファイルのコンテンツ例を以下に示します。まず、Confluent Maven リポジトリ を指定します。
<repositories>
...
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
...
</repositories>
次に、Kafka Java クライアントおよび Confluent Monitoring Interceptor の依存関係を追加します。
<dependencies>
...
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>6.1.5-ccs</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>monitoring-interceptors</artifactId>
<version>6.1.5</version>
</dependency>
...
</dependencies>
librdkafka ベースのクライアント¶
confluent-kafka-python、confluent-kafka-go、confluent-kafka-dotnet などの librdkafka ベースのクライアントの場合は、プラットフォームに応じて別々に配布される個別のモニタリングインターセプタープラグインを使用します。
- Linux(Debian および RedHat ベースのディストリビューション): Confluent リポジトリ から
confluent-librdkafka-plugins
パッケージをインストールします。 - macOS または Windows: モニタリングインターセプター zip ファイル をダウンロードしてから、次の操作を行います。
- macOS の場合: アプリケーションと同じディレクトリか、
/usr/local/lib
などのシステムライブラリの検索パスにあるディレクトリに、monitoring-interceptor.dylib
ファイルを解凍します。 - Windows の場合: アプリケーションのインストール先と同じ場所かアプリケーションの実行ディレクトリに、ご使用のアーキテクチャに適した
monitoring-interceptor.dll
を解凍します。
- macOS の場合: アプリケーションと同じディレクトリか、
注釈
librdkafka ベースのクライアント用のモニタリングインターセプタープラグインには、librdkafka バージョン 0.11.0 以降が必要です。
注釈
モニタリングインターセプタープラグインは実行時の依存関係であって、クライアントまたはアプリケーションのビルド時には必要ありません。このプラグインは構成プロパティ(plugin.library.paths
)を通じて直接参照され、ビルドホストではなくデプロイホスト上にインストールする必要があります。
インターセプターの有効化¶
Confluent Monitoring Interceptor のパッケージをご使用の Kafka アプリケーションにインストールした後で、実際にこのインターセプターを使用するようにクライアントを構成します。クライアントの構成方法はクライアントの種類によって異なります。
警告
プロデューサーとコンシューマーのインターセプターはクラスが異なります。プロデューサーおよびコンシューマーごとに適切なクラスを選択してください。
注釈
インターセプターを使用し、Confluent Platform を RHEL、CentOS、Debian または Ubuntu にインストールする場合は、コンポーネントのパッケージ( confluent-ksql
など)および Confluent Control Center (confluent-control-center
)を同じマシン上にインストールする必要があります。
Java プロデューサーおよびコンシューマー¶
Java プロデューサーとコンシューマーで使用する Confluent Monitoring Interceptor を指定できます。
プロデューサーの場合は、interceptor.classes
を io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
に設定します。
producerProps.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
コンシューマーの場合は、interceptor.classes
を io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
に設定します。
consumerProps.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");
注釈
librdkafka ベースのクライアントの場合は、「librdkafka」を参照してください。
Kafka Streams¶
Kafka Streams は Kafka のプロデューサーとコンシューマーを内部で使用します。この内部プロデューサーとコンシューマーで使用する Confluent Monitoring Interceptor を指定できます。
プロデューサーの場合は、producer.classes
を io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
に設定します。コンシューマーの場合は、consumer.interceptor.classes
を io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
に設定します。
streamsConfig.put(
StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
streamsConfig.put(
StreamsConfig.MAIN_CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");
ksqlDB¶
ksqlDB は、Kafka Streams 同様、Kafka のプロデューサーとコンシューマーを内部で使用します。この内部プロデューサーとコンシューマーで使用する Confluent Monitoring Interceptor を指定できます。
プロデューサーの場合は、producer.classes
を io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
に設定します。コンシューマーの場合は、consumer.interceptor.classes
を io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
に設定します。
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
librdkafka ベースのクライアント¶
librdkafka ベースのクライアントの場合は、plugin.library.paths
構成プロパティをインターセプターライブラリの名前である monitoring-interceptor
に設定します。
- C の例:
rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_res_t r; char errstr[512]; r = rd_kafka_conf_set(conf, "bootstrap.servers", "mybroker", errstr, sizeof(errstr)); if (r != RD_KAFKA_CONF_OK) fatal("%s", errstr); r = rd_kafka_conf_set(conf, "plugin.library.paths", "monitoring-interceptor", errstr, sizeof(errstr)); if (r != RD_KAFKA_CONF_OK) fatal("%s", errstr); rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) fatal("%s", errstr); rd_kafka_destroy(rk);
- Python の例:
p = confluent_kafka.Producer({'bootstrap.servers': 'mybroker', 'plugin.library.paths': 'monitoring-interceptor'})
注釈
モニタリングインターセプターライブラリがシステムの動的リンカーパス(dlopen(3)
または LoadLibrary
のドキュメントを参照)に含まれない、標準以外の場所にインストールされている場合は、完全なパスまたは相対パスを構成する必要があります。
注釈
プラットフォーム固有のライブラリファイル名拡張子(.so
、.dll
など)は省略できます。
Kafka Connect¶
Kafka Connect コネクターは Kafka のプロデューサーとコンシューマーを内部で使用します。この内部プロデューサーとコンシューマーで使用する Confluent Monitoring Interceptor を指定できます。
ソースコネクター : ワーカー内で、Confluent Monitoring Interceptor を追加して
producer
プレフィックスを使用します。producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
シンクコネクター : ワーカー内で、Confluent Monitoring Interceptor を追加して
consumer
プレフィックスを使用します。consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
ちなみに
メトリクスプロデューサーおよびコンシューマー用のモニタリングインターセプターブートストラップサーバーをオーバーライドするには、producer.<*>
または consumer.<*>
のプレフィックスと、必要なセキュリティ構成プロパティを指定します。「SSL での暗号化と認証」および「SSL による暗号化」を参照してください。たとえば、メトリクスプロデューサーの場合は、producer.confluent.monitoring.interceptor.bootstrap.servers
と producer.confluent.monitoring.interceptor.security.protocol
のプロパティを入力することにより、ブートストラップサーバーのオーバーライドとセキュリティプロトコルの指定を行います。
Confluent Replicator¶
Replicator は Kafka のコンシューマーを内部で使用します。この内部コンシューマーで使用する Confluent Monitoring Interceptor を指定できます。Replicator の JSON 構成ファイルを変更します。追加する構成のサブセットの例を次に示します。
{
"name":"replicator",
"config":{
....
"src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
....
}
}
}
ちなみに
Replicator 用のモニタリングインターセプターブートストラップサーバーをオーバーライドするには、構成内でコンシューマーとして Replicator のプレフィックスを指定します。例: `` "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9091"``
詳細については、「Confluent モニタリングインターセプター」の「Kafka Connect 用のインターセプター」および「Replicator 用のインターセプター」、「SSL による暗号化」の「Confluent Monitoring Interceptor」、「Replicator のモニタリング」、「Replicator クイックスタートチュートリアル」の「Control Center を使用した Replicator のモニタリング」を参照してください。
Confluent REST Proxy¶
REST Proxy は Kafka のコンシューマーとプロデューサーを内部的に使用します。スタートアップ構成プロパティファイル(/etc/kafka-rest/kafka-rest.properties
)を変更することにより、これらの内部クライアント用の Confluent Monitoring Interceptor を設定することができます。インターセプターの JAR(share/java/monitoring-interceptors
に配置)を CLASSPATH に組み込む必要もあります。追加する構成のサブセットの例を次に示します。
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
REST Proxy インターセプタークラスの構成¶
REST Proxy では、Java の新規プロデューサーおよびコンシューマーの設定の一環としてインターセプター構成をサポートしています。
producer.interceptor.classes
プロデューサーインターセプタークラス。
- 型: string
- デフォルト: ""
- 重要度: 低
consumer.interceptor.classes
コンシューマーインターセプタークラス。
- 型: string
- デフォルト: ""
- 重要度: 低
インターセプターの構成¶
デフォルト¶
デフォルトでは、Confluent Monitoring Interceptor は、モニタリングしているものと同じ Kafka クラスターを使用してメッセージを送受信し、一連のデフォルトトピックを使用して情報を共有します。また、デフォルトで 15 秒の一定間隔によりデータのレポートも行います。
全般オプション¶
以下に、Confluent Monitoring Interceptor で使用する推奨構成パラメーターをいくつか示します。ただし、多くのアプリケーションでデフォルトを使用できます。
confluent.monitoring.interceptor.bootstrap.servers
- モニタリングデータの書き込み先である、クラスター内の Kafka ブローカーのリストです(デフォルトは
localhost:9092
)。
注釈
インターセプターの すべて の Kafka プロデューサー構成オプションは、confluent.monitoring.interceptor.
(末尾の .
を含む)をプレフィックスとすることによって変更できます。たとえば、インターセプターの timeout.ms
の値をプロパティ confluent.monitoring.interceptor.timeout.ms
によって変更できます。Kafka プロデューサーオプションの詳細については、「プロデューサーの構成」を参照してください。
Confluent Monitoring Interceptor によってのみ使用される構成パラメーターもいくつかあります。
confluent.monitoring.interceptor.topic
- モニタリングデータの書き込み先のトピックです(デフォルトは
_confluent-monitoring
)。 confluent.monitoring.interceptor.publishMs
- インターセプターがメッセージのパブリッシュで使用する必要のある期間です(デフォルトは 15 秒)。
confluent.monitoring.interceptor.client.id
- Confluent Control Center モニタリングデータに含める論理クライアント名です。指定していない場合は、
confluent.monitoring.interceptor
によってインターセプトされたクライアントのクライアント ID が使用されます。
セキュリティ¶
セキュアなクラスター用にモニタリングインターセプターを構成するときは、モニタリングインターセプター内の組み込みプロデューサー(ブローカーにモニタリングデータを送信する)に、confluent.monitoring.interceptor
をプレフィックスとする、正しい セキュリティ構成 がある必要があります。
Confluent Monitoring Interceptor 向けにセキュリティを有効にする方法の詳細については、以下を参照してください。
librdkafka ベースのクライアントの場合は、confluent.monitoring.interceptor.
プレフィックスは同じですが、実際のクライアント構成プロパティは基盤となるクライアントに応じて異なることがあります。Java クライアントおよび librdkafka ベースの複数のクライアント間で異なる可能性があります。
RBAC 用のインターセプターのセットアップ¶
RBAC が有効にされているときにインターセプターを使用するには、プロデューサーおよびコンシューマーに、モニタリングデータの送信先のクラスター上にあるモニタリングトピック(デフォルトは _confluent-monitoring
)に対するプリンシパル書き込みアクセスが必要です。デフォルトでは、モニタリング対象のクラスターは、生成先および消費元のクラスターと同じです。
以下の手順に従います。
Control Center を起動するか、手動で トピックの作成 を行って、
_confluent-monitoring
トピックを作成します。Confluent CLI を使用して、
_confluent-monitoring
トピックに対するDeveloperWrite
ロールをクライアントプリンシパルに付与します。confluent iam rolebinding create \ --principal User:<client-principal> \ --role DeveloperWrite \ --resource Topic:_confluent-monitoring --kafka-cluster-id <kafka-id>
クライアントを起動します。
Confluent Platform での RBAC の詳細については、「ロールベースアクセス制御を使用した認可」および「Control Center 用の RBAC の構成」を参照してください。
ログ記録¶
Kafka クライアントと Confluent インターセプターは、いずれも slf4j
を使用してエラーなどの情報をログに記録します。ログ記録を有効にするには、slf4j バインディングを構成する必要があります。構成しないと、"Failed to load class org.slf4j.impl.StaticLoggerBinder" などのエラーメッセージが表示されます。この問題を解決する最もシンプルな方法は、slf4j-simple.jar
をクラスパスに追加することです。詳細については、http://www.slf4j.org/codes.html#StaticLoggerBinder を参照してください。
librdkafka ベースのモニタリングインターセプターでは、重大なエラーが stderr にログ記録されます。インターセプター固有の(stderr への)デバッグを有効にするには、confluent.monitoring.interceptor.icdebug
構成プロパティを true
に設定します。
構成の例¶
以下に、Kafka に付属している組み込みパフォーマンステストツール用にストリームモニタリングをセットアップする方法を示します。この手順では、 クイックスタートガイド の説明と同様にクラスターがセットアップされていると想定しています。
Control Center が既に実行されている状態で、ターミナルを開いて以下のコマンドを実行し、
MonitoringProducerInterceptor
を使用して Producer Performance Test ツールを起動します。export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-6.1.5.jar ./bin/kafka-producer-perf-test --topic c3test --num-records 10000000 --record-size 1000 \ --throughput 10000 --producer-props bootstrap.servers=localhost:9092 \ interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor acks=all
また、ターミナルを開いて以下のコマンドを実行することで、
MonitoringProducerInterceptor
を使用してコンソールプロデューサーを起動することもできます。export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-6.1.5.jar echo "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" > /tmp/producer.config echo "acks=all" >> /tmp/producer.config seq 10000 | bin/kafka-console-producer --topic c3test --bootstrap-server localhost:9092 --producer.config /tmp/producer.config
kafkacat など librdkafka ベースのクライアントの場合:
(for i in $(seq 1 100000) ; echo $i ; sleep 0.01 ; done) | kafkacat -b localhost:9092 -P -t c3test -X acks=all \ -X plugin.library.paths=monitoring-interceptor
別のターミナルで、
MonitoringConsumerInterceptor
を使用してコンソールコンシューマーを起動します。export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-6.1.5.jar echo "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" > /tmp/consumer.config echo "group.id=c3_example_consumer_group" >> /tmp/consumer.config bin/kafka-console-consumer --topic c3test --bootstrap-server localhost:9092 --consumer.config /tmp/consumer.config
kafkacat など librdkafka ベースのクライアントの場合:
kafkacat -b localhost:9092 -G c3_example_consumer_group -X auto.offset.reset=earliest \ -X plugin.library.paths=monitoring-interceptor \ c3test
http://localhost:9021/ の Control Center UI を開き、Consumers をクリックして
c3_example_consumer_group
を表示します。コンシューマーグループ ID をクリックして詳細を表示します。
検証¶
インターセプターがデータを _confluent-monitoring
トピックに対して正常に送信していることを検証するには、コンソールコンシューマーを起動します。
bin/control-center-console-consumer <control-center.properties> --topic _confluent-monitoring --from-beginning
関連する clientId
を含むモニタリングメッセージが該当のトピック上に生成されているはずです。
注釈
インターセプターがプロデューサーとコンシューマーの両方に対して機能していることを確認してください。Control Center では現在、消費されていないメッセージは表示されません。
制限¶
- Confluent Monitoring Interceptor では現在、
isolation.level=read_committed
であるコンシューマーおよびtransactional.id
が設定されているプロデューサーをサポートして いません。 - Confluent Monitoring Interceptor では、タイムスタンプが欠落しているメッセージやタイムスタンプが無効であるメッセージをスキップします。すべてのブローカーおよびトピックが
log.message.format.version
およびmessage.format.version
に0.10.0
以上を指定して構成されていることを確認してください。 - Confluent Monitoring Interceptor は、Confluent Control Center で "厳密に 1 回" のセマンティクス(EOS)と組み合わせて構成することは できません。