Confluent Metrics Reporter

The Confluent Metrics Reporter collects various metrics from a Apache Kafka® cluster. The Confluent Metrics Reporter is necessary for the Confluent Control Center system health monitoring and Confluent Auto Data Balancer to operate.

The metrics are produced to a topic in a Kafka cluster. You may choose whether to publish metrics to a Kafka cluster that is:

  1. The same as your production traffic cluster. Using the same cluster is more convenient and is a reasonable way to get started.
  2. Different from your production traffic cluster. Using a dedicated metrics cluster is more resilient because it continues to provide system health monitoring even if the production traffic cluster experiences issues.

Installation

The Confluent Metrics Reporter is automatically installed onto Kafka brokers if they are running Confluent Platform.

Confirm the Confluent Metrics Reporter JAR file is on the broker:

<install path>/share/java/kafka/confluent-metrics-5.3.1.jar
<install path>/share/java/confluent-rebalancer/confluent-metrics-5.3.1.jar

Configuration

By default, the Confluent Metrics Reporter is not enabled. To enable it, you must edit each Kafka broker’s server.properties and set the metric.reporters and confluent.metrics.reporter.bootstrap.servers configuration parameters. For the changes to take effect, you must perform a rolling restart of the brokers.

For your convenience, the server.properties shipped with Confluent Platform already provides these configuration parameters, but they are commented out. Uncomment the following lines in the Confluent Metrics Reporter section, which by default publishes metrics to your production traffic cluster. If you prefer to publish metrics to a Kafka cluster that is different from your production traffic cluster, modify confluent.metrics.reporter.bootstrap.servers to point to Kafka brokers in the dedicated metrics cluster.

metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
# The bootstrap servers refer to your monitoring cluster
confluent.metrics.reporter.bootstrap.servers=broker1:9092,broker2:9092,broker3:9092

Update confluent.metrics.reporter.topic.replicas if there are less than 3 brokers in the Kafka metrics cluster.

A rolling restart of the brokers is required for the configuration changes to be picked up. After the restart, the reporter should return messages similar to the following standard output and server.log:

[2017-07-17 17:11:32,304] INFO KafkaConfig values:
...
metric.reporters = [io.confluent.metrics.reporter.ConfluentMetricsReporter]
...
[2017-07-17 17:11:32,611] INFO ConfluentMetricsReporterConfig values:
         confluent.metrics.reporter.bootstrap.servers = localhost:9092
         confluent.metrics.reporter.publish.ms = 15000
   ...
...
[2017-07-17 17:11:48,288] INFO Created metrics reporter topic _confluent-metrics (io.confluent.metrics.reporter.ConfluentMetricsReporter)

After the topic is created, the metrics reporter will produce to the topic periodically (every 15 seconds by default).

By default, the Confluent Metrics Reporter logs events to server.log on each Kafka broker. For more verbose logging, add the following line to ./etc/kafka/log4j.properties:

log4j.logger.io.confluent.metrics.reporter.ConfluentMetricsReporter=DEBUG

Note

A rolling bounce of the Kafka brokers is required for any of the above configuration changes to take effect.

After the Confluent Metrics Reporter is properly configured and the brokers have been restarted, the topic is automatically created and metrics data is produced to the topic periodically (every 15 seconds by default).

Message size

If the total number of partitions in the Kafka cluster is large, it is possible that the produced message is larger than the maximum allowed by the broker for the metrics topic. As of 3.3.0, the topic is configured to accept messages up to 10 MB in size by default. In previous versions, the broker default was used (1 MB by default). The following log is an example of when a message is rejected due to its size:

[2017-07-19 00:34:50,664] WARN Failed to produce metrics message (io.confluent.metrics.reporter.ConfluentMetricsReporter)
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

The solution is to increase the max.message.bytes configuration for the metrics topic. The following example updates it to twice the default:

./bin/kafka-topics --alter --zookeeper localhost:2181 --config max.message.bytes=20000000 --topic _confluent-metrics

Configuration Options

Only the first configuration below is required, but you should update confluent.metrics.reporter.topic.replicas if there are less than 3 brokers in the Kafka metrics cluster.

