Configure Cluster for Confluent Control Center Client Monitoring

Set up your Apache Kafka® cluster for Control Center client monitoring.

Prerequisites

  • Control Center running in normal mode

  • Minimum required Confluent Platform is 8.0 or higher

  • Minimum required client versions are Java client 3.7.2+ and librdkafka clients 2.6+

  • Enable Telemetry Reporter on Confluent Platform

Limitations

  • You must add a configuration to your self-managed connectors to enable client metrics for self-managed connectors. Add the following configuration to the connector configuration file:

    producer.override.enable.metric.push=true
    
    • Alternatively, you can use Control Center to add the configuration to enable client metrics for self-managed connectors.

      Add the configuration in Additional Properties of the connector for which you want to receive client metrics. For more information, see Manage Kafka Connect.

    • Without the producer.override.enable.metric.push=true configuration, Control Center does not display client metrics from self-managed connectors.

  • Some librdkafka clients do not expose all standard KIP-714 metrics. If you are working with such a client, some columns in Control Center are blank. This is due to this client limitation for some librdkafka clients.

Configure cluster for client monitoring

Use the following configurations to add and update the properties file of every Kafka broker in the cluster.

Considerations:
  • KRaft properties file: server.properties

  • File system location: <kafka_path>/kafka_2.13-3.8.0/config/kraft/server.properties

  1. Add the following configurations to the properties file of every Kafka broker.

    Telemetry Reporter configurations to add:

    confluent.telemetry.external.client.metrics.push.enabled=true
    confluent.telemetry.external.client.metrics.delta.temporality=false
    confluent.telemetry.external.client.metrics.subscription.interval.ms.list=60000
    confluent.telemetry.external.client.metrics.subscription.metrics.list=org.apache.kafka.consumer.fetch.manager.fetch.latency.avg,org.apache.kafka.consumer.connection.creation.total,org.apache.kafka.consumer.fetch.manager.fetch.total,org.apache.kafka.consumer.fetch.manager.bytes.consumed.rate,org.apache.kafka.producer.bufferpool.wait.ratio,org.apache.kafka.producer.record.queue.time.avg,org.apache.kafka.producer.request.latency.avg,org.apache.kafka.producer.produce.throttle.time.avg,org.apache.kafka.producer.connection.creation.total,org.apache.kafka.producer.request.total,org.apache.kafka.producer.topic.byte.rate
    
  2. Update the following configuration in the properties file of every Kafka broker.

    Telemetry Reporter configurations to update:

    confluent.telemetry.exporter._c3.metrics.include=io.confluent.kafka.server.request.(?!.*delta).*|io.confluent.kafka.server.socket_server.connections|io.confluent.kafka.server.server.broker.state|io.confluent.kafka.server.replica.manager.leader.count|io.confluent.kafka.server.request.queue.size|io.confluent.kafka.server.broker.topic.failed.produce.requests.rate.1.min|io.confluent.kafka.server.tier.archiver.total.lag|io.confluent.kafka.server.request.total.time.ms.p99|io.confluent.kafka.server.broker.topic.failed.fetch.requests.rate.1.min|io.confluent.kafka.server.broker.topic.total.fetch.requests.rate.1.min|io.confluent.kafka.server.partition.caught.up.replicas.count|io.confluent.kafka.server.partition.observer.replicas.count|io.confluent.kafka.server.tier.tasks.num.partitions.in.error|io.confluent.kafka.server.broker.topic.bytes.out.rate.1.min|io.confluent.kafka.server.request.total.time.ms.p95|io.confluent.kafka.server.controller.active.controller.count|io.confluent.kafka.server.session.expire.listener.zookeeper.disconnects.total|io.confluent.kafka.server.request.total.time.ms.p999|io.confluent.kafka.server.controller.active.broker.count|io.confluent.kafka.server.request.handler.pool.request.handler.avg.idle.percent.rate.1.min|io.confluent.kafka.server.session.expire.listener.zookeeper.disconnects.rate.1.min|io.confluent.kafka.server.controller.unclean.leader.elections.rate.1.min|io.confluent.kafka.server.replica.manager.partition.count|io.confluent.kafka.server.controller.unclean.leader.elections.total|io.confluent.kafka.server.partition.replicas.count|io.confluent.kafka.server.broker.topic.total.produce.requests.rate.1.min|io.confluent.kafka.server.controller.offline.partitions.count|io.confluent.kafka.server.socket.server.network.processor.avg.idle.percent|io.confluent.kafka.server.partition.under.replicated|io.confluent.kafka.server.log.log.start.offset|io.confluent.kafka.server.log.tier.size|io.confluent.kafka.server.log.size|io.confluent.kafka.server.tier.fetcher.bytes.fetched.total|io.confluent.kafka.server.request.total.time.ms.p50|io.confluent.kafka.server.tenant.consumer.lag.offsets|io.confluent.kafka.server.session.expire.listener.zookeeper.expires.rate.1.min|io.confluent.kafka.server.log.log.end.offset|io.confluent.kafka.server.broker.topic.bytes.in.rate.1.min|io.confluent.kafka.server.partition.under.min.isr|io.confluent.kafka.server.partition.in.sync.replicas.count|io.confluent.telemetry.http.exporter.batches.dropped|io.confluent.telemetry.http.exporter.items.total|io.confluent.telemetry.http.exporter.items.succeeded|io.confluent.telemetry.http.exporter.send.time.total.millis|io.confluent.kafka.server.controller.leader.election.rate.(?!.*delta).*|io.confluent.telemetry.http.exporter.batches.failed|org.apache.kafka.consumer.(fetch.manager.fetch.latency.avg|connection.creation.total|fetch.manager.fetch.total|fetch.manager.bytes.consumed.rate)|org.apache.kafka.producer.(bufferpool.wait.ratio|record.queue.time.avg|request.latency.avg|produce.throttle.time.avg|connection.creation.total|request.total|topic.byte.rate)
    
  3. Restart every Kafka broker.