KSQL Configuration Parameter Reference¶
Here are some common configuration properties that you can customize.
Kafka Streams and Kafka Client Settings¶
These configurations control how Kafka Streams executes queries. These configurations can be specified via the
ksql-server.properties file or via
SET in a KSQL CLI. These can be provided with the optional
Although you can use either prefixed (
ksql.streams.) or un-prefixed settings, it is recommended that
you use prefixed settings.
Determines what to do when there is no initial offset in Apache Kafka® or if the current offset does not exist on the server. The
default value in KSQL is
latest, which means all Kafka topics are read from the latest available offset. For example,
to change it to earliest by using the KSQL command line:
A list of host and port pairs that is used for establishing the initial connection to the Kafka cluster. This list should be
in the form
host1:port1,host2:port2,... The default value in KSQL is
localhost:9092. For example, to change it to
by using the KSQL command line:
The frequency to save the position of the processor. The default value in KSQL is
2000. Here is an example to change
the value to
5000 by using the KSQL command line:
The maximum number of memory bytes to be used for buffering across all threads. The default value in KSQL is
10000000 (~ 10 MB).
Here is an example to change the value to
20000000 by using the KSQL command line:
This number of stream threads in an instance of the Kafka Streams application. The stream processing code runs in these threads. For more information about Kafka Streams threading model, see Threading Model.
The default prefix for automatically created topic names. Unless a user
defines an explicit topic name in a KSQL statement, KSQL prepends the value of
ksql.output.topic.name.prefix to the names of automatically created output
topics. For example, you might use "ksql-interactive-" to name output topics
in a KSQL Server cluster that's deployed in interactive mode. For more information, see
Configuring Security for KSQL.
KSQL Query Settings¶
These configurations control how KSQL executes queries. These configurations can be specified via the
file or via
SET in a KSQL CLI. For example,
Indicates whether to fail if corrupt messages are read. KSQL decodes messages at runtime when reading from a Kafka topic. The
decoding that KSQL uses depends on what's defined in STREAM's or TABLE's data definition as the data format for the
topic. If a message in the topic can't be decoded according to that data format, KSQL considers this message to be
corrupt. For example, a message is corrupt if KSQL expects message values to be in JSON format, but they are in
DELIMITED format. The default value in KSQL is
true. For example, to ignore corrupt messages, add this to your
The Schema Registry URL path to connect KSQL to. To communicate with Schema Registry over a secure connection, see Configuring KSQL for Secured Confluent Schema Registry.
The service ID of the KSQL server. This is used to define the KSQL cluster membership of a KSQL server instance. If multiple KSQL
servers connect to the same Kafka cluster (i.e. the same
bootstrap.servers) and have the same
ksql.service.id they will form a KSQL cluster and share the workload.
By default, the service ID of KSQL servers is
default_. The service ID is also used as
the prefix for the internal topics created by KSQL. Using the default value
ksql.service.id, the KSQL internal topics
will be prefixed as
The default number of partitions for the topics created by KSQL. The default is four.
The default number of replicas for the topics created by KSQL. The default is one.
Controls the semantics of the SUBSTRING UDF. Refer to the SUBSTRING documentation in the function guide for details.
When upgrading headless mode KSQL applications from versions 5.0.x or earlier without updating your queries that use SUBSTRING to match
the new 5.1 behavior, you must set this config to
true to enforce the previous SUBSTRING behavior. If possible, however, we recommend
that you update your queries accordingly instead of enabling this configuration setting.
KSQL Server Settings¶
These configurations control the general behavior of the KSQL server. These configurations can only be specified via the
KSQL server configuration settings take precedence over those set in the KSQL CLI. For example, if a value
ksql.streams.replication.factor is set in both the KSQL server and KSQL CLI, the KSQL server value is used.
A file that specifies a predefined set of queries for the KSQL and KSQL server. For an example, see Non-interactive (Headless) KSQL Usage.
listeners setting controls the REST API endpoint for the KSQL server.
For more info, see KSQL REST API Reference.
Specify hostname as
0.0.0.0 to bind to all interfaces or leave it empty to
bind to the default interface. For example:
# Bind to all interfaces. listeners=http://0.0.0.0:8088 # Bind only to localhost. listeners=http://localhost:8088
Confluent Control Center Settings¶
You can access KSQL Server by using Confluent Control Center. For more information, see KSQL Settings.
Confluent Cloud Settings¶
You can connect KSQL Server to Confluent Cloud. For more information, see Connecting KSQL to Confluent Cloud.
Recommended KSQL Production Settings¶
When deploying KSQL to production, the following settings are recommended in your
# Set the retries to Integer.MAX_VALUE to ensure that transient failures # will not result in data loss. ksql.streams.producer.retries=2147483647 # Set the batch expiry to Long.MAX_VALUE to ensure that queries will not # terminate if the underlying Kafka cluster is unavailable for a period of # time. ksql.streams.producer.confluent.batch.expiry.ms=9223372036854775807 # Allows more frequent retries of requests when there are failures, # enabling quicker recovery. ksql.streams.producer.request.timeout.ms=300000 # Set the maximum allowable time for the producer to block to # Long.MAX_VALUE. This allows KSQL to pause processing if the underlying # Kafka cluster is unavailable. ksql.streams.producer.max.block.ms=9223372036854775807 # Set the replication factor for internal topics, the command topic, and # output topics to be 3 for better fault tolerance and durability. Note: # the value 3 requires at least 3 brokers in your Kafka cluster. ksql.streams.replication.factor=3 ksql.sink.replicas=3 # Set the storage directory for stateful operations like aggregations and # joins to be at a durable location. By default, they are stored in /tmp. ksql.streams.state.dir=/some/non-temporary-storage-path/ # Bump the number of replicas for state storage for stateful operations # like aggregations and joins. By having two replicas (one main and one # standby) recovery from node failures is quicker since the state doesn't # have to be rebuilt from scratch. ksql.streams.num.standby.replicas=1