Configure Cluster for Confluent Control Center Client Monitoring

Set up your Apache Kafka® cluster for Control Center client metrics and to monitor share partition lag.

Configure cluster for client metrics

Prerequisites

  • Control Center running in normal mode

  • Minimum required Confluent Platform is 8.0 or higher

  • Minimum required client versions are Java client 3.7.2+ and librdkafka clients 2.6+

  • Enable Telemetry Reporter on Confluent Platform

Limitations

  • You must add a configuration to your self-managed connectors to enable client metrics for self-managed connectors. Add the following configuration to the connector configuration file:

    producer.override.enable.metric.push=true
    
    • Alternatively, you can use Control Center to add the configuration to enable client metrics for self-managed connectors.

      Add the configuration in Additional Properties of the connector for which you want to receive client metrics. For more information, see Manage Kafka Connect.

    • Without the producer.override.enable.metric.push=true configuration, Control Center does not display client metrics from self-managed connectors.

  • Some librdkafka clients do not expose all standard KIP-714 metrics. If you are working with such a client, some columns in Control Center are blank. This is due to this client limitation for some librdkafka clients.

Configure cluster

Use the following configurations to add and update the properties file of every Kafka broker in the cluster.

Considerations:
  • KRaft properties file: server.properties

  • File system location: <kafka_path>/kafka_2.13-3.8.0/config/kraft/server.properties

  1. Add the following configurations to the properties file of every Kafka broker.

    Telemetry Reporter configurations to add:

    confluent.telemetry.external.client.metrics.push.enabled=true
    confluent.telemetry.external.client.metrics.delta.temporality=false
    confluent.telemetry.external.client.metrics.subscription.interval.ms.list=60000
    confluent.telemetry.external.client.metrics.subscription.metrics.list=org.apache.kafka.consumer.fetch.manager.fetch.latency.avg,org.apache.kafka.consumer.connection.creation.total,org.apache.kafka.consumer.fetch.manager.fetch.total,org.apache.kafka.consumer.fetch.manager.bytes.consumed.rate,org.apache.kafka.producer.bufferpool.wait.ratio,org.apache.kafka.producer.record.queue.time.avg,org.apache.kafka.producer.request.latency.avg,org.apache.kafka.producer.produce.throttle.time.avg,org.apache.kafka.producer.connection.creation.total,org.apache.kafka.producer.request.total,org.apache.kafka.producer.topic.byte.rate
    
  2. Update the following configuration in the properties file of every Kafka broker.

    Telemetry Reporter configurations to update:

    confluent.telemetry.exporter._c3.metrics.include=io.confluent.kafka.server.request.(?!.*delta).*|io.confluent.kafka.server.socket_server.connections|io.confluent.kafka.server.server.broker.state|io.confluent.kafka.server.replica.manager.leader.count|io.confluent.kafka.server.request.queue.size|io.confluent.kafka.server.broker.topic.failed.produce.requests.rate.1.min|io.confluent.kafka.server.tier.archiver.total.lag|io.confluent.kafka.server.request.total.time.ms.p99|io.confluent.kafka.server.broker.topic.failed.fetch.requests.rate.1.min|io.confluent.kafka.server.broker.topic.total.fetch.requests.rate.1.min|io.confluent.kafka.server.partition.caught.up.replicas.count|io.confluent.kafka.server.partition.observer.replicas.count|io.confluent.kafka.server.tier.tasks.num.partitions.in.error|io.confluent.kafka.server.broker.topic.bytes.out.rate.1.min|io.confluent.kafka.server.request.total.time.ms.p95|io.confluent.kafka.server.controller.active.controller.count|io.confluent.kafka.server.session.expire.listener.zookeeper.disconnects.total|io.confluent.kafka.server.request.total.time.ms.p999|io.confluent.kafka.server.controller.active.broker.count|io.confluent.kafka.server.request.handler.pool.request.handler.avg.idle.percent.rate.1.min|io.confluent.kafka.server.session.expire.listener.zookeeper.disconnects.rate.1.min|io.confluent.kafka.server.controller.unclean.leader.elections.rate.1.min|io.confluent.kafka.server.replica.manager.partition.count|io.confluent.kafka.server.controller.unclean.leader.elections.total|io.confluent.kafka.server.partition.replicas.count|io.confluent.kafka.server.broker.topic.total.produce.requests.rate.1.min|io.confluent.kafka.server.controller.offline.partitions.count|io.confluent.kafka.server.socket.server.network.processor.avg.idle.percent|io.confluent.kafka.server.partition.under.replicated|io.confluent.kafka.server.log.log.start.offset|io.confluent.kafka.server.log.tier.size|io.confluent.kafka.server.log.size|io.confluent.kafka.server.tier.fetcher.bytes.fetched.total|io.confluent.kafka.server.request.total.time.ms.p50|io.confluent.kafka.server.tenant.consumer.lag.offsets|io.confluent.kafka.server.session.expire.listener.zookeeper.expires.rate.1.min|io.confluent.kafka.server.log.log.end.offset|io.confluent.kafka.server.broker.topic.bytes.in.rate.1.min|io.confluent.kafka.server.partition.under.min.isr|io.confluent.kafka.server.partition.in.sync.replicas.count|io.confluent.telemetry.http.exporter.batches.dropped|io.confluent.telemetry.http.exporter.items.total|io.confluent.telemetry.http.exporter.items.succeeded|io.confluent.telemetry.http.exporter.send.time.total.millis|io.confluent.kafka.server.controller.leader.election.rate.(?!.*delta).*|io.confluent.telemetry.http.exporter.batches.failed|org.apache.kafka.consumer.(fetch.manager.fetch.latency.avg|connection.creation.total|fetch.manager.fetch.total|fetch.manager.bytes.consumed.rate)|org.apache.kafka.producer.(bufferpool.wait.ratio|record.queue.time.avg|request.latency.avg|produce.throttle.time.avg|connection.creation.total|request.total|topic.byte.rate)
    
  3. Restart every Kafka broker.

Configure cluster to monitor share partition lag

Share partition lag represents the number of records that have not yet been fully processed by consumers in a share group. It is the primary metric used to automate the scaling of share consumer instances. Unlike standard consumer lag, share partition lag accounts for the non-contiguous state of records in these states:

  • AVAILABLE: Not yet acquired by any consumer

  • ACQUIRED: Currently assigned to a consumer with an active acquisition lock

  • ACKNOWLEDGED: Processing complete

  • ARCHIVED: Removed by compaction

Share partition lag is calculated as:

(highest offset) - (share-partition start offset) + 1 - (processed in-flight records)

To monitor lag effectively, you must distinguish between record states. Records in a Terminal state have been acknowledged or archived. These records no longer contribute to the lag calculation.

To monitor share partition lag:

  1. Set the following property in KafkaBrokerConfig to view share partition lag details:

    confluent.share.lag.calculator.enabled=true
    
  2. Add these configurations to the etc/kafka/broker.properties file to override the default value of 3 for these configurations:

    share.coordinator.state.topic.replication.factor=1
    share.coordinator.state.topic.min.isr=1