Kafka Streams Upgrade Guide for Confluent Cloud
To upgrade from Kafka Streams versions earlier than 3.1.x, see Legacy Streams Upgrade Guide.
Kafka Streams and Confluent Cloud version support
The following table lists Kafka Streams and Confluent Cloud version support.
Kafka Streams | Release Date | Business Level end of support | Premier Level end of support |
|---|---|---|---|
4.2.x | February 17, 2026 | February 17, 2028 | February 17, 2029 |
4.1.x | October 15, 2025 | October 15, 2027 | October 15, 2028 |
4.0.x | June 11, 2025 | June 11, 2027 | June 11, 2028 |
3.9.x | February 19, 2025 | February 19, 2027 | February 19, 2028 |
3.8.x | December 2, 2024 | December 2, 2026 | December 2, 2027 |
3.7.x | July 26, 2024 | July 26, 2026 | July 26, 2027 |
3.6.x | February 9, 2024 | February 9, 2026 | February 9, 2027 |
3.5.x | August 25, 2023 | August 25, 2025 | August 25, 2026 |
3.4.x | May 3, 2023 | May 3, 2025 | May 3, 2026 |
3.3.x | November 4, 2022 | November 4, 2024 | November 4, 2025 |
3.2.x | July 25, 2022 | July 25, 2024 | July 25, 2025 |
Upgrade from older versions
Upgrading from any earlier Kafka Streams version to 4.2.0 is supported.
If you’re upgrading from Kafka Streams 3.4 or earlier, you must do two rolling bounces.
During the first rolling bounce, set the
upgrade.from="<older_version>"configuration. Possible values are 2.4 through 3.4.During the second bounce, remove the
upgrade.fromconfig.
This is required to handle two changes safely:
A change in foreign-key join serialization format.
A change in the serialization format for an internal repartition topic. For more information, see KIP-904.
If you skip or delay the second rolling bounce, your deployment continues using the previous eager rebalancing protocol, but you can switch safely to cooperative rebalancing at any time, after the entire group is on Kafka Streams 3.4 or later, by removing the configuration value and bouncing.
Prepare your application instances for a rolling bounce, and ensure that the
upgrade.fromconfiguration is set to the version you’re upgrading from.Bounce each application instance.
Prepare your newly deployed Kafka Streams 4.2.0 application instances for a second round of rolling bounces. Be sure to remove the value for the
upgrade.fromconfiguration.Bounce each application instance a second time to complete the upgrade.
As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as Kafka Streams 0.10.x to 4.2.0 in offline mode requires the following steps:
Stop all old application instances, for example, Kafka Streams 0.10.x.
Update your code and swap old code and JAR files with new code and new JAR files.
Restart all new 4.2.0 application instances.
Upgrade to Kafka Streams 4.2.0 from Kafka Streams 3.1.x or later
RocksDB compatibility matrix
The following table shows which RocksDB version ships with each Kafka Streams version. This matters when you use state stores backed by RocksDB, because mismatched versions can lead to runtime errors or data incompatibility.
Kafka Streams version | RocksDB version | Notes |
|---|---|---|
4.2.x | 10.1.3 | |
4.1.x | 10.1.3 | |
4.0.x | 9.7.3 | Significant API changes, see Upgraded RocksDB dependency. |
3.5.x – 3.9.x | 7.9.2 | Requires newer GCC; not compatible with RHEL 7. |
3.0.x – 3.4.x | 7.1.2 | Downgrading from 3.0.x or newer to a 2.8.x or older version requires special attention because of a data format change. |
2.6.x – 2.8.x | 5.18.4 | |
2.3.x – 2.5.x | 5.18.3 | Can specify more RocksDB configurations, which helps to limit RocksDB off-heap memory usage. |
2.2.x | 5.15.10 | |
2.1.x | 5.14.2 | |
2.0.x | 5.7.3 |
The exact RocksDB version is a transitive dependency of the Kafka Streams artifact, and Maven or Gradle manage it automatically, so updating Kafka Streams typically updates RocksDB.
If you are customizing or overriding the RocksDB version, for advanced tuning or bugfixes, be careful to match major versions and API compatibility, especially after the significant upgrade in Kafka Streams 4.0.0.
For more information, see:
Upgrade your Kafka Streams applications to version 4.2.0
To use version 4.2.0, update the Kafka Streams version your application depends on to use the version number 4.2.0. You might need to make minor code changes, detailed in the following sections, and recompile your application.
For example, in your pom.xml file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<!-- update version to 4.2.0 -->
<version>4.2.0</version>
</dependency>
Streams API changes in Kafka Streams version 4.2.0
Streams Rebalance Protocol
The Streams Rebalance Protocol is a broker-driven rebalancing system designed specifically for Kafka Streams applications. Following the pattern of KIP-848, which moved rebalance coordination of plain consumers from clients to brokers, KIP-1071 extends this model to Kafka Streams workloads. Instead of clients computing new assignments during rebalance events that involve all members of the group, the broker computes assignments continuously. Instead of using a consumer group, the Kafka Streams application registers as a Kafka Streams group with the broker, which manages and exposes all metadata required for coordination of the Kafka Streams application instances.
Important
Use the Streams Rebalance Protocol as the default option.
Use client-based rebalancing only if you need one of the features that are not included in this release.
The Streams Rebalance Protocol is enabled on Dedicated, Enterprise, Standard, and Basic clusters.
Streams Rebalance Protocol features
Core Streams Group Rebalance Protocol: The
group.protocol=streamsconfiguration enables the dedicated streams rebalance protocol. This separates streams groups from consumer groups and provides a streams-specific group membership lifecycle and metadata management on the broker.Sticky Task Assignor: Provides a basic task assignment strategy that minimizes task movement during rebalances.
Interactive Query Support: IQ operations are compatible with the new streams protocol.
New Admin RPC:
StreamsGroupDescribeRPC provides streams-specific metadata separate from consumer group information, with corresponding access through theAdminclient.CLI Integration: You can list, describe, and delete streams groups with the
kafka-streams-groupsscript.
Not included in the current release
This release covers a subset of the functionality detailed in KIP-1071. The following features are not included in this release:
Static membership: You can’t set a client
group.instance.id.Topology updates: Updates to existing topologies are not supported. If you change a topology significantly, for example, by adding new source topics or changing the number of sub-topologies, you must create a new streams group.
High availability assignor: Only the sticky assignor is supported. Warmup tasks and rack aware assignment are not supported in this release.
Regular Expressions: Pattern-based topic subscription is not supported.
Online migration: Group migration is available between the classic and new streams protocols, but only as an offline upgrade, which means that all instances must first be stopped, and the group must become empty before it can be restarted with the new protocol enabled. Rolling bounce online upgrade is not supported.
Note
KAFKA-20254 doesn’t affect Confluent Cloud. Confluent Cloud supports offline group migrations from “classic” to “streams” protocol, subject to the limitations described in this section.
Custom Kafka clients: The KafkaClientSupplier interface is not fully supported. Using a custom
KafkaClientSuppliercan provide only restore/global consumer, producer, and admin client. It’s not possible to provide the “main” consumer when “streams” groups are enabled.
Why use the Streams Rebalance Protocol?
KIP-1071 delivers broker-driven rebalancing specifically optimized for Kafka Streams applications. This reduces coordination overhead and improves failure detection, leading to more stable and responsive stream processing workloads.
Broker-driven coordination: Centralizes task assignment logic on brokers instead of the client. This provides consistent, authoritative task assignment decisions from a single coordination point and reduces the potential for split-brain scenarios.
Faster, more stable rebalances: Reduces rebalance duration and impact by removing the global synchronization point. This helps minimize application downtime during membership changes or failures.
Better observability: Provides dedicated metrics and admin interfaces that separate streams from consumer groups, leading to clearer troubleshooting with broker-side observability.
Enable the protocol
Kafka Streams clients must be running Kafka 4.2 or later to use this protocol.
In your Kafka Streams application configuration, set
group.protocol=streams.
Online “rolling-bounce” migration between the classic consumer group protocol and the Streams Rebalance Protocol is not supported in either direction, but you can upgrade an empty group from “classic” to “streams” or downgrade from “streams” to “classic”. Also, the application.id can’t be in use as a group.id by any other consumer, either “classic” or “consumer”, or by any share-group application.
You can delete a previous consumer group by using kafka-consumer-groups in ${CONFLUENT_HOME}/bin before starting the application with the new protocol, but this also deletes all offsets for that group.
To operate the new streams groups, you can explore the options of kafka-streams-groups to list, describe, and delete streams groups. In the new protocol, session.timeout.ms, heartbeat.interval.ms, initial.rebalance.delay.ms, and num.standby.replicas are group-level configurations. Setting session.timeout.ms, heartbeat.interval.ms, or num.standby.replicas on the client has no effect; Kafka Streams ignores these client-side settings. Similarly, the existing cluster configuration, initial.rebalance.delay.ms, doesn’t apply to streams groups.
You can use the kafka-configs tool to set these configurations, for example:
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type groups
--entity-name wordcount --add-config streams.num.standby.replicas=1
Streams API changes in Kafka Streams version 4.1.x
KIP-1071: Early Access of the Streams Rebalance Protocol
The Streams Rebalance Protocol is available as an Early Access feature.
This release covers a subset of the functionality detailed in KIP-1071.
KIP-1111: Enforce explicit naming for internal resources
KIP-1111 enables you to enforce explicit naming for all internal resources of the topology, including internal topics, like changelog and repartition topics, and their associated state stores. This ensures that you name every internal resource before deploying the Kafka Streams application, which is essential for upgrading your topology. You can enable this feature with StreamsConfig by using the StreamsConfig#ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG parameter. When set to true, the application doesn’t start if any internal resource has an auto-generated name.
KIP-1020: Move window.size.ms and windowed.inner.class.serde
The window.size.ms and windowed.inner.class.serde configurations are now defined in TimeWindowed and SessionWindowed SerDes. For more information, see KIP-1020.
Streams API changes in Kafka Streams version 4.0.x
For a full list of API changes, see KAFKA-12822.
Kafka Streams 4.0 removes all public API that was deprecated in Kafka Streams 3.6 or an earlier release, except for
JoinWindows.of()andJoinWindows#grace(). For more information, see KAFKA-17531.Ensure that your Java clients are at Kafka Streams version 2.1 or later.
Take care with Kafka clients that aren’t part of Apache Kafka®. For more information, see KIP-896.
Kafka Streams 4.0 raises the minimum Java version for clients and Kafka Streams applications from Java 8 to Java 11. For more information, see KIP-750.
Kafka Streams 4.0 no longer supports
eos-v1(Exactly Once Semantics version 1).
KAFKA-12822: Remove deprecated APIs of Kafka Streams in 4.0
Kafka Streams 4.0 removes all methods, classes, APIs, and configuration parameters that were deprecated up to and including Kafka Streams 3.6.
The following list shows some of the important deprecated APIs. For the full list, see KAFKA-12822.
KIP-1056: Remove default. prefix for exception handler StreamsConfig
KIP-1056 deprecates the default.deserialization.exception.handler and default.production.exception.handler configurations because they don’t have any overwrites. Instead, use the new deserialization.exception.handler and production.exception.handler configurations. For more information, see KIP-1056.
KIP-1070: Deprecate MockProcessorContext
The previous release introduced a new version of the Processor API and incrementally replaced and deprecated the old Processor API.
KIP-1070 deprecates the following APIs:
MockProcessorContextTransformerTransformerSupplierValueTransformerValueTransformerSupplier
For more information, see KIP-1070.
KIP-1077: Deprecate ForeachProcessor and move to internal package
KIP-1077 deprecates the ForeachProcessor class. This change improves the organization and clarity of the Kafka Streams API by keeping internal classes out of public packages. For more information, see KIP-1077.
KIP-1078: Remove leaking getter methods in Joined helper class
KIP-1078 deprecates the leaking getter methods in the Joined helper class. These methods are scheduled for future removal without a replacement, because they don’t add any value to Kafka Streams users. For more information, see KIP-1078.
KIP-1085: Fix leaking *_DOC variables in StreamsConfig
To improve encapsulation and organization of configuration documentation within Kafka Streams, KIP-1085 deprecates certain public doc description variables that are used only within the StreamsConfig or TopologyConfig classes. KIP-1085 also deprecates the unused DUMMY_THREAD_INDEX variable. For more information, see KIP-1085.
KIP-1087: Removing intermediateTopicsOption from StreamsResetter
Because Kafka Streams 4.0 removes the already-deprecated #through method, the intermediateTopicsOption of the StreamsResetter tool in Kafka is no longer needed; KIP-1087 deprecates it. For more information, see KIP-1087.
KIP-1091: Improved Kafka Streams operator metrics
Because the broker can’t collect string metrics (KIP-714), this version introduces numeric counterparts that enable proper broker-side metric collection for Kafka Streams applications. KIP-1091 exposes these metrics at the INFO recording level and adds a thread-level metric with a String value for users leveraging Java Management Extensions (JMX). For more information, see KIP-1091.
KIP-1104: Allow foreign key extraction from both key and value in KTable joins
KIP-1104 introduces a new method in the Java and Scala APIs that accepts a BiFunction for foreign key extraction, enabling foreign key extraction from both the key and value in KTable joins. This change reduces storage overhead and improves API usability.
Previously, foreign key joins in KTables allowed only extraction from the value, which led to data duplication and potential inconsistencies.
KIP-1104 deprecates the existing methods but doesn’t remove them, ensuring backward compatibility.
For more information, see KIP-1104.
KIP-1106: Add duration based offset reset option for consumer clients
KIP-1106 deprecates the Topology.AutoOffsetReset enum and replaces it with a new class, org.apache.kafka.streams.AutoOffsetReset, that captures the reset strategies. KIP-1106 also adds new methods to the org.apache.kafka.streams.Topology and org.apache.kafka.streams.kstream.Consumed classes to support the new reset strategy. These changes provide more flexibility and efficiency for managing offsets, especially in scenarios that involve long-term storage and infinite retention. For more information, see KIP-1106.
KIP-1112: Allow custom processor wrapping
You can now configure your topology with a ProcessorWrapper, which enables accessing and optionally wrapping and replacing any processor in the topology by injecting an alternative ProcessorSupplier in its place. You can use this to peek records and access the processor context even for DSL operators, for example, to implement a logging or tracing framework, or to aid in testing or debugging scenarios.
You must implement the ProcessorWrapper interface and pass the class or class name into the configurations by using the new StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG configuration.
Kafka Streams applies this configuration during the topology building phase, so the configuration doesn’t take effect unless you pass it when you create the StreamsBuilder (DSL) or Topology (PAPI) objects. You must use the StreamsBuilder or Topology constructor overload that accepts a TopologyConfig parameter so that Kafka Streams picks up the StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG.
For more information, see KIP-1112.
Upgraded RocksDB dependency
This version upgrades the RocksDB dependency to version 9.7.3, from 7.9.2. This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes.
RocksDB 9.7.3 removes the
org.rocksdb.AccessHintclass and its associated methods.RocksDB 9.7.3 removes several methods related to compressed block cache configuration in the
BlockBasedTableConfigclass, includingblockCacheCompressedNumShardBits,blockCacheCompressedSize, and their corresponding setters. Thecacheoption now consolidates this functionality; configure your compressed block cache by using thesetCachemethod instead.RocksDB 9.7.3 removes the
NO_FILE_CLOSESfield from theorg.rocksdb.TickerTypeenum. As a result, thenumber-open-filesmetric doesn’t work as expected. Thenumber-open-filesmetric returns the constant-1until it is officially removed.The
org.rocksdb.Options.setLogger()method now accepts aLoggerInterfaceparameter instead of the previousLogger.
RocksDB 9.7.3 also modifies some data types in its Java API. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations.
These changes are largely transparent to most Kafka Streams users. However, if you use advanced RocksDB customizations in your Streams applications, particularly through the rocksdb.config.setter, consult the RocksDB 9.7.3 changelog to ensure a smooth transition and adapt your configurations as needed. Specifically, if you’re leveraging the removed AccessHint class, the removed methods from the BlockBasedTableConfig class, the NO_FILE_CLOSES field from TickerType, or you’re relying on the previous signature of setLogger(), you must update your implementations.
Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.
KIP-714: Client metrics and observability
In this release, the ClientInstanceIds instance stores the global consumer Uuid for the KIP-714 id with a key of global stream-thread name appended with -global-consumer, where before it was only the global stream-thread name.
KIP-1065: Add “retry” return-option to ProductionExceptionHandler
Previously, Kafka Streams didn’t invoke the ProductionExceptionHandler on a (retryable) TimeoutException. In Kafka Streams 4.0, Kafka Streams calls the handler, and the default handler returns RETRY to preserve existing behavior. A custom handler can break the infinite retry loop by returning either CONTINUE or FAIL. For more information, see KIP-1065.
KIP-1076: Metrics for client applications KIP-714 extension
You can collect Kafka Streams metrics broker-side by using the KIP-714 broker plugin. The plugin collects metrics for the internally used clients of a Kafka Streams application and for the Kafka Streams runtime itself. For more information, see KIP-1076.
Streams API changes in Kafka Streams 3.9.0
KIP-1033: Improve exception handling
You can provide a processing exception handler to manage exceptions during the processing of a record, rather than throwing the exception all the way out of your Kafka Streams application. Provide the configs by using the StreamsConfig as StreamsConfig#PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG. The specified handler must implement the org.apache.kafka.streams.errors.ProcessingExceptionHandler interface. For more information, see KIP-1033
KIP-1049: Customize logging interval
Kafka Streams now enables you to customize the logging interval of the stream-thread runtime summary by using the newly added log.summary.interval.ms configuration. By default, Kafka Streams logs the summary every 2 minutes. For more information, see KIP-1049.
Streams API changes in Kafka Streams 3.8.0
KIP-924: Customizable task assignment for Streams
Kafka Streams now supports customizable task assignment strategies through the task.assignor.class configuration. You can set the configuration to the fully qualified class name of a custom task assignor implementation that extends the new org.apache.kafka.streams.processor.assignment.TaskAssignor interface.
The new configuration also enables bringing back the behavior of the old task assignor, StickyTaskAssignor, that was used before the introduction of the HighAvailabilityTaskAssignor. If no custom task assignor is configured, the default task assignor, HighAvailabilityTaskAssignor, is used.
If you were using the internal.task.assignor.class configuration, switch to using the new task.assignor.class configuration instead. The internal configuration is deprecated and is scheduled for removal in a future release. If you were previously plugging in the StickyTaskAssignor through the legacy internal.task.assignor.class configuration, you must ensure that you are importing the new org.apache.kafka.streams.processor.assignment.StickTaskAssignor when you switch to the new task.assignor.class config, which is a version of the StickyTaskAssignor that implements the new public TaskAssignor interface. For more information, see the public interface section of KIP-924.
KIP-989: Improved StateStore iterator metrics for detecting leaks
To improve detection of leaked state store iterators, KIP-989 adds new store-level metrics to track the number and age of open iterators. The new metrics are num-open-iterators, iterator-duration-avg, iterator-duration-max, and oldest-iterator-open-since-ms. These metrics are available for all state stores, including RocksDB, in-memory, and custom
Streams API changes in Kafka Streams 3.7.0
KIP-925: Rack-aware task assignment in Kafka Streams (part 2)
Part one of KIP-925 added the min_traffic assignment strategy for Kafka Streams. Part Two finishes the KIP by introducing the second rack-aware assignment strategy, balanced_subtopology.
KIP-954: Expand default DSL store configuration to custom types
KIP-954 builds on KIP-591 and enables you to provide a default state store provider for your custom stores. KIP-954 provides a new interface, along with default support for RocksDB and in-memory state stores.
KIP-962: Relax non-null key requirement in Kafka Streams
Previously, Kafka Streams treated records with null-keys as invalid input for joins and dropped them. KIP-962 relaxes this behavior for various left-joins so that the join can process null-key records successfully.
The behavior of the following operators changed.
left join KStream-KStream: no longer drop left records with null-key and call
ValueJoinerwithnullfor the right value.outer join KStream-KStream: no longer drop left/right records with null-key and call
ValueJoinerwithnullfor the right/left value.left-foreign-key join KTable-KTable: no longer drop left records with null-foreign-key returned by the
ForeignKeyExtractorand callValueJoinerwithnullfor the right value.left join KStream-KTable: no longer drop left records with null-key and call
ValueJoinerwithnullfor the right value.left join KStream-GlobalTable: no longer drop records when
KeyValueMapperreturnsnulland callValueJoinerwithnullfor right value.
Stream-DSL users who want to keep the current behavior can prepend a .filter() operator to the previously listed operators and filter accordingly. The following snippets illustrate how to keep the pre-7.7.0 behavior.
//left join KStream-KStream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//outer join KStream-KStream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//left-foreign-key join KTable-KTable
Function<String, String> foreignKeyExtractor = leftValue -> ...
leftTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(rightTable, foreignKeyExtractor, (leftValue, rightValue) -> join(leftValue, rightValue), Named.as("left-foreign-key-table-join"));
//left join KStream-KTable
leftStream
.filter((key, value) -> key != null)
.leftJoin(kTable, (k, leftValue, rightValue) -> join(leftValue, rightValue));
//left join KStream-GlobalTable
KeyValueMapper<String, String, String> keyValueMapper = (key, value) -> ...;
leftStream
.filter((key, value) -> keyValueMapper.apply(key,value) != null)
.leftJoin(globalTable, keyValueMapper, (leftValue, rightValue) -> join(leftValue, rightValue));
KIP-960 / KIP-968: IQ support for versioned state stores
The Kafka 3.5 release added versioned state stores (KIP-889), but you couldn’t query the new stores. KIP-960 and KIP-968 close this gap by adding two new IQv2 query types, VersionedKeyQuery and MultiVersionedKeyQuery. Both queries enable you to look up a single key and ask for the most recent value, a historic value, or a range of historic values for the provided key.
KIP-985: Add reverseRange and reverseAll query over kv-store in IQv2
IQv2 supports RangeQuery and enables you to query for a range of keys and specify unbounded, bounded, or half-open key-ranges. It returns data in ascending (byte[]-lexicographical) order (per partition). KIP-985 extends this functionality by adding the .withDescendingKeys() method to enable receiving data in descending order, so you can request the result to be ordered (per partition) in either ascending or descending order, or to leave the order unspecified.
KIP-988: Streams standby update listener
KIP-988 adds a new interface for handling cases where standby tasks have their state stores registered, load a batch of records, and stop updates.
KIP-992: Add TimestampedKeyQuery and TimestampedRangeQuery to IQv2
KIP-992 adds new timestamped-key and timestamped-range interactive queries for timestamped key-value state stores. This change improves the type safety of the IQv2 API. The existing RangeQuery now always returns only the value if issued against a timestamped key-value store.
default.dsl.store configuration deprecated
Instead, use the dsl.store.suppliers.class. If you currently specify default.dsl.store=ROCKS_DB or default.dsl.store=IN_MEMORY, replace these configurations with dsl.store.suppliers.class=BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class and dsl.stores.suppliers.class=BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class, respectively.
Streams API changes in Kafka Streams 3.6.0
KIP-923: Add a grace period to stream-table join
KIP-923 adds a grace period to stream-table joins to improve table-side out-of-order data handling. The joined object has a new method named withGracePeriod that causes the table-side lookup to happen only after the grace period has passed.
KIP-925: Rack-aware task assignment in Kafka Streams
KIP-925 introduces rack-aware task assignment. You can enable rack-aware task assignment for StickyTaskAssignor or HighAvailabilityTaskAssignor to compute task assignments that can minimize cross-rack traffic under certain conditions. For more information, including how to enable and configure this feature, see rack.aware.assignment.strategy.
KIP-941: Range queries to accept null lower and upper bounds
Previously, RangeQuery did not support null to specify “”no upper/lower bound”. KIP-941 allows users to pass null into withRange(...) for lower/upper bounds to specify a full or half-open range:
withRange(null, null) == withNoBounds()withRange(lower, null) == withLowerBound(lower)withRange(null, upper) == withUpperBound(upper)
Streams API changes in Kafka Streams 3.5.0
Downgrading from Kafka Streams 3.5.x or later to Kafka Streams 3.4.x or earlier requires special attention: starting in the 3.5.0 release, Kafka Streams uses a new serialization format for repartition topics. This means that older versions of Kafka Streams don’t recognize the bytes written by newer versions, so it’s harder to downgrade Kafka Streams with version 3.5.0 or later to older versions in-flight. For more information, see KIP-904.
For a downgrade, first switch the config from upgrade.from to the version you’re downgrading to. This disables writing the new serialization format in your application. It’s important to wait in this state long enough to ensure that the application has finished processing any “in-flight” messages written into the repartition topics in the new serialization format. Afterward, you can downgrade your application to a pre-3.5.x version.
KIP-399: Extend ProductionExceptionHandler to cover serialization exceptions
KIP-399 adds a method, handleSerializationException(), to the ProductionExceptionHandler interface to handle any serialization errors encountered while producing records.
KIP-884: Add configuration to configure KafkaClientSupplier
KIP-884 adds a new configuration, default.client.supplier that enables using a custom KafkaClientSupplier without any code changes.
KIP-889: Versioned state stores
KIP-889 introduces versioned state stores to improve the accuracy of joins when out-of-order records are processed. For more information, see dsl.timestamp.based.semantics.
In addition to KIP-889, KIP-914 updates DSL processing semantics if a user opts-in to use the new versioned key-value stores. Using the new versioned key-value stores, DSL processing can better handle out-of-order data. For example, a late record might be dropped and stream-table joins can do a timestamp-based lookup into the table. Table aggregations and primary/foreign-key table-table joins are also improved. Versioned key-value stores are not supported for global-KTable, and they don’t work with suppress().
KIP-904: Guarantee subtractor is called before adder if key has not changed
KIP-904 improves the implementation of KTable aggregations. In general, an input KTable update triggers a result refinement for two rows, but prior to KIP-904, if both refinements happened to the same result row, Kafka Streams applied two independent updates to the same row, resulting in spurious intermediate results. KIP-904 detects this case and applies only a single update, which avoids spurious intermediate results.
KIP-907: Add Boolean serde to public interface
Kafka Streams includes built-in Serdes for most primitive types. KIP-907 adds a new one for booleans.
Streams API changes in Kafka Streams 3.4.0
KIP-770: Replace cache.max.bytes.buffering with statestore.cache.max.bytes
KIP-770 deprecates the existing cache.max.bytes.buffering configuration and introduces a new statestore.cache.max.bytes configuration to replace it. The semantics and default value of the cache size configuration are unchanged. This KIP also adds a new cache.size metric at the DEBUG level so users can monitor the actual size of the Kafka Streams cache.
KIP-837: Allow multicasting a result record
KIP-837 enables you to multicast result records to every partition of downstream sink topics and adds functionality for choosing to drop result records without sending.
KIP-865: Support “–bootstrap-server” in kafka-streams-application-reset
KIP-865 updates the Kafka Streams application reset tool’s server parameter name to conform to the other Kafka tooling by deprecating the --bootstrap-servers parameter and introducing a new --bootstrap-server parameter in its place.
Streams API changes in Kafka Streams 3.3.0
Source/sink node metrics for consumed/produced throughput in Kafka Streams
Starting with Kafka Streams 3.3.0, source and sink node metrics for consumed and produced throughput are available in Kafka Streams.
Previously, with the metrics available in the plain consumer you could derive the consumed throughput of your applications at the subtopology level, but the same was not true for the produced throughput.
KIP-846 fills this gap and gives you a way to compute the production rate of each subtopology by introducing two new metrics for the throughput at sink nodes. Even though it’s possible to derive the consumed throughput with existing client-level metrics, KIP-846 also adds two new metrics for the throughput at source nodes, to provide an equally fine-grained metrics scope as for the newly added metrics at the sink nodes, and to simplify the user experience.
Pause and resume Kafka Streams topologies
KIP-834 adds the ability to pause and resume topologies. You can use this feature to reduce resources used or modify data pipelines. Paused topologies skip processing, punctuation, and standby tasks. For distributed Kafka Streams applications, each instance must be paused and resumed separately.
Consolidate KStream transform() and process() methods
KIP-820 generalizes the Kafka Streams API to consolidate Transformers, which could forward results, and Processors, which could not. The change makes use of the new type-safe Processor API, which simplifies Kafka Streams, making it easier to use and learn.
New KafkaStreams.close() API
KIP-812 introduces another form of the KafkaStreams.close() API that forces the member to leave the consumer group. This new method efficiently closes the stream permanently by forcing the member to leave the consumer group.
Streams API changes in Kafka Streams 3.2.0
Rack awareness for Kafka Streams
Starting with Kafka Streams 3.2.0, Kafka Streams can distribute its standby replicas over distinct “racks” with KIP-708. To form a “rack”, Kafka Streams uses tags in the application configuration. For example, Kafka Streams clients might be tagged with the cluster or the cloud region they run in. Users can specify the tags for the rack-aware distribution of the standby replicas by setting the rack.aware.assignment.tags configuration. During task assignment, Kafka Streams tries to distribute the standby replicas across different task dimensions. Rack-aware standby assignment improves fault tolerance in case of the failure of an entire “rack”. For example, you can use this feature to ensure that Kafka Streams distributes replicas across different availability zones in a cloud hosting provider.
Add record metadata to state store context
KIP-791 adds the recordMetadata() method to the StateStoreContext and provides access to the topic, partition, and offset of the current record. Exposing the current context this way enables state stores to track their current offset in each input partition and to implement the consistency mechanisms that KIP-796 introduces.
Interactive Query v2 preview
Kafka Streams 3.2.0 introduces Interactive Queries v2 (IQv2). IQv2 is a preview feature, and the IQv2 interfaces are marked as @Evolving, which means they might break compatibility in minor releases without a deprecation period if preview users find significant flaws in the current API.
KIP-796 specifies an improved interface for Interactive Queries in Kafka Streams (IQv2). The new interface makes querying the state store simpler and faster and reduces the maintenance cost when modifying existing state stores and adding new state stores. KIP-796 describes the generic interface for querying state stores with Interactive Queries. You can add specific query types to Interactive Query v2 by implementing the
Queryinterface. KIP-976 also defines theKeyQueryclass so that users can evaluate a key/value lookup by using IQv2.KIP-805 adds the
RangeQueryclass to IQv2. TheRangeQueryclass is an implementation of theQueryinterface that enables querying state stores over a range specified by upper or lower key bounds or by scanning all records of a state store when no bounds are provided.KIP-806 adds two implementations of the
Queryinterface.The
WindowKeyQueryclass enables scanning over windows with a given key within a specified time range.The
WindowRangeQueryclass enables scanning over windows within a given time range independently of the windows’ keys.
Streams API changes in Kafka Streams 3.1.0
Java 17 support
In Kafka Streams 3.1.0, Kafka Streams supports Java 17.
Improved left/outer stream-stream join semantics
KIP-633 improves the semantics of left/outer stream-stream joins.
Previously, a left/outer stream-stream join might have emitted so-called spurious left/outer results because of an eager-emit strategy. KIP-633 changes the implementation to emit left/outer join result records only after the join window closes. KIP-633 deprecates the old API for specifying the join window, JoinWindows.of(), in favor of the JoinWindows.ofTimeDifferenceAndGrace() and JoinWindows.ofTimeDifferencWithNoGrace() methods. The new semantics apply only if you use the new join window builders.
KIP-633 also makes setting a grace period mandatory for windowed aggregations: TimeWindows (hopping/tumbling), SessionWindows, and SlidingWindows. KIP-633 deprecates the corresponding .of(...) builder methods in favor of the new .ofTimeDifferenceAndGrace() and .ofTimeDifferencWithNoGrace() methods.
New metrics to track blocking times
KIP-761 adds new metrics that enable tracking blocking times on the underlying consumer and producer clients. For more information, see Monitor Kafka Streams Applications.
Interactive Query improvements
KIP-763 and KIP-766 improve Interactive Queries. Range queries now accept null as a lower/upper key-range bound to indicate an open-ended lower/upper bound.
Custom partitioners for foreign-key table-table joins
Foreign-key table-table joins now support custom partitioners through KIP-775. Previously, if an input table was partitioned by a non-default partitioner, joining records might fail. With KIP-775, you now can pass a custom StreamPartitioner into the join using the newly added TableJoined object.
Upgrade guide for versions earlier than Kafka Streams 3.1.x
For upgrade guidance on Kafka Streams versions earlier than 3.1.0, see Legacy Streams Upgrade Guide.
Note
This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.