Configure ksqlDB for Confluent Platform

ksqlDB configuration parameters can be set for ksqlDB Server and for queries, as well as for the underlying Kafka Streams and Kafka Clients (producer and consumer).

These instructions assume you are installing Confluent Platform by using ZIP or TAR archives. For more information, see On-Premises Deployments.

Setting ksqlDB Server Parameters

You can specify ksqlDB Server configuration parameters by using the server configuration file (ksql-server.properties) or the KSQL_OPTS environment variable. Properties set with KSQL_OPTS take precedence over those specified in the ksqlDB configuration file. A recommended approach is to configure a common set of properties using the ksqlDB configuration file and override specific properties as needed, using the KSQL_OPTS environment variable.

Tip

If you deploy Confluent Platform by using Docker containers, you can specify configuration parameters as environment variables to the ksqlDB Server image. For more information, see Install ksqlDB with Docker.

ksqlDB Server Configuration File

By default, the ksqlDB server configuration file is located at <path-to-confluent>/etc/ksqldb/ksql-server.properties. The file follows the syntax conventions of Java properties files.

<property-name>=<property-value>

For example:

bootstrap.servers=localhost:9092
listeners=http://localhost:8088

After you have updated the server configuration file, you can start the ksqlDB Server with the configuration file specified.

<path-to-confluent>/bin/ksql-server-start <path-to-confluent>/etc/ksqldb/ksql-server.properties

For more information, see ksqlDB Configuration Parameter Reference.

KSQL_OPTS Environment Variable

You can override ksqlDB Server configuration parameters by using the KSQL_OPTS environment variable. The properties are standard Java system properties. For example, to set ksql.streams.num.streams.threads to 1:

KSQL_OPTS="-Dksql.streams.num.streams.threads=1" <path-to-confluent>/bin/ksql-server-start \
<path-to-confluent>/etc/ksqldb/ksql-server.properties

You can specify multiple parameters at the same time. For example, to configure ksql.streams.auto.offset.reset and ksql.streams.num.stream.threads:

KSQL_OPTS="-Dksql.streams.auto.offset.reset=earliest -Dksql.streams.num.stream.threads=1" <path-to-confluent>/bin/ksql-server-start \
<path-to-confluent>/etc/ksqldb/ksql-server.properties

ksqlDB Server Runtime Environment Variables

When ksqlDB Server starts, it checks for shell environment variables that control the host Java Virtual Machine (JVM). Set the following environment variables to control options like heap size and Log4j configuration. These settings are applied by the ksql-run-class shell script when ksqlDB Server starts.

KSQL_CLASSPATH

Path to the Java deployment of ksqlDB Server and related Java classes. The following command shows an example KSQL_CLASSPATH setting.

