.. title:: Kafka Streams Upgrade Guide for Confluent Platform .. meta:: :description: Guidance for upgrading Kafka Streams applications on Confluent Platform .. _streams_upgrade-guide: |kstreams| Upgrade Guide for |cp| ################################# To upgrade from |cp| versions earlier than 6.0.0, see `Legacy Streams Upgrade Guide `__. Upgrade to |cp| |release| from |cp| 6.0.x ***************************************** .. _streams-upgrade-guide-compatibility: Compatibility ============= |kstreams| applications built with |cp| |release| are forward and backward compatible with certain |ak| clusters. Forward-compatible to newer clusters up to |cp| |release|: Existing |kstreams| applications built with |cp| 3.x and later work with upgraded |ak| clusters running |cp| |release|. Backward-compatible to older clusters down to |cp| 3.0.x: - New |kstreams| applications built with |cp| |release| work with older |ak| clusters running |cp| 6.x down to 3.0.x. - |ak| clusters running |cp| 3.0.x, |cp| 3.1.x, or |cp| 3.2.x are *not* compatible with new |cp| |release| |kstreams| applications. Compatibility Matrix: .. include:: includes/compatibilityMatrix.rst .. _upgrade-streams-cp-release: Upgrade your |kstreams| applications to |cp| |release| ------------------------------------------------------ To use |cp| |release|, update the |kstreams| your application's dependency to use the version number :litwithvars:`|kafka_release|`. You may need to make minor code changes, detailed below, and recompile your application. For example, in your ``pom.xml`` file: .. codewithvars:: xml org.apache.kafka kafka-streams |kafka_release| As of the |cp| 6.0.0 release (|kstreams| 2.6.0), |kstreams| depends on a RocksDB version that requires MacOS 10.14 or higher. .. _streams_upgrade-guide_api-changes: Streams API changes in |cp| 7.6.0 --------------------------------- KIP-923: Add A Grace Period to Stream Table Join ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-923 `__ adds a grace period to stream-table joins to improve table-side out-of-order data handling. The joined object has a new method named ``withGracePeriod`` that causes the table-side lookup to happen only after the grace period has passed. KIP-925: Rack aware task assignment in Kafka Streams ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Rack aware task assignment was introduced in `KIP-925 `__. Rack aware task assignment can be enabled for ``StickyTaskAssignor`` or ``HighAvailabilityTaskAssignor`` to compute task assignments, which can minimize cross-rack traffic under certain conditions. For more information, including how it can be enabled and further configured, see :ref:`streams_developer-guide_rack-aware-assignment-strategy`. KIP-941: Range queries to accept null lower and upper bounds ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Previously, ``RangeQuery`` did not support null to specify ""no upper/lower bound". `KIP-941 `__ allows users to pass null into ``withRange(...)`` for lower/upper bounds to specify a full or half-open range: - ``withRange(null, null) == withNoBounds()`` - ``withRange(lower, null) == withLowerBound(lower)`` - ``withRange(null, upper) == withUpperBound(upper)`` Streams API changes in |cp| 7.5.0 --------------------------------- Downgrading from |cp| 7.5.x (|kstreams| 3.5.x) or later to |cp| 7.4.x or earlier requires special attention: starting in the 3.5.0 release, |kstreams| uses a new serialization format for repartition topics. This means that older versions of |kstreams| don't recognize the bytes written by newer versions, so it's harder to downgrade |kstreams| with version 3.5.0 or later to older versions in-flight. For more information, see `KIP-904 `__. For a downgrade, first switch the config from ``upgrade.from`` to the version you're downgrading to. This disables writing the new serialization format in your application. It's important to wait in this state long enough to ensure that the application has finished processing any "in-flight" messages written into the repartition topics in the new serialization format. Afterward, you can downgrade your application to a pre-3.5.x version. KIP-399: Extend ProductionExceptionHandler to cover serialization exceptions ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-399 `__ adds a method, ``handleSerializationException()``, to the ``ProductionExceptionHandler`` interface to handle any serialization errors encountered while producing records. KIP-884: Add config to configure KafkaClientSupplier ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-884 `__ adds a new config, ``default.client.supplier`` that enables using a custom ``KafkaClientSupplier`` without any code changes. KIP-889: Versioned state stores ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-889 `__ introduces versioned state stores to improve the accuracy of joins when out-of-order records are processed. for more information, see :ref:`streams_developer-guide_dsl-timestamp-based-semantics`. In addition to KIP-889, `KIP-914 `__ updates DSL processing semantics if a user opts-in to use the new versioned key-value stores. Using the new versioned key-value stores, DSL processing can better handle out-of-order data. For example, a late record may be dropped and stream-table joins can do a timestamp-based lookup into the table. Table aggregations and primary/foreign-key table-table joins are also improved. Versioned key-value stores are not supported for global-KTable, and they don't work with ``suppress()``. KIP-904: Guarantee subtractor is called before adder if key has not changed ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-904 `__ improves the implemenation of KTable aggregations. In general, an input KTable update triggers a result refinent for two rows, but prior to KIP-904, if both refinements happened to the same result row, two independent updates to the same row are applied, resulting in spurious intermediate results. KIP-904 enables detecting this case and applies only a single update, avoiding spurious intermediate results. KIP-907: Add Boolean serde to public interface ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |kstreams| includes built-in Serdes for most primitive types. `KIP-907 `__ adds a new one for booleans. Streams API changes in |cp| 7.4.0 --------------------------------- KIP-770: Replace cache.max.bytes.buffering with cache.max.bytes ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-770 `__ deprecates the existing ``cache.max.bytes.buffering`` config and introduces a new ``cache.max.bytes`` config to replace it. The semantics and default value of the cache size config is unchanged. This KIP also adds a new ``cache.size`` metric at the DEBUG level for users to monitor the actual size of the |kstreams| cache. KIP-837: Allow MultiCasting a Result Record ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-837 `__ enables you to multicast result records to every partition of downstream sink topics and adds functionality for choosing to drop result records without sending. KIP-865: Support "--bootstrap-server" in kafka-streams-application-reset ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-865 `__ updates the |kstreams| application reset tool's server parameter name to conform to the other |ak| tooling by deprecating the ``--bootstrap-servers`` parameter and introducing a new ``--bootstrap-server`` parameter in its place. Streams API changes in |cp| 7.3.0 --------------------------------- Source/sink node metrics for consumed/produced throughput in |kstreams| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Starting with |cp| 7.3.0, source and sink node metrics for consumed and produced throughput are available in |kstreams|. Previously, with the metrics available in the plain consumer you could derive the consumed throughput of your applications at the subtopology level, but the same was not true for the produced throughput. `KIP-846 `__ fills this gap and gives you a way to compute the production rate of each subtopology by introducing two new metrics for the throughput at sink nodes. Even though it's possible to derive the consumed throughput with existing client-level metrics, KIP-846 also adds two new metrics for the throughput at source nodes, to provide an equally fine-grained metrics scope as for the newly added metrics at the sink nodes, and to simplify the user experience. Pause/resume KafkaStreams topologies ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-834 `__ adds the ability to pause and resume topologies. You can use this feature to reduce resources used or modify data pipelines. Paused topologies skip processing, punctuation, and standby tasks. For distributed |kstreams| applications, each instance must be paused and resumed separately. Consolidate KStream transform() and process() methods ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-820 `__ generalizes the |kstreams| API to consolidate Transformers, which could forward results, and Processors, which could not. The change makes use of the new type-safe Processor API, which simplifies |kstreams|, making it easier to use and learn. New KafkaStreams.close() API ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-812 `__ introduces another form of the ``KafkaStreams.close()`` API that forces the member to leave the consumer group. This new method efficiently closes the stream permanently by forcing the member to leave the consumer group. Streams API changes in |cp| 7.2.0 --------------------------------- Rack awareness for |kstreams| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Starting with |cp| 7.2.0, |kstreams| can distribute its standby replicas over distinct “racks” with `KIP-708 `__. To form a “rack”, |kstreams| uses tags in the application configuration. For example, |kstreams| clients might be tagged with the cluster or the cloud region they are running in. Users can specify the tags that should be used for the rack-aware distribution of the standby replicas by setting the ``rack.aware.assignment.tags`` configuration. During task assignment, |kstreams| tries to distribute the standby replicas over different task dimensions. Rack-aware standby assignment improves fault tolerance in case of the failure of an entire “rack”. This can be used, for example, to ensure that replicas are distributed over different availability zones in a cloud hosting provider. Add record metadata to state store context ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-791 `__ adds the ``recordMetadata()`` method to the ``StateStoreContext``, providing access to the topic, partition, and offset of the record currently being processed. Exposing the current context in this way enables state stores to track their current offset in each input partition, allowing them to implement the consistency mechanisms introduced in `KIP-796 `__. Interactive Query v2 preview ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |cp| 7.2.0 introduces Interactive Queries v2 in |kstreams| (IQv2). IQv2 is a preview feature, and the interfaces of IQv2 are marked as ``@Evolving``, which means that they may break compatibility in minor releases without a deprecation period if preview users find significant flaws in the current API. - `KIP-796 `__ specifies an improved interface for Interactive Queries in |kstreams| (IQv2). The new interface makes querying the state store simpler and faster and reduces the maintenance cost when modifying existing state stores and adding new state stores. KIP-796 describes the generic interface for querying state stores with Interactive Queries. Specific query types can be added to Interactive Query v2 by implementing the ``Query`` interface. KIP-976 also defines the ``KeyQuery`` class to enable users to evaluate a key/value lookup by using IQv2. - `KIP-805 `__ adds the ``RangeQuery`` class to IQv2. The ``RangeQuery`` class is an implementation of the ``Query`` interface that enables querying state stores over a range specified by upper or lower key bounds or by scanning all records of a state store when no bounds are provided. - `KIP-806 `__ adds two implementations of the ``Query`` interface. - The ``WindowKeyQuery`` class enables scanning over windows with a given key within a specified time range. - The ``WindowRangeQuery`` class enables scanning over windows within a given time range independently of the windows' keys. Streams API changes in |cp| 7.1.0 --------------------------------- Java 17 support ^^^^^^^^^^^^^^^ In |cp| 7.1.0, |kstreams| supports Java 17. Improved left/outer stream-stream join semantics ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The semantics of left/outer stream-stream join were improved by `KIP-633 `__. Previously, a left-/outer stream-stream join might have emitted so-called spurious left/outer results, due to an eager-emit strategy. The implementation was changed to emit left/outer join result records only after the join window is closed. The old API to specify the join window, ``JoinWindows.of()``, that enables the eager-emit strategy, was deprecated in favor of the ``JoinWindows.ofTimeDifferenceAndGrace()`` and ``JoinWindows.ofTimeDifferencWithNoGrace()`` methods. The new semantics are enabled only if you use the new join window builders. Additionally, KIP-633 makes setting a grace period mandatory for windowed aggregations, i.e., ``TimeWindows`` (hopping/tumbling), ``SessionWindows``, and ``SlidingWindows``. The corresponding builder methods ``.of(...)`` were deprecated in favor of the new ``.ofTimeDifferenceAndGrace()`` and ``.ofTimeDifferencWithNoGrace()`` methods. New metrics to track blocking times ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `KIP-761 `__ adds new metrics that enable tracking blocking times on the underlying consumer and producer clients. For more information, see `Kafka Streams metrics `__. Interactive Query improvements ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Interactive Queries were improved by `KIP-763 `__ and `KIP-766 `__. Range queries now accept ``null`` as a lower/upper key-range bound to indicate an open-ended lower/upper bound. Custom partitioners for foreign-key table-table joins ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Foreign-key table-table joins now support custom partitioners via `KIP-775 `__. Previously, if an input table was partitioned by a non-default partitioner, joining records might fail. With KIP-775, you now can pass a custom ``StreamPartitioner`` into the join using the newly added ``TableJoined`` object. Cooperative rebalancing protocol ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Upgrading from any earlier version to |cp| 7.1.0 is supported. If you're upgrading from |cp| 5.3.x (|kstreams| 2.3) or earlier, you must do two rolling bounces. #. During the first rolling bounce, set the configuration ``upgrade.from=""`` (possible values are "0.10.0" through "2.3"). #. During the second bounce, remove the ``upgrade.from`` config. This is required to upgrade safely to the new cooperative rebalancing protocol of the embedded consumer. If you skip or delay the second rolling bounce, your deployment continues using the previous eager rebalancing protocol , but you can switch safely to cooperative rebalancing at any time, once the entire group is on |cp| 5.4.x (|kstreams| 2.4) or later by removing the configuration value and bouncing. For more information, see `KIP-429 `__. #. Prepare your application instances for a rolling bounce, and ensure that the ``upgrade.from`` config is set to the version from which it is being upgraded. #. Bounce each instance of your application once. #. Prepare your newly deployed |cp| 7.1.0 (|kstreams| 3.1.0) application instances for a second round of rolling bounces. Be sure to remove the value for the ``upgrade.from`` config. #. Bounce each instance of your application once more to complete the upgrade. As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as |cp| 3.0.x (|kstreams| 0.10.x) to |cp| 7.1.0 (|kstreams| 3.1.0) in offline mode requires the following steps: #. Stop all old (for example, |cp| 3.0.x) application instances. #. Update your code and swap old code and JAR files with new code and new JAR files. #. Restart all new |cp| 7.1.0 (|kstreams| 3.1.0) application instances. Streams API changes in |cp| 7.0.0 --------------------------------- Improved task idling semantics ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ In |cp| 7.0.0, |kstreams| provides stronger in-order join and merge-processing semantics. The new default value for :ref:`streams_developer-guide_max-idle` pauses processing on tasks with multiple input partitions when one of the partitions has no data buffered locally but has a non-zero lag. This means |kstreams| waits to fetch records that are already available on the broker. This results in improved join semantics, because |kstreams| can interleave the two input partitions in timestamp order, instead of processing whichever partition happens to be buffered. You can disable this new behavior or there is an option to make |kstreams| wait longer for new records to be produced to the input partitions, which you can use to get stronger time semantics when you know that some of your producers may be slow. For more information, see `KIP-695 `__. New exceptions for Interactive Queries ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Interactive Queries may throw new exceptions for different errors: - ``UnknownStateStoreException``: If the specified store name doesn't exist in the topology, an ``UnknownStateStoreException`` is thrown, instead of the ``InvalidStateStoreException`` thrown in previous versions. - ``StreamsNotStartedException``: If Streams state is ``CREATED``, a ``StreamsNotStartedException`` is thrown. - ``InvalidStateStorePartitionException``: If the specified partition doesn't exist, a ``InvalidStateStorePartitionException`` is thrown. For more information, see `KIP-216 `__ ``exactly_once`` processing guarantee deprecated ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The StreamsConfig ``processing.guarantee`` configuration value ``exactly_once`` (for EOS version 1) is deprecated in favor of the improved EOS version 2, which was previously configured by using ``exactly_once_beta``. To avoid confusion about the term "beta" in the config name and highlight the production readiness of EOS version 2, "eos-beta" is renamed to "eos-v2", and the configuration value ``exactly_once_beta`` is deprecated and replaced with a new configuration value, ``exactly_once_v2``. If you use exactly-once semantics, plan to migrate to the "eos-v2" config and prepare for the removal of the deprecated configs in |kstreams| 4.0 or after at least a year from the release of 3.0, whichever comes last. Note that "eos-v2" requires broker version 2.5 or higher, like "eos-beta", so you should begin to upgrade your |ak| cluster if necessary. For more information, see `KIP-732 `__. Default ``RocksDBConfigSetter#close()`` implementation removed ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The default implementation of ``RocksDBConfigSetter#close()`` is removed. Default grace period reduced ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The default 24-hour grace period for windowed operations such as Window or Session aggregates, or stream-stream joins is reduced. This period determines how long any out-of-order records will still be processed after a window ends. Records arriving after the grace period has elapsed are considered late and are dropped. But in operators such as suppression, a large grace period has the drawback of incurring an equally large output latency. The previous API made it too easy to miss the grace period config completely, leading you to wonder why your application seems to produce no output -- it actually is, but not for 24 hours. To prevent accidentally or unknowingly falling back to the default 24-hour grace period, all of the existing static constructors for the Windows classes, like ``TimeWindows#of``, are deprecated. These constructors are replaced by new static constructors that have two flavors: ``#ofSizeAndGrace`` and ``#ofSizeWithNoGrace`` (these are for the ``TimeWindows`` class; analogous APIs exist for the ``JoinWindows``, ``SessionWindows``, and ``SlidingWindows`` classes). With these new APIs, you must set the grace period explicitly or consciously choose to opt out by selecting the ``WithNoGrace`` flavor, which sets it to ``0`` for situations where you don't care about the grace period, such as during testing or when experimenting with |kstreams| for the first time. Note that using the new APIs for the ``JoinWindows`` class also enables a fix for spurious left/outer join results, as described in the next section. For more information, see `KIP-633 `__. Public fields on ``TaskId`` deprecated ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The public ``topicGroupId`` and partition fields on ``TaskId`` are deprecated and replaced with getters. Instead, use the new ``TaskId.subtopology()`` method, which replaces ``topicGroupId``, and the ``TaskId.partition()`` method. Also, the ``TaskId#readFrom`` and ``TaskId#writeTo`` methods have been deprecated and will be removed, as they were never intended for public use. ``TaskMetadata`` class deprecated ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The ``org.apache.kafka.streams.processsor.TaskMetadata`` class is deprecated and replaced with the new ``org.apache.kafka.streams.TaskMetadata`` interface. This change better reflects the fact that ``TaskMetadata`` was not intended to be instantiated outside of the |ak| codebase. Note that ``TaskMetadata`` offers APIs that better represent the task id as an actual ``TaskId`` object, instead of a String. Migrate to the new ``org.apache.kafka.streams.TaskMetadata`` interface, which offers these improved methods, for example, by using the new ``ThreadMetadata#activeTasks`` and ``ThreadMetadata#standbyTasks``. ``ThreadMetadata`` class deprecated ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The ``org.apache.kafka.streams.processor.ThreadMetadata`` class is deprecated and replaced with new ``org.apache.kafka.streams.ThreadMetadata`` interface. In the new ``ThreadMetadata`` interface, any reference to the deprecated ``TaskMetadata`` is replaced by the new interface. ``StreamsMetadata`` class deprecated ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The ``org.apache.kafka.streams.state.StreamsMetadata`` is deprecated and replaced with ``org.apache.kafka.streams.StreamsMetadata``. Methods under ``org.apache.kafka.streams.KafkaStreams`` deprecated ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The following methods that returned the previously deprecated classes are deprecated: - For ``KafkaStreams#allMetadata``, migrate to the new ``KafkaStreams#metadataForAllStreamsClients``. - For ``KafkaStreams#allMetadataForStore(String)``, migrate to the new ``KafkaStreams#streamsMetadataForStore(String)``. - For ``KafkaStreams#localThreadsMetadata``, migrate to the new ``KafkaStreams#metadataForLocalThreads``. For more information, see `KIP-740 `__ and `KIP-744 `__. Deprecated APIs removed ^^^^^^^^^^^^^^^^^^^^^^^ The following deprecated APIs are removed. - ``--zookeeper`` flag of the application reset tool: deprecated in |ak| 1.0.0 (`KIP-198 `__). - ``--execute`` flag of the application reset tool: deprecated in |ak| 1.1.0 (`KIP-171 `__). - ``StreamsBuilder#addGlobalStore`` (one overload): deprecated in |ak| 1.1.0 (`KIP-233 `__). - ``ProcessorContext#forward`` (some overloads): deprecated in |ak| 2.0.0 (`KIP-251 `__). - ``WindowBytesStoreSupplier#segments``: deprecated in |ak| 2.1.0 (`KIP-319 `__). - ``segments``, ``until``, ``maintainMs`` on ``TimeWindows``, ``JoinWindows``, and ``SessionWindows``: deprecated in |ak| 2.1.0 (`KIP-328 `__). - Overloaded ``JoinWindows#of``, ``before``, ``after``, ``SessionWindows#with``, ``TimeWindows#of``, ``advanceBy``, ``UnlimitedWindows#startOn``, and ``KafkaStreams#close`` with ``long`` typed parameters: deprecated in |ak| 2.1.0 (`KIP-358 `__). - Overloaded ``KStream#groupBy``, ``groupByKey``, and ``KTable#groupBy`` with ``Serialized`` parameter: deprecated in |ak| 2.1.0 (`KIP-372 `__). - ``Joined#named``, ``name``: deprecated in |ak| 2.3.0 (`KIP-307 `__). - ``TopologyTestDriver#pipeInput``, ``readOutput``, ``OutputVerifier``, and ``ConsumerRecordFactory`` classes (`KIP-470 `__). - ``KafkaClientSupplier#getAdminClient``: deprecated in |ak| 2.4.0 (`KIP-476 `__). - Overloaded ``KStream#join``, ``leftJoin``, ``outerJoin`` with ``KStream`` and ``Joined`` parameters: deprecated in |ak| 2.4.0 (`KIP-479 `__). - ``WindowStore#put(K key, V value)``: deprecated in |ak| 2.4.0 (`KIP-474 `__). - ``UsePreviousTimeOnInvalidTimestamp``: deprecated in |ak| 2.5.0 as renamed to ``UsePartitionTimeOnInvalidTimestamp`` (`KIP-530 `__). - Overloaded ``KafkaStreams#metadataForKey``: deprecated in |ak| 2.5.0 (`KIP-535 `__). - Overloaded ``KafkaStreams#store``: deprecated in |ak| 2.5.0 (`KIP-562 `__). Dependencies removed from |kstreams| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The following dependencies are removed from |kstreams|: - Connect-json: |kstreams| no longer has a compile time dependency on the "connect:json" module (`KAFKA-5146 `__). Projects that relied on this transitive dependency must declare it explicitly. Default ``replication.factor`` changed ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The default value for the ``replication.factor`` configuration parameter is changed to ``-1``, which means to use the broker default replication factor. The ``replication.factor`` value of ``-1`` requires broker version 2.4 or newer. ``ListSerde`` introduced ^^^^^^^^^^^^^^^^^^^^^^^^ The new ``ListSerde`` serde type is introduced: - Added class ``ListSerde`` to (de)serialize List-based objects - Introduced ``ListSerializer`` and ``ListDeserializer`` to power the new functionality Streams API changes in |cp| 6.2.0 --------------------------------- The type-safe :platform:`split()|streams/javadocs/javadoc/org/apache/kafka/streams/kstream/KStream.html#split-org.apache.kafka.streams.kstream.Predicate...-` method replaces the ``branch()`` method for branching or splitting a ``KStream`` based on the supplied predicates into one or more ``KStream`` instances. The following example code shows how you used the ``branch`` method before |cp| 6.2.0: .. code:: java final KStream[] branches = events.branch( (id, event) -> event.getTransactionValue() >= FRAUD_LIMIT, (id, event) -> event.getTransactionValue() < FRAUD_LIMIT ); branches[0].to(suspiciousTransactionsTopicName); branches[1].to(validatedTransactionsTopicName); The following example code shows how to rewrite the previous code to use the ``split`` method: .. code:: java // Canonical rewrite from the branch() method. final Map> branches = events.split(Named.as("branch-")) .branch((id, event) -> event.getTransactionValue() >= FRAUD_LIMIT) .branch((id, event) -> event.getTransactionValue() < FRAUD_LIMIT) .defaultBranch(); // Rewrite to exploit the new API. events.split() .branch( (id, event) -> event.getTransactionValue() >= FRAUD_LIMIT, Branched.withConsumer(ks -> ks.to(suspiciousTransactionsTopicName))) .branch( (id, event) -> event.getTransactionValue() < FRAUD_LIMIT, Branched.withConsumer(ks -> ks.to(validatedTransactionsTopicName))); Streams API changes in |cp| 6.1.0 --------------------------------- Sliding Windows ^^^^^^^^^^^^^^^ |cp| 6.1 adds ``SlidingWindows`` as an option for ``windowedBy()`` windowed aggregations as described in `KIP-450 `_. Sliding windows are fixed-time and data-aligned windows that allow for flexible and efficient windowed aggregations. Refer to :ref:`Developer Guide ` for more details. TRACE level end-to-end latency metrics ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The end-to-end latency metrics introduced in 6.0 have been expanded to include store-level metrics. The new store-level metrics are recorded at the ``TRACE`` level, a new metrics recording level. Enabling ``TRACE`` level metrics automatically turns on all higher levels, i.e., INFO and DEBUG. For more information, see `KIP-613 `__. Upgrading older |kstreams| applications to |cp| 6.1.x ===================================================== Streams API changes in |cp| 6.0.0 --------------------------------- - New processing mode that improves application scalability using exactly-once guarantees (`KIP-447 `_) - Highly available state stores (`KIP-441 `__) - New ``KStream.repartition()`` operator replaces ``KStream.through()`` (`KIP-221 `__) - New end-to-end latency metrics added (`KIP-613 `__) - New ``--force`` option in ``StreamsResetter`` to force remove left-over members (`KIP-571 `__) Improved exactly-once processing ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Starting in |cp| 6.0, a new processing mode is available, named ``exactly_once_v2``, which is configurable by using the ``processing.guarantee`` parameter. To use this new feature, your brokers must be on version |cp| 5.5.x / |ak| 2.5.x or newer. This implementation is more efficient, because it reduces client and broker resource utilization, like client threads and used network connections, and it enables higher throughput and improved scalability. For more information on how this is done inside the brokers and Kafka Streams, see `KIP-447 `__. Also, as part of the KIP-447 implementation, the transaction timeout has been reduced from 60 seconds to 10 seconds. If you want to upgrade your EOS application from an older version and enable this feature in version 6.0+, upgrade your application to version 6.0.x, staying on ``exactly_once``, and then do second round of rolling bounces to switch to ``exactly_once_v2``. If you're upgrading an EOS application from an older version (before |ak| 2.6) to a version between 2.6 and 2.8, follow the same steps but with the config ``exactly_once_beta`` instead. No special steps are required to upgrade an application using ``exactly_once_beta`` from version 2.6+ to 3.0 or higher: you can just change the config from ``exactly_once_beta`` to ``exactly_once_v2`` during the rolling upgrade. For a downgrade, do the reverse: first switch the config from ``exactly_once_v2`` to ``exactly_once`` to disable the feature in your 2.6.x application. Afterward, you can downgrade your application to a pre-2.6.x version. Highly available state stores ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For more highly available stateful applications, we've modified the task assignment algorithm to delay the movement of stateful active tasks to instances that aren't yet caught up with that task's state. Instead, to migrate a task from one instance to another (eg when scaling out), |kstreams| assigns a warmup replica to the target instance so that it can begin restoring the state while the active task stays available on an instance that already had the task. The instances warming up tasks communicate their progress to the group so that, once ready, |kstreams| can move active tasks to their new owners in the background. Check out `KIP-441 `__ for full details, including several new configs for control over this new feature. End-to-end latency metrics ^^^^^^^^^^^^^^^^^^^^^^^^^^ New end-to-end latency metrics have been added. These task-level metrics are logged at the INFO level and report the minimum and maximum end-to-end latency of a record at the beginning/source node(s) and end/terminal node(s) of a task. For more information, see `KIP-613 `__. Replace through() with repartition() operator ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The operator ``KStream.through()`` is deprecated in favor of the new ``KStream.repartition()`` operator (via `KIP-221 `__). ``KStream.repartition()`` is similar to ``KStream.through()``, however |kstreams| will manage the topic for you. If you need to write into and read back from a topic that you manage, you can fall back to use ``KStream.to()`` in combination with ``StreamsBuilder#stream()``. If you only want to write into a topic as "side output" and continue processing, you can also fan-out your dataflow via ``myStream.xxx(); myStream.to()`` to avoid the need to read back the data from the cluster. Upgrade from earlier versions ============================= To upgrade from |cp| versions earlier than 6.0.0, see `Legacy Streams Upgrade Guide `__. .. include:: ../.hidden/docs-common/home/includes/ak-share.rst