Confluent Monitoring Interceptor

Control Center で生成および消費をモニタリングするには、Confluent Monitoring Interceptor をご使用の Apache Kafka® アプリケーションと一緒にインストールし、生成および消費される Kafka メッセージに対してこのインターセプターを使用するようにアプリケーションを構成します。これらのメッセージは、Control Center に送信されます。たとえば、平均および最大のレイテンシに対して コンシューマーグループトリガー を構成するには、ターゲットのコンシューマーグループ内のクライアント用に Monitoring Interceptor を構成する必要があります。

インターセプターのインストール

Kafka 0.10.0.0 以降、Kafka クライアントでは、プラグ可能な インターセプター によるメッセージの調査(場合によっては変更)をサポートしています。Control Center では、生成および消費のモニタリング機能を提供するために、Confluent Monitoring Interceptor を使用して着信メッセージおよび送信メッセージに関する統計情報を収集することをクライアントに要求しています。

Control Center の生成および消費のモニタリング機能を使用するには、まず Confluent Monitoring Interceptor をクライアントマシンにインストールする必要があります。

インターセプターのディストリビューション

Confluent Platform をインストールするとき、以下の 2 種類のパッケージからいずれかを選択することができます。

  • Confluent Platform Server Package(-ce)。すべてのコミュニティおよび商用コンポーネントがインストールされます。モニタリングインターセプターも含まれます。
  • Confluent Community Package(-ccs)。コミュニティコンポーネントのみがインストールされます。このパッケージを使用する場合は、インターセプターが含まれている別のコンポーネントパッケージをインストールする必要があります。

詳細については Confluent Platform パッケージ を参照してください。

Java クライアント

Java アプリケーションを開発するときは、Maven、Ivy、Gradle のいずれかを使用します。Maven 用 POM ファイルのコンテンツ例を以下に示します。まず、Confluent Maven リポジトリ を指定します。

<repositories>
  ...
  <repository>
    <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
  </repository>
  ...
</repositories>

次に、Kafka Java クライアントおよび Confluent Monitoring Interceptor の依存関係を追加します。

<dependencies>
  ...
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>6.2.4-ccs</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>monitoring-interceptors</artifactId>
        <version>6.2.4</version>
    </dependency>
  ...
 </dependencies>

librdkafka ベースのクライアント

confluent-kafka-python、confluent-kafka-go、confluent-kafka-dotnet などの librdkafka ベースのクライアントの場合は、個別のモニタリングインターセプタープラグインが使用されます。Confluent Community を使用している場合、このパッケージはクライアントのプラットフォームに応じて異なる方法で配布されます。

  • Linux(Debian および RedHat ベースのディストリビューション): Confluent Community を使用している場合、Confluent リポジトリ から confluent-librdkafka-plugins パッケージをインストールします。
  • macOS または Windows: Confluent Community を使用している場合、モニタリングインターセプター zip ファイル をダウンロードしてから、次の操作を行います。
    • macOS の場合: アプリケーションと同じディレクトリか、/usr/local/lib などのシステムライブラリの検索パスにあるディレクトリに、monitoring-interceptor.dylib ファイルを抽出またはコピーします。
    • Windows の場合: アプリケーションのインストール先と同じ場所かアプリケーションの実行ディレクトリに、ご使用のアーキテクチャに適した monitoring-interceptor.dll を抽出またはコピーします。

注釈

librdkafka ベースのクライアント用のモニタリングインターセプタープラグインには、librdkafka バージョン 0.11.0 以降が必要です。

注釈

モニタリングインターセプタープラグインは実行時の依存関係であって、クライアントまたはアプリケーションのビルド時には必要ありません。このプラグインは構成プロパティ(plugin.library.paths)を通じて直接参照され、ビルドホストではなくデプロイホスト上にインストールする必要があります。

インターセプターの有効化

Confluent Monitoring Interceptor のパッケージをご使用の Kafka アプリケーションにインストールした後で、実際にこのインターセプターを使用するようにクライアントを構成します。クライアントの構成方法はクライアントの種類によって異なります。

警告

プロデューサーとコンシューマーのインターセプターはクラスが異なります。プロデューサーおよびコンシューマーごとに適切なクラスを選択してください。

注釈

インターセプターを使用し、Confluent Platform を RHEL、CentOS、Debian または Ubuntu にインストールする場合は、コンポーネントのパッケージ( confluent-ksql など)および Confluent Control Center (confluent-control-center )を同じマシン上にインストールする必要があります。