export CLASSPATH=/usr/share/java/my-base/*:/usr/share/java/my-ksql-server/*:/opt/my-company/lib/ksql/*:$CLASSPATH
export KSQL_CLASSPATH="${CLASSPATH}"
KSQL_LOG4J_OPTS

Specifies ksqlDB Server logging options by using the Log4j configuration settings. The following example command sets the default Log4j configuration.

export KSQL_LOG4J_OPTS="-Dlog4j.configuration=file:$KSQL_CONFIG_DIR/log4j-rolling.properties"

For more information, see Log4j Configuration.

KSQL_JMX_OPTS

Specifies ksqlDB metrics options by using Java Management Extensions (JMX). The following example command sets the default JMX configuration.

export KSQL_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "

For more information, see Monitoring and Management Using JMX Technology.

KSQL_HEAP_OPTS

Specifies the initial size and maximum size of the JVM heap for the ksqlDB Server process. The following example command sets the initial size and maximum size to 15GB.

export KSQL_HEAP_OPTS="-Xms15G -Xmx15G"

For more information, see JRockit JVM Heap Size Options.

KSQL_JVM_PERFORMANCE_OPTS

Specifies performance tuning options for the JVM that runs ksqlDB Server. The following example command sets the default JVM configuration.

export KSQL_JVM_PERFORMANCE_OPTS="-server -XX:+UseConcMarkSweepGC -XX:+CMSClassUnload ingEnabled -XX:+CMSScavengeBeforeRemark -XX:+ExplicitGCInvokesConcurrent -XX:New Ratio=1 -Djava.awt.headless=true"

For more information, see D Command-Line Options.

JMX_PORT

Specifies the port that JMX uses to report metrics.

export JMX_PORT=1099
JAVA_HOME

Specifies the location of the java executable file.

export JAVA_HOME=<jdk-install-directory>

Non-interactive (Headless) ksqlDB Usage

ksqlDB supports locked-down, “headless” deployment scenarios where interactive use of the ksqlDB cluster is disabled. For example, the CLI enables a team of users to develop and verify their queries interactively on a shared testing ksqlDB cluster. But when you deploy these queries in your production environment, you want to lock down access to ksqlDB servers, version-control the exact queries, and store them in a .sql file. This prevents users from interacting directly with the production ksqlDB cluster. For more information, see Headless Deployment.

You can configure servers to exclusively run a predefined script (.sql file) via the --queries-file command line argument, or the ksql.queries.file setting in the ksqlDB configuration file. If a server is running a predefined script, it will automatically disable its REST endpoint and interactive use.

Note

In headless mode, you must start all ksqlDB servers with the same queries file. If the queries files differ across ksqlDB servers, the behavior is undefined.

Tip

When both the ksql.queries.file property and the --queries-file argument are present, the --queries-file argument takes precedence.

Schema resolution

When you run a ksqlDB application that uses Avro or Protobuf, ksqlDB infers schemas from Schema Registry automatically, but the behavior after restarting ksqlDB Server differs between interactive and non-interactive mode.

  • Interactive mode: after ksqlDB Server restarts, it doesn’t contact Schema Registry again to resolve schemas, because it has previously persisted the information to the command topic.
  • Non-interactive mode: after ksqlDB Server restarts, it does contact Schema Registry again to resolve schemas. If schemas have changed, unexpected behavior in your ksqlDB applications may occur.

Important

If your ksqlDB applications use Avro or Protobuf, and you run them in non-interactive mode, ensure that the schemas don’t change between ksqlDB Server restarts, or provide the schema explicitly. If the schema may evolve, it’s safer to provide the schema explicitly.

Start headless ksqlDB Server from the command line

To start the ksqlDB Server in headless, non-interactive configuration via the --queries-file command line argument:

Create a predefined script and save as an .sql file.

Start the ksqlDB Server with the predefined script specified by using the --queries-file argument.

<path-to-confluent>/bin/ksql-server-start <path-to-confluent>/etc/ksqldb/ksql-server.properties \
--queries-file /path/to/queries.sql

Start headless ksqlDB Server by using the configuration file

To start the ksqlDB Server in headless, non-interactive configuration via the ksql.queries.file in the server configuration file:

Configure the ksql-server.properties file. The bootstrap.servers and ksql.queries.file are required. For more information about configuration, see ksqlDB configuration file.

# Inform the ksqlDB server where the Kafka cluster can be found:
bootstrap.servers=localhost:9092

# Define the location of the queries file to execute
ksql.queries.file=/path/to/queries.sql

Start the ksqlDB server with the configuration file specified.

<path-to-confluent>/bin/ksql-server-start <path-to-confluent>/etc/ksqldb/ksql-server.properties

Configuring Listeners of a ksqlDB Cluster

Multiple hosts are required to scale ksqlDB processing power and to do that, they must form a cluster. ksqlDB requires all hosts of a cluster to use the same ksql.service.id.

bootstrap.servers=kafkaBroker1:9092,kafkaBroker2:9092
ksql.service.id=my_application_

Once formed, many operations can be run using the client APIs exposed on listeners.

In order to utilize pull queries and their high availability functionality, the nodes within the cluster must be able to communicate with each other. ksqlDB supports setups with either a single shared listener for both client and internal communication, or dual single-purpose listeners. The following section describes how to configure listeners depending on the nature of your environment and requirements.

Single listener

Single routable listener

If you want to configure your listener with an IP address or hostname that is resolvable and routable from within the cluster, you might do the following:

# Hostname that other nodes can resolve to an routable IP:
listeners=https://ksqlHost56:8088

# Or, routable IP address:
listeners=https://192.168.1.101:8088

In this setup, the node shares the first URL in the listeners config as its internal endpoint, which other nodes use for inter-node communication. Inter-node communication uses the same listener as client communication.

Single non-routable listener

It’s common to set up a service using special hostnames, like localhost, or wildcard addresses, like 0.0.0.0 or [::]. These special hostnames have special meanings and are not appropriate for inter-node communication, because they’re not routable from other machines. This is also the case if your network is set up such that the IP or hostname you bind isn’t resolvable or routable.

If you choose to use a non-routable listener, you must set ksql.advertised.listener and specify a URL that is externally accessible and which resolves to an endpoint defined in listeners.

# Non-routable wildcard ip:
listeners=http://0.0.0.0:8088

# Externally accessible name that resolves to the IP of the machine:
ksql.advertised.listener=http://host1.internal.example.com:8088

In this setup, the node shares the URL in the ksql.advertised.listener config as its internal endpoint, which other nodes use for inter-node communication. Inter-node communication uses the same listener as client communication.

Dual listeners

You may choose to configure internal communication to use a different listener to client communication, which enables port filtering rules to deny clients access to the internal listener or the use of a different network interface for internal communication, for security or QoS reasons.

This can be achieved by setting the ksql.internal.listener configuration to start a second listener that is used exclusively for inter-node communication.

If you’re running dual listeners to improve security, you may also wish to enable authentication and other security measures.

Dual routable listeners

If the internal IP address or hostname used in the ksql.internal.listener configuration is externally resolvable and routable, you only need to configure ksql.internal.listener to set the internal listener, for example:

# Client listener:
listeners=https://192.168.1.101:8088

# Inter-node listener on different NIC:
ksql.internal.listener=https://192.168.1.102:8088

Note

Only the ksql.internal.listener needs to be resolvable and routable from servers running other nodes in the cluster. The listener configuration can be non-resolvable and non-routable, because clients can connect using whatever URL you choose.

In this setup, the node shares the URL in the ksql.internal.listener config as its internal endpoint, which other nodes use for inter-node communication. Inter-node communication uses a different listener to client communication.

Dual non-routable listeners

If the internal IP address or hostname used in the ksql.internal.listener configuration is not externally resolvable and routable, for example where it uses localhost or wildcard IPs such as 0.0.0.0 or [::], you must configure both ksql.internal.listener and ksql.advertised.listener to set the internal listener:

# Client listener:
listeners=https://0.0.0.0:8088

# Inter-node listener on wildcard address and different port:
# Note: port 8099 could be locked down using port forward or other network tools.
ksql.internal.listener=https://0.0.0.0:8099

# URL that other nodes can resolve and use to route requests to this node:
ksql.advertised.listener=http://host1.internal.example.com:8099

Note

Only the ksql.advertised.listener needs to be resolvable and routable from servers running other nodes in the cluster. The listener configuration can be non-resolvable and non-routable, because clients can connect using whatever URL you choose, and ksql.internal.listener is only used to start the listener.

In this setup, the node shares the url in the ksql.advertised.listener config as its internal endpoint, which other nodes use for inter-node communication. Inter-node communication uses a different listener to client communication.