Confluent Metrics Reporter

Confluent Metrics Reporter は、さまざまなメトリクスを Apache Kafka® クラスターから収集します。Confluent Control Center のシステム正常性モニタリングおよび Confluent Auto Data Balancer が動作するためには Confluent Metrics Reporter が必要です。

メトリクスは Kafka クラスター内のトピックに対して生成されます。メトリクスを以下の Kafka クラスターにパブリッシュするかどうかを選択できます。

  1. 本稼働環境のトラフィッククラスターと同じクラスター。同じクラスターを使用すると、利便性が高くなり、使い始めるには適切な方法です。
  2. 本稼働環境のトラフィッククラスターと異なるクラスター。専用のメトリクスクラスターを使用すると、本稼働環境のトラフィッククラスターで問題が発生した場合でもシステム正常性モニタリングが提供され続けるため、回復性が向上します。

インストール

Confluent Metrics Reporter は、Kafka ブローカーで Confluent Platform が実行中であれば、ブローカーに自動的にインストールされます。

ブローカー上の次のパスに Confluent Metrics Reporter の JAR ファイルが存在することを確認します。

$CONFLUENT_HOME/share/java/confluent-telemetry/confluent-metrics-6.2.4-ce.jar

ちなみに

Confluent Platform を $CONFLUENT_HOME(./bin/kafka-server-start etc/kafka/server.properties )ディレクトリ内から起動した場合は、それらの起動コマンドに組み込まれているスクリプトによってパスが自動的に構成されるので、Metrics Reporter をインストールする必要はありません。Metrics Reporter を有効にするには、metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter のコメントを解除し、必要に応じて confluent.metrics.reporter.bootstrap.servers=localhost:9092 の値を以下で説明するように変更します。

構成

デフォルトでは Confluent Metrics Reporter は有効になっていません。有効にするには、各 Kafka ブローカーの server.properties を編集して、構成パラメーター metric.reportersconfluent.metrics.reporter.bootstrap.servers を設定する必要があります。変更内容が反映されるためには、ブローカーのローリング再起動を行う必要があります。

ユーザーにおける利便性を考慮して、Confluent Platform に付属している server.properties では、これらの構成パラメーターが既に指定されていますが、コメントアウトされています。Confluent Metrics Reporter セクションにある以下の行のコメントを解除してください。デフォルトでは、本稼働環境のトラフィッククラスターにメトリクスをパブリッシュするように設定されています。本稼働環境のトラフィッククラスターとは異なる Kafka クラスターにメトリクスをパブリッシュする場合は、専用のメトリクスクラスター内の Kafka ブローカーを指すように、confluent.metrics.reporter.bootstrap.servers の値を変更します。

metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
# The bootstrap servers refer to your monitoring cluster
confluent.metrics.reporter.bootstrap.servers=broker1:9092,broker2:9092,broker3:9092

Kafka メトリクスクラスター内のブローカーが 2 つ以下であれば、confluent.metrics.reporter.topic.replicas の値を変更します。

構成の変更内容が反映されるためには、ブローカーの ローリング再起動 が必要です。再起動すると、Metrics Reporter では次のようなメッセージを標準出力と server.log に返します。

[2017-07-17 17:11:32,304] INFO KafkaConfig values:
...
metric.reporters = [io.confluent.metrics.reporter.ConfluentMetricsReporter]
...
[2017-07-17 17:11:32,611] INFO ConfluentMetricsReporterConfig values:
         confluent.metrics.reporter.bootstrap.servers = localhost:9092
         confluent.metrics.reporter.publish.ms = 15000
   ...
...
[2017-07-17 17:11:48,288] INFO Created metrics reporter topic _confluent-metrics (io.confluent.metrics.reporter.ConfluentMetricsReporter)

トピックが作成されると、Metrics Reporter ではそのトピックを定期的(デフォルトでは 15 秒ごと)に生成します。

デフォルトでは、Confluent Metrics Reporter は各 Kafka ブローカー上の server.log にイベントを記録します。より詳細なログを記録するには、./etc/kafka/log4j.properties ファイルに次の行を追加します。

log4j.logger.io.confluent.metrics.reporter.ConfluentMetricsReporter=DEBUG

