.. _controlcenter_clients: Installing Control Center Interceptors ====================================== To use :ref:`Stream Monitoring `, you need to install the Confluent Monitoring Interceptors with your applications, and configure your applications to use the interceptor to send metrics to |c3-short|. This guide explains how to do this. Interceptor Installation ------------------------ With Kafka 0.10.0.0+, Kafka Clients support pluggable *interceptors* to examine (and potentially modify) messages. |c3| uses an interceptor that collects statistics on incoming and outgoing messages to provide Stream Monitoring. To use |c3|'s :ref:`Stream Monitoring ` feature, first you need to install the Confluent Metric Interceptor on client machines. Detailed instructions for getting |CP| clients are provided at :ref:`controlcenter_install`. We recommend using a build tool like Maven, Ivy, or Gradle when developing Java applications. Here is some sample content for a POM file for maven. First, you need to specify the :ref:`Confluent maven repository `: .. sourcecode:: xml ... confluent http://packages.confluent.io/maven/ ... Next, you can add dependencies. To monitor a producer or consumer with |c3-short|, you need to include the dependencies for the Apache Kafka Java clients and the Confluent Monitoring Interceptor: .. sourcecode:: xml ... org.apache.kafka kafka-clients 0.11.0.1-cp1 io.confluent monitoring-interceptors 3.3.1 ... Compatibility ^^^^^^^^^^^^^ 1. Interceptors currently **do not** support consumers with ```isolation.level=read_committed``` and producers with ```transactional.id``` set. 2. Monitoring Interceptor will skip messages with missing or invalid timestamps, make sure all brokers and topics are all configured with ``log.message.format.version`` and ``message.format.version`` >= ``0.10.0`` Interceptor installation for librdkafka-based clients ----------------------------------------------------- For librdkafka-based clients, such as confluent-kafka-python, confluent-kafka-go or confluent-kafka-dotnet, a separate monitoring interceptor plugin is used which is distributed differently depending on platform: * Linux (Debian and RedHat based distributions): install the ``confluent-librdkafka-plugins`` package from the `Confluent repositories `_. * Mac OS X: download the `monitoring interceptor zip file `_ and unzip the ``monitoring-interceptor.dylib`` file to the same directory as your application or a directory in the system library search path, such as ``/usr/local/lib``. * Windows: download the `monitoring interceptor zip file `_ and unzip the appropriate ``monitoring-interceptor.dll`` for your architecture to the same location as your application is installed or the directory it is run from. .. note:: The monitoring interceptor plugin for librdkafka-based clients requires librdkafka version 0.11.0 or later. .. note:: The monitoring interceptor plugin is a runtime dependency and is not required to build the client or application, the plugin is directly referenced through configuration properties (``plugin.library.paths``) and must be installed on the deployment host rather than the build host. Client Configuration -------------------- After you have configured your applications to include the Confluent Monitoring Interceptor, you must configure your clients to use the interceptor. Adding the interceptor to your Kafka Producer ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ If you are not using any interceptors currently, you will need to add a new item to the Java ``Properties`` object that you use to create a new Producer. Specifically, you need to set ``interceptor.classes`` to ``io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor``. ' If you are already using an interceptor, then you need to add an additional item to ``interceptor.classes``. .. note:: For librdkafka-based clients, see :ref:`add_ic_librdkafka`. Adding the interceptor to your Kafka Consumer ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ If you are not using any interceptors currently, you will need to add a new item to the Java ``Properties`` object that you use to create a new Consumer. Specifically, you need to set ``interceptor.classes`` to ``io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor``. ' If you are already using an interceptor, then you need to add an additional item to ``interceptor.classes``. .. warning:: The producer and consumer interceptor classes are different; make sure you choose the correct class for each producer and consumer. .. note:: For librdkafka-based clients, see :ref:`add_ic_librdkafka`. Adding interceptors to your Kafka Streams application ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For Kafka producers and consumers used internally, you can specify the Confluent Monitoring Interceptor to be used. For producers, you need to set ``producer.interceptor.classes`` to ``io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor``. For consumers, you will need to set ``consumer.interceptor.classes`` to ``io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor``. .. note:: Instead of hardcoding the config names in code, you can use prefixes defined in the corresponding classes like so: .. sourcecode:: java streamsConfig.put( StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"); .. _add_ic_librdkafka: Adding the interceptor to librdkafka-based client applications ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For librdkafka-based clients, set the ``plugin.library.paths`` configuration property to the name of the interceptor library, ``monitoring-interceptor``. C example: .. sourcecode:: C rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_res_t r; char errstr[512]; r = rd_kafka_conf_set(conf, "bootstrap.servers", "mybroker", errstr, sizeof(errstr)); if (r != RD_KAFKA_CONF_OK) fatal("%s", errstr); r = rd_kafka_conf_set(conf, "plugin.library.paths", "monitoring-interceptor", errstr, sizeof(errstr)); if (r != RD_KAFKA_CONF_OK) fatal("%s", errstr); rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) fatal("%s", errstr); rd_kafka_destroy(rk); Python example: .. sourcecode:: Python p = confluent_kafka.Producer({'bootstrap.servers': 'mybroker', 'plugin.library.paths': 'monitoring-interceptor'}) .. note:: If the monitoring-interceptor library is installed to a non-standard location which is not covered by the systems dynamic linker path (see ``dlopen(3)`` or ``LoadLibrary`` documentation) a full or relative path needs to be configured. .. note:: The platform-specific library filename extension (e.g., ``.so`` or ``.dll``) may be omitted. .. _controlcenter_client_interceptor_config: Interceptor Configuration ------------------------- We provide some configuration parameters that allow you to configure the |c3| interceptors. By default, interceptors will send and receive messages using the same Kafka cluster that you are monitoring, and will use a set of default topics to share information. The interceptors will also report data at a regular interval of, by default, 15 seconds. Configuration options ^^^^^^^^^^^^^^^^^^^^^ We allow you to change the behavior of the interceptors. (In most applications, we recommend keeping the default.) .. important:: You can change *any* Kafka producer configuration option for the interceptor by prefixing it with ``confluent.monitoring.interceptor.`` (including the ``.`` on the end). For example, you can change the value of ``timeout.ms`` for the interceptor using the property ``confluent.monitoring.interceptor.timeout.ms``. For more information on Kafka producer options, see the `Apache Kafka Producer documentation `_. There are also some configuration parameters that are only used by the interceptor: ``confluent.monitoring.interceptor.topic`` Topic on which monitoring data will be written. (Default is ``_confluent-monitoring``.) ``confluent.monitoring.interceptor.publishMs`` Period the interceptor should use to publish messages to. (Default is 15 seconds.) ``confluent.monitoring.interceptor.client.id`` A logical client name to be included in |c3| monitoring data. If not specified, client id of an intercepted client with ``confluent.monitoring.interceptor`` is used. Security ^^^^^^^^ When configuring Monitoring Interceptor for a secure cluster, the embedded producer (that sends monitoring data to the broker) in Monitoring Interceptor needs to have the correct :ref:`security configurations ` prefixed with ``confluent.monitoring.interceptor.`` **Example Configs:** .. sourcecode:: bash confluent.monitoring.interceptor.security.protocol=SSL confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.monitoring.interceptor.ssl.truststore.password=test1234 confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks confluent.monitoring.interceptor.ssl.keystore.password=test1234 confluent.monitoring.interceptor.ssl.key.password=test1234 **librdkafka-based client configuration example:** .. sourcecode:: bash confluent.monitoring.interceptor.security.protocol=SSL confluent.monitoring.interceptor.ssl.ca.location=/var/private/ssl/local-ca.cert confluent.monitoring.interceptor.ssl.certificate.location=/var/private/ssl/kafka.client.pem confluent.monitoring.interceptor.ssl.key.location=/var/private/ssl/kafka.client.key confluent.monitoring.interceptor.ssl.key.password=test1234 While the ``confluent.monitoring.interceptor.`` prefix is the same, the actual client configuration properties depend on the underlying client and may differ between the Java client and librdkafka-based clients. .. _interceptor_configuration_options: Logging ^^^^^^^ Both the Apache Kafka client and the Confluent interceptor use ``slf4j`` for logging errors and other information. To enable logging, you need to configure an slf4j binding, or you will see an error message like "Failed to load class org.slf4j.impl.StaticLoggerBinder." The simplest way to resolve this issue is to add ``slf4j-simple.jar`` to your classpath. For more details, see http://www.slf4j.org/codes.html#StaticLoggerBinder. The librdkafka-based monitoring interceptor will log critical errors to stderr. To enable interceptor-specific debugging (to stderr) set the ``confluent.monitoring.interceptor.icdebug`` configuration property to ``true``. Example Configuration ^^^^^^^^^^^^^^^^^^^^^ Below shows how to setup stream monitoring for the built-in performance testing tools that come with Kafka. The instructions assume you have a cluster setup similar to that of the :ref:`Quickstart guide`. 1. With |c3-short| already running, open a terminal and run the following commands to start the Producer Performance Test tool with the ``MonitoringProducerInterceptor`` .. sourcecode:: bash $ export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-3.3.1.jar $ ./bin/kafka-producer-perf-test --topic c3test --num-records 10000000 --record-size 1000 \ --throughput 10000 --producer-props bootstrap.servers=localhost:9092 \ interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor acks=all For librdkafka-based clients, e.g. kafkacat: .. sourcecode:: bash $ (for i in $(seq 1 100000) ; echo $i ; sleep 0.01 ; done) | kafkacat -b localhost:9092 -P -t c3test -X acks=all \ -X plugin.library.paths=monitoring-interceptor 2. In a separate terminal, start up the console consumer with the ``MonitoringConsumerInterceptor`` .. sourcecode:: bash $ export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-3.3.1.jar $ echo "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" > /tmp/consumer.config $ echo "group.id=c3_example_consumer_group" >> /tmp/consumer.config $ bin/kafka-console-consumer --topic c3test --new-consumer --bootstrap-server localhost:9092 --consumer.config /tmp/consumer.config For librdkafka-based clients, e.g. kafkacat: .. sourcecode:: bash $ kafkacat -b localhost:9092 -G c3_example_consumer_group -X auto.offset.reset=earliest \ -X plugin.library.paths=monitoring-interceptor \ c3test 3. Open up the |c3-short| UI at `http://localhost:9021/ `__ and click on ``Stream Monitoring`` to view the stream monitoring UI for the ``c3_example_consumer_group``. Verification ------------ To verify the interceptors are properly sending data to the ``_confluent-monitoring`` topic, start the console consumer: .. sourcecode:: bash bin/control-center-console-consumer --topic --from-beginning _confluent-monitoring You should see monitoring messages with the relevant ``clientId`` being produced onto that topic. .. note:: Make sure interceptor is working for both producers and consumers, |c3-short| currently will not display messages that have not been consumed.