Configure a Kafka Streams Application for Confluent Platform

Apache Kafka® and Kafka Streams configuration options must be configured before using Streams. Configure Kafka Streams by specifying parameters in a java.util.Properties instance.

  1. Create a java.util.Properties instance.

  2. Set the parameters. For example:

    import java.util.Properties;
    import org.apache.kafka.streams.StreamsConfig;
    
    Properties props = new Properties();
    // Set a few key parameters
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
    // Any further settings
    props.put(... , ...);
    

Configuration parameter reference

This section contains the most common Streams configuration parameters. For a full reference, see the Streams and Client Javadocs.

Required configuration parameters

Here are the required Streams configuration parameters.

Parameter Name Importance Description Default Value
application.id Required An identifier for the stream processing application. Must be unique within the Kafka cluster. None
bootstrap.servers Required A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. None

application.id

(Required) The application ID. Each stream processing application must have a unique ID. The same ID must be given to all instances of the application. It is recommended to use only alphanumeric characters, . (dot), - (hyphen), and _ (underscore). Examples: "hello_world", "hello_world-v1.0.0"

This ID is used in the following places to isolate resources used by the application from others:

  • As the default Kafka consumer and producer client.id prefix
  • As the Kafka consumer group.id for coordination
  • As the name of the subdirectory in the state directory (cf. state.dir)
  • As the prefix of internal Kafka topic names
Tip:
When an application is updated, the application.id should be changed unless you want to reuse the existing data in internal topics and state stores. For example, you could embed the version information within application.id, as my-app-v1.0.0 and my-app-v1.0.2.

bootstrap.servers

(Required) The Kafka bootstrap servers. This is the same setting that is used by the underlying producer and consumer clients to connect to the Kafka cluster. Example: "kafka-broker1:9092,kafka-broker2:9092".

Tip:
Kafka Streams applications can only communicate with a single Kafka cluster specified by this config value.

Optional configuration parameters

Here are the optional Streams configuration parameters, sorted by level of importance:

  • High: These parameters can have a significant impact on performance. Take care when deciding the values of these parameters.
  • Medium: These parameters can have some impact on performance. Your specific environment will determine how much tuning effort should be focused on these parameters.
  • Low: These parameters have a less general or less significant impact on performance.
Parameter Name Importance Description Default Value
acceptable.recovery.lag Medium The maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready for the active task. 10,000
application.server Low A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single Kafka Streams application. The value of this must be different for each instance of the application. the empty string
buffered.records.per.partition Low Deprecated in Confluent Platform 7.4. The maximum number of records to buffer per partition. 1000
cache.max.bytes.buffering Medium Deprecated in Confluent Platform 7.4. Use statestore.cache.max.bytes instead. 10485760 bytes
client.id Medium An ID string to pass to the server when making requests. (This setting is passed to the consumer/producer clients used internally by Kafka Streams.) the empty string
commit.interval.ms Low

The frequency with which to save the position (offsets in source topics) of tasks.

  • For at-least-once processing, committing means saving the position (offsets) of the processor.
  • For exactly-once processing, it means to commit the transaction, which includes saving the position.