The other configurations enable you to tune the publisher for performance and reliability. In most cases, configurations categorized with “Importance: low” do not require modification.

confluent.metrics.reporter.bootstrap.servers

Bootstrap servers for the Kafka cluster for which metrics will be published. The metrics cluster may be different from the cluster(s) whose metrics are being collected. For example, several Kafka clusters can publish to a single metrics cluster.

  • Type: string
  • Importance: high
confluent.metrics.reporter.topic.max.message.bytes

Maximum message size for the metrics topic.

  • Type: int
  • Default: 10485760
  • Valid Values: [0,…]
  • Importance: medium
confluent.metrics.reporter.publish.ms

The Confluent Metrics Reporter will publish new metrics to the metrics topic in intervals defined by this setting. This means that control center system health data lags by this duration, or that rebalancer may compute a plan based on broker data that is stale by this duration. The default is a reasonable value for production environments and typically does not need to be changed.

  • Type: long
  • Default: 15000
  • Importance: low
confluent.metrics.reporter.topic

Topic on which metrics data will be written.

  • Type: string
  • Default: _confluent-metrics
  • Importance: low
confluent.metrics.reporter.topic.create

Create the metrics topic if it does not exist.

  • Type: boolean
  • Default: true
  • Importance: low
confluent.metrics.reporter.topic.partitions

Number of partitions in the metrics topic.

  • Type: int
  • Default: 12
  • Importance: low
confluent.metrics.reporter.topic.replicas

Number of replicas in the metric topic. It must not be higher than the number of brokers in the Kafka cluster.

  • Type: int
  • Default: 3
  • Importance: low
confluent.metrics.reporter.topic.retention.bytes

Retention bytes for the metrics topic.

  • Type: long
  • Default: -1
  • Importance: low
confluent.metrics.reporter.topic.retention.ms

Retention time for the metrics topic.

  • Type: long
  • Default: 259200000 (3 days)
  • Importance: low
confluent.metrics.reporter.topic.roll.ms

Log rolling time for the metrics topic.

  • Type: long
  • Default: 14400000 (4 hours)
  • Importance: low
confluent.metrics.reporter.volume.metrics.refresh.ms

The minimum interval at which to fetch new volume metrics.

  • Type: long
  • Default: 15000
  • Importance: low
confluent.metrics.reporter.whitelist

Regex matching the yammer metric mbean name or Kafka metric name to be published to the metrics topic.

By default this includes all the metrics required by Confluent Control Center and Confluent Auto Data Balancer. This should typically never be modified unless requested by Confluent.

  • Type: string
  • Default: includes all the metrics necessary for Confluent Control Center and Confluent Auto Data Balancer
  • Importance: low

Security

When configuring Confluent Metrics Reporter on a secure Kafka broker, the embedded producer (that sends metrics data to _confluent-metrics topic) in Confluent Metrics Reporter must have the correct client security configurations prefixed with confluent.metrics.reporter. Also see Configure Brokers and Configure Clients.

Authentication

For SSL-related configurations, refer to SSL for Kafka Clients.

confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234
confluent.metrics.reporter.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.metrics.reporter.ssl.keystore.password=test1234
confluent.metrics.reporter.ssl.key.password=test1234

For SASL-related configurations refer to SASL for Kafka Clients.

confluent.metrics.reporter.sasl.mechanism=PLAIN
confluent.metrics.reporter.security.protocol=SASL_PLAINTEXT

Pass the JAAS configuration file location as JVM parameter (supported, but not recommended) or configure in server.properties (recommended). For more details see JAAS.

confluent.metrics.reporter.sasl.jaas.config= \
 org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
     username=“<mds-username>” \
     password=“<password>” \
  metadataServerUrls="https://<host-name>:<local-port>" (or "http" if non-production);

Role-Based Access Control

To send metrics when RBAC is enabled, configure Confluent Metrics Reporter using the same method as Authentication for producers and consumers.

Use the following configuration to send metrics for a cluster that is utilizing but not hosting MDS:

############################# Metrics Reporter Settings #############################

metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.bootstrap.servers=<broker-name>:<broker-port>
confluent.metrics.reporter.security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
confluent.metrics.reporter.sasl.mechanism=OAUTHBEARER
confluent.metrics.reporter.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
confluent.metrics.reporter.sasl.jaas.config= \
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    username="<mds-username>" \
    password="<password>" \
metadataServerUrls="https://<host-name>:<local-port>" (or "http" if non-production);

If you are running Confluent Metrics Reporter on the same cluster that is running and hosting MDS, be aware that it cannot use tokens from the MDS server, so it must communicate with listeners that do not require MDS authentication. In this case, do not use the RBAC listener specified for the MDS instance. Instead, use the inter-broker listener specified for advertised.listeners in the MDS configuration. For details about listeners specified in an MDS configuration, see Configure Metadata Service (MDS).

Note

It is possible that conflicting client identifiers occur when enabling Confluent Metrics Reporter. In such cases, you will notice a number of messages in the error log. This error does not impact or change the expected behavior of Confluent Metrics Reporter. For example:

WARN Error registering AppInfo mbean (org.apache.kafka.common.utils.AppInfoParser)
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=confluent-metrics-reporter

Authorization

  1. The broker’s principal must have permission to create the metrics topic in the configured Kafka cluster.
  2. The broker’s principal must have permission to produce to the metrics topic.
  3. The tool’s principal must have permission to consume from the metrics topic. This would typically be the Confluent Control Center and/or the Auto Data Balancer, but it also applies to the Console Consumer if it’s used to inspect the topic.

If you have ACLs set up for Kafka, use the bin/kafka-acls command line tool to add/remove ACLs on topics, for example:

bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
    --allow-principal User:Alice --allow-host 198.51.100.0 \
    --operation Read --operation Write --topic _confluent-metrics

RBAC with Kerberos enabled

To send metrics when RBAC is running with Kerberos enabled, use the following configuration:

metrics.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.topic.replicas=3
confluent.metrics.reporter.bootstrap.servers=<broker-name>:<broker-port>
confluent.metrics.reporter.security.protocol=GSSAPI
confluent.metrics.reporter.sasl.mechanism=GSSAPI
confluent.metrics.reporter.sasl.jaas.config=  \
   com.sun.security.auth.module.Krb5LoginModule required \
   debug=true \
   useKeyTab=true \
   storeKey=true \
   keyTab=<path-to-your-keytab> \
   principal=<org-kerberos-principal>;

Verification

Verify the Kafka brokers are properly sending metrics data to the correct topic, which by default is _confluent-metrics.

bin/kafka-console-consumer.sh --topic _confluent-metrics --bootstrap-server <bootstrap-server> --formatter io.confluent.metrics.reporter.ConfluentMetricsFormatter

Verify the Confluent Metrics Reporter are logging events to server.log. You should see messages similar to the following:

[2017-07-17 17:11:32,304] INFO KafkaConfig values:
...
metric.reporters = [io.confluent.metrics.reporter.ConfluentMetricsReporter]
...
[2017-07-17 17:11:32,611] INFO ConfluentMetricsReporterConfig values:
         confluent.metrics.reporter.bootstrap.servers = localhost:9092
         confluent.metrics.reporter.publish.ms = 15000
   ...
...
[2017-07-17 17:11:48,288] INFO Created metrics reporter topic _confluent-metrics (io.confluent.metrics.reporter.ConfluentMetricsReporter)

If you enabled more verbose logging, such as DEBUG, you should see messages with additional information, which may be helpful during debugging. They may be similar to the following:

[2017-07-19 00:54:02,619] DEBUG Metrics reporter topic _confluent-metrics already exists (io.confluent.metrics.reporter.ConfluentMetricsReporter)
[2017-07-19 00:54:02,622] DEBUG Begin publishing metrics (io.confluent.metrics.reporter.ConfluentMetricsReporter)
...
[2017-07-19 00:54:02,772] DEBUG Produced metrics message of size 52104 with offset 316 to topic partition _confluent-metrics-6 (io.confluent.metrics.reporter.ConfluentMetricsReporter)