.. title:: Configure a Kafka Streams Application for Confluent Platform .. meta:: :description: Learn how to set properties for configuring your Kafka Streams applications. .. _streams_developer-guide_configuration: Configure a |kstreams| Application for |cp| ------------------------------------------- |ak-tm| and |kstreams| configuration options must be configured before using Streams. Configure |kstreams| by specifying parameters in a ``java.util.Properties`` instance. #. Create a ``java.util.Properties`` instance. #. Set the :ref:`parameters `. For example: .. sourcecode:: java import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; Properties props = new Properties(); // Set a few key parameters props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); // Any further settings props.put(... , ...); .. _streams_developer-guide_required-configs: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Configuration parameter reference ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ This section contains the most common Streams configuration parameters. For a full reference, see the :ref:`Streams ` and :ref:`Client ` Javadocs. Required configuration parameters ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Here are the required Streams configuration parameters. .. rst-class:: non-scrolling-table +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------+ | Parameter Name | Importance | Description | Default Value | +==================================================+============+===================================================================================================================+==============================================================================+ | application.id | Required | An identifier for the stream processing application. Must be unique within the |ak| cluster. | None | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------+ | bootstrap.servers | Required | A list of host/port pairs to use for establishing the initial connection to the |ak| cluster. | None | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------+ -------------- application.id -------------- (Required) The application ID. Each stream processing application must have a unique ID. The same ID must be given to all instances of the application. It is recommended to use only alphanumeric characters, ``.`` (dot), ``-`` (hyphen), and ``_`` (underscore). Examples: ``"hello_world"``, ``"hello_world-v1.0.0"`` This ID is used in the following places to isolate resources used by the application from others: - As the default |ak| consumer and producer ``client.id`` prefix - As the |ak| consumer ``group.id`` for coordination - As the name of the subdirectory in the state directory (cf. ``state.dir``) - As the prefix of internal |ak| topic names Tip: When an application is updated, the ``application.id`` should be changed unless you want to reuse the existing data in internal topics and state stores. For example, you could embed the version information within ``application.id``, as ``my-app-v1.0.0`` and ``my-app-v1.0.2``. ----------------- bootstrap.servers ----------------- (Required) The |ak| bootstrap servers. This is the same :ref:`setting ` that is used by the underlying producer and consumer clients to connect to the |ak| cluster. Example: ``"kafka-broker1:9092,kafka-broker2:9092"``. Tip: |kstreams| applications can only communicate with a single |ak| cluster specified by this config value. Recommended configuration parameters for resiliency ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ There are several |ak| and |kstreams| configuration options that need to be configured explicitly for resiliency in face of broker failures: .. rst-class:: non-scrolling-table +--------------------------------+----------------------------+--------------------------------------+-------------------------+ | Parameter Name | Corresponding Client | Default value | Consider setting to | +================================+============================+======================================+=========================+ | min.insync.replicas | Broker | ``1`` | ``2`` | +--------------------------------+----------------------------+--------------------------------------+-------------------------+ | num.standby.replicas | Streams | ``0`` | ``1`` | +--------------------------------+----------------------------+--------------------------------------+-------------------------+ | state.dir | Streams | ``/${java.io.tmpdir}/kafka-streams`` | a persistent volume | +--------------------------------+----------------------------+--------------------------------------+-------------------------+ | replication.factor (for | Streams | ``-1`` | ``3`` | | broker version 2.3 and older) | | | | +--------------------------------+----------------------------+--------------------------------------+-------------------------+ | acks | Producer | ``acks=all`` | ``acks=all`` [1]_ | +--------------------------------+----------------------------+--------------------------------------+-------------------------+ Increasing the replication factor to 3 ensures that the internal |kstreams| topic can tolerate up to 2 broker failures. The tradeoff from moving to the default values to the recommended ones is that some performance and more storage space (3x with the replication factor of 3) are sacrificed for more resiliency. .. [1] The default value for ``acks`` was changed from ``1`` to ``all`` in |kstreams| version 3.0 (|cp| 7.0). Changing the ``acks`` setting to ``all`` guarantees that a record won't be lost as long as one replica is alive. For more information, see `KIP-679: Producer will enable the strongest delivery guarantee by default `__ Define these settings by using the ``StreamsConfig`` class: .. sourcecode:: java Properties streamsSettings = new Properties(); // for broker version 2.3 or older //streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2); streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all"); streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); ---- acks ---- The number of acknowledgments that the leader must have received before considering a request complete. This controls the durability of records that are sent. The possible values are: - ``acks=0``: The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the ``retries`` configuration will not take effect (as the client won't generally know of any failures). The offset returned for each record will always be set to ``-1``. - ``acks=1``: The leader writes the record to its local log and responds without waiting for full acknowledgment from all followers. If the leader immediately fails after acknowledging the record, but before the followers have replicated it, then the record will be lost. - ``acks=all``: The leader waits for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost if there is at least one in-sync replica alive. This is the strongest available guarantee. For more information, see the :ref:`Kafka Producer documentation `. .. _configuration-streams-replication-factor: ------------------ replication.factor ------------------ See the :ref:`description here `. ------------------- min.insync.replicas ------------------- In addition to setting the ``min.insync.replicas`` parameter in the broker configuration, ``min.insync.replicas`` must be set for the internal topics of your |kstreams| application, as shown in the previous code example. See the :ref:`description here `. -------------------- num.standby.replicas -------------------- See the :ref:`description here `. --------- state.dir --------- Although state can be re-created from |ak|, recreating state can require significant resources and time. The ``/tmp/`` directory could be configured as ``tmpfs``, which is essentially memory. Also, many organizations mount tmp directories with the ``noexec`` option. For these reasons, |confluent| recommends relying on a persistent volume for a production environment. .. _streams_developer-guide_optional-configs: Optional configuration parameters ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Here are the optional :ref:`Streams configuration parameters `, sorted by level of importance: - High: These parameters can have a significant impact on performance. Take care when deciding the values of these parameters. - Medium: These parameters can have some impact on performance. Your specific environment will determine how much tuning effort should be focused on these parameters. - Low: These parameters have a less general or less significant impact on performance. .. rst-class:: non-scrolling-table +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | Parameter Name | Importance | Description | Default Value | +==================================================+============+===================================================================================================================+========================================================================================+ | acceptable.recovery.lag | Medium | The maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready | 10,000 | | | | for the active task. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | application.server | Low | A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of | the empty string | | | | state stores within a single |kstreams| application. The value of this must be different for each instance | | | | | of the application. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | buffered.records.per.partition | Low | Deprecated in |cp| 7.4. The maximum number of records to buffer per partition. | 1000 | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | cache.max.bytes.buffering | Medium | Deprecated in |cp| 7.4. Use ``statestore.cache.max.bytes`` instead. | 10485760 bytes | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | client.id | Medium | An ID string to pass to the server when making requests. | the empty string | | | | (This setting is passed to the consumer/producer clients used internally by |kstreams|.) | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | commit.interval.ms | Low | The frequency with which to save the position (offsets in source topics) of tasks. | 30000 ms (``at_least_once``) / 100 ms (``exactly_once_v2``) | | | | | | | | | - For at-least-once processing, committing means saving the position (offsets) of the processor. | | | | | - For exactly-once processing, it means to commit the transaction, which includes saving the position. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | connections.max.idle.ms | Low | The number of milliseconds to wait before closing idle connections. | 540000 ms (9 minutes) | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | default.client.supplier | Low | Client supplier class that implements the ``org.apache.kafka.streams.KafkaClientSupplier`` interface. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | default.dsl.store | Low | The default state store type used by DSL operators. | "rocksDB" | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | default.deserialization.exception.handler | Medium | Exception handling class that implements the ``DeserializationExceptionHandler`` interface. | See :ref:`streams_developer-guide_deh` | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | default.production.exception.handler | Medium | Exception handling class that implements the ``ProductionExceptionHandler`` interface. | See :ref:`streams_def-prod-exc-hand` | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | default.key.serde | Medium | Default serializer/deserializer class for record keys, implements the ``Serde`` interface (see also value.serde). | ``null`` | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | default.value.serde | Medium | Default serializer/deserializer class for record values, implements the ``Serde`` interface (see also key.serde). | ``null`` | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | default.windowed.key.serde.inner | Medium | Default inner serializer/deserializer class for record keys, implements the ``Serde`` interface. | ``Serdes.ByteArray().getClass().getName()`` | | | | Only affective if ``default.key.serde`` is a windowed serde. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | default.windowed.value.serde.inner | Medium | Default inner serializer/deserializer class for record values, implements the ``Serde`` interface | ``Serdes.ByteArray().getClass().getName()`` | | | | Only affective if ``default.value.serde`` is a windowed serde. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | default.timestamp.extractor | Medium | Default timestamp extractor class that implements the ``TimestampExtractor`` interface. | See :ref:`Timestamp Extractor ` | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | max.task.idle.ms | Medium | Maximum amount of time Streams waits to fetch data to ensure in-order processing semantics. | 0 milliseconds | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | max.warmup.replicas | Medium | The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned | 2 | | | | at once. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | metadata.max.age.ms | Low | The period of time in milliseconds after which a refresh of metadata is forced. | 300000 ms (5 minutes) | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | metric.reporters | Low | A list of classes to use as metrics reporters. | the empty list | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | metrics.num.samples | Low | The number of samples maintained to compute metrics. | 2 | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | metrics.recording.level | Low | The highest recording level for metrics. | ``INFO`` | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | metrics.sample.window.ms | Low | The window of time a metrics sample is computed over. | 30000 milliseconds | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | num.standby.replicas | High | The number of standby replicas for each task. | 0 | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | num.stream.threads | Medium | The number of threads to execute stream processing. | 1 | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | poll.ms | Low | The amount of time in milliseconds to block waiting for input. | 100 milliseconds | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | probing.rebalance.interval.ms | Low | The maximum time to wait before triggering a rebalance to probe for warmup replicas that have sufficiently | 600000 milliseconds (10 minutes) | | | | caught up. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | processing.guarantee | Medium | The processing mode. Can be either ``at_least_once`` (default), or ``exactly_once_v2`` (for EOS version 2, | See :ref:`Processing Guarantee ` | | | | requires |cp| version 5.5.x / |ak| version 2.5.x or higher). Deprecated config options are ``exactly_once`` | | | | | (for EOS version 1) and ``exactly_once_beta`` (for EOS version 2). | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | replication.factor | High | The replication factor for changelog topics and repartition topics created by the application. | 1 | | | | If your broker cluster is on version |cp| 5.4.x (|ak| 2.4.x) or newer, | | | | | you can set -1 to use the broker default replication factor. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | retries | Medium | The number of retries for broker requests that return a retryable error. | 0 | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | retry.backoff.ms | Medium | The amount of time in milliseconds, before a request is retried. | 100 | | | | This applies if the ``retries`` parameter is configured to be greater than 0. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | rocksdb.config.setter | Medium | The RocksDB configuration. | | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | state.cleanup.delay.ms | Low | The amount of time in milliseconds to wait before deleting state when a partition has migrated. | 600000 milliseconds | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | state.dir | High | Directory location for state stores. | ``/var/lib/kafka-streams`` | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | statestore.cache.max.bytes | Medium | Maximum number of memory bytes to be used for record caches across all threads. | 10485760 bytes | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | topology.optimization | Low | Enables/Disables topology optimization. Accepts strings ``none`` or ``all``. | ``none`` | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | upgrade.from | Medium | The version you are upgrading from during a rolling upgrade. | See :ref:`Upgrade From ` | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ | windowstore.changelog.additional.retention.ms | Low | Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. | 86400000 milliseconds = 1 day | +--------------------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+ .. _streams_developer-guide_acceptable-recovery-lag: ----------------------- acceptable.recovery.lag ----------------------- The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams only assigns stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assigns warmup replicas to restore state in the background for instances that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0. ------------------ application.server ------------------ A host:port pair pointing to a user-defined endpoint that can be used for state store discovery and interactive queries on the current ``KafkaStreams`` instance. ------------------------ built.in.metrics.version ------------------------ Version of the built-in metrics to use. The default is "latest". ------------------ commit.interval.ms ------------------ The frequency in milliseconds with which to commit processing progress. For at-least-once processing, committing means to save the position (the offsets) of the processor. For exactly-once processing, it means to commit the transaction, which includes saving the position and making the committed data in the output topic visible to consumers with isolation level ``read_committed``. If ``processing.guarantee`` is set to ``exactly_once_v2`` or ``exactly_once``, the default value is 100, otherwise the default value is 30000. ----------------------- connections.max.idle.ms ----------------------- The number of milliseconds to wait before closing idle connections. ----------------------- default.client.supplier ----------------------- Client supplier class that implements the ``org.apache.kafka.streams.KafkaClientSupplier`` interface. The default is ``org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier``. ----------------- default.dsl.store ----------------- The default state store type used by DSL operators. The default is "rocksDB". .. _streams_developer-guide_deh: ----------------------------------------- default.deserialization.exception.handler ----------------------------------------- The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception handler needs to return a ``FAIL`` or ``CONTINUE`` depending on the record and the exception thrown. Returning ``FAIL`` will signal that Streams should shut down and ``CONTINUE`` will signal that Streams should ignore the issue and continue processing. The default implemention class is :platform:`LogAndFailExceptionHandler|streams/javadocs/javadoc/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html`. These exception handlers are available: * :platform:`LogAndContinueExceptionHandler|streams/javadocs/javadoc/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.html`: This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records. This log-and-skip strategy allows |kstreams| to make progress instead of failing if there are records that fail to deserialize. * :platform:`LogAndFailExceptionHandler|streams/javadocs/javadoc/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html`. This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records. You can also provide your own customized exception handler besides the library provided ones to meet your needs. For an example customized exception handler implementation, read the :ref:`Failure and exception handling FAQ `. .. _streams_def-prod-exc-hand: ------------------------------------ default.production.exception.handler ------------------------------------ The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker such as attempting to produce a record that is too large. By default, |ak| provides and uses the :platform:`DefaultProductionExceptionHandler|streams/javadocs/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html` that always fails when these exceptions occur. Each exception handler can return a ``FAIL`` or ``CONTINUE`` depending on the record and the exception thrown. Returning ``FAIL`` will signal that Streams should shut down and ``CONTINUE`` will signal that Streams should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following: .. code:: java import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse; class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler { public void configure(Map config) {} public ProductionExceptionHandlerResponse handle(final ProducerRecord record, final Exception exception) { if (exception instanceof RecordTooLargeException) { return ProductionExceptionHandlerResponse.CONTINUE; } else { return ProductionExceptionHandlerResponse.FAIL; } } } Properties settings = new Properties(); // other various kafka streams settings, e.g. bootstrap servers, application ID, etc settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, IgnoreRecordTooLargeHandler.class); .. _streams_developer-guide_deserialization: ----------------- default.key.serde ----------------- The default Serializer/Deserializer class for record keys, null until set by user. Serialization and deserialization in |kstreams| happens whenever data needs to be materialized, for example: - Whenever data is read from or written to a *Kafka topic* (e.g., via the ``StreamsBuilder#stream()`` and ``KStream#to()`` methods). - Whenever data is read from or written to a *state store*. This is discussed in more detail in :ref:`Data types and serialization `. ---------------------------- default.list.key.serde.inner ---------------------------- Default inner class of list serde for key that implements the ``org.apache.kafka.common.serialization.Serde`` interface. This configuration is read only if the ``default.key.serde`` configuration is set to ``org.apache.kafka.common.serialization.Serdes.ListSerde``. --------------------------- default.list.key.serde.type --------------------------- Default class for key that implements the ``java.util.List`` interface. This configuration is read only if the ``default.key.serde`` configuration is set to ``org.apache.kafka.common.serialization.Serdes.ListSerde``. When a list serde class is used, you must set the inner serde class that implements the ``org.apache.kafka.common.serialization.Serde`` Interface by using ``default.list.key.serde.inner``. ------------------------------ default.list.value.serde.inner ------------------------------ Default inner class of list serde for value that implements the ``org.apache.kafka.common.serialization.Serde`` interface. This configuration is read only if the ``default.value.serde`` configuration is set to ``org.apache.kafka.common.serialization.Serdes.ListSerde``. ----------------------------- default.list.value.serde.type ----------------------------- Default class for value that implements the ``java.util.List`` interface. This configuration is read only if the ``default.value.serde`` configuration is set to ``org.apache.kafka.common.serialization.Serdes.ListSerde``. When a list serde class is used, you must set the inner serde class that implements the ``org.apache.kafka.common.serialization.Serde`` Interface by using ``default.list.value.serde.inner``. ------------------- default.value.serde ------------------- The default Serializer/Deserializer class for record values. Its value is ``null`` until you set it. Serialization and deserialization in |kstreams| happens whenever data needs to be materialized, for example: - Whenever data is read from or written to a *Kafka topic* (e.g., via the ``KStreamBuilder#stream()`` and ``KStream#to()`` methods). - Whenever data is read from or written to a *state store*. This is discussed in more detail in :ref:`Data types and serialization `. .. _streams_developer-guide_timestamp-extractor: --------------------------- default.timestamp.extractor --------------------------- A timestamp extractor pulls a timestamp from an instance of :platform:`ConsumerRecord|clients/javadocs/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html`. Timestamps are used to control the progress of streams. The default extractor is :platform:`FailOnInvalidTimestamp|streams/javadocs/javadoc/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.html`. This extractor retrieves built-in timestamps that are automatically embedded into |ak| messages by the |ak| producer client since `Kafka version 0.10 `__. Depending on the setting of |ak|'s server-side ``log.message.timestamp.type`` broker and ``message.timestamp.type`` topic parameters, this extractor provides you with: * **event-time** processing semantics if ``log.message.timestamp.type`` is set to ``CreateTime`` aka "producer time" (which is the default). This represents the time when a |ak| producer sent the original message. If you use |ak|'s official producer client or one of Confluent's producer clients, the timestamp represents milliseconds since the epoch. * **ingestion-time** processing semantics if ``log.message.timestamp.type`` is set to ``LogAppendTime`` aka "broker time". This represents the time when the |ak| broker received the original message, in milliseconds since the epoch. The ``FailOnInvalidTimestamp`` extractor throws an exception if a record contains an invalid (i.e. negative) built-in timestamp, because |kstreams| would not process this record but silently drop it. Invalid built-in timestamps can occur for various reasons: if for example, you consume a topic that is written to by pre-0.10 |ak| producer clients or by third-party producer clients that don't support the new |ak| 0.10 message format yet; another situation where this may happen is after upgrading your |ak| cluster from ``0.9`` to ``0.10``, where all the data that was generated with ``0.9`` does not include the ``0.10`` message timestamps. If you have data with invalid timestamps and want to process it, then there are two alternative extractors available. Both work on built-in timestamps, but handle invalid timestamps differently. * :platform:`LogAndSkipOnInvalidTimestamp|streams/javadocs/javadoc/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.html`: This extractor logs a warn message and returns the invalid timestamp to |kstreams|, which will not process but silently drop the record. This log-and-skip strategy allows |kstreams| to make progress instead of failing if there are records with an invalid built-in timestamp in your input data. * :platform:`UsePartitionTimeOnInvalidTimestamp|streams/javadocs/javadoc/org/apache/kafka/streams/processor/UsePartitionTimeOnInvalidTimestamp.html`. This extractor returns the record's built-in timestamp if it is valid (i.e. not negative). If the record does not have a valid built-in timestamps, the extractor returns the previously extracted valid timestamp from a record of the same topic partition as the current record as a timestamp estimation. In case that no timestamp can be estimated, it throws an exception. Another built-in extractor is :platform:`WallclockTimestampExtractor|streams/javadocs/javadoc/org/apache/kafka/streams/processor/WallclockTimestampExtractor.html`. This extractor does not actually "extract" a timestamp from the consumed record but rather returns the current time in milliseconds from the system clock (think: ``System.currentTimeMillis()``), which effectively means Streams will operate on the basis of the so-called **processing-time** of events. You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of messages. If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or estimate a timestamp. Returning a negative timestamp will result in data loss -- the corresponding record will not be processed but silently dropped. If you want to estimate a new timestamp, you can use the value provided via ``previousTimestamp`` (i.e., a |kstreams| timestamp estimation). Here is an example of a custom ``TimestampExtractor`` implementation: .. sourcecode:: java import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.streams.processor.TimestampExtractor; // Extracts the embedded timestamp of a record (giving you "event-time" semantics). public class MyEventTimeExtractor implements TimestampExtractor { @Override public long extract(final ConsumerRecord record, final long previousTimestamp) { // `Foo` is your own custom class, which we assume has a method that returns // the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC). long timestamp = -1; final Foo myPojo = (Foo) record.value(); if (myPojo != null) { timestamp = myPojo.getTimestampInMillis(); } if (timestamp < 0) { // Invalid timestamp! Attempt to estimate a new timestamp, // otherwise fall back to wall-clock time (processing-time). if (previousTimestamp >= 0) { return previousTimestamp; } else { return System.currentTimeMillis(); } } return timestamp; } } You would then define the custom timestamp extractor in your Streams configuration as follows: .. sourcecode:: java import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class); .. _streams_developer-guide_max-idle: ---------------- max.task.idle.ms ---------------- Controls how long |kstreams| waits to fetch data to ensure in-order processing semantics. The ``max.task.idle.ms`` setting controls whether joins and merges may produce out-of-order results. The config value is the maximum amount of time, in milliseconds, that a stream task stays idle when it's fully caught up on some, but not all, input partitions when waiting for producers to send additional records. This idle time avoids potential out-of-order record processing across multiple input streams. The default is 0. If set to the default, the stream doesn't wait for producers to send more records. Instead, it waits to fetch data that's already present on the brokers, which means that for records already present on the brokers, |kstreams| processes them in timestamp order. Set to -1 to disable idling and process any locally available data, even though doing so may produce out-of-order processing. When processing a task that has multiple input partitions, like in a join or merge, |kstreams| must choose which partition to process the next record from. When all input partitions have locally buffered data, |kstreams| chooses the partition with the next record that has the lowest timestamp. This decision collates the input partitions in timestamp order, which is desirable in a streaming join or merge. But when |kstreams| doesn't have any data buffered locally for one of the partitions, it can't determine whether the next record for that partition has a lower or higher timestamp than the remaining partitions' records. There are two cases to consider: either there is data in the partition on the broker that |kstreams| hasn't fetched yet, or |kstreams| is fully caught up with that partition, but the producers haven't produced any new records since |kstreams| polled the last batch. The default value of 0 causes |kstreams| to delay processing a task when it detects that it has no locally buffered data for a partition, but there is data available on the brokers, which means that there's an empty partition in the local buffer, but |kstreams| has a non-zero lag for that partition. But as soon as |kstreams| catches up to the broker, it continues processing, even if there's no data in one of the partitions, so it doesn't wait for new data to be produced. This default is designed to sacrifice some throughput in exchange for correct join semantics. Setting ``max.task.idle.ms`` to any value greater than zero specifies the number of additional milliseconds that |kstreams| waits if it has a caught-up but empty partition. A value greater than zero defines the time to wait for new data to be produced to the input partitions to ensure in-order processing of data, in the case of a slow producer. Setting ``max.task.idle.ms`` to -1 indicates that |kstreams| never waits to buffer empty partitions before choosing the next record by timestamp, which achieves maximum throughput at the expense of introducing out-of-order processing. .. _streams_developer-guide_default-windowed-key-serde-inner: -------------------------------- default.windowed.key.serde.inner -------------------------------- The default Serializer/Deserializer class for the inner class of windowed keys. Serialization and deserialization in |kstreams| happens whenever data needs to be materialized, for example: - Whenever data is read from or written to a |ak| topic, for example, via the ``StreamsBuilder#stream()`` and ``KStream#to()`` methods. - Whenever data is read from or written to a state store. For more information, see :ref:`streams_developer-guide_serdes`. .. _streams_developer-guide_default-windowed-value-serde-inner: ---------------------------------- default.windowed.value.serde.inner ---------------------------------- The default Serializer/Deserializer class for the inner class of windowed values. Serialization and deserialization in |kstreams| happens whenever data needs to be materialized, for example: - Whenever data is read from or written to a |ak| topic, for example, via the ``StreamsBuilder#stream()`` and ``KStream#to()`` methods. - Whenever data is read from or written to a state store. For more information, see :ref:`streams_developer-guide_serdes`. .. _streams_developer-guide_max-warmup-replicas: ------------------- max.warmup.replicas ------------------- The maximum number of warmup replicas. Warmup replicas are extra standbys beyond the configured ``num.standbys``, that may be assigned to keep the task available on one instance while it's warming up on another instance that it has been reassigned to. Used to throttle extra broker traffic and cluster state that can be used for high availability. Increasing this enables |kstreams| to warm up more tasks at once, speeding up the time for the reassigned warmups to restore sufficient state to be transitioned to active tasks. Must be at least 1. ------------------- metadata.max.age.ms ------------------- The period of time in milliseconds after which a refresh of metadata is forced, even in the absence of partition leadership changes, to proactively discover any new brokers or partitions. The default is 300000 ms (5 minutes). ---------------- metric.reporters ---------------- A list of classes to use as metrics reporters. Implementing the ``org.apache.kafka.common.metrics.MetricsReporter`` interface enables plugging in classes that are notified of new metric creation. The JmxReporter is always included to register JMX statistics. ------------------- metrics.num.samples ------------------- The number of samples maintained to compute metrics. Valid values are integers starting with 1. The default is 2. ----------------------- metrics.recording.level ----------------------- The highest recording level for metrics. Valid values are "INFO", "DEBUG", and "TRACE". The default is "INFO". ------------------------ metrics.sample.window.ms ------------------------ The window of time a metrics sample is computed over. The default is 30000 ms (30 seconds). .. _streams_developer-guide_min-insync-replicas: ------------------- min.insync.replicas ------------------- When a producer sets ``acks`` to "all" (or "-1"), the ``min.insync.replicas`` configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. For more information, see `min.insync.replicas `__. .. _streams_developer-guide_standby-replicas: -------------------- num.standby.replicas -------------------- The number of standby replicas. Standby replicas are shadow copies of local state stores. |kstreams| attempts to create the specified number of replicas per store and keep them up to date as long as there are enough instances running. Standby replicas are used to minimize the latency of task failover. A task that was previously running on a failed instance is preferred to restart on an instance that has standby replicas so that the local state store restoration process from its changelog can be minimized. Details about how |kstreams| makes use of the standby replicas to minimize the cost of resuming tasks on failover can be found in the :ref:`State ` section. Recommendation: Increase the number of standbys to ``1`` to get instant fail-over (high-availability). Increasing the number of standbys requires more client-side storage space. For example, with 1 standby, 2x space is required. If you configure `n` standby replicas, you need to provision `n+1` ``KafkaStreams`` instances. ------------------ num.stream.threads ------------------ This specifies the number of stream threads in an instance of the |kstreams| application. The stream processing code runs in these threads. For more info about the |kstreams| threading model, see :ref:`streams_architecture_threads`. ------- poll.ms ------- The amount of time in milliseconds to block waiting for input. The default is 100. .. _streams_developer-guide_probing-rebalance-interval-ms: ----------------------------- probing.rebalance.interval.ms ----------------------------- The maximum time to wait before triggering a rebalance to probe for warmup replicas that have restored enough to be considered caught up. |kstreams| assigns stateful active tasks only to instances that are caught up and within the :ref:`acceptable.recovery.lag `, if any exist. Probing rebalances are used to query the latest total lag of warmup replicas and transition them to active tasks if ready. They will continue to be triggered as long as there are warmup tasks, and until the assignment is balanced. Must be at least 1 minute. .. _streams_developer-guide_processing-guarantee: -------------------- processing.guarantee -------------------- The processing guarantee that should be used. Possible values are ``at_least_once`` (default) and ``exactly_once_v2``. Using ``exactly_once_v2`` requires |cp| version 5.5.x / |ak| version 2.5.x or newer. Note that if exactly-once processing is enabled, the default for parameter ``commit.interval.ms`` changes to ``100ms``. Additionally, consumers are configured with ``isolation.level="read_committed"`` and producers are configured with ``enable.idempotence=true`` by default. Note that ``exactly_once_v2`` processing requires a cluster of at least three brokers by default, which is the recommended setting for production. For development, you can change this by adjusting the broker settings in both ``transaction.state.log.replication.factor`` and ``transaction.state.log.min.isr`` to the number of brokers you want to use. To learn more, see :ref:`streams_concepts_processing-guarantees`. .. _streams_developer-guide_rack-aware-assignment-non-overlap-cost: -------------------------------------- rack.aware.assignment.non_overlap_cost -------------------------------------- This configuration sets the cost of moving a task from the original assignment computed either by ``StickyTaskAssignor`` or ``HighAvailabilityTaskAssignor``. Together with :ref:`streams_developer-guide_rack-aware-assignment-traffic-cost`, they control whether the optimizer favors minimizing cross-rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than ``rack.aware.assignment.traffic_cost``, the optimizer tries to maintain the existing assignment computed by the task assignor. The optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting ``rack.aware.assignment.non_overlap_cost`` to *10* and ``rack.aware.assignment.traffic_cost`` to *1* is more likely to maintain existing assignment than setting ``rack.aware.assignment.non_overlap_cost`` to *100* and ``rack.aware.assignment.traffic_cost`` to *50*. The default value is null, which means the default ``non_overlap_cost`` in different assignors is used. - In ``StickyTaskAssignor``, it has a default value of *10*, and ``rack.aware.assignment.traffic_cost`` has a default value of *1*, which means that maintaining stickiness is preferred in ``StickyTaskAssignor``. - In ``HighAvailabilityTaskAssignor``, it has a default value of *1*, and ``rack.aware.assignment.traffic_cost`` has a default value of *10*, which means that minimizing cross-rack traffic is preferred in ``HighAvailabilityTaskAssignor``. .. _streams_developer-guide_rack-aware-assignment-strategy: ------------------------------ rack.aware.assignment.strategy ------------------------------ This configuration sets the strategy |kstreams| uses for rack-aware task assignment so that cross traffic from broker to client can be reduced. This config takes effect only when `broker.rack `__ is set on the brokers and ``client.rack`` is set on |kstreams| side. There are two settings for this config: - ``none``: This is the default value, which means rack-aware task assignment is disabled. - ``min_traffic``: This setting means the rack-aware task assigner computes an assignment that tries to minimize cross-rack traffic. This config can be used with :ref:`streams_developer-guide_rack-aware-assignment-non-overlap-cost` and :ref:`streams_developer-guide_rack-aware-assignment-traffic-cost` to balance reducing cross-rack traffic and maintaining the existing assignment. .. _streams_developer-guide_rack-aware-assignment-tags: -------------------------- rack.aware.assignment.tags -------------------------- This configuration sets a list of tag keys used to distribute standby replicas across |kstreams| clients. When configured, |kstreams| makes a best-effort to distribute the standby tasks over clients with different tag values. Tags for the |kstreams| clients can be set by using the ``client.tag.`` prefix, for example: :: Client-1 | Client-2 -------------------------------------------+----------------------------------------- client.tag.zone: eu-central-1a | client.tag.zone: eu-central-1b client.tag.cluster: k8s-cluster1 | client.tag.cluster: k8s-cluster1 rack.aware.assignment.tags: zone,cluster | rack.aware.assignment.tags: zone,cluster Client-3 | Client-4 -------------------------------------------+----------------------------------------- client.tag.zone: eu-central-1a | client.tag.zone: eu-central-1b client.tag.cluster: k8s-cluster2 | client.tag.cluster: k8s-cluster2 rack.aware.assignment.tags: zone,cluster | rack.aware.assignment.tags: zone,cluster This example shows four |kstreams| clients across two zones (eu-central-1a, eu-central-1b) and across two clusters (k8s-cluster1, k8s-cluster2). For an active task located on Client-1, |kstreams| allocates a standby task on Client-4, because Client-4 has a different zone and a different cluster than Client-1. .. _streams_developer-guide_rack-aware-assignment-traffic-cost: ---------------------------------- rack.aware.assignment.traffic_cost ---------------------------------- This configuration sets the cost of cross-rack traffic. Together with :ref:`streams_developer-guide_rack-aware-assignment-non-overlap-cost`, they control whether the optimizer favors minimizing cross-rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than ``rack.aware.assignment.non_overlap_cost``, the optimizer tries to compute an assignment that minimizes the cross-rack traffic. The optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting ``rack.aware.assignment.non_overlap_cost`` to *10* and ``rack.aware.assignment.traffic_cost`` to *1* is more likely to maintain existing assignment than setting ``rack.aware.assignment.non_overlap_cost`` to *100* and ``rack.aware.assignment.traffic_cost`` to *50*. The default value is null, which means default traffic cost in different assignors is used. - In ``StickyTaskAssignor``, it has a default value of *1* and ``rack.aware.assignment.non_overlap_cost`` has a default value of *10*. - In ``HighAvailabilityTaskAssignor``, it has a default value of *10* and ``rack.aware.assignment.non_overlap_cost`` has a default value of *1*. -------------------- receive.buffer.bytes -------------------- The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. The default is 32768 (32 kibibytes). -------------------- reconnect.backoff.ms -------------------- The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. The default is 50 ms. ------------------------ reconnect.backoff.max.ms ------------------------ The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host increases exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20 percent random jitter is added to avoid connection storms. The default is 1000 ms (1 second). ----------------------------- repartition.purge.interval.ms ----------------------------- The frequency in milliseconds with which to delete fully consumed records from repartition topics. Purging occurs after at least this value since the last purge but may be delayed until later. Unlike ``commit.interval.ms``, the default for this value remains unchanged when ``processing.guarantee`` is set to ``exactly_once_v2``. The default is 30000 ms (30 seconds) .. _streams_developer-guide_replication_factor: ------------------ replication.factor ------------------ This specifies the replication factor of internal topics that |kstreams| creates when local states are used or a stream is repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics. Recommendation: Increase the replication factor to 3 to ensure that the internal |kstreams| topic can tolerate up to 2 broker failures. Note that you will require more storage space as well (3 times more with the replication factor of 3). ------------------ request.timeout.ms ------------------ Controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. The default is 40000 ms (40 seconds). ------- retries ------- Setting a value greater than zero causes the client to resend any request that fails with a potentially transient error. It is recommended to set the value to either zero or ``MAX_VALUE`` and use corresponding timeout parameters to control how long a client should retry a request. The default is 0. ---------------- retry.backoff.ms ---------------- The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. The default is 100 ms. .. _streams_developer-guide_rocksdb-config: --------------------- rocksdb.config.setter --------------------- The RocksDB configuration. |kstreams| uses RocksDB as the default storage engine for persistent stores. To change the default configuration for RocksDB, implement ``RocksDBConfigSetter`` and provide your custom class via :platform:`rocksdb.config.setter|streams/javadocs/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html`. Here is an example that adjusts the memory size consumed by RocksDB. .. sourcecode:: java public static class CustomRocksDBConfig implements RocksDBConfigSetter { // This object should be a member variable so it can be closed in RocksDBConfigSetter#close. private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L); @Override public void setConfig(final String storeName, final Options options, final Map configs) { // See #1 below. BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); tableConfig.setBlockCache(cache); // See #2 below. tableConfig.setBlockSize(16 * 1024L); // See #3 below. tableConfig.setCacheIndexAndFilterBlocks(true); options.setTableFormatConfig(tableConfig); // See #4 below. options.setMaxWriteBufferNumber(2); } @Override public void close(final String storeName, final Options options) { // See #5 below. cache.close(); } } Properties streamsSettings = new Properties(); streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class); Notes for example: #. ``BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();`` Get a reference to the existing ``TableFormatConfig`` rather than create a new one so you don't accidentally overwrite defaults such as the ``BloomFilter``, an important optimization. #. ``tableConfig.setBlockSize(16 * 1024L);`` Modify the `default `__ per these instructions from the `RocksDB GitHub (indexes and filter blocks) `__. #. ``tableConfig.setCacheIndexAndFilterBlocks(true);`` Do not let the index and filter blocks grow unbounded. For more information, see the `RocksDB GitHub (caching index and filter blocks) `__. #. ``options.setMaxWriteBufferNumber(2);`` See the advanced options in the `RocksDB GitHub `__. #. ``cache.close();`` To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See `RocksJava docs `__ for more details. ----------------- security.protocol ----------------- Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. ----------------- send.buffer.bytes ----------------- The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. The default is 131072 (128 kibibytes). ---------------------- state.cleanup.delay.ms ---------------------- The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least ``state.cleanup.delay.ms`` are removed. The default is 600000 ms (10 minutes). .. _streams_developer-guide_state-dir: --------- state.dir --------- The state directory. |kstreams| persists local states under the state directory. Each application has a subdirectory on its hosting machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated with the application are created under this subdirectory. -------------------------- statestore.cache.max.bytes -------------------------- Maximum number of bytes in memory to be used for statestore cache across all threads. --------------- task.timeout.ms --------------- The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0ms, a task raises an error for the first internal error. For any timeout larger than 0ms, a task retries at least once before an error is raised. .. _streams_developer-guide_topology-optimization: --------------------- topology.optimization --------------------- Indicates that |kstreams| should apply topology optimizations. The optimizations are currently all or none and disabled by default. These optimizations include moving and reducing repartition topics, and reusing the source topic as the changelog for source KTables. We recommend enabling this option. Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to ``StreamsConfig.OPTIMIZE``, you must to pass your configuration properties when building your topology by using the overloaded ``StreamsBuilder.build(Properties)`` method. .. sourcecode:: java KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties) .. _streams_developer-guide_upgrade-from: ------------ upgrade.from ------------ The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide. You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4. -------------- window.size.ms -------------- Sets window size for the deserializer in order to calculate window end times. Setting this config in a ``KafkaStreams`` application results in an error, because it is intended to be used only from a plain consumer client. -------------------------- windowed.inner.class.serde -------------------------- Default serializer/deserializer for the inner class of a windowed record. Must implement the ``org.apache.kafka.common.serialization.Serde`` interface. --------------------------------------------- windowstore.changelog.additional.retention.ms --------------------------------------------- Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Accounts for clock drift. The default is 86400000 ms (1 day). |ak| consumers, producer, and admin client configuration parameters ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can specify parameters for the |ak| :platform:`consumers|clients/javadocs/javadoc/org/apache/kafka/clients/consumer/package-summary.html`, :platform:`producers|clients/javadocs/javadoc/org/apache/kafka/clients/producer/package-summary.html`, and :platform:`admin client|clients/javadocs/javadoc/org/apache/kafka/clients/admin/package-summary.html` that are used internally. The consumer, producer, and admin client settings are defined by specifying parameters in a ``StreamsConfig`` instance. In this example, the |ak| :platform:`consumer session timeout|clients/javadocs/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG` is configured to be 60000 milliseconds in the Streams settings: .. sourcecode:: java Properties streamsSettings = new Properties(); // Example of a "normal" setting for Kafka Streams streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092"); // Customize the Kafka consumer settings of your Streams application streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); ------ Naming ------ Some consumer, producer, and admin client configuration parameters use the same parameter name. For example, ``send.buffer.bytes`` and ``receive.buffer.bytes`` are used to configure TCP buffers; ``request.timeout.ms`` and ``retry.backoff.ms`` control retries for client request. You can avoid duplicate names by prefix parameter names with ``consumer.``, ``producer.``, or ``admin.`` (e.g., ``consumer.send.buffer.bytes`` or ``producer.send.buffer.bytes``). .. sourcecode:: java Properties streamsSettings = new Properties(); // same value for consumer and producer streamsSettings.put("PARAMETER_NAME", "value"); // different values for consumer, producer, and admin client streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value"); streamsSettings.put("producer.PARAMETER_NAME", "producer-value"); streamsSettings.put("admin.PARAMETER_NAME", "admin-value"); // alternatively, you can use streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value"); streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value"); streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value"); Parameter names for the main consumer, restore consumer, and global consumer are prepended with the following prefixes. - ``main.consumer.`` -- for the main consumer, which is the default consumer of a stream source. - ``restore.consumer.`` -- for the restore consumer, which manages state store recovery. - ``global.consumer.`` -- for the global consumer, which is used in global KTable construction. Setting values for parameters with these prefixes overrides the values set for ``consumer`` parameters. For example, the following configuration overrides the ``consumer.max.poll.records`` value. .. code:: consumer.max.poll.records = 5 main.consumer.max.poll.records = 100 restore.consumer.max.poll.records = 50 During initialization, these settings have the following effect on consumers. +--------------------+-----------------------+--------------------------------------------------------------+ | Consumer Type | max.poll.records value | Reason | +====================+=======================+==============================================================+ | Consumer | 5 | Default value of 5 for all consumer types. | +--------------------+-----------------------+--------------------------------------------------------------+ | Main Consumer | 100 | Target assignment with ``main.consumer.`` prefix. | +--------------------+-----------------------+--------------------------------------------------------------+ | Restore Consumer | 50 | The ``restore.consumer.`` prefix overrides the default value.| +--------------------+-----------------------+--------------------------------------------------------------+ | Global Consumer | 5 | No ``global.consumer`` prefix, so the default value is used. | +--------------------+-----------------------+--------------------------------------------------------------+ For example, if you want to configure only the restore consumer, without changing the settings of other consumers, you can use ``restore.consumer.`` to set the configuration. .. code:: java Properties streamsSettings = new Properties(); // same config value for all consumer types streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value"); // set a different restore consumer config. This would make restore consumer take restore-consumer-value, // while main consumer and global consumer stay with general-consumer-value streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value"); // alternatively, you can use streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value"); .. _streams_developer-guide_internal-topic-params: ------------------------- Internal topic parameters ------------------------- To configure the internal repartition/changelog topics, you can use the ``topic.`` prefix, followed by any of the standard topic configuration properties. .. sourcecode:: java Properties streamsSettings = new Properties(); // Override default for both changelog and repartition topics streamsSettings.put("topic.PARAMETER_NAME", "topic-value"); // alternatively, you can use streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value"); -------------- Default Values -------------- |kstreams| uses different default values for some of the underlying client configs, which are summarized below. For detailed descriptions of these configs, see :ref:`cp-config-producer` and :ref:`cp-config-consumer`. .. rst-class:: non-scrolling-table +--------------------------------------------------------------------------+----------------------------+---------------------------------------------+ | Parameter Name | Corresponding Client | Streams Default | +==========================================================================+============================+=============================================+ | auto.offset.reset | Consumer | earliest | +--------------------------------------------------------------------------+----------------------------+---------------------------------------------+ | linger.ms | Producer | 100 | +--------------------------------------------------------------------------+----------------------------+---------------------------------------------+ | max.poll.records | Consumer | 1000 | +--------------------------------------------------------------------------+----------------------------+---------------------------------------------+ | transaction.timeout.ms | Producer | 10000 | +--------------------------------------------------------------------------+----------------------------+---------------------------------------------+ ----------------------------------- Parameters controlled by |kstreams| ----------------------------------- |kstreams| assigns the following configuration parameters. If you try to change ``allow.auto.create.topics``, your value is ignored and setting it has no effect in a |kstreams| application. You can set the other parameters. |kstreams| sets them to different default values than a plain ``KafkaConsumer``. |kstreams| uses the ``client.id`` parameter to compute derived client IDs for internal clients. If you don't set ``client.id``, |kstreams| sets it to ``-``. There are some special considerations when |kstreams| assigns values to configuration parameters. - There is only one global consumer per |kstreams| instance. - There is one restore consumer per thread. - Producer ``client.id``: the value depends on the configured processing guaranteee. - EOS disabled or EOS version 2 enabled: There is only one producer per thread. - EOS version 1 enabled: There is only one producer per task. - ``partition.assignment.strategy``: the assignment strategy parameter affects only the main consumer. The global and restore consumers use "partition assignment" instead of "topic subscription", and they don't form a consumer group, so their ``StreamsPartitionAssignor`` is never used. +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | Parameter Name | Corresponding Client | Value Assigned by |kstreams| | +========================================+============================+==================================================================================+ | allow.auto.create.topics | Consumer | ``false`` | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | auto.offset.reset | Global Consumer | none | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | auto.offset.reset | Restore Consumer | none | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | client.id | Admin | ``-admin`` | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | client.id | Consumer | ``-StreamThread--consumer`` | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | client.id | Global Consumer | ``-global-consumer`` | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | client.id | Restore Consumer | ``-StreamThread--restore-consumer`` | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | client.id | Producer | - EOS v1 case: ``-StreamThread---producer`` | | | | - Non-EOS and EOS v2 case: ``-StreamThread--producer`` | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | enable.auto.commit | Consumer | ``false`` | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | group.id | Consumer | Equal to ``application.id``. | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | group.id | Global Consumer | ``null`` | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | group.id | Restore Consumer | ``null`` | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | group.instance.id | Consumer | User-provided setting with the ``-`` suffix appended. | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ | partition.assignment.strategy | Consumer | Always set to ``StreamsPartitionAssignor``. | +----------------------------------------+----------------------------+----------------------------------------------------------------------------------+ .. _streams_developer-guide_consumer-auto-commit: ------------------ enable.auto.commit ------------------ The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, |kstreams| overrides this consumer config value to ``false``. Consumers will only commit explicitly via *commitSync* calls when the |kstreams| library or a user decides to commit the current processing state. .. include:: ../../.hidden/docs-common/home/includes/ak-share.rst