30000 ms (at_least_once) / 100 ms (exactly_once_v2)
default.client.supplier Low Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface.  
default.deserialization.exception.handler Medium Exception handling class that implements the DeserializationExceptionHandler interface. See default.deserialization.exception.handler
default.production.exception.handler Medium Exception handling class that implements the ProductionExceptionHandler interface. See default.production.exception.handler
default.key.serde Medium Default serializer/deserializer class for record keys, implements the Serde interface (see also value.serde). null
default.value.serde Medium Default serializer/deserializer class for record values, implements the Serde interface (see also key.serde). null
default.windowed.key.serde.inner Medium Default inner serializer/deserializer class for record keys, implements the Serde interface. Only affective if default.key.serde is a windowed serde. Serdes.ByteArray().getClass().getName()
default.windowed.value.serde.inner Medium Default inner serializer/deserializer class for record values, implements the Serde interface Only affective if default.value.serde is a windowed serde. Serdes.ByteArray().getClass().getName()
default.timestamp.extractor Medium Default timestamp extractor class that implements the TimestampExtractor interface. See Timestamp Extractor
max.task.idle.ms Medium Maximum amount of time Streams waits to fetch data to ensure in-order processing semantics. 0 milliseconds
max.warmup.replicas Medium The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once. 2
metric.reporters Low A list of classes to use as metrics reporters. the empty list
metrics.num.samples Low The number of samples maintained to compute metrics. 2
metrics.recording.level Low The highest recording level for metrics. INFO
metrics.sample.window.ms Low The window of time a metrics sample is computed over. 30000 milliseconds
num.standby.replicas High The number of standby replicas for each task. 0
num.stream.threads Medium The number of threads to execute stream processing. 1
poll.ms Low The amount of time in milliseconds to block waiting for input. 100 milliseconds
probing.rebalance.interval.ms Low The maximum time to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up. 600000 milliseconds (10 minutes)
processing.guarantee Medium The processing mode. Can be either at_least_once (default), or exactly_once_v2 (for EOS version 2, requires Confluent Platform version 5.5.x / Kafka version 2.5.x or higher). Deprecated config options are exactly_once (for EOS version 1) and exactly_once_beta (for EOS version 2). See Processing Guarantee
replication.factor High The replication factor for changelog topics and repartition topics created by the application. If your broker cluster is on version Confluent Platform 5.4.x (Kafka 2.4.x) or newer, you can set -1 to use the broker default replication factor. 1
retries Medium The number of retries for broker requests that return a retryable error. 0
retry.backoff.ms Medium The amount of time in milliseconds, before a request is retried. This applies if the retries parameter is configured to be greater than 0. 100
rocksdb.config.setter Medium The RocksDB configuration.  
state.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. 600000 milliseconds
state.dir High Directory location for state stores. /var/lib/kafka-streams
statestore.cache.max.bytes Medium Maximum number of memory bytes to be used for record caches across all threads. 10485760 bytes
topology.optimization Low Enables/Disables topology optimization. Accepts strings none or all. none
upgrade.from Medium The version you are upgrading from during a rolling upgrade. See Upgrade From
windowstore.changelog.additional.retention.ms Low Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. 86400000 milliseconds = 1 day
window.size.ms Low Sets window size for the deserializer in order to calculate window end times. null

acceptable.recovery.lag

The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams only assigns stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assigns warmup replicas to restore state in the background for instances that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.

default.deserialization.exception.handler

The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception handler needs to return a FAIL or CONTINUE depending on the record and the exception thrown. Returning FAIL will signal that Streams should shut down and CONTINUE will signal that Streams should ignore the issue and continue processing. The default implemention class is LogAndFailExceptionHandler. These exception handlers are available:

  • LogAndContinueExceptionHandler: This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail to deserialize.
  • LogAndFailExceptionHandler. This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records.

You can also provide your own customized exception handler besides the library provided ones to meet your needs. For an example customized exception handler implementation, read the Failure and exception handling FAQ.

default.production.exception.handler

The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker such as attempting to produce a record that is too large. By default, Kafka provides and uses the DefaultProductionExceptionHandler that always fails when these exceptions occur.

Each exception handler can return a FAIL or CONTINUE depending on the record and the exception thrown. Returning FAIL will signal that Streams should shut down and CONTINUE will signal that Streams should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;

class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    public void configure(Map<String, Object> config) {}

    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}

Properties settings = new Properties();

// other various kafka streams settings, e.g. bootstrap servers, application ID, etc

settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
             IgnoreRecordTooLargeHandler.class);

default.key.serde