注釈

上記の構成の変更内容が反映されるためには、Kafka ブローカーのローリング再起動が必要です。

Confluent Metrics Reporter が適切に構成されていて、ブローカーが再起動すると、トピックが自動的に作成され、そのトピックにメトリクスデータが定期的(デフォルトでは 15 秒ごと)に生成されます。

メッセージサイズ

Kafka クラスター内のパーティションの総数が多い場合は、メトリクストピックに対してブローカーで許容されている最大値より大きいサイズのメッセージが生成される可能性があります。バージョン 3.3.0 の時点でのデフォルトでは、トピックは最大 10 MB のサイズのメッセージを受け入れるように構成されています。それより前のバージョンでは、ブローカーのデフォルト値(デフォルトでは 1 MB)が使用されていました。次のログは、サイズが大きすぎたためにメッセージが拒否された場合の例です。

[2017-07-19 00:34:50,664] WARN Failed to produce metrics message (io.confluent.metrics.reporter.ConfluentMetricsReporter)
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

解決策は、そのメトリクストピックの max.message.bytes 構成オプションの値を増やすことです。次の例では、デフォルト値の 2 倍に変更されています。

bin/kafka-configs --alter --bootstrap-server localhost:9092 --add-config max.message.bytes=20000000 --entity-type topics --entity-name _confluent-metrics

構成オプション

必須の構成は下記の最初の項目のみですが、Kafka メトリクスクラスター内のブローカーが 2 つ以下であれば、confluent.metrics.reporter.topic.replicas を変更する必要があります。

その他の構成オプションを使用することで、パフォーマンスと信頼性が向上するようにパブリッシャーを調整できます。ほとんどの場合、"重要度: 低" に分類されている構成オプションを変更する必要はありません。

confluent.metrics.reporter.bootstrap.servers

メトリクスのパブリッシュ先である、Kafka クラスターのブートストラップサーバー。このメトリクスクラスターは、メトリクスの収集元であるクラスター(1 つまたは複数)と異なるクラスターにすることもできます。たとえば、複数の Kafka クラスターから単一のメトリクスクラスターにパブリッシュすることもできます。

  • 型: string
  • 重要度: 高
confluent.metrics.reporter.topic.max.message.bytes

メトリクストピックの最大メッセージサイズ。

  • 型: int
  • デフォルト: 10485760
  • 指定可能な値: [0,…]
  • 重要度: 中
confluent.metrics.reporter.publish.ms

Confluent Metrics Reporter は、この設定で定義されている時間間隔で、新しいメトリクスをメトリクストピックにパブリッシュします。したがって、Control Center のシステム正常性データではこの時間だけラグが発生します。また、リバランサーではこの時間分だけ古くなっているブローカーのデータに基づいて予定を算出することがあります。このデフォルト値は本稼働環境に対する適正な値となっており、通常は変更する必要はありません。

  • 型: long
  • デフォルト: 15000
  • 重要度: 低
confluent.metrics.reporter.topic

メトリクスデータの書き込み先のトピック。

  • 型: string
  • デフォルト: _confluent-metrics
  • 重要度: 低
confluent.metrics.reporter.topic.create

メトリクストピックが存在しない場合に作成されます。

  • 型: boolean
  • デフォルト: true
  • 重要度: 低
confluent.metrics.reporter.topic.partitions

メトリクストピック内のパーティションの数。

  • 型: int
  • デフォルト: 12
  • 重要度: 低
confluent.metrics.reporter.topic.replicas

メトリクストピック内のレプリカ数。この値は、Kafka クラスター内のブローカー数以下である必要があります。

  • 型: int
  • デフォルト: 3
  • 重要度: 低
confluent.metrics.reporter.topic.retention.bytes

メトリクストピックの保持バイト数。

  • 型: long
  • デフォルト: -1
  • 重要度: 低
confluent.metrics.reporter.topic.retention.ms

メトリクストピックの保持時間(ミリ秒)。

  • 型: long
  • デフォルト値: 259200000(3 日間)
  • 重要度: 低
confluent.metrics.reporter.topic.roll.ms