Java プロデューサーおよびコンシューマー

Java プロデューサーとコンシューマーで使用する Confluent Monitoring Interceptor を指定できます。

プロデューサーの場合は、interceptor.classesio.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor に設定します。

producerProps.put(
   ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
   "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");

コンシューマーの場合は、interceptor.classesio.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor に設定します。

consumerProps.put(
   ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
   "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");

注釈

librdkafka ベースのクライアントの場合は、「librdkafka」を参照してください。

Kafka Streams

Kafka Streams は Kafka のプロデューサーとコンシューマーを内部で使用します。この内部プロデューサーとコンシューマーで使用する Confluent Monitoring Interceptor を指定できます。

プロデューサーの場合は、producer.classesio.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor に設定します。コンシューマーの場合は、consumer.interceptor.classesio.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor に設定します。

streamsConfig.put(
   StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
   "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");

streamsConfig.put(
   StreamsConfig.MAIN_CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
   "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");

ksqlDB

ksqlDB は、Kafka Streams 同様、Kafka のプロデューサーとコンシューマーを内部で使用します。この内部プロデューサーとコンシューマーで使用する Confluent Monitoring Interceptor を指定できます。

プロデューサーの場合は、producer.classesio.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor に設定します。コンシューマーの場合は、consumer.interceptor.classesio.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor に設定します。

producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

librdkafka ベースのクライアント

librdkafka ベースのクライアントの場合は、plugin.library.paths 構成プロパティをインターセプターライブラリの名前である monitoring-interceptor に設定します。

C の例:
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_res_t r;
char errstr[512];

r = rd_kafka_conf_set(conf, "bootstrap.servers", "mybroker", errstr, sizeof(errstr));
if (r != RD_KAFKA_CONF_OK)
  fatal("%s", errstr);

r = rd_kafka_conf_set(conf, "plugin.library.paths", "monitoring-interceptor", errstr, sizeof(errstr));
if (r != RD_KAFKA_CONF_OK)
  fatal("%s", errstr);

rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk)
  fatal("%s", errstr);

rd_kafka_destroy(rk);
Python の例:
p = confluent_kafka.Producer({'bootstrap.servers': 'mybroker',
                              'plugin.library.paths': 'monitoring-interceptor'})

注釈

モニタリングインターセプターライブラリがシステムの動的リンカーパス(dlopen(3) または LoadLibrary のドキュメントを参照)に含まれない、標準以外の場所にインストールされている場合は、完全なパスまたは相対パスを構成する必要があります。

注釈

プラットフォーム固有のライブラリファイル名拡張子(.so.dll など)は省略できます。

Kafka Connect

Kafka Connect コネクターは Kafka のプロデューサーとコンシューマーを内部で使用します。この内部プロデューサーとコンシューマーで使用する Confluent Monitoring Interceptor を指定できます。

  • ソースコネクター : ワーカー内で、Confluent Monitoring Interceptor を追加して producer プレフィックスを使用します。

    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    
  • シンクコネクター : ワーカー内で、Confluent Monitoring Interceptor を追加して consumer プレフィックスを使用します。

    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    

ちなみに

メトリクスプロデューサーおよびコンシューマー用のモニタリングインターセプターブートストラップサーバーをオーバーライドするには、producer.<*> または consumer.<*> のプレフィックスと、必要なセキュリティ構成プロパティを指定します。「TLS/SSL での暗号化と認証」および「TLS/SSL による暗号化」を参照してください。たとえば、メトリクスプロデューサーの場合は、producer.confluent.monitoring.interceptor.bootstrap.serversproducer.confluent.monitoring.interceptor.security.protocol のプロパティを入力することにより、ブートストラップサーバーのオーバーライドとセキュリティプロトコルの指定を行います。

Confluent Replicator

Replicator は Kafka のコンシューマーを内部で使用します。この内部コンシューマーで使用する Confluent Monitoring Interceptor を指定できます。Replicator の JSON 構成ファイルを変更します。追加する構成のサブセットの例を次に示します。

{
  "name":"replicator",
    "config":{
      ....
      "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
      ....
    }
  }
}

ちなみに

Replicator 用のモニタリングインターセプターブートストラップサーバーをオーバーライドするには、構成内でコンシューマーとして Replicator のプレフィックスを指定します。例: `` "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9091"``