The default Serializer/Deserializer class for record keys, null until set by user. Serialization and deserialization in Kafka Streams happens whenever data needs to be materialized, for example:

  • Whenever data is read from or written to a Kafka topic (e.g., via the StreamsBuilder#stream() and KStream#to() methods).
  • Whenever data is read from or written to a state store.

This is discussed in more detail in Data types and serialization.

default.value.serde

The default Serializer/Deserializer class for record values, null until set by user. Serialization and deserialization in Kafka Streams happens whenever data needs to be materialized, for example:

  • Whenever data is read from or written to a Kafka topic (e.g., via the KStreamBuilder#stream() and KStream#to() methods).
  • Whenever data is read from or written to a state store.

This is discussed in more detail in Data types and serialization.

default.timestamp.extractor

A timestamp extractor pulls a timestamp from an instance of ConsumerRecord. Timestamps are used to control the progress of streams.

The default extractor is FailOnInvalidTimestamp. This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer client since Kafka version 0.10. Depending on the setting of Kafka’s server-side log.message.timestamp.type broker and message.timestamp.type topic parameters, this extractor provides you with:

  • event-time processing semantics if log.message.timestamp.type is set to CreateTime aka “producer time” (which is the default). This represents the time when a Kafka producer sent the original message. If you use Kafka’s official producer client or one of Confluent’s producer clients, the timestamp represents milliseconds since the epoch.
  • ingestion-time processing semantics if log.message.timestamp.type is set to LogAppendTime aka “broker time”. This represents the time when the Kafka broker received the original message, in milliseconds since the epoch.

The FailOnInvalidTimestamp extractor throws an exception if a record contains an invalid (i.e. negative) built-in timestamp, because Kafka Streams would not process this record but silently drop it. Invalid built-in timestamps can occur for various reasons: if for example, you consume a topic that is written to by pre-0.10 Kafka producer clients or by third-party producer clients that don’t support the new Kafka 0.10 message format yet; another situation where this may happen is after upgrading your Kafka cluster from 0.9 to 0.10, where all the data that was generated with 0.9 does not include the 0.10 message timestamps.

If you have data with invalid timestamps and want to process it, then there are two alternative extractors available. Both work on built-in timestamps, but handle invalid timestamps differently.

  • LogAndSkipOnInvalidTimestamp: This extractor logs a warn message and returns the invalid timestamp to Kafka Streams, which will not process but silently drop the record. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records with an invalid built-in timestamp in your input data.
  • UsePartitionTimeOnInvalidTimestamp. This extractor returns the record’s built-in timestamp if it is valid (i.e. not negative). If the record does not have a valid built-in timestamps, the extractor returns the previously extracted valid timestamp from a record of the same topic partition as the current record as a timestamp estimation. In case that no timestamp can be estimated, it throws an exception.

Another built-in extractor is WallclockTimestampExtractor. This extractor does not actually “extract” a timestamp from the consumed record but rather returns the current time in milliseconds from the system clock (think: System.currentTimeMillis()), which effectively means Streams will operate on the basis of the so-called processing-time of events.

You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of messages. If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or estimate a timestamp. Returning a negative timestamp will result in data loss – the corresponding record will not be processed but silently dropped. If you want to estimate a new timestamp, you can use the value provided via previousTimestamp (i.e., a Kafka Streams timestamp estimation). Here is an example of a custom TimestampExtractor implementation:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event-time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

  @Override
  public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).
    long timestamp = -1;
    final Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      timestamp = myPojo.getTimestampInMillis();
    }
    if (timestamp < 0) {
      // Invalid timestamp!  Attempt to estimate a new timestamp,
      // otherwise fall back to wall-clock time (processing-time).
      if (previousTimestamp >= 0) {
        return previousTimestamp;
      } else {
        return System.currentTimeMillis();
      }
    }
    return timestamp;
  }

}

You would then define the custom timestamp extractor in your Streams configuration as follows:

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);

max.task.idle.ms

Controls how long Kafka Streams waits to fetch data to ensure in-order processing semantics.

The max.task.idle.ms setting controls whether joins and merges may produce out-of-order results. The config value is the maximum amount of time, in milliseconds, that a stream task stays idle when it’s fully caught up on some, but not all, input partitions when waiting for producers to send additional records. This idle time avoids potential out-of-order record processing across multiple input streams.

