.. _controlcenter_clients: Confluent Monitoring Interceptors ================================= To monitor production and consumption in |c3-short|, install the Confluent Monitoring Interceptors with your |ak-tm| applications and configure your applications to use the interceptors on the |ak| messages produced and consumed, that are then sent to |c3-short|. This guide explains how to do this. Installing Interceptors ----------------------- Since |ak| 0.10.0.0, |ak| Clients support pluggable *interceptors* to examine (and potentially modify) messages. |c3-short| requires clients to use Confluent Monitoring Interceptors to collect statistics on incoming and outgoing messages to provide production and consumption monitoring capabilities. To use |c3-short|'s production and consumption monitoring features, first you must install the Confluent Monitoring Interceptor on client machines. Java Clients ~~~~~~~~~~~~ Maven, Ivy, or Gradle when developing Java applications. Here is sample content for a POM file for Maven. First, specify the :ref:`Confluent Maven repository `: .. sourcecode:: xml ... confluent http://packages.confluent.io/maven/ ... Next, add dependencies for the |ak| Java clients and the Confluent Monitoring Interceptors: .. codewithvars:: xml ... org.apache.kafka kafka-clients |kafka_release| io.confluent monitoring-interceptors |release| ... librdkafka-based Clients ~~~~~~~~~~~~~~~~~~~~~~~~ For librdkafka-based clients such as confluent-kafka-python, confluent-kafka-go, or confluent-kafka-dotnet, a separate monitoring interceptor plugin is used that is distributed differently depending on the platform: * Linux (Debian and RedHat based distributions): install the ``confluent-librdkafka-plugins`` package from the :ref:`Confluent repositories `. * macOS: download the `monitoring interceptor zip file `_ and unzip the ``monitoring-interceptor.dylib`` file to the same directory as your application or a directory in the system library search path, such as ``/usr/local/lib``. * Windows: download the `monitoring interceptor zip file `_ and unzip the appropriate ``monitoring-interceptor.dll`` for your architecture to the same location as your application is installed or the directory it is run from. .. note:: The monitoring interceptor plugin for librdkafka-based clients requires librdkafka version 0.11.0 or later. .. note:: The monitoring interceptor plugin is a runtime dependency and is not required to build the client or application; the plugin is directly referenced through configuration properties (``plugin.library.paths``) and must be installed on the deployment host rather than the build host. .. _controlcenter_client_interceptor_config: Enabling Interceptors --------------------- After you have installed the Confluent Monitoring Interceptor package to your |ak| applications, configure your clients to actually use the interceptors. How you configure your clients depends on what kind of client it is. .. warning:: The producer and consumer interceptor classes are different; make sure you choose the correct class for each producer and consumer. Java Producers and Consumers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can specify the Confluent Monitoring Interceptor to be used for Java producers and consumers. For producers, set ``interceptor.classes`` to ``io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor``. .. sourcecode:: java producerProps.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"); For consumers, set ``interceptor.classes`` to ``io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor``. .. sourcecode:: java consumerProps.put( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"); .. note:: For librdkafka-based clients, see :ref:`librdkafka `. |kstreams| ~~~~~~~~~~~~~ |kstreams| uses |ak| producers and consumers internally. You can specify the Confluent Monitoring Interceptor to be used for those internal producers and consumers. For producers, set ``producer.interceptor.classes`` to ``io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor``. For consumers, set ``consumer.interceptor.classes`` to ``io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor``. .. sourcecode:: java streamsConfig.put( StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"); streamsConfig.put( StreamsConfig.MAIN_CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"); KSQL ~~~~ KSQL, like |kstreams|, uses |ak| producers and consumers internally. You can specify the Confluent Monitoring Interceptor to be used for those internal producers and consumers. For producers, set ``producer.interceptor.classes`` to ``io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor``. For consumers, set ``consumer.interceptor.classes`` to ``io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor``. .. codewithvars:: bash producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor .. _add_ic_librdkafka: librdkafka-based clients ~~~~~~~~~~~~~~~~~~~~~~~~ For librdkafka-based clients, set the ``plugin.library.paths`` configuration property to the name of the interceptor library, ``monitoring-interceptor``. C example: .. sourcecode:: C rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_res_t r; char errstr[512]; r = rd_kafka_conf_set(conf, "bootstrap.servers", "mybroker", errstr, sizeof(errstr)); if (r != RD_KAFKA_CONF_OK) fatal("%s", errstr); r = rd_kafka_conf_set(conf, "plugin.library.paths", "monitoring-interceptor", errstr, sizeof(errstr)); if (r != RD_KAFKA_CONF_OK) fatal("%s", errstr); rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) fatal("%s", errstr); rd_kafka_destroy(rk); Python example: .. sourcecode:: Python p = confluent_kafka.Producer({'bootstrap.servers': 'mybroker', 'plugin.library.paths': 'monitoring-interceptor'}) .. note:: If the monitoring-interceptor library is installed to a non-standard location which is not covered by the systems dynamic linker path (see ``dlopen(3)`` or ``LoadLibrary`` documentation) a full or relative path needs to be configured. .. note:: The platform-specific library filename extension (e.g., ``.so`` or ``.dll``) may be omitted. |kconnect-long| ~~~~~~~~~~~~~~~ |kconnect-long| connectors use |ak| producers and consumers internally. You can specify the Confluent Monitoring Interceptor to be used for those internal producers and consumers. * Source connector: configure the Confluent Monitoring Interceptors with the ``producer`` prefix. .. codewithvars:: bash producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor * Sink connector: configure the Confluent Monitoring Interceptors with the ``consumer`` prefix. .. codewithvars:: bash consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor .. tip:: To override the monitoring interceptors bootstrap servers for metrics producers and consumers (such as for a different listener), specify the ``producer.<*>`` or ``consumer.<*>`` prefixes and any required security configurations, as shown in :ref:`kafka_ssl_authentication` and :ref:`kafka_ssl_encryption`. For example, for the metrics producer: ``producer.confluent.monitoring.interceptor.bootstrap.servers`` and ``producer.confluent.monitoring.interceptor.security.protocol``. |crep-full| ~~~~~~~~~~~~~~~~~~~~ Replicator uses a |ak| consumer internally. You can specify the Confluent Monitoring Interceptor to be used for that internal consumer. Modify the Replicator JSON configuration file. Here is an example subset of configuration to add. .. codewithvars:: bash { "name":"replicator", "config":{ .... "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor", .... } } } .. tip:: To override the monitoring interceptor bootstrap servers for |crep|, specify the prefix for |crep| as the consumer in the configuration. For example: ``"src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9091"`` To learn more, see :ref:`authentication-ssl-kafka-connect-monitoring` and :ref:`authentication-ssl-replicator-monitoring` in :ref:`authentication-ssl-interceptors`, :ref:`encryption-ssl-interceptors` in :ref:`kafka_ssl_encryption`, :ref:`replicator_monitoring`, and :ref:`rep-quickstart-monitoring` in the :ref:`Replicator quick start tutorial `. |crest-long| ~~~~~~~~~~~~~~~~~~~~ |crest| uses Kafka consumers and producers internally. You can instrument these with the Confluent Monitoring Interceptors for these internal clients by modifying the startup configuration properties file (``/etc/kafka-rest/kafka-rest.properties``). You also need to include the interceptor JAR (located under ``share/java/monitoring-interceptors``) on the CLASSPATH. Here is an example subset of the configuration to add. .. sourcecode:: bash producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor |crest| Interceptor class configurations ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: /kafka-rest/includes/rest-proxy-interceptors-configs.rst .. _interceptor_configuration_options: Configuring Interceptors ------------------------ Defaults ~~~~~~~~ By default, Confluent Monitoring Interceptors will send and receive messages using the same |ak| cluster that you are monitoring, and will use a set of default topics to share information. The interceptors will also report data at a regular interval of 15 seconds by default. General Options ~~~~~~~~~~~~~~~ Here are some recommended configuration parameters to use for the Confluent Monitoring Interceptors, although the defaults can be used for many applications. ``confluent.monitoring.interceptor.bootstrap.servers`` List of |ak| brokers in a cluster to which monitoring data will be written. (Default is ``localhost:9092``.) .. note:: You can change *any* |ak| producer configuration option for the interceptor by prefixing it with ``confluent.monitoring.interceptor.`` (including the ``.`` on the end). For example, you can change the value of ``timeout.ms`` for the interceptor using the property ``confluent.monitoring.interceptor.timeout.ms``. For more information on |ak| producer options, see the :ref:`cp-config-producer`. There are also some configuration parameters that are only used by the Confluent Monitoring Interceptor: ``confluent.monitoring.interceptor.topic`` Topic on which monitoring data will be written. (Default is ``_confluent-monitoring``.) ``confluent.monitoring.interceptor.publishMs`` Period the interceptor should use to publish messages to. (Default is 15 seconds.) ``confluent.monitoring.interceptor.client.id`` A logical client name to be included in |c3| monitoring data. If not specified, client id of an intercepted client with ``confluent.monitoring.interceptor`` is used. Security ~~~~~~~~ When configuring Monitoring Interceptor for a secure cluster, the embedded producer (that sends monitoring data to the broker) in Monitoring Interceptor must have the correct :ref:`security configurations ` prefixed with ``confluent.monitoring.interceptor.`` For more information on how to enable security for Confluent Monitoring Interceptors: * :ref:`SSL Encryption ` * :ref:`SSL Authentication ` * :ref:`SASL/PLAIN Authentication ` * :ref:`SASL/SCRAM Authentication ` * :ref:`SASL/GSSAPI Authentication ` For librdkafka-based clients, while the ``confluent.monitoring.interceptor.`` prefix is the same, the actual client configuration properties may vary depending on the underlying client. They may differ between the Java client and librdkafka-based clients. .. _interceptor_rbac: Set up interceptors for |rbac| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To use interceptors when |rbac| is enabled, the producers and consumers require principal write access to the monitoring topic (default is ``_confluent-monitoring``) on the cluster where the monitoring data is being sent. By default, the monitoring cluster is the same as the cluster being produced to and consumed from. Follow these steps: #. Create the ``_confluent-monitoring`` topic by either starting |c3-short| or manually :ref:`creating the topic `. #. Grant the client principal the ``DeveloperWrite`` role on the ``_confluent-monitoring`` topic using the :ref:`cli`. .. code:: bash confluent iam rolebinding create \ --principal User: \ --role DeveloperWrite \ --resource Topic:_confluent-monitoring --kafka-cluster-id #. Start up the client. For more infomration on |rbac| in |cp|, see :ref:`rbac-overview` and :ref:`controlcenter_security_rbac`. Logging ~~~~~~~ Both the |ak| client and the Confluent interceptor use ``slf4j`` for logging errors and other information. To enable logging, you need to configure an slf4j binding, or you will see an error message like "Failed to load class org.slf4j.impl.StaticLoggerBinder." The simplest way to resolve this issue is to add ``slf4j-simple.jar`` to your classpath. For more details, see http://www.slf4j.org/codes.html#StaticLoggerBinder. The librdkafka-based monitoring interceptor will log critical errors to stderr. To enable interceptor-specific debugging (to stderr) set the ``confluent.monitoring.interceptor.icdebug`` configuration property to ``true``. Example Configuration ~~~~~~~~~~~~~~~~~~~~~ Below shows how to setup stream monitoring for the built-in performance testing tools that come with |ak|. The instructions assume you have a cluster setup similar to that of the :ref:`quick start guide`. #. With |c3-short| already running, open a terminal and run the following commands to start the Producer Performance Test tool with the ``MonitoringProducerInterceptor``: .. codewithvars:: bash export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-|release|.jar ./bin/kafka-producer-perf-test --topic c3test --num-records 10000000 --record-size 1000 \ --throughput 10000 --producer-props bootstrap.servers=localhost:9092 \ interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor acks=all For librdkafka-based clients, e.g. kafkacat: .. codewithvars:: bash (for i in $(seq 1 100000) ; echo $i ; sleep 0.01 ; done) | kafkacat -b localhost:9092 -P -t c3test -X acks=all \ -X plugin.library.paths=monitoring-interceptor #. In a separate terminal, start up the console consumer with the ``MonitoringConsumerInterceptor``: .. codewithvars:: bash export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-|release|.jar echo "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" > /tmp/consumer.config echo "group.id=c3_example_consumer_group" >> /tmp/consumer.config bin/kafka-console-consumer --topic c3test --bootstrap-server localhost:9092 --consumer.config /tmp/consumer.config For librdkafka-based clients, e.g. kafkacat: .. codewithvars:: bash kafkacat -b localhost:9092 -G c3_example_consumer_group -X auto.offset.reset=earliest \ -X plugin.library.paths=monitoring-interceptor \ c3test #. Open the |c3-short| UI at `http://localhost:9021/ `__ and click on **Consumers** to view the ``c3_example_consumer_group``. Click the consumer group ID to view its details. Verification ------------ To verify the interceptors are properly sending data to the ``_confluent-monitoring`` topic, start the console consumer: .. codewithvars:: bash bin/control-center-console-consumer --topic --from-beginning _confluent-monitoring You should see monitoring messages with the relevant ``clientId`` being produced onto that topic. .. note:: Make sure interceptor is working for both producers and consumers; |c3-short| currently will not display messages that have not been consumed. Caveats ------- - Confluent Monitoring Interceptors currently **do not** support consumers with ``isolation.level=read_committed`` and producers with ``transactional.id`` set. - Confluent Monitoring Interceptors will skip messages with missing or invalid timestamps. Make sure all brokers and topics are all configured with ``log.message.format.version`` and ``message.format.version`` >= ``0.10.0``