メトリクストピックのログのローリング時間(ミリ秒)。

  • 型: long
  • デフォルト: 14400000(4 時間)
  • 重要度: 低
confluent.metrics.reporter.volume.metrics.refresh.ms

新しいボリュームメトリクスがフェッチされる最小時間間隔(ミリ秒)。

  • 型: long
  • デフォルト: 15000
  • 重要度: 低
confluent.metrics.reporter.whitelist

メトリクストピックにパブリッシュされる、Yammer メトリクスの MBean 名または Kafka のメトリクス名に一致する正規表現。

デフォルトでは、Confluent Control Center および Confluent Auto Data Balancer で必要とされるすべてのメトリクスが含まれます。Confluent から要求された場合を除き、通常はこのオプションを変更しないでください。

  • 型: string
  • デフォルト : Confluent Control Center および Confluent Auto Data Balancer で必要とされるすべてのメトリクスが含まれます
  • 重要度: 低

セキュリティ

セキュアな Kafka ブローカーで Confluent Metrics Reporter を構成する場合は、Confluent Metrics Reporter の 組み込みプロデューサー_confluent-metrics トピックにメトリクスデータを送信する)に、名前が confluent.metrics.reporter で始まる、正しい クライアントセキュリティ構成 がある必要があります。「ブローカーの構成」および「クライアントの構成」も参照してください。

認証

SSL 関連の構成については、 Kafka クライアントの SSL を参照してください。

confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234
confluent.metrics.reporter.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.metrics.reporter.ssl.keystore.password=test1234
confluent.metrics.reporter.ssl.key.password=test1234

SASL 関連の構成については、 Kafka クライアントの SASL を参照してください。

confluent.metrics.reporter.sasl.mechanism=PLAIN
confluent.metrics.reporter.security.protocol=SASL_PLAINTEXT

JAAS の構成ファイルの場所を JVM パラメーターとして渡す(サポートされていますが非推奨です)か、または server.properties 内で構成(推奨)します。詳細については「JAAS」を参照してください。

confluent.metrics.reporter.sasl.jaas.config= \
 org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
     username=“<mds-username>” \
     password=“<password>” \
  metadataServerUrls="https://<host-name>:<local-port>" (or "http" if non-production);

ロールベースアクセス制御

RBAC が有効になっている場合にメトリクスを送信するためには、プロデューサーおよびコンシューマーに対する 認証 と同じ方法を使用して Confluent Metrics Reporter を構成します。

以下の構成を使用して、MDS を利用していながら MDS を "ホストしていない" クラスターに対してメトリクスを送信します。

############################# Metrics Reporter Settings #############################

metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.bootstrap.servers=<broker-name>:<broker-port>
confluent.metrics.reporter.security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
confluent.metrics.reporter.sasl.mechanism=OAUTHBEARER
confluent.metrics.reporter.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
confluent.metrics.reporter.sasl.jaas.config= \
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    username="<mds-username>" \
    password="<password>" \
metadataServerUrls="https://<host-name>:<local-port>" (or "http" if non-production);

MDS を実行およびホストしているクラスターと同じクラスターで Confluent Metrics Reporter を実行している場合、Metrics Reporter では MDS サーバーからのトークンを使用できないので、MDS 認証を必要としないリスナーと通信する必要があることに注意してください。その場合、その MDS のインスタンスに対して指定されている RBAC リスナーは使用しないでください。代わりに、MDS 構成の advertised.listeners で指定されているブローカー間リスナーを使用します。MDS 構成で指定されているリスナーの詳細については「Metadata Service (MDS) の構成」を参照してください。

注釈

Confluent Metrics Reporter を有効にしている場合に、クライアント ID の競合が発生する可能性があります。その場合は、エラーログにあるメッセージの数でわかります。このエラーによって、Confluent Metrics Reporter の想定される動作が影響を受けたり、それらの動作が変更されたりすることはありません。以下に例を示します。

WARN Error registering AppInfo mbean (org.apache.kafka.common.utils.AppInfoParser)
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=confluent-metrics-reporter