詳細については、「Confluent モニタリングインターセプター」の「Kafka Connect 用のインターセプター」および「Replicator 用のインターセプター」、「TLS/SSL による暗号化」の「Confluent Monitoring Interceptor」、「Replicator のモニタリング」、「Replicator クイックスタートチュートリアル」の「Control Center を使用した Replicator のモニタリング」を参照してください。

Confluent REST Proxy

REST Proxy は Kafka のコンシューマーとプロデューサーを内部的に使用します。スタートアップ構成プロパティファイル(/etc/kafka-rest/kafka-rest.properties)を変更することにより、これらの内部クライアント用の Confluent Monitoring Interceptor を設定することができます。インターセプターの JAR(share/java/monitoring-interceptors に配置)を CLASSPATH に組み込む必要もあります。追加する構成のサブセットの例を次に示します。

producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

REST Proxy インターセプタークラスの構成

REST Proxy では、Java の新規プロデューサーおよびコンシューマーの設定の一環としてインターセプター構成をサポートしています。

producer.interceptor.classes

プロデューサーインターセプタークラス。

  • 型: string
  • デフォルト: ""
  • 重要度: 低
consumer.interceptor.classes

コンシューマーインターセプタークラス。

  • 型: string
  • デフォルト: ""
  • 重要度: 低

インターセプターの構成

デフォルト

デフォルトでは、Confluent Monitoring Interceptor は、モニタリングしているものと同じ Kafka クラスターを使用してメッセージを送受信し、一連のデフォルトトピックを使用して情報を共有します。また、デフォルトで 15 秒の一定間隔によりデータのレポートも行います。

全般オプション

以下に、Confluent Monitoring Interceptor で使用する推奨構成パラメーターをいくつか示します。ただし、多くのアプリケーションでデフォルトを使用できます。

confluent.monitoring.interceptor.bootstrap.servers
モニタリングデータの書き込み先である、クラスター内の Kafka ブローカーのリストです(デフォルトは localhost:9092)。

注釈

インターセプターの すべて の Kafka プロデューサー構成オプションは、confluent.monitoring.interceptor. (末尾の . を含む)をプレフィックスとすることによって変更できます。たとえば、インターセプターの timeout.ms の値をプロパティ confluent.monitoring.interceptor.timeout.ms によって変更できます。Kafka プロデューサーオプションの詳細については、「プロデューサーの構成」を参照してください。

Confluent Monitoring Interceptor によってのみ使用される構成パラメーターもいくつかあります。

confluent.monitoring.interceptor.topic
モニタリングデータの書き込み先のトピックです(デフォルトは _confluent-monitoring)。
confluent.monitoring.interceptor.publishMs
インターセプターがメッセージのパブリッシュで使用する必要のある期間です(デフォルトは 15 秒)。
confluent.monitoring.interceptor.client.id
Confluent Control Center モニタリングデータに含める論理クライアント名です。指定していない場合は、confluent.monitoring.interceptor によってインターセプトされたクライアントのクライアント ID が使用されます。

セキュリティ

セキュアなクラスター用にモニタリングインターセプターを構成するときは、モニタリングインターセプター内の組み込みプロデューサー(ブローカーにモニタリングデータを送信する)に、confluent.monitoring.interceptor をプレフィックスとする、正しい セキュリティ構成 がある必要があります。

Confluent Monitoring Interceptor 向けにセキュリティを有効にする方法の詳細については、以下を参照してください。

librdkafka ベースのクライアントの場合は、confluent.monitoring.interceptor. プレフィックスは同じですが、実際のクライアント構成プロパティは基盤となるクライアントに応じて異なることがあります。Java クライアントおよび librdkafka ベースの複数のクライアント間で異なる可能性があります。

RBAC 用のインターセプターのセットアップ

RBAC が有効にされているときにインターセプターを使用するには、プロデューサーおよびコンシューマーに、モニタリングデータの送信先のクラスター上にあるモニタリングトピック(デフォルトは _confluent-monitoring )に対するプリンシパル書き込みアクセスが必要です。デフォルトでは、モニタリング対象のクラスターは、生成先および消費元のクラスターと同じです。

以下の手順に従います。

  1. Control Center を起動するか、手動で トピックの作成 を行って、_confluent-monitoring トピックを作成します。

  2. Confluent CLI を使用して、_confluent-monitoring トピックに対する DeveloperWrite ロールをクライアントプリンシパルに付与します。

    confluent iam rolebinding create \
    --principal User:<client-principal> \
    --role DeveloperWrite \
    --resource Topic:_confluent-monitoring
    --kafka-cluster-id <kafka-id>
    
  3. クライアントを起動します。