The default is 0. If set to the default, the stream doesn’t wait for producers to send more records. Instead, it waits to fetch data that’s already present on the brokers, which means that for records already present on the brokers, Kafka Streams processes them in timestamp order.

Set to -1 to disable idling and process any locally available data, even though doing so may produce out-of-order processing.

When processing a task that has multiple input partitions, like in a join or merge, Kafka Streams must choose which partition to process the next record from. When all input partitions have locally buffered data, Kafka Streams chooses the partition with the next record that has the lowest timestamp. This decision collates the input partitions in timestamp order, which is desirable in a streaming join or merge.

But when Kafka Streams doesn’t have any data buffered locally for one of the partitions, it can’t determine whether the next record for that partition has a lower or higher timestamp than the remaining partitions’ records.

There are two cases to consider: either there is data in the partition on the broker that Kafka Streams hasn’t fetched yet, or Kafka Streams is fully caught up with that partition, but the producers haven’t produced any new records since Kafka Streams polled the last batch.

The default value of 0 causes Kafka Streams to delay processing a task when it detects that it has no locally buffered data for a partition, but there is data available on the brokers, which means that there’s an empty partition in the local buffer, but Kafka Streams has a non-zero lag for that partition. But as soon as Kafka Streams catches up to the broker, it continues processing, even if there’s no data in one of the partitions, so it doesn’t wait for new data to be produced. This default is designed to sacrifice some throughput in exchange for correct join semantics.

Setting max.task.idle.ms to any value greater than zero specifies the number of additional milliseconds that Kafka Streams waits if it has a caught-up but empty partition. A value greater than zero defines the time to wait for new data to be produced to the input partitions to ensure in-order processing of data, in the case of a slow producer.

Setting max.task.idle.ms to -1 indicates that Kafka Streams never waits to buffer empty partitions before choosing the next record by timestamp, which achieves maximum throughput at the expense of introducing out-of-order processing.

max.warmup.replicas

The maximum number of warmup replicas. Warmup replicas are extra standbys beyond the configured num.standbys, that may be assigned to keep the task available on one instance while it’s warming up on another instance that it has been reassigned to. Used to throttle extra broker traffic and cluster state that can be used for high availability. Increasing this enables Kafka Streams to warm up more tasks at once, speeding up the time for the reassigned warmups to restore sufficient state to be transitioned to active tasks. Must be at least 1.

min.insync.replicas

When a producer sets acks to “all” (or “-1”), the min.insync.replicas configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. For more information, see min.insync.replicas.

num.standby.replicas

The number of standby replicas. Standby replicas are shadow copies of local state stores. Kafka Streams attempts to create the specified number of replicas per store and keep them up to date as long as there are enough instances running. Standby replicas are used to minimize the latency of task failover. A task that was previously running on a failed instance is preferred to restart on an instance that has standby replicas so that the local state store restoration process from its changelog can be minimized. Details about how Kafka Streams makes use of the standby replicas to minimize the cost of resuming tasks on failover can be found in the State section.

Recommendation:

Increase the number of standbys to 1 to get instant fail-over (high-availability). Increasing the number of standbys requires more client-side storage space. For example, with 1 standby, 2x space is required.

If you configure n standby replicas, you need to provision n+1 KafkaStreams instances.

num.stream.threads

This specifies the number of stream threads in an instance of the Kafka Streams application. The stream processing code runs in these threads. For more info about the Kafka Streams threading model, see Threading model.

probing.rebalance.interval.ms

The maximum time to wait before triggering a rebalance to probe for warmup replicas that have restored enough to be considered caught up. Kafka Streams assigns stateful active tasks only to instances that are caught up and within the acceptable.recovery.lag, if any exist. Probing rebalances are used to query the latest total lag of warmup replicas and transition them to active tasks if ready. They will continue to be triggered as long as there are warmup tasks, and until the assignment is balanced. Must be at least 1 minute.

processing.guarantee