認可

  1. ブローカーのプリンシパルには、構成されている Kafka クラスターでメトリクストピックを作成するためのアクセス許可が付与されている必要があります。
  2. ブローカーのプリンシパルには、メトリクストピックに対してメッセージを生成するためのアクセス許可が付与されている必要があります。
  3. ツールのプリンシパルには、メトリクストピックから消費するためのアクセス許可が付与されている必要があります。これは通常、Confluent Control Center と Auto Data Balancer のいずれかまたは両方に該当しますが、トピックを検査するためにコンソールコンシューマーを使用している場合は、コンソールコンシューマーにも適用されます。

Kafka に対して ACL をセットアップしている場合は、bin/kafka-acls コマンドラインツールを使用して、トピックの ACL を追加または削除します。以下に例を示します。

bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add \
    --allow-principal User:Alice --allow-host 198.51.100.0 \
    --operation Read --operation Write --topic _confluent-metrics

Kerberos を有効化している RBAC

Kerberos を有効化 して RBAC を実行している場合にメトリクスを送信するには、以下の構成を使用します。

metrics.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.topic.replicas=3
confluent.metrics.reporter.bootstrap.servers=<broker-name>:<broker-port>
confluent.metrics.reporter.security.protocol=SASL_SSL (or SASL_PLAINTEXT)
confluent.metrics.reporter.sasl.mechanism=GSSAPI
confluent.metrics.reporter.sasl.jaas.config=  \
   com.sun.security.auth.module.Krb5LoginModule required \
   debug=true \
   useKeyTab=true \
   storeKey=true \
   keyTab=<path-to-your-keytab> \
   principal=<org-kerberos-principal>;

MDS と同じクラスターで mTLS を実行している RBAC

MDS と同じクラスターで mTLS(相互 TLS)を使用して RBAC を実行している場合にメトリクスを送信するためには、以下の構成(server.properties)を使用します。

metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.topic.replicas=3
confluent.metrics.reporter.bootstrap.servers=<broker hostname>:<internal-listener-port-number>
confluent.metrics.reporter.security.protocol=ssl
confluent.metrics.reporter.ssl.keystore.location=<path-to-your-keystore>
confluent.metrics.reporter.ssl.keystore.password=<password>
confluent.metrics.reporter.ssl.truststore.location=<path-to-your-truststore>
confluent.metrics.reporter.ssl.endpoint.identification.algorithm=
confluent.metrics.reporter.ssl.client.auth=required
confluent.metrics.reporter.ssl.truststore.password=<truststore-password>
confluent.metrics.reporter.ssl.key.password=<ssl-key-password>

検証

Kafka ブローカーが、メトリクスデータを正しいトピック(デフォルトでは _confluent-metrics )に対して正常に送信していることを検証します。

bin/kafka-console-consumer.sh --topic _confluent-metrics --bootstrap-server <bootstrap-server> --formatter io.confluent.metrics.reporter.ConfluentMetricsFormatter

Confluent Metrics Reporter がイベントを server.log にログ記録していることを検証します。次のようなメッセージが出力されます。

[2017-07-17 17:11:32,304] INFO KafkaConfig values:
...
metric.reporters = [io.confluent.metrics.reporter.ConfluentMetricsReporter]
...
[2017-07-17 17:11:32,611] INFO ConfluentMetricsReporterConfig values:
         confluent.metrics.reporter.bootstrap.servers = localhost:9092
         confluent.metrics.reporter.publish.ms = 15000
   ...
...
[2017-07-17 17:11:48,288] INFO Created metrics reporter topic _confluent-metrics (io.confluent.metrics.reporter.ConfluentMetricsReporter)

より詳細なログ記録(DEBUG など)を有効にしている場合は、デバッグ時に役立つ追加情報付きでメッセージが出力されます。次のようなメッセージが出力されることが考えられます。

[2017-07-19 00:54:02,619] DEBUG Metrics reporter topic _confluent-metrics already exists (io.confluent.metrics.reporter.ConfluentMetricsReporter)
[2017-07-19 00:54:02,622] DEBUG Begin publishing metrics (io.confluent.metrics.reporter.ConfluentMetricsReporter)
...
[2017-07-19 00:54:02,772] DEBUG Produced metrics message of size 52104 with offset 316 to topic partition _confluent-metrics-6 (io.confluent.metrics.reporter.ConfluentMetricsReporter)