Confluent Platform での RBAC の詳細については、「ロールベースアクセス制御を使用した認可」および「Control Center 用の RBAC の構成」を参照してください。

ログ記録

Kafka クライアントと Confluent インターセプターは、いずれも slf4j を使用してエラーなどの情報をログに記録します。ログ記録を有効にするには、slf4j バインディングを構成する必要があります。構成しないと、"Failed to load class org.slf4j.impl.StaticLoggerBinder" などのエラーメッセージが表示されます。この問題を解決する最もシンプルな方法は、slf4j-simple.jar をクラスパスに追加することです。詳細については、http://www.slf4j.org/codes.html#StaticLoggerBinder を参照してください。

librdkafka ベースのモニタリングインターセプターでは、重大なエラーが stderr にログ記録されます。インターセプター固有の(stderr への)デバッグを有効にするには、confluent.monitoring.interceptor.icdebug 構成プロパティを true に設定します。

構成の例

以下に、Kafka に付属している組み込みパフォーマンステストツール用にストリームモニタリングをセットアップする方法を示します。この手順では、 クイックスタートガイド の説明と同様にクラスターがセットアップされていると想定しています。

  1. Control Center が既に実行されている状態で、ターミナルを開いて以下のコマンドを実行し、MonitoringProducerInterceptor を使用して Producer Performance Test ツールを起動します。

    export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-6.2.4.jar
    ./bin/kafka-producer-perf-test --topic c3test --num-records 10000000 --record-size 1000 \
    --throughput 10000 --producer-props bootstrap.servers=localhost:9092 \
    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor acks=all
    

    また、ターミナルを開いて以下のコマンドを実行することで、MonitoringProducerInterceptor を使用してコンソールプロデューサーを起動することもできます。

    export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-6.2.4.jar
    echo "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" > /tmp/producer.config
    echo "acks=all" >> /tmp/producer.config
    seq 10000 | bin/kafka-console-producer --topic c3test --bootstrap-server localhost:9092 --producer.config /tmp/producer.config
    

    kcat (旧 kafkacat)ユーティリティ など librdkafka ベースのクライアントの場合:

    (for i in $(seq 1 100000) ; echo $i ; sleep 0.01 ; done) | kafkacat -b localhost:9092 -P -t c3test -X acks=all \
    -X plugin.library.paths=monitoring-interceptor
    
  2. 別のターミナルで、MonitoringConsumerInterceptor を使用してコンソールコンシューマーを起動します。

    export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-6.2.4.jar
    echo "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" > /tmp/consumer.config
    echo "group.id=c3_example_consumer_group" >> /tmp/consumer.config
    bin/kafka-console-consumer --topic c3test --bootstrap-server localhost:9092 --consumer.config /tmp/consumer.config
    

    kafkacat など librdkafka ベースのクライアントの場合:

    kafkacat -b localhost:9092 -G c3_example_consumer_group -X auto.offset.reset=earliest \
    -X plugin.library.paths=monitoring-interceptor \
    c3test
    
  3. http://localhost:9021/ の Control Center UI を開き、Consumers をクリックして c3_example_consumer_group を表示します。コンシューマーグループ ID をクリックして詳細を表示します。

検証

インターセプターがデータを _confluent-monitoring トピックに対して正常に送信していることを検証するには、コンソールコンシューマーを起動します。

bin/control-center-console-consumer <control-center.properties> --topic _confluent-monitoring --from-beginning

関連する clientId を含むモニタリングメッセージが該当のトピック上に生成されているはずです。

注釈

インターセプターがプロデューサーとコンシューマーの両方に対して機能していることを確認してください。Control Center では現在、消費されていないメッセージは表示されません。

制限

  • Confluent Monitoring Interceptor では現在、isolation.level=read_committed であるコンシューマーおよび transactional.id が設定されているプロデューサーをサポートして いません
  • Confluent Monitoring Interceptor では、タイムスタンプが欠落しているメッセージやタイムスタンプが無効であるメッセージをスキップします。すべてのブローカーおよびトピックが log.message.format.version および message.format.version0.10.0 以上を指定して構成されていることを確認してください。
  • Confluent Monitoring Interceptor は、Confluent Control Center で "厳密に 1 回" のセマンティクス(EOS)と組み合わせて構成することは できません