Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Configuring Security for KSQL¶
KSQL supports authentication on its HTTP endpoints and also supports many of the security features of the other services it communicates with, like Apache Kafka® and Schema Registry.
- KSQL supports Basic HTTP authentication on its RESTful and WebSocket endpoints, which means that the endpoints can be protected by a username and password.
- KSQL supports Apache Kafka security features such as SSL for encryption, SASL for authentication, and authorization with ACLs.
- KSQL supports Schema Registry security features such SSL for encryption and mutual authentication for authorization.
To configure security for KSQL, add your configuration settings to the <path-to-confluent>/etc/ksql/ksql-server.properties
file and then start the KSQL server with your configuration file specified.
$ <path-to-confluent>/bin/ksql-server-start <path-to-confluent>/etc/ksql/ksql-server.properties
Tip
These instructions assume you are installing Confluent Platform by using ZIP or TAR archives. For more information, see On-Premises Deployments.
Configuring KSQL for Basic HTTP Authentication¶
KSQL can be configured to require users to authenticate using a username and password via the Basic HTTP authentication mechanism.
Note
If you’re using Basic authentication, we recommended that you configure KSQL Server to use SSL for secure communication, because the Basic protocol passes credentials in plain text. For more information, see Configuring Kafka Encrypted Communication.
Use the following settings to configure the KSQL server to require authentication:
authentication.method=BASIC
authentication.roles=<user-role1>,<user-role2>,...
authentication.realm=<KsqlServer-Props-in-jaas_config.file>
The authentication.roles
config defines a comma-separated list of user roles. To be authorized
to use the KSQL server, an authenticated user must belong to at least one of these roles.
For example, if you define admin
, developer
, user
, and ksq-user
roles, the following configuration assigns them for authentication.
authentication.roles=admin,developer,user,ksq-user
The authentication.realm
config must match a section within jaas_config.file
, which
defines how the server authenticates users and should be passed as a JVM option during server start:
$ export KSQL_OPTS=-Djava.security.auth.login.config=/path/to/the/jaas_config.file
$ <path-to-confluent>/bin/ksql-server-start <path-to-confluent>/etc/ksql/ksql-server.properties
An example jaas_config.file
is:
KsqlServer-Props {
org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
file="/path/to/password-file"
debug="false";
};
The example jaas_config.file
above uses the Jetty PropertyFileLoginModule
, which itself
authenticates users by checking for their credentials in a password file.
Assign the KsqlServer-Props
section to the authentication.realm
config setting:
authentication.realm=KsqlServer-Props
You can also use other implementations of the standard Java LoginModule
interface, such as
JDBCLoginModule
for reading credentials from a database or the LdapLoginModule
.
The file parameter is the location of the password file, The format is:
<username>: <password-hash>[,<rolename> ...]
Here’s an example:
fred: OBF:1w8t1tvf1w261w8v1w1c1tvn1w8x,user,admin
harry: changeme,user,developer
tom: MD5:164c88b302622e17050af52c89945d44,user
dick: CRYPT:adpexzg3FUZAk,admin,ksq-user
The password hash for a user can be obtained by using the org.eclipse.jetty.util.security.Password
utility, for example running:
> bin/ksql-run-class org.eclipse.jetty.util.security.Password fred letmein
Which results in an output similar to:
letmein
OBF:1w8t1tvf1w261w8v1w1c1tvn1w8x
MD5:0d107d09f5bbe40cade3de5c71e9e9b7
CRYPT:frd5btY/mvXo6
Where each line of the output is the password encrypted using different mechanisms, starting with plain text.
Configuring the CLI¶
If the KSQL server is configured to use Basic authentication, CLI instances will need to be
configured with suitable valid credentials. Credentials can be passed when starting the CLI using
the --user
and --password
command-line arguments, for example:
<ksql-install>bin/ksql --user fred --password letmein http://localhost:8088
Configuring KSQL for Confluent Cloud¶
You can use KSQL with a Kafka cluster in Confluent Cloud. For more information, see Connecting ksqlDB to Confluent Cloud.
Configuring KSQL for Confluent Control Center¶
You can use KSQL with a Kafka cluster in Confluent Control Center. For more information, see Integrate KSQL with Confluent Control Center.
Configuring KSQL for Secured Confluent Schema Registry¶
You can configure KSQL to connect to Schema Registry over HTTP by setting the
ksql.schema.registry.url
to the HTTPS endpoint of Schema Registry.
Depending on your security setup, you might also need to supply additional SSL configuration.
For example, a trustStore is required if the Schema Registry SSL certificates are not trusted by
the JVM by default; a keyStore is required if Schema Registry requires mutual authentication.
You can configure SSL for communication with Schema Registry by using non-prefixed names,
like ssl.truststore.location
, or prefixed names like ksql.schema.registry.ssl.truststore.location
.
Non-prefixed names are used for settings that are shared with other communication
channels, i.e. where the same settings are required to configure SSL communication
with both Kafka and Schema Registry. Prefixed names only affect communication with Schema Registry
and override any non-prefixed setting of the same name.
Use the following to configure KSQL to communicate with Schema Registry over HTTPS, where mutual authentication is not required and Schema Registry SSL certificates are trusted by the JVM:
ksql.schema.registry.url=https://<host-name-of-schema-registry>:<ssl-port>
Use the following to configure KSQL to communicate with Schema Registry over HTTPS, with mutual authentication, with an explicit trustStore, and where the SSL configuration is shared between Kafka and Schema Registry:
ksql.schema.registry.url=https://<host-name-of-schema-registry>:<ssl-port>
ksql.schema.registry.ssl.truststore.location=/etc/kafka/secrets/ksql.truststore.jks
ksql.schema.registry.ssl.truststore.password=<your-secure-password>
ksql.schema.registry.ssl.keystore.location=/etc/kafka/secrets/ksql.keystore.jks
ksql.schema.registry.ssl.keystore.password=<your-secure-password>
ksql.schema.registry.ssl.key.password=<your-secure-password>
Use the following to configure KSQL to communicate with Schema Registry over HTTP, without mutual authentication and with an explicit trustStore. These settings explicitly configure only KSQL to Schema Registry SSL communication.
ksql.schema.registry.url=https://<host-name-of-schema-registry>:<ssl-port>
ksql.schema.registry.ssl.truststore.location=/etc/kafka/secrets/sr.truststore.jks
ksql.schema.registry.ssl.truststore.password=<your-secure-password>
The exact settings will vary depending on the encryption and authentication mechanisms Schema Registry is using, and how your SSL certificates are signed.
You can pass authentication settings to the Schema Registry client used by KSQL by adding the following to your KSQL server config.
ksql.schema.registry.basic.auth.credentials.source=USER_INFO
ksql.schema.registry.basic.auth.user.info=username:password
For more information, see Schema Registry Security Overview.
Configuring KSQL for Secured Apache Kafka clusters¶
The following are common configuration examples.
Configuring Kafka Encrypted Communication¶
This configuration enables KSQL to connect to a Kafka cluster over SSL, with a user supplied trust store:
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=confluent
The exact settings will vary depending on the security settings of the Kafka brokers, and how your SSL certificates are signed. For full details, and instructions on how to create suitable trust stores, please refer to the Security Guide.
Configuring Kafka Authentication¶
This configuration enables KSQL to connect to a secure Kafka cluster using PLAIN SASL, where the SSL certificates have been signed by a CA trusted by the default JVM trust store.
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=\
org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<ksql-user>" \
password="<password>";
The exact settings will vary depending on what SASL mechanism your Kafka cluster is using and how your SSL certificates are signed. For more information, see the Security Guide.
Configuring Authorization of KSQL with Kafka ACLs¶
Kafka clusters can use ACLs to control access to resources. Such clusters require each client to authenticate as a particular user. To work with such clusters, KSQL must be configured to authenticate with the Kafka cluster, and certain ACLs must be defined in the Kafka cluster to allow the user KSQL is authenticating as access to resources. The list of ACLs that must be defined depends on the version of the Kafka cluster.
Confluent Platform v5.0 (Apache Kafka v2.0) and above¶
Confluent Platform 5.0 simplifies the ACLs required to run KSQL against a Kafka cluster secured with ACLs, (see KIP-277 and KIP-290 for details). It is highly recommended to use Confluent Platform 5.0 or above for deploying secure installations of Kafka and KSQL.
ACL definition¶
Kafka ACLs are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP”.
- Principal
- An authenticated user or group. For example,
"user: Fred"
or"group: fraud"
. - Permission
- Defines if the ACL allows (
ALLOW
) or denies (DENY
) access to the resource. - Operation
- The operation that is performed on the resource, for example
READ
. - Resource
A resource is comprised of a resource type and resource name:
RESOURCE_TYPE
, for exampleTOPIC
or consumerGROUP
.- Resource name, for example the name of a topic or a consumer-group.
- ResourcePattern
A resource pattern matches zero or more Resources and is comprised of a resource type, a resource name and a pattern type.
RESOURCE_TYPE
, for exampleTOPIC
or consumerGROUP
. The pattern will only match resources of the same resource type.- Resource name. How the pattern uses the name to match Resources is dependant on the pattern type.
PATTERN_TYPE
, controls how the pattern matches a Resource’s name to the patterns. Valid values are:LITERAL
pattern types match the name of a resource exactly, or, in the case of the special wildcard resource name *, resources of any name.PREFIXED
pattern types match when the resource’s name is prefixed with the pattern’s name.
The
CLUSTER
resource type is implicitly a literal pattern with a constant name because it refers to the entire Kafka cluster.
The ACLs described below list a RESOURCE_TYPE
, resource name, PATTERN_TYPE
, and OPERATION
.
All ACLs described are ALLOW
ACLs, where the principal is the user the KSQL server has authenticated as,
with the Apache Kafka cluster, or an appropriate group that includes the authenticated KSQL user.
Tip
For more information about ACLs, see Authorization and ACLs.
ACLs on Literal Resource Pattern¶
A literal resource pattern matches resources exactly. They are case-sensitive. For example
ALLOW
user Fred
to READ
the TOPIC
with the LITERAL
name users
.
Here, user Fred would be allowed to read from the topic users only. Fred would not be allowed to read from similarly named topics such as user, users-europe, Users etc.
ACLs on Prefixed Resource Pattern¶
A prefixed resource pattern matches resources where the resource name starts with the pattern’s name.
They are case-sensitive. For example
ALLOW
user Bob
to WRITE
to any TOPIC
whose name is PREFIXED
with fraud-
.
Here, user Bob would be allowed to write to any topic whose name starts with fraud-, for example fraud-us, fraud-testing and fraud-. Bob would not be allowed to write to topics such as production-fraud-europe, Fraud-us, etc.
Required ACLs¶
The ACLs required are the same for both Interactive and non-interactive (headless) KSQL clusters.
KSQL always requires the following ACLs for its internal operations and data management:
- The
DESCRIBE_CONFIGS
operation on theCLUSTER
resource type. - The
ALL
operation on all internalTOPICS
that arePREFIXED
with_confluent-ksql-<ksql.service.id>
. - The
ALL
operation on all internalGROUPS
that arePREFIXED
with_confluent-ksql-<ksql.service.id>
.
Where ksql.service.id
can be configured in the KSQL configuration and defaults to default_
.
In addition to the general permissions above, KSQL also needs permissions to perform the actual processing of your data. Here, KSQL needs permissions to read data from your desired input topics and/or permissions to write data to your desired output topics:
- The
READ
operation on any input topics. - The
WRITE
operation on any output topics. - The
CREATE
operation on any output topics that do not already exist.
Often output topics from one query form the inputs to others. KSQL will require READ
and WRITE
permissions for such topics.
The set of input and output topics that a KSQL cluster requires access to will depend on your use case and whether the KSQL cluster is configured in interactive or non-interactive mode.
Non-Interactive (headless) KSQL clusters¶
Non-interactive KSQL clusters run a known set of SQL statements, meaning the set of input and output topics is well defined. Add the ACLs required to allow KSQL access to these topics.
For example, given the following setup:
- A 3-node KSQL cluster with KSQL servers running on IPs 198.51.100.0, 198.51.100.1, 198.51.100.2
- Authenticating with the Kafka cluster as a
KSQL1
user. - With
ksql.service.id
set toproduction_
. - Running queries that read from input topics
input-topic1
andinput-topic2
. - Writing to output topics
output-topic1
andoutput-topic2
. - Where
output-topic1
is also used as an input for another query.
Then the following commands would create the necessary ACLs in the Kafka cluster to allow KSQL to operate:
# Allow KSQL to discover the cluster:
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation DescribeConfigs --cluster
# Allow KSQL to read the input topics (including output-topic1):
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation Read --topic input-topic1 --topic input-topic2 --topic output-topic1
# Allow KSQL to write to the output topics:
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation Write --topic output-topic1 --topic output-topic2
# Or, if the output topics do not already exist, the 'create' operation is also required:
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation Create --operation Write --topic output-topic1 --topic output-topic2
# Allow KSQL to manage its own internal topics and consumer groups:
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation All --resource-pattern-type prefixed --topic _confluent-ksql-production_ --group _confluent-ksql-production_
Interactive KSQL clusters¶
Interactive KSQL clusters accept SQL statements from users and hence may require access to a wide variety of input and output topics. Add ACLs to appropriate literal and prefixed resource patterns to allow KSQL access to the input and output topics, as required.
Tip
To simplify ACL management, you should configure a default custom topic name prefix such as ksql-interactive-
for your
KSQL cluster via the ksql.output.topic.name.prefix
server configuration setting.
Unless a user defines an explicit topic name in a KSQL statement, KSQL will then always prefix the name of any automatically
created output topics. Then add an ACL to allow ALL
operations on TOPICs
that are PREFIXED
with the configured
custom name prefix (in the example above: ksql-interactive-
).
For example, given the following setup:
- A 3-node KSQL cluster with KSQL servers running on IPs 198.51.100.0, 198.51.100.1, 198.51.100.2
- Authenticating with the Kafka cluster as a
KSQL1
user. - With
ksql.service.id
set tofraud_
. - Where users should be able to run queries against any input topics prefixed with
accounts-
,orders-
andpayments-
. - Where
ksql.output.topic.name.prefix
is set toksql-fraud-
- And users won’t use explicit topic names, i.e. users will rely on KSQL auto-creating any required topics with auto-generated names. (Note: If users want to use explicit topic names, then you must provide the necessary ACLs for these in addition to what’s shown in the example below.)
Then the following commands would create the necessary ACLs in the Kafka cluster to allow KSQL to operate:
# Allow KSQL to discover the cluster:
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation DescribeConfigs --cluster
# Allow KSQL to read the input topics:
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation Read --resource-pattern-type prefixed --topic accounts- --topic orders- --topic payments-
# Allow KSQL to manage output topics:
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation All --resource-pattern-type prefixed --topic ksql-fraud-
# Allow KSQL to manage its own internal topics and consumer groups:
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation All --resource-pattern-type prefixed --topic _confluent-ksql-fraud_ --group _confluent-ksql-fraud_
Confluent Platform versions below v5.0 (Apache Kafka < v2.0)¶
Versions of the Confluent Platform below v5.0, (which use Apache Kafka versions below v2.0), do not benefit from the enhancements found in later versions of Kafka, which simplify the ACLs required to run KSQL against a Kafka cluster secured with ACLs. This means a much larger, or wider range, set of ACLs must be defined. The set of ACLs that must be defined depends on whether the KSQL cluster is configured for interactive or non-interactive (headless).
ACL definition¶
Kafka ACLs are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H on Resource R”.
- Principal
- An authenticated user or group. For example,
"user: Fred"
or"group: fraud"
. - Permission
- Defines if the ACL allows (
ALLOW
) or denies (DENY
) access to the resource. - Operation
- The operation that is performed on the resource, for example
READ
. - Resource
A resource is comprised of a resource type and resource name:
RESOURCE_TYPE
, for exampleTOPIC
or consumerGROUP
.- Resource name, where the name is either specific, for example
users
, or the wildcard*
, meaning all resources of this type. The name is case-sensitive.
The
CLUSTER
resource type does not require a resource name because it refers to the entire Kafka cluster.
An example ACL might ALLOW
user Jane
to READ
the TOPIC
named users
.
Here, user Jane would be allowed to read from the topic users only. Jane would not be allowed to read from similarly named topics such as user, users-europe, Users etc.
The ACLs described below list a RESOURCE_TYPE
, resource name, and OPERATION
. All ACLs described are ALLOW
ACLs, where
the principal is the user the KSQL server has authenticated as, with the Apache Kafka cluster, or an appropriate group
that includes the authenticated KSQL user.
Tip
For more information about ACLs, see Authorization and ACLs.
Interactive KSQL clusters¶
Interactive KSQL clusters, (which is the default configuration), require that the authenticated KSQL user has open access to create, read, write, delete topics, and use any consumer group:
Interactive KSQL clusters require these ACLs:
- The
DESCRIBE_CONFIGS
operation on theCLUSTER
resource type. - The
CREATE
operation on theCLUSTER
resource type. - The
DESCRIBE
,READ
,WRITE
andDELETE
operations on allTOPIC
resource types. - The
DESCRIBE
andREAD
operations on allGROUP
resource types.
It is still possible to restrict the authenticated KSQL user from accessing specific resources using DENY
ACLs. For
example, you can add a DENY
ACL to stop KSQL queries from accessing a topic that contains sensitive data.
For example, given the following setup:
- A 3-node KSQL cluster with KSQL servers running on IPs 198.51.100.0, 198.51.100.1, 198.51.100.2
- Authenticating with the Kafka cluster as a ‘KSQL1’ user.
Then the following commands would create the necessary ACLs in the Kafka cluster to allow KSQL to operate:
# Allow KSQL to discover the cluster and create topics:
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation DescribeConfigs --operation Create --cluster
# Allow KSQL access to topics and consumer groups:
bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:KSQL1 --allow-host 198.51.100.0 --allow-host 198.51.100.1 --allow-host 198.51.100.2 --operation All --topic '*' --group '*'
Non-Interactive (headless) KSQL clusters¶
Because the list of queries are known ahead of time, you can run Non-interactive KSQL clusters with more restrictive ACLs. Determining the list of ACLs currently requires a bit of effort. This will be improved in future KSQL releases.
- Standard ACLs
The authenticated KSQL user always requires:
DESCRIBE_CONFIGS
permission on theCLUSTER
resource type.
- Input topics
An input topic is one that has been imported into KSQL using a
CREATE STREAM
orCREATE TABLE
statement. The topic should already exist when KSQL is started.The authenticated KSQL user requires
DESCRIBE
andREAD
permissions for each input topic.- Output topics
KSQL creates output topics when you run persistent
CREATE STREAM AS SELECT
orCREATE TABLE AS SELECT
queries.The authenticated KSQL user requires
DESCRIBE
andWRITE
permissions on each output topic.By default, KSQL will attempt to create any output topics that do not exist. To allow this, the authenticated KSQL user requires
CREATE
permissions on theCLUSTER
resource type. Alternatively, topics can be created manually before running KSQL. To determine the list of output topics and their required configuration, (partition count, replication factor, retention policy, etc), you can run initially run KSQL on a Kafka cluster with none or open ACLs first.- Change-log and repartition topics
Internally, KSQL uses repartition and changelog topics for selected operations. KSQL requires repartition topics when using either
PARTITION BY
, or usingGROUP BY
on non-key values, and requires changelog topics for anyCREATE TABLE x AS
statements.The authenticated KSQL user requires
DESCRIBE
,READ
, andWRITE
permissions for each changelog and repartitionTOPIC
.By default, KSQL will attempt to create any repartition or changelog topics that do not exist. To allow this, the authenticated KSQL user requires
CREATE
permissions on theCLUSTER
resource type. Alternatively, you can create topics manually before running KSQL. To determine the list of output topics and their required configuration, (partition count, replication factor, retention policy, etc), you can run initially run KSQL on a Kafka cluster with none or open ACLs first.All changelog and repartition topics are prefixed with
_confluent-ksql-<ksql.service.id>
whereksql.service.id
defaults todefault_
, (for more information, see ksql.service.id), and postfixed with either-changelog
or-repartition
, respectively.- Consumer groups
KSQL uses Kafka consumer groups when consuming input, change-log and repartition topics. The set of consumer groups that KSQL requires depends on the queries that are being executed.
The authenticated KSQL user requires
DESCRIBE
andREAD
permissions for each consumerGROUP
.The easiest way to determine the list of consumer groups is to initially run the queries on a Kafka cluster with none or open ACLS and then list the groups created. For more information about how to list groups, see Managing Consumer Groups.
Consumer group names are formatted like
_confluent-ksql-<value of ksql.service.id property>_query_<query id>
, where the default ofksql.service.id
isdefault_
.
Tip
For more information about interactive and non-interactive queries, see Non-interactive (Headless) KSQL Usage.
Configuring Control Center Monitoring Interceptors¶
This configuration enables SASL and SSL for the monitoring interceptors that integrate KSQL with Control Center.
# Confluent Monitoring Interceptors for Control Center streams monitoring
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
# Confluent Monitoring interceptors SASL / SSL config
confluent.monitoring.interceptor.security.protocol=SASL_SSL
confluent.monitoring.interceptor.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
confluent.monitoring.interceptor.ssl.truststore.password=confluent
confluent.monitoring.interceptor.ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
confluent.monitoring.interceptor.ssl.keystore.password=confluent
confluent.monitoring.interceptor.ssl.key.password=confluent
confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="ksql-user" password="ksql-user-secret";
confluent.monitoring.interceptor.sasl.mechanism=PLAIN
- Learn More
- See the blog post Secure Stream Processing with Apache Kafka, Confluent Platform and KSQL and try out the Monitoring Kafka streaming ETL deployments tutorial.