Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Confluent Monitoring Interceptors¶
To use Stream Monitoring in Control Center, install the Confluent Monitoring Interceptors with your Apache Kafka® applications and configure your applications to use the interceptors on the Kafka messages produced and consumed, that are then sent to Control Center. This guide explains how to do this.
Installing Interceptors¶
Since Kafka 0.10.0.0, Kafka Clients support pluggable interceptors to examine (and potentially modify) messages. Confluent Control Center requires clients to use Confluent Monitoring Interceptors to collect statistics on incoming and outgoing messages to provide Stream Monitoring capabilities.
To use Confluent Control Center’s Stream Monitoring feature, first you must install the Confluent Monitoring Interceptor on client machines.
Java Clients¶
Maven, Ivy, or Gradle when developing Java applications. Here is sample content for a POM file for Maven. First, specify the Confluent Maven repository:
<repositories>
...
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
...
</repositories>
Next, add dependencies for the Kafka Java clients and the Confluent Monitoring Interceptors:
<dependencies>
...
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>monitoring-interceptors</artifactId>
<version>5.1.4</version>
</dependency>
...
<dependencies>
librdkafka-based Clients¶
For librdkafka-based clients such as confluent-kafka-python, confluent-kafka-go, or confluent-kafka-dotnet, a separate monitoring interceptor plugin is used which is distributed differently depending on the platform:
- Linux (Debian and RedHat based distributions): install the
confluent-librdkafka-plugins
package from the Confluent repositories. - macOS: download the monitoring interceptor zip file
and unzip the
monitoring-interceptor.dylib
file to the same directory as your application or a directory in the system library search path, such as/usr/local/lib
. - Windows: download the monitoring interceptor zip file
and unzip the appropriate
monitoring-interceptor.dll
for your architecture to the same location as your application is installed or the directory it is run from.
Note
The monitoring interceptor plugin for librdkafka-based clients requires librdkafka version 0.11.0 or later.
Note
The monitoring interceptor plugin is a runtime dependency and is not required to build the client or application,
the plugin is directly referenced through configuration properties (plugin.library.paths
) and must be installed
on the deployment host rather than the build host.
Enabling Interceptors¶
After you have installed the Confluent Monitoring Interceptor package to your Kafka applications, configure your clients to actually use the interceptors. How you configure your clients depends on what kind of client it is.
Warning
The producer and consumer interceptor classes are different; make sure you choose the correct class for each producer and consumer.
Java Producers and Consumers¶
You can specify the Confluent Monitoring Interceptor to be used for Java producers and consumers.
For producers, set interceptor.classes
to io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
.
producerProps.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
For consumers, set interceptor.classes
to io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
.
consumerProps.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");
Note
For librdkafka-based clients, see section on librdkafka.
Kafka Streams¶
Kafka Streams uses Kafka producers and consumers internally. You can specify the Confluent Monitoring Interceptor to be used for those internal producers and consumers.
For producers, set producer.interceptor.classes
to io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
.
For consumers, set consumer.interceptor.classes
to io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
.
streamsConfig.put(
StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
streamsConfig.put(
StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");
KSQL¶
KSQL, like Kafka Streams, uses Kafka producers and consumers internally. You can specify the Confluent Monitoring Interceptor to be used for those internal producers and consumers.
For producers, set producer.interceptor.classes
to io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
.
For consumers, set consumer.interceptor.classes
to io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
.
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
librdkafka-based clients¶
For librdkafka-based clients, set the plugin.library.paths
configuration property to the name of the interceptor library, monitoring-interceptor
.
- C example:
rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_res_t r; char errstr[512]; r = rd_kafka_conf_set(conf, "bootstrap.servers", "mybroker", errstr, sizeof(errstr)); if (r != RD_KAFKA_CONF_OK) fatal("%s", errstr); r = rd_kafka_conf_set(conf, "plugin.library.paths", "monitoring-interceptor", errstr, sizeof(errstr)); if (r != RD_KAFKA_CONF_OK) fatal("%s", errstr); rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) fatal("%s", errstr); rd_kafka_destroy(rk);
- Python example:
p = confluent_kafka.Producer({'bootstrap.servers': 'mybroker', 'plugin.library.paths': 'monitoring-interceptor'})
Note
If the monitoring-interceptor library is installed to a non-standard location which is not covered
by the systems dynamic linker path (see dlopen(3)
or LoadLibrary
documentation) a full or relative
path needs to be configured.
Note
The platform-specific library filename extension (e.g., .so
or .dll
) may be omitted.
Kafka Connect¶
Kafka Connect connectors use Kafka producers and consumers internally. You can specify the Confluent Monitoring Interceptor to be used for those internal producers and consumers.
Source connector: configure the Confluent Monitoring Interceptors with the
producer
prefix.producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
Sink connector: configure the Confluent Monitoring Interceptors with the
consumer
prefix.consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
Confluent Replicator¶
Replicator uses a Kafka consumer internally. You can specify the Confluent Monitoring Interceptor to be used for that internal consumer. Modify the Replicator JSON configuration file. Here is an example subset of configuration to add.
{
"name":"replicator",
"config":{
....
"src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
....
}
}
}
Configuring Interceptors¶
Defaults¶
By default, Confluent Monitoring Interceptors will send and receive messages using the same Kafka cluster that you are monitoring, and will use a set of default topics to share information. The interceptors will also report data at a regular interval of, by default, 15 seconds.
General Options¶
Here are some recommended configuration parameters to use for the Confluent Monitoring Interceptors, although the defaults can be used for many applications.
confluent.monitoring.interceptor.bootstrap.servers
- List of Kafka brokers in a cluster to which monitoring data will be written. (Default is
localhost:9092
.)
Note
You can change any Kafka producer configuration option for the interceptor by prefixing it with confluent.monitoring.interceptor.
(including the .
on the end).
For example, you can change the value of timeout.ms
for the interceptor using the property confluent.monitoring.interceptor.timeout.ms
. For more information on Kafka producer options, see the Apache Kafka Producer documentation.
There are also some configuration parameters that are only used by the Confluent Monitoring Interceptor:
confluent.monitoring.interceptor.topic
- Topic on which monitoring data will be written. (Default is
_confluent-monitoring
.) confluent.monitoring.interceptor.publishMs
- Period the interceptor should use to publish messages to. (Default is 15 seconds.)
confluent.monitoring.interceptor.client.id
- A logical client name to be included in Confluent Control Center monitoring data. If not specified, client id of an
intercepted client with
confluent.monitoring.interceptor
is used.
Security¶
When configuring Monitoring Interceptor for a secure cluster, the embedded producer (that sends monitoring data to the broker) in
Monitoring Interceptor must have the correct security configurations prefixed with confluent.monitoring.interceptor.
Please see more information on how to enable security for Confluent Monitoring Interceptors:
- SSL Encryption
- SSL Authentication
- SASL/PLAIN Authentication
- SASL/SCRAM Authentication
- SASL/GSSAPI Authentication
For librdkafka-based clients, while the confluent.monitoring.interceptor.
prefix is the same, the actual client configuration properties
may vary depending on the underlying client. They may differ between the Java client and librdkafka-based clients.
Logging¶
Both the Kafka client and the Confluent interceptor use slf4j
for logging errors and other information.
To enable logging, you need to configure an slf4j binding, or you will see an error message like “Failed to load class
org.slf4j.impl.StaticLoggerBinder.” The simplest way to resolve this issue is to add slf4j-simple.jar
to your
classpath. For more details, see http://www.slf4j.org/codes.html#StaticLoggerBinder.
The librdkafka-based monitoring interceptor will log critical errors to stderr.
To enable interceptor-specific debugging (to stderr) set the confluent.monitoring.interceptor.icdebug
configuration property to true
.
Example Configuration¶
Below shows how to setup stream monitoring for the built-in performance testing tools that come with Kafka. The instructions assume you have a cluster setup similar to that of the quick start guide.
With Control Center already running, open a terminal and run the following commands to start the Producer Performance Test tool with the
MonitoringProducerInterceptor
export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-5.1.4.jar ./bin/kafka-producer-perf-test --topic c3test --num-records 10000000 --record-size 1000 \ --throughput 10000 --producer-props bootstrap.servers=localhost:9092 \ interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor acks=all
For librdkafka-based clients, e.g. kafkacat:
(for i in $(seq 1 100000) ; echo $i ; sleep 0.01 ; done) | kafkacat -b localhost:9092 -P -t c3test -X acks=all \ -X plugin.library.paths=monitoring-interceptor
In a separate terminal, start up the console consumer with the
MonitoringConsumerInterceptor
export CLASSPATH=./share/java/monitoring-interceptors/monitoring-interceptors-5.1.4.jar echo "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" > /tmp/consumer.config echo "group.id=c3_example_consumer_group" >> /tmp/consumer.config bin/kafka-console-consumer --topic c3test --bootstrap-server localhost:9092 --consumer.config /tmp/consumer.config
For librdkafka-based clients, e.g. kafkacat:
kafkacat -b localhost:9092 -G c3_example_consumer_group -X auto.offset.reset=earliest \ -X plugin.library.paths=monitoring-interceptor \ c3test
Open up the Control Center UI at http://localhost:9021/ and click on
Stream Monitoring
to view the stream monitoring UI for thec3_example_consumer_group
.
Verification¶
To verify the interceptors are properly sending data to the _confluent-monitoring
topic, start the console consumer:
bin/control-center-console-consumer <control-center.properties> --topic --from-beginning _confluent-monitoring
You should see monitoring messages with the relevant clientId
being produced onto that topic.
Note
Make sure interceptor is working for both producers and consumers, Control Center currently will not display messages that have not been consumed.
Caveats¶
- Confluent Monitoring Interceptors currently do not support consumers with
`isolation.level=read_committed`
and producers with`transactional.id`
set. - Confluent Monitoring Interceptors will skip messages with missing or invalid timestamps, make sure all brokers and
topics are all configured with
log.message.format.version
andmessage.format.version
>=0.10.0