The processing guarantee that should be used. Possible values are at_least_once (default) and exactly_once_v2. Using exactly_once_v2 requires Confluent Platform version 5.5.x / Kafka version 2.5.x or newer. Note that if exactly-once processing is enabled, the default for parameter commit.interval.ms changes to 100ms. Additionally, consumers are configured with isolation.level="read_committed" and producers are configured with enable.idempotence=true by default. Note that exactly_once_v2 processing requires a cluster of at least three brokers by default, which is the recommended setting for production. For development, you can change this by adjusting the broker settings in both transaction.state.log.replication.factor and transaction.state.log.min.isr to the number of brokers you want to use. To learn more, see Processing Guarantees.

replication.factor

This specifies the replication factor of internal topics that Kafka Streams creates when local states are used or a stream is repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics.

Recommendation:
Increase the replication factor to 3 to ensure that the internal Kafka Streams topic can tolerate up to 2 broker failures. Note that you will require more storage space as well (3 times more with the replication factor of 3).

rocksdb.config.setter

The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default configuration for RocksDB, implement RocksDBConfigSetter and provide your custom class via rocksdb.config.setter.

Here is an example that adjusts the memory size consumed by RocksDB.

public static class CustomRocksDBConfig implements RocksDBConfigSetter {

  // This object should be a member variable so it can be closed in RocksDBConfigSetter#close.
  private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);

  @Override
  public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
    // See #1 below.
    BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
    tableConfig.setBlockCache(cache);
    // See #2 below.
    tableConfig.setBlockSize(16 * 1024L);
    // See #3 below.
    tableConfig.setCacheIndexAndFilterBlocks(true);
    options.setTableFormatConfig(tableConfig);
    // See #4 below.
    options.setMaxWriteBufferNumber(2);
  }

  @Override
  public void close(final String storeName, final Options options) {
    // See #5 below.
    cache.close();
  }
}

Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
Notes for example:
  1. BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); Get a reference to the existing TableFormatConfig rather than create a new one so you don’t accidentally overwrite defaults such as the BloomFilter, an important optimization.
  2. tableConfig.setBlockSize(16 * 1024L); Modify the default per these instructions from the RocksDB GitHub (indexes and filter blocks).
  3. tableConfig.setCacheIndexAndFilterBlocks(true); Do not let the index and filter blocks grow unbounded. For more information, see the RocksDB GitHub (caching index and filter blocks).
  4. options.setMaxWriteBufferNumber(2); See the advanced options in the RocksDB GitHub.
  5. cache.close(); To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See RocksJava docs for more details.

state.dir

The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated with the application are created under this subdirectory.

topology.optimization

Indicates that Kafka Streams should apply topology optimizations. The optimizations are currently all or none and disabled by default. These optimizations include moving and reducing repartition topics, and reusing the source topic as the changelog for source KTables. We recommend enabling this option.

Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to StreamsConfig.OPTIMIZE, you must to pass your configuration properties when building your topology by using the overloaded StreamsBuilder.build(Properties) method.

KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)

upgrade.from

The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide. You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.

Kafka consumers, producer, and admin client configuration parameters

You can specify parameters for the Kafka consumers, producers, and admin client that are used internally. The consumer, producer, and admin client settings are defined by specifying parameters in a StreamsConfig instance.

In this example, the Kafka consumer session timeout is configured to be 60000 milliseconds in the Streams settings:

Properties streamsSettings = new Properties();
// Example of a "normal" setting for Kafka Streams
streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings of your Streams application
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);

Naming

Some consumer, producer, and admin client configuration parameters use the same parameter name. For example, send.buffer.bytes and receive.buffer.bytes are used to configure TCP buffers; request.timeout.ms and retry.backoff.ms control retries for client request. You can avoid duplicate names by prefix parameter names with consumer., producer., or admin. (e.g., consumer.send.buffer.bytes or producer.send.buffer.bytes).

Properties streamsSettings = new Properties();
// same value for consumer and producer
streamsSettings.put("PARAMETER_NAME", "value");
// different values for consumer, producer, and admin client
streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");

Parameter names for the main consumer, restore consumer, and global consumer are prepended with the following prefixes.

  • main.consumer. – for the main consumer, which is the default consumer of a stream source.
  • restore.consumer. – for the restore consumer, which manages state store recovery.
  • global.consumer. – for the global consumer, which is used in global KTable construction.

Setting values for parameters with these prefixes overrides the values set for consumer parameters. For example, the following configuration overrides the consumer.max.poll.records value.

consumer.max.poll.records = 5
main.consumer.max.poll.records = 100
restore.consumer.max.poll.records = 50

During initialization, these settings have the following effect on consumers.

Consumer Type max.poll.records value | Reason
Consumer 5 Default value of 5 for all consumer types.
Main Consumer 100 Target assignment with main.consumer. prefix.
Restore Consumer 50 The restore.consumer. prefix overrides the default value.
Global Consumer 5 No global.consumer prefix, so the default value is used.

For example, if you want to configure only the restore consumer, without changing the settings of other consumers, you can use restore.consumer. to set the configuration.

Properties streamsSettings = new Properties();
// same config value for all consumer types
streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
// while main consumer and global consumer stay with general-consumer-value
streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");

Internal topic parameters

To configure the internal repartition/changelog topics, you can use the topic. prefix, followed by any of the standard topic configuration properties.

Properties streamsSettings = new Properties();
// Override default for both changelog and repartition topics
streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");

Default Values

Kafka Streams uses different default values for some of the underlying client configs, which are summarized below. For detailed descriptions of these configs, see Kafka Producer Configuration Reference for Confluent Platform and Kafka Consumer Configuration Reference for Confluent Platform.

Parameter Name Corresponding Client Streams Default
auto.offset.reset Consumer earliest
linger.ms Producer 100
max.poll.records Consumer 1000
transaction.timeout.ms Producer 10000

Parameters controlled by Kafka Streams

Kafka Streams assigns the following configuration parameters. If you try to change allow.auto.create.topics, your value is ignored and setting it has no effect in a Kafka Streams application. You can set the other parameters. Kafka Streams sets them to different default values than a plain KafkaConsumer.

Kafka Streams uses the client.id parameter to compute derived client IDs for internal clients. If you don’t set client.id, Kafka Streams sets it to <application.id>-<random-UUID>.

There are some special considerations when Kafka Streams assigns values to configuration parameters.

  • There is only one global consumer per Kafka Streams instance.
  • There is one restore consumer per thread.
  • Producer client.id: the value depends on the configured processing guaranteee.
    • EOS disabled or EOS version 2 enabled: There is only one producer per thread.
    • EOS version 1 enabled: There is only one producer per task.
  • partition.assignment.strategy: the assignment strategy parameter affects only the main consumer. The global and restore consumers use “partition assignment” instead of “topic subscription”, and they don’t form a consumer group, so their StreamsPartitionAssignor is never used.
Parameter Name Corresponding Client Value Assigned by Kafka Streams
allow.auto.create.topics Consumer false
auto.offset.reset Global Consumer none
auto.offset.reset Restore Consumer none
client.id Admin <client.id>-admin
client.id Consumer <client.id>-StreamThread-<threadIndex>-consumer
client.id Global Consumer <client.id>-global-consumer
client.id Restore Consumer <client.id>-StreamThread-<threadIndex>-restore-consumer
client.id Producer
  • EOS v1 case: <client.id>-StreamThread-<threadIndex>-<taskId>-producer
  • Non-EOS and EOS v2 case: <client.id>-StreamThread-<threadIndex>-producer
enable.auto.commit Consumer false
group.id Consumer Equal to application.id.
group.id Global Consumer null
group.id Restore Consumer null
group.instance.id Consumer User-provided setting with the -<threadIndex> suffix appended.
partition.assignment.strategy Consumer Always set to StreamsPartitionAssignor.

enable.auto.commit

The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config value to false. Consumers will only commit explicitly via commitSync calls when the Kafka Streams library or a user decides to commit the current processing state.

Note

This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.