Kafka Event Streaming Application¶
This demo shows users how to deploy a Kafka event streaming application using KSQL and Kafka Streams for stream processing. All the components in the Confluent platform have security enabled end-to-end.
Overview¶
The use case is a Kafka event streaming application for real-time edits to real Wikipedia
pages. Wikimedia Foundation has IRC channels that publish edits
happening to real wiki pages (e.g. #en.wikipedia
, #en.wiktionary
) in
real time. Using Kafka
Connect, a
Kafka source connector
kafka-connect-irc
streams raw messages from these IRC channels, and a custom Kafka Connect
transform
kafka-connect-transform-wikiedit
transforms these messages and then the messages are written to a Kafka
cluster. This demo uses KSQL
and Kafka Streams
for data processing. Then a Kafka sink connector
kafka-connect-elasticsearch
streams the data out of Kafka, applying another custom Kafka Connect
transform called NullFilter. The data is materialized into
Elasticsearch for
analysis by Kibana.
Use Confluent Control Center for management and monitoring.

Note
This is a Docker environment and has all services running on one host. Do not use this demo in production. It is meant exclusively to easily demo the Confluent Platform. In production, Confluent Control Center should be deployed with a valid license and with its own dedicated metrics cluster, separate from the cluster with production traffic. Using a dedicated metrics cluster is more resilient because it continues to provide system health monitoring even if the production traffic cluster experiences issues.
Run demo¶
Demo validated with:
- Docker version 17.06.1-ce
- Docker Compose version 1.14.0 with Docker Compose file format 2.2
- Java version 1.8.0_92
- MacOS 10.14.3 (note for Ubuntu environments)
- git
- jq
Note
If you prefer a non-Docker version and have Elasticsearch and Kibana running on your local machine, please follow these instructions.
Clone the cp-demo GitHub repository:
git clone https://github.com/confluentinc/cp-demo
In Docker’s advanced settings, increase the memory dedicated to Docker to at least 8GB (default is 2GB).
From the
cp-demo
directory, start the entire demo by running a single command that generates the keys and certificates, brings up the Docker containers, and configures and validates the environment. This will take less than 5 minutes to complete../scripts/start.sh
Use Google Chrome to view the Confluent Control Center GUI at http://localhost:9021. Click on the top right button that shows the current date, and change
Last 4 hours
toLast 30 minutes
.View the data in the Kibana dashboard at http://localhost:5601/app/kibana#/dashboard/Wikipedia
Playbook¶
Brokers¶
Select the cluster named “Kafka Raleigh”.
Click on “Brokers”.
View the status of the Brokers in the cluster, including:
- Production and Consumption metrics
- Broker uptime
- Partitions: online, under replicated, total replicas, out of sync replicas
- Disk utilization
- System: network pool usage, request pool usage
Topics¶
Confluent Control Center has a useful interface to manage topics in a Kafka cluster. Click on “Topics”.
Scroll down and click on the topic
wikipedia.parsed
.View an overview of this topic:
- Throughput
- Partition replication status
View which brokers are leaders for which partitions and where all partitions reside.
Inspect messages for this topic, in real-time.
Return to “All Topics”, click on
wikipedia.parsed.count-by-channel
to view the Kafka Streams application output topic.View the schema for this topic. For wikipedia.parsed, the topic value is using a Schema registered with Schema Registry (the topic key is just a string).
View configuration settings for this topic.
Return to the
All topics
view and click the + Add a topic button on the top right to create a new topic in your Kafka cluster. You can also view and edit settings of Kafka topics in the cluster. Read more on Confluent Control Center topic management.Dataflow: you can derive which producers are writing to which topics and which consumers are reading from which topics. When Confluent Monitoring Interceptors are configured on Kafka clients, they write metadata to a topic named
_confluent-monitoring
. Kafka clients include any application that uses the Apache Kafka client API to connect to Kafka brokers, such as custom client code or any service that has embedded producers or consumers, such as Kafka Connect, KSQL, or a Kafka Streams application. Confluent Control Center uses that topic to ensure that all messages are delivered and to provide statistics on throughput and latency performance. From that same topic, you can also derive which producers are writing to which topics and which consumers are reading from which topics, and an example script is provided with the repo (note: this is for demo purposes only, not suitable for production). The command is:./scripts/app/map_topics_clients.py
Your output should resemble:
Reading topic _confluent-monitoring for 60 seconds...please wait EN_WIKIPEDIA_GT_1 producers _confluent-ksql-default_query_CTAS_EN_WIKIPEDIA_GT_1_2-84e85189-4f37-460c-991f-bb7bbb4b5a58-StreamThread-12-producer _confluent-ksql-default_query_CTAS_EN_WIKIPEDIA_GT_1_2-84e85189-4f37-460c-991f-bb7bbb4b5a58-StreamThread-9-producer consumers _confluent-ksql-default_query_CSAS_EN_WIKIPEDIA_GT_1_COUNTS_3 EN_WIKIPEDIA_GT_1_COUNTS producers _confluent-ksql-default_query_CSAS_EN_WIKIPEDIA_GT_1_COUNTS_3-df19ff7e-4d42-4b40-8133-a3632c86e42d-StreamThread-13-producer _confluent-ksql-default_query_CSAS_EN_WIKIPEDIA_GT_1_COUNTS_3-df19ff7e-4d42-4b40-8133-a3632c86e42d-StreamThread-14-producer consumers EN_WIKIPEDIA_GT_1_COUNTS-consumer WIKIPEDIABOT producers _confluent-ksql-default_query_CSAS_WIKIPEDIABOT_1-7d47ae21-e734-43da-9782-bae3191fc85a-StreamThread-7-producer _confluent-ksql-default_query_CSAS_WIKIPEDIABOT_1-7d47ae21-e734-43da-9782-bae3191fc85a-StreamThread-8-producer consumers connect-elasticsearch-ksql WIKIPEDIANOBOT producers _confluent-ksql-default_query_CSAS_WIKIPEDIANOBOT_0-6f29b3fb-abf8-4c3e-bb8d-266cb5aa65c6-StreamThread-2-producer _confluent-ksql-default_query_CSAS_WIKIPEDIANOBOT_0-6f29b3fb-abf8-4c3e-bb8d-266cb5aa65c6-StreamThread-3-producer consumers WIKIPEDIANOBOT-consumer _confluent-ksql-default_query_CTAS_EN_WIKIPEDIA_GT_1_2-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog producers _confluent-ksql-default_query_CTAS_EN_WIKIPEDIA_GT_1_2-84e85189-4f37-460c-991f-bb7bbb4b5a58-StreamThread-12-producer _confluent-ksql-default_query_CTAS_EN_WIKIPEDIA_GT_1_2-84e85189-4f37-460c-991f-bb7bbb4b5a58-StreamThread-9-producer _confluent-ksql-default_query_CTAS_EN_WIKIPEDIA_GT_1_2-KSTREAM-AGGREGATE-STATE-STORE-0000000007-repartition producers _confluent-ksql-default_query_CTAS_EN_WIKIPEDIA_GT_1_2-84e85189-4f37-460c-991f-bb7bbb4b5a58-StreamThread-11-producer consumers _confluent-ksql-default_query_CTAS_EN_WIKIPEDIA_GT_1_2 wikipedia.parsed producers connect-worker-producer consumers _confluent-ksql-default_query_CSAS_WIKIPEDIABOT_1 _confluent-ksql-default_query_CSAS_WIKIPEDIANOBOT_0 _confluent-ksql-default_query_CTAS_EN_WIKIPEDIA_GT_1_2 connect-replicator wikipedia.parsed.replica producers connect-worker-producer
Connect¶
Confluent Control Center uses the Kafka Connect API to manage multiple connect clusters. Click on “Connect”.
Select
connect-default
, the name of the cluster of Connect workers.Verify the connectors running in this demo:
- source connector
wikipedia-irc
- source connector
replicate-topic
- sink connector
elasticsearch-ksql
consuming from the Kafka topicWIKIPEDIABOT
- source connector
Click any connector name to view or modify any details of the connector configuration and custom transforms.
KSQL¶
In this demo, KSQL is authenticated and authorized to connect to the secured Kafka cluster, and it is already running queries as defined in the KSQL command file.
In the navigation bar, click KSQL.
From the list of KSQL applications, select
KSQL
.Alternatively, run KSQL CLI to get to the KSQL CLI prompt.
docker-compose exec ksql-cli ksql http://ksql-server:8088
View the existing KSQL streams. (If you are using the KSQL CLI, at the
ksql>
prompt typeSHOW STREAMS;
).Describe the schema (fields or columns) and source and sink of an existing KSQL stream. Click on
WIKIPEDIA
.View the existing KSQL tables. (If you are using the KSQL CLI, at the
ksql>
prompt typeSHOW TABLES;
).View the existing KSQL queries, which are continuously running. (If you are using the KSQL CLI, at the
ksql>
prompt typeSHOW QUERIES;
).View messages from different KSQL streams and tables. Click on your stream of choice and select Query to open the Query Editor. The editor shows a pre-populated query, like
select * from WIKIPEDIA;
, and it shows results for newly arriving data.Click KSQL Editor and run the
SHOW PROPERTIES;
statement. You can see the configured KSQL server properties and check these values with the docker-compose.yml file.This demo creates two streams
EN_WIKIPEDIA_GT_1
andEN_WIKIPEDIA_GT_1_COUNTS
, and the reason is to demonstrate how KSQL windows work.EN_WIKIPEDIA_GT_1
counts occurences with a tumbling window, and for a given key it writes a null into the table on the first seen message. The underlying Kafka topic forEN_WIKIPEDIA_GT_1
does not filter out those nulls, but since we want to send downstream just the counts greater than one, there is a separate Kafka topic for``EN_WIKIPEDIA_GT_1_COUNTS
which does filter out those nulls (e.g., the query has a clausewhere ROWTIME is not null
). From the bash prompt, view those underlying Kafka topics.
View messages in EN_WIKIPEDIA_GT_1
:
docker exec connect kafka-avro-console-consumer --bootstrap-server kafka1:9091 --topic EN_WIKIPEDIA_GT_1 \
--property schema.registry.url=https://schemaregistry:8085 \
--consumer.config /etc/kafka/secrets/client_without_interceptors.config --max-messages 10
Your output should resemble:
null
{"USERNAME":"Atsme","WIKIPAGE":"Wikipedia:Articles for deletion/Metallurg Bratsk","COUNT":2}
null
null
null
{"USERNAME":"7.61.29.178","WIKIPAGE":"Tandem language learning","COUNT":2}
{"USERNAME":"Attar-Aram syria","WIKIPAGE":"Antiochus X Eusebes","COUNT":2}
...
View messages in EN_WIKIPEDIA_GT_1_COUNTS
:
docker exec connect kafka-avro-console-consumer --bootstrap-server kafka1:9091 --topic EN_WIKIPEDIA_GT_1_COUNTS \
--property schema.registry.url=https://schemaregistry:8085 \
--consumer.config /etc/kafka/secrets/client_without_interceptors.config --max-messages 10
Your output should resemble:
{"USERNAME":"Atsme","COUNT":2,"WIKIPAGE":"Wikipedia:Articles for deletion/Metallurg Bratsk"}
{"USERNAME":"7.61.29.178","COUNT":2,"WIKIPAGE":"Tandem language learning"}
{"USERNAME":"Attar-Aram syria","COUNT":2,"WIKIPAGE":"Antiochus X Eusebes"}
{"USERNAME":"RonaldB","COUNT":2,"WIKIPAGE":"Wikipedia:Open proxy detection"}
{"USERNAME":"Dormskirk","COUNT":2,"WIKIPAGE":"Swindon Designer Outlet"}
{"USERNAME":"B.Bhargava Teja","COUNT":3,"WIKIPAGE":"Niluvu Dopidi"}
...
Consumers¶
Confluent Control Center enables you to monitor consumer lag and throughput performance. Consumer lag is the topic’s high water mark (latest offset for the topic that has been written) minus the current consumer offset (latest offset read for that topic by that consumer group). Keep in mind the topic’s write rate and consumer group’s read rate when you consider the significance the consumer lag’s size. Click on “Consumers”.
Consumer lag is available on a per-consumer basis, including embedded consumers in sink connectors (e.g.,
connect-replicator
andconnect-elasticsearch-ksql
), KSQL queries (e.g., consumer groups whose names start with_confluent-ksql-default_query_
), console consumers (e.g.,WIKIPEDIANOBOT-consumer
), etc. Consumer lag is also available on a per-topic basis.View consumer lag for the persistent KSQL “Create Stream As Select” query
CSAS_WIKIPEDIABOT
, which is displayed as_confluent-ksql-default_query_CSAS_WIKIPEDIABOT_0
in the consumer group list.View consumer lag for the Kafka Streams application under the consumer group id
wikipedia-activity-monitor
.With Confluent Monitoring Interceptors, you may also view additional metrics related to production and consumption of messages, including:
- Throughput
- Failed consume requests
- Percent messages consumed
- End to end latency
View consumption metrics for the persistent KSQL “Create Stream As Select” query
CSAS_WIKIPEDIABOT
, which is displayed as_confluent-ksql-default_query_CSAS_WIKIPEDIABOT_0
in the consumer group list.Confluent Control Center shows which consumers in a consumer group are consuming from which partitions and on which brokers those partitions reside. Confluent Control Center updates as consumer rebalances occur in a consumer group. Start consuming from topic
wikipedia.parsed
with a new consumer groupapp
with one consumerconsumer_app_1
. It runs in the background../scripts/app/start_consumer_app.sh 1
Let this consumer group run for 2 minutes until Confluent Control Center shows the consumer group
app
with steady consumption. This consumer groupapp
has a single consumerconsumer_app_1
consuming all of the partitions in the topicwikipedia.parsed
.Add a second consumer
consumer_app_2
to the existing consumer groupapp
../scripts/app/start_consumer_app.sh 2
Let this consumer group run for 2 minutes until Confluent Control Center shows the consumer group
app
with steady consumption. Notice that the consumersconsumer_app_1
andconsumer_app_2
now share consumption of the partitions in the topicwikipedia.parsed
.Click “System health” and then a line in “Request latency”.
This shows a breakdown of produce latencies (fetch latencies also available) through the entire request lifecycle.
Data Streams: Over Consumption¶
Streams monitoring in Control Center can highlight consumers that are
over consuming some messages, which is an indication that consumers are
processing a set of messages more than once. This may happen
intentionally, for example an application with a software bug consumed
and processed Kafka messages incorrectly, got a fix, and then
reprocesses previous messages correctly. This may also happen
unintentionally if an application crashes before committing processed
messages. To simulate over consumption, we will use Kafka’s consumer
offset reset tool to set the offset of the consumer group app
to an
earlier offset, thereby forcing the consumer group to reconsume messages
it has previously read.
Note
Data Streams view is enabled by setting confluent.controlcenter.deprecated.views.enable=true
Click on
Data streams
, andView Details
for the consumer groupapp
.Scroll down to verify there are two consumers
consumer_app_1
andconsumer_app_2
that were created in an earlier section. If these two consumers are not running and were never started, start them as described in the section Consumers. Let this consumer group run for two minutes, until Confluent Control Center stream monitoring shows the consumer groupapp
with steady consumption.Stop the consumer group
app
to stop consuming from topicwikipedia.parsed
. Note that the command below stops the consumers gracefully withkill -15
, so the consumers follow the shutdown sequence../scripts/app/stop_consumer_app_group_graceful.sh
Wait for 2 minutes to let messages continue to be written to the topics for a while, without being consumed by the consumer group
app
. Notice the red bar which highlights that during the time window when the consumer group was stopped, there were some messages produced but not consumed. These messages are not missing, they are just not consumed because the consumer group stopped.Reset the offset of the consumer group
app
by shifting 200 offsets backwards. The offset reset tool must be run when the consumer is completely stopped. Offset values in output shown below will vary.docker-compose exec kafka1 kafka-consumer-groups \ --reset-offsets --group app --shift-by -200 --bootstrap-server kafka1:10091 \ --all-topics --execute
Your output should resemble:
TOPIC PARTITION NEW-OFFSET wikipedia.parsed 1 4071 wikipedia.parsed 0 7944
Restart consuming from topic
wikipedia.parsed
with the consumer groupapp
with two consumers../scripts/app/start_consumer_app.sh 1 ./scripts/app/start_consumer_app.sh 2
Let this consumer group run for 2 minutes until Control Center stream monitoring shows the consumer group
app
with steady consumption. Notice several things:- Even though the consumer group
app
was not running for some of this time, all messages are shown as delivered. This is because all bars are time windows relative to produce timestamp. - For some time intervals, the the bars are red and consumption line is above expected consumption because some messages were consumed twice due to rewinding offsets.
- The latency peaks and then gradually decreases, because this is also relative to the produce timestamp.
- Even though the consumer group
Data Streams: Under Consumption¶
Streams monitoring in Control Center can highlight consumers that are
under consuming some messages. This may happen intentionally when
consumers stop and restart and operators change the consumer offsets to
the latest offset. This avoids delay processing messages that were
produced while the consumers were stopped, especially when they care
about real-time. This may also happen unintentionally if a consumer is
offline for longer than the log retention period, or if a producer is
configured for acks=0
and a broker suddenly fails before having a
chance to replicate data to other brokers. To simulate under
consumption, we will use Kafka’s consumer offset reset tool to set the
offset of the consumer group app
to the latest offset, thereby
skipping messages that will never be read.
Note
Data Streams view is enabled by setting confluent.controlcenter.deprecated.views.enable=true
Click on Data streams, and View Details for the consumer group
app
.Scroll down to verify there are two consumers
consumer_app_1
andconsumer_app_2
that were created in an earlier section. If these two consumers are not running and were never started, start them as described in the section Consumers. Let this consumer group run for two minutes, until Confluent Control Center stream monitoring shows the consumer groupapp
with steady consumption.Stop the consumer group
app
to stop consuming from topicwikipedia.parsed
. Note that the command below stops the consumers ungracefully withkill -9
, so the consumers did not follow the shutdown sequence../scripts/app/stop_consumer_app_group_ungraceful.sh
Wait for 2 minutes to let messages continue to be written to the topics for a while, without being consumed by the consumer group
app
. Notice the red bar which highlights that during the time window when the consumer group was stopped, there were some messages produced but not consumed. These messages are not missing, they are just not consumed because the consumer group stopped.Reset the offset of the consumer group
app
by setting it to latest offset. The offset reset tool must be run when the consumer is completely stopped. Offset values in output shown below will vary.docker-compose exec kafka1 kafka-consumer-groups \ --reset-offsets --group app --to-latest --bootstrap-server kafka1:10091 \ --all-topics --execute
Your output should resemble:
TOPIC PARTITION NEW-OFFSET wikipedia.parsed 1 8601 wikipedia.parsed 0 15135
Restart consuming from topic
wikipedia.parsed
with the consumer groupapp
with two consumers../scripts/app/start_consumer_app.sh 1 ./scripts/app/start_consumer_app.sh 2
Let this consumer group run for two minutes, until Confluent Control Center stream monitoring shows the consumer group
app
with steady consumption. Notice that during the time period that the consumer groupapp
was not running, no produced messages are shown as delivered. The light blue indicates that perhaps the consumer group stopped ungracefully.Return to the Data Streams view, find the
wikipedia-activity-monitor
, click onView Details
and thenTopic partitions
. From this view you can see the consumption status of the various topic and partitions for the Kafka Streams Application.
Failed broker¶
To simulate a failed broker, stop the Docker container running one of the two Kafka brokers.
Stop the Docker container running Kafka broker 2.
docker-compose stop kafka2
After a few minutes, observe the Broker summary show that the number of brokers has decreased from 2 to 1, and there are many under replicated partitions.
View Topic information details to see that there are out of sync replicas on broker 2.
Restart the Docker container running Kafka broker 2.
docker-compose start kafka2
After about a minute, observe the Broker summary in Confluent Control Center. The broker count has recovered to 2, and the topic partitions are back to reporting no under replicated partitions.
Click on the broker count
2
inside the “Broker uptime” box to view when broker counts changed.
Alerting¶
There are many types of Control Center alerts and many ways to configure them. Use the Alerts management page to define triggers and actions, or click on individual resources to setup alerts from there.

This demo already has pre-configured triggers and actions. View the Alerts
Triggers
screen, and clickEdit
against each trigger to see configuration details.- The trigger
Under Replicated Partitions
happens when a broker reports non-zero under replicated partitions, and it causes an actionEmail Administrator
. - The trigger
Consumption Difference
happens when consumption difference for the Elasticsearch connector consumer group is greater than0
, and it causes an actionEmail Administrator
.
- The trigger
If you followed the steps in the failed broker section, view the Alert history to see that the trigger
Under Replicated Partitions
happened and caused an alert when you stopped broker 2.You can also trigger the
Consumption Difference
trigger. In the Kafka Connect -> Sinks screen, edit the running Elasticsearch sink connector.In the Connect view, pause the Elasticsearch sink connector in Settings by pressing the pause icon in the top right. This will stop consumption for the related consumer group.
View the Alert history to see that this trigger happened and caused an alert.
Replicator¶
Confluent Replicator copies data from a source Kafka cluster to a destination Kafka cluster. The source and destination clusters are typically different clusters, but in this demo, Replicator is doing intra-cluster replication, i.e., the source and destination Kafka clusters are the same. As with the rest of the components in the solution, Confluent Replicator is also configured with security.
Consumers: monitor throughput and latency of Confluent Replicator. Replicator is a Kafka Connect source connector and has a corresponding consumer group
connect-replicator
.View Replicator Consumer Lag.
View Replicator Consumption metrics.
Topics: scroll down to view the topics called
wikipedia.parsed
(Replicator is consuming data from this topic) andwikipedia.parsed.replica
(Replicator automatically created this topic and is copying data to it). Click onConsumer Groups
for the topicwikipedia.parsed
and observe that one of the consumer groups is calledconnect-replicator
.MANAGEMENT –> Kafka Connect: pause the Replicator connector in Settings by pressing the pause icon in the top right. This will stop consumption for the related consumer group.
Observe that the
connect-replicator
consumer group has stopped consumption.
- Restart the Replicator connector.
- Observe that the
connect-replicator
consumer group has resumed consumption. Notice several things:- Even though the consumer group connect-replicator was not running for some of this time, all messages are shown as delivered. This is because all bars are time windows relative to produce timestamp.
- The latency peaks and then gradually decreases, because this is also relative to the produce timestamp.
Security¶
Follow along with the Security video.
All the components in this demo are enabled with many security features:
- SSL for encryption, except for ZooKeeper which does not support SSL
- SASL/PLAIN for authentication, except for ZooKeeper which is configured for SASL/DIGEST-MD5
- Authorization. If a resource has no associated ACLs, then users are not allowed to access the resource, except super users
- HTTPS for Control Center
- HTTPS for Schema Registry
- HTTPS for Connect
Note
This demo showcases a secure Confluent Platform for educational purposes and is not meant to be complete best practices. There are certain differences between what is shown in the demo and what you should do in production:
- Each component should have its own username, instead of authenticating all users as
client
- Authorize users only for operations that they need, instead of making all of them super users
- If the
PLAINTEXT
security protocol is used, theseANONYMOUS
usernames should not be configured as super users - Consider not even opening the
PLAINTEXT
port ifSSL
orSASL_SSL
are configured
Encryption & Authentication¶
Each broker has four listener ports:
- PLAINTEXT port called
PLAINTEXT
for users with no security enabled - SSL port port called
SSL
for users with just SSL without SASL - SASL_SSL port called
SASL_SSL
for communication between services inside Docker containers - SASL_SSL port called
SASL_SSL_HOST
for communication between any potential services outside of Docker that communicate to the Docker containers
port | kafka1 | kafka2 |
---|---|---|
PLAINTEXT | 10091 | 10092 |
SSL | 11091 | 11092 |
SASL_SSL | 9091 | 9092 |
SASL_SSL_HOST | 29091 | 29092 |
Authorization¶
All the brokers in this demo authenticate as broker
, and all other
services authenticate as their respective names. Per the broker configuration
parameter super.users
, as it is set in this demo, the only users
that can communicate with the cluster are those that authenticate as
broker
, schemaregistry
, client
, restproxy
, client
, or users
that connect via the PLAINTEXT
port (their username is ANONYMOUS
).
All other users are not authorized to communicate with the cluster.
Verify the ports on which the Kafka brokers are listening with the following command, and they should match the table shown below:
docker-compose logs kafka1 | grep "Registered broker 1" docker-compose logs kafka2 | grep "Registered broker 2"
This demo automatically generates simple SSL certificates and creates keystores, truststores, and secures them with a password. To communicate with the brokers, Kafka clients may use any of the ports on which the brokers are listening. To use a security-enabled port, they must specify security parameters for keystores, truststores, password, or authentication so the Kafka command line client tools pass the security configuration file with interceptors or without interceptors with these security parameters. As an example, to communicate with the Kafka cluster to view all the active consumer groups:
- Communicate with brokers via the PLAINTEXT port
# PLAINTEXT port docker-compose exec kafka1 kafka-consumer-groups --list --bootstrap-server kafka1:10091
- Communicate with brokers via the SASL_SSL port, and SASL_SSL parameters configured via the
--command-config
argument for command line tools or--consumer.config
for kafka-console-consumer.
# SASL_SSL port with SASL_SSL parameters docker-compose exec kafka1 kafka-consumer-groups --list --bootstrap-server kafka1:9091 \ --command-config /etc/kafka/secrets/client_without_interceptors.config
- If you try to communicate with brokers via the SASL_SSL port but don’t specify the SASL_SSL parameters, it will fail
# SASL_SSL port without SASL_SSL parameters docker-compose exec kafka1 kafka-consumer-groups --list --bootstrap-server kafka1:9091
Your output should resemble:
Error: Executing consumer group command failed due to Request METADATA failed on brokers List(kafka1:9091 (id: -1 rack: null))
Verify which authenticated users are configured to be super users.
docker-compose logs kafka1 | grep SUPER_USERS
Your output should resemble the following. Notice this authorizes each service name which authenticates as itself, as well as the unauthenticated
PLAINTEXT
which authenticates asANONYMOUS
(for demo purposes only):KAFKA_SUPER_USERS=User:client;User:schemaregistry;User:restproxy;User:broker;User:connect;User:ANONYMOUS
Verify that a user
client
which authenticates via SASL can consume messages from topicwikipedia.parsed
:./scripts/consumers/listen_wikipedia.parsed.sh SASL
Verify that a user which authenticates via SSL cannot consume messages from topic
wikipedia.parsed
. It should fail with an exception../scripts/consumers/listen_wikipedia.parsed.sh SSL
Your output should resemble:
[2018-01-12 21:13:18,481] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [wikipedia.parsed]
Verify that the broker’s Authorizer logger logs the denial event. As shown in the log message, the user which authenticates via SSL has a username
CN=client,OU=TEST,O=CONFLUENT,L=PaloAlto,ST=Ca,C=US
, not justclient
.# Authorizer logger logs the denied operation docker-compose logs kafka1 | grep kafka.authorizer.logger
Your output should resemble:
[2018-01-12 21:13:18,454] INFO Principal = User:CN=client,OU=TEST,O=CONFLUENT,L=PaloAlto,ST=Ca,C=US is Denied Operation = Describe from host = 172.23.0.7 on resource = Topic:wikipedia.parsed (kafka.authorizer.logger) [2018-01-12 21:13:18,464] INFO Principal = User:CN=client,OU=TEST,O=CONFLUENT,L=PaloAlto,ST=Ca,C=US is Denied Operation = Describe from host = 172.23.0.7 on resource = Group:test (kafka.authorizer.logger)
Add an ACL that authorizes user
CN=client,OU=TEST,O=CONFLUENT,L=PaloAlto,ST=Ca,C=US
, and then view the updated ACL configuration.docker-compose exec kafka1 /usr/bin/kafka-acls \ --authorizer-properties zookeeper.connect=zookeeper:2181 \ --add --topic wikipedia.parsed \ --allow-principal User:CN=client,OU=TEST,O=CONFLUENT,L=PaloAlto,ST=Ca,C=US \ --operation Read --group test docker-compose exec kafka1 /usr/bin/kafka-acls \ --authorizer-properties zookeeper.connect=zookeeper:2181 \ --list --topic wikipedia.parsed --group test
Your output should resemble:
Current ACLs for resource ``Topic:wikipedia.parsed``: User:CN=client,OU=TEST,O=CONFLUENT,L=PaloAlto,ST=Ca,C=US has Allow permission for operations: Read from hosts: \* Current ACLs for resource ``Group:test``: User:CN=client,OU=TEST,O=CONFLUENT,L=PaloAlto,ST=Ca,C=US has Allow permission for operations: Read from hosts: \*
Verify that the user which authenticates via SSL is now authorized and can successfully consume some messages from topic
wikipedia.parsed
../scripts/consumers/listen_wikipedia.parsed.sh SSL
Because ZooKeeper is configured for SASL/DIGEST-MD5, any commands that communicate with ZooKeeper need properties set for ZooKeeper authentication. This authentication configuration is provided by the
KAFKA_OPTS
setting on the brokers. For example, notice that the throttle script runs on the Docker containerkafka1
which has the appropriate KAFKA_OPTS setting. The command would otherwise fail if run on any other container aside fromkafka1
orkafka2
.
Schema Registry¶
The connectors used in this demo are configured to automatically read and write Avro-formatted data, leveraging the Confluent Schema Registry .
View the Schema Registry subjects for topics that have registered schemas for their keys and/or values. Notice the security arguments passed into the
curl
command which are required to interact with Schema Registry, which is listening for HTTPS on port 8085.docker-compose exec schemaregistry curl -X GET --cert /etc/kafka/secrets/schemaregistry.certificate.pem --key /etc/kafka/secrets/schemaregistry.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt https://schemaregistry:8085/subjects | jq .
Your output should resemble:
[ "ksql_query_CTAS_EN_WIKIPEDIA_GT_1-KSQL_Agg_Query_1526914100640-changelog-value", "ksql_query_CTAS_EN_WIKIPEDIA_GT_1-KSQL_Agg_Query_1526914100640-repartition-value", "EN_WIKIPEDIA_GT_1_COUNTS-value", "WIKIPEDIABOT-value", "EN_WIKIPEDIA_GT_1-value", "WIKIPEDIANOBOT-value", "wikipedia.parsed-value" ]
Register a new Avro schema (a record with two fields
username
anduserid
) into Schema Registry for the value of a new topicusers
. Note the schema id that it returns, e.g. below schema id is6
.docker-compose exec schemaregistry curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --cert /etc/kafka/secrets/schemaregistry.certificate.pem --key /etc/kafka/secrets/schemaregistry.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt --data '{ "schema": "[ { \"type\":\"record\", \"name\":\"user\", \"fields\": [ {\"name\":\"userid\",\"type\":\"long\"}, {\"name\":\"username\",\"type\":\"string\"} ]} ]" }' https://schemaregistry:8085/subjects/users-value/versions | jq .
Your output should resemble:
{ "id": 6 }
View the new schema for the subject
users-value
. From Confluent Control Center, click MANAGEMENT -> Topics. Scroll down to and click on the topic users and select “SCHEMA”.You may alternatively request the schema via the command line:
docker-compose exec schemaregistry curl -X GET --cert /etc/kafka/secrets/schemaregistry.certificate.pem --key /etc/kafka/secrets/schemaregistry.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt https://schemaregistry:8085/subjects/users-value/versions/1 | jq .
Your output should resemble:
{ "subject": "users-value", "version": 1, "id": 6, "schema": "{\"type\":\"record\",\"name\":\"user\",\"fields\":[{\"name\":\"username\",\"type\":\"string\"},{\"name\":\"userid\",\"type\":\"long\"}]}" }
Confluent REST Proxy¶
The Confluent REST Proxy is running for optional client access.
Use the REST Proxy, which is listening for HTTPS on port 8086, to produce a message to the topic
users
, referencing schema id6
.docker-compose exec restproxy curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" -H "Accept: application/vnd.kafka.v2+json" --cert /etc/kafka/secrets/restproxy.certificate.pem --key /etc/kafka/secrets/restproxy.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt --data '{"value_schema_id": 6, "records": [{"value": {"user":{"userid": 1, "username": "Bunny Smith"}}}]}' https://restproxy:8086/topics/users
Your output should resemble:
{"offsets":[{"partition":1,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":6}
Create consumer instance
my_avro_consumer
.docker-compose exec restproxy curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --cert /etc/kafka/secrets/restproxy.certificate.pem --key /etc/kafka/secrets/restproxy.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' https://restproxy:8086/consumers/my_avro_consumer
Subscribe my_avro_consumer to the users topic
docker-compose exec restproxy curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --cert /etc/kafka/secrets/restproxy.certificate.pem --key /etc/kafka/secrets/restproxy.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt --data '{"topics":["users"]}' https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
Get messages for my_avro_consumer subscriptions
# Note: Issue this command twice due to https://github.com/confluentinc/kafka-rest/issues/432 docker-compose exec restproxy curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" --cert /etc/kafka/secrets/restproxy.certificate.pem --key /etc/kafka/secrets/restproxy.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance/records docker-compose exec restproxy curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" --cert /etc/kafka/secrets/restproxy.certificate.pem --key /etc/kafka/secrets/restproxy.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance/records
Delete the consumer instance
my_avro_consumer
.docker-compose exec restproxy curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" --cert /etc/kafka/secrets/restproxy.certificate.pem --key /etc/kafka/secrets/restproxy.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance
Troubleshooting the demo¶
Verify the status of the Docker containers show
Up
state, except for thekafka-client
container which is expected to haveExit 0
state. If any containers are not up, verify in the advanced Docker preferences settings that the memory available to Docker is at least 8 GB (default is 2 GB).docker-compose ps
Your output should resemble:
Name Command State Ports -------------------------------------------------------------------------------------------------------------------------- connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp, 0.0.0.0:9022->9022/tcp elasticsearch /bin/bash bin/es-docker Up 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp kafka-client bash -c -a echo Waiting fo ... Exit 0 kafka1 /etc/confluent/docker/run Up 0.0.0.0:29091->29091/tcp, 0.0.0.0:9091->9091/tcp, 9092/tcp kafka2 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp kibana /bin/sh -c /usr/local/bin/ ... Up 0.0.0.0:5601->5601/tcp ksql-cli /bin/sh Up ksql-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp replicator-for-jar-transfer sleep infinity Up 8083/tcp, 9092/tcp restproxy /etc/confluent/docker/run Up 8082/tcp, 0.0.0.0:8086->8086/tcp schemaregistry /etc/confluent/docker/run Up 8081/tcp, 0.0.0.0:8085->8085/tcp streams-demo /bin/sh -c /app/start.sh Up 9092/tcp zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
To view sample messages for each topic, including
wikipedia.parsed
:./scripts/consumers/listen.sh
If the data streams monitoring appears to stop for the Kafka source connector, restart the connect container.
docker-compose restart connect
If a command that communicates with ZooKeeper appears to be failing with the error
org.apache.zookeeper.KeeperException$NoAuthException
, change the container you are running the command from to be eitherkafka1
orkafka2
. This is because ZooKeeper is configured for SASL/DIGEST-MD5, and any commands that communicate with ZooKeeper need properties set for ZooKeeper authentication.
Teardown¶
Stop the consumer group
app
to stop consuming from topicwikipedia.parsed
. Note that the command below stops the consumers gracefully withkill -15
, so the consumers follow the shutdown sequence../scripts/app/stop_consumer_app_group_graceful.sh
Stop the Docker demo, destroy all components and clear all Docker volumes.
./scripts/stop.sh