Kafka Streams Upgrade Guide for Confluent Platform
To upgrade from Confluent Platform versions earlier than 7.1.x, see Legacy Streams Upgrade Guide.
Upgrade from older versions
Upgrading from any earlier Kafka Streams version to Confluent Platform 8.1.0 is supported.
If you’re upgrading from Confluent Platform 7.4.x (Kafka Streams 3.4) or earlier, you must do two rolling bounces.
During the first rolling bounce, set the
upgrade.from="<older_version>"configuration. Possible values are 0.10.0 through 3.4.During the second bounce, remove the
upgrade.fromconfig.
This is required to handle three changes safely:
The introduction of the cooperative rebalancing protocol of the embedded consumer. For more information, see KIP-429.
A change in foreign-key join serialization format.
A change in the serialization format for an internal repartition topic. For more information, see KIP-904.
If you skip or delay the second rolling bounce, your deployment continues using the previous eager rebalancing protocol, but you can switch safely to cooperative rebalancing at any time, once the entire group is on Confluent Platform 8.1.0 (Kafka Streams 3.4) or later by removing the configuration value and bouncing.
Prepare your application instances for a rolling bounce, and ensure that the
upgrade.fromconfig is set to the version it is being upgraded from.Bounce each instance of your application once.
Prepare your newly deployed Confluent Platform 8.1.0 (Kafka Streams 4.1.0) application instances for a second round of rolling bounces. Be sure to remove the value for the
upgrade.fromconfig.Bounce each instance of your application once more to complete the upgrade.
As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as Confluent Platform 3.0.x (Kafka Streams 0.10.x) to Confluent Platform 8.1.0 (Kafka Streams 4.1.0) in offline mode requires the following steps:
Stop all old application instances, for example, Confluent Platform 3.0.x.
Update your code and swap old code and JAR files with new code and new JAR files.
Restart all new Confluent Platform 8.1.0 (Kafka Streams 4.1.0) application instances.
Upgrade to Confluent Platform 8.1.0 from Confluent Platform 7.1.x or later
Compatibility
Kafka Streams applications built with Confluent Platform 8.1.0 are forward and backward compatible with certain Kafka clusters.
- Forward-compatible to newer clusters up to Confluent Platform 8.1.0:
Existing Kafka Streams applications built with Confluent Platform 3.x and later work with upgraded Kafka clusters running Confluent Platform 8.1.0.
- Backward-compatible to older clusters down to Confluent Platform 3.0.x:
New Kafka Streams applications built with Confluent Platform 8.1.0 work with older Kafka clusters running Confluent Platform 6.x down to 3.0.x.
Kafka clusters running Confluent Platform 3.0.x, Confluent Platform 3.1.x, or Confluent Platform 3.2.x are not compatible with new Confluent Platform 8.1.0 Kafka Streams applications.
Compatibility matrix
The following table shows the versions of the Kafka Streams API that are compatible with various Kafka broker versions. For versions earlier than Confluent Platform 7.4.x (Kafka Streams 2.4.x), see Legacy Streams Upgrade Guide.
Kafka Broker (columns) Confluent Platform version / Kafka Streams version | ||
Streams API (rows) | 5.1.x / 2.1.x 5.2.x / 2.2.x 5.3.x / 2.3.x 5.4.x / 2.4.x 5.5.x / 2.5.x 6.0.x / 2.6.x 6.1.x / 2.7.x 6.2.x / 2.8.x 7.0.x / 3.0.x 7.1.x / 3.1.x 7.2.x / 3.2.x 7.3.x / 3.3.x 7.4.x / 3.4.x 7.5.x / 3.5.x 7.6.x / 3.6.x 7.7.x / 3.7.x 7.8.x / 3.8.x 7.9.x / 3.9.x 8.0.x / 4.0.x | 8.1.x / 4.1.x |
5.4.x / 2.4.x 5.5.x / 2.5.x | compatible | compatible |
6.0.x / 2.6.x 6.1.x / 2.7.x 6.2.x / 2.8.x 7.0.x / 3.0.x 7.1.x / 3.1.x 7.2.x / 3.2.x 7.3.x / 3.3.x 7.4.x / 3.4.x 7.5.x / 3.5.x 7.6.x / 3.6.x 7.7.x / 3.7.x 7.8.x / 3.8.x 7.9.x / 3.9.x 8.0.x / 4.0.x 8.1.x / 4.1.x | compatible; enabling exactly-once v2 requires 5.5.x / 2.5.x | compatible |
The Streams API is not compatible with Kafka clusters running older Kafka versions (0.7, 0.8, 0.9).
RocksDB compatibility matrix
The following table shows which versions of RocksDB are included with corresponding versions of Kafka Streams. This is important when you use state stores backed by RocksDB, as mismatched versions may lead to runtime errors or data incompatibility.
Kafka Streams version | Confluent Platform version | RocksDB version | Notes |
|---|---|---|---|
2.0.x | 5.0.x | 5.7.3 | |
2.1.x | 5.1.x | 5.14.2 | |
2.2.x | 5.2.x | 5.15.10 | |
2.3.x – 2.5.x | 5.3.x – 5.5.x | 5.18.3 | Can specify more RocksDB configurations, which helps to limit RocksDB off-heap memory usage. |
2.6.x – 2.8.x | 6.0.x – 6.2.x | 5.18.4 | |
3.0.x – 3.4.x | 7.0.x – 7.4.x | 7.1.2 | Downgrading from 3.0.x or newer to 2.8.x or older version requires special attention due to on-disk format change. |
3.5.x – 3.9.x | 7.5.x – 7.9.x | 7.9.2 | Requires newer GCC; not compatible with RHEL 7. |
4.0.x | 8.0.x | 9.7.3 | Significant API changes, see Upgraded RocksDB dependency. |
4.1.x | 8.1.x | 10.1.3 |
The exact RocksDB version is a transitive dependency of the Kafka Streams artifact and is managed automatically if you use Maven or Gradle, so updating Kafka Streams typically updates RocksDB.
If you are customizing or overriding the RocksDB version, for advanced tuning or bugfixes, be careful to match major versions and API compatibility, especially after the significant upgrade in Kafka Streams 4.0.0.
For more information, see:
Upgrade your Kafka Streams applications to Confluent Platform 8.1.0 (Kafka Streams version 4.1.0)
To use Confluent Platform 8.1.0, update the Kafka Streams version your application depends on to use the version number 4.1.0. You may need to make minor code changes, detailed below, and recompile your application.
For example, in your pom.xml file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<!-- update version to 4.1.0 -->
<version>4.1.0</version>
</dependency>
Streams API changes in Confluent Platform 8.1.x (Kafka Streams version 4.1.0)
Early Access of the Streams Rebalance Protocol
The Streams Rebalance Protocol is a broker-driven rebalancing system designed specifically for Kafka Streams applications. Following the pattern of KIP-848, which moved rebalance coordination of plain consumers from clients to brokers, KIP-1071 extends this model to Kafka Streams workloads. Instead of clients computing new assignments on the client during rebalance events involving all members of the group, assignments are computed continuously on the broker. Instead of using a consumer group, the Kafka Streams application registers as a Kafka Streams group with the broker, which manages and exposes all metadata required for coordination of the Kafka Streams application instances.
This Early Access release covers a subset of the functionality detailed in KIP-1071. Do not use the new protocol in production. The API is subject to change in future releases.
Included in Early Access
Core Streams Group Rebalance Protocol: The
group.protocol=streamsconfiguration enables the dedicated streams rebalance protocol. This separates streams groups from consumer groups and provides a streams-specific group membership lifecycle and metadata management on the broker.Sticky Task Assignor: A basic task assignment strategy that minimizes task movement during rebalances is included.
Interactive Query Support: IQ operations are compatible with the new streams protocol.
New Admin RPC:
StreamsGroupDescribeRPC provides streams-specific metadata separate from consumer group information, with corresponding access via theAdminclient.CLI Integration: You can list, describe, and delete streams groups via the
kafka-streams-groups.shscript.
Not included in Early Access
Static membership: Setting a client
instance.idwill be rejected.Topology updates: If a topology is changed significantly, for example, by adding new source topics or changing the number of sub-topologies, a new streams group must be created.
High availability assignor: Only the sticky assignor is supported.
Regular expressions: Pattern-based topic subscription is not supported.
Reset operations: CLI offset reset operations are not supported.
Protocol migration: Group migration is not available between the classic and new streams protocols.
Why Use the Streams Rebalance Protocol?
Broker-driven coordination: Centralizes task assignment logic on brokers instead of the client. This provides consistent, authoritative task assignment decisions from a single coordination point and reduces the potential for split-brain scenarios.
Faster, more stable rebalances: Reduces rebalance duration and impact by removing the global synchronization point. This minimizes application downtime during membership changes or failures.
Better observability: Provides dedicated metrics and admin interfaces that separate streams from consumer groups, leading to clearer troubleshooting with broker-side observability.
Enable the protocol
Set
unstable.feature.versions.enable=truefor controllers and brokers.Set
unstable.api.versions.enable=trueon the brokers.In your Kafka Streams application configuration, set
group.protocol=streams.
Enabling the protocol requires that brokers and clients are running Confluent Platform 8.1 (Kafka 4.1). It should be enabled only on new clusters for testing purposes.
Migration between the classic consumer group protocol and the Streams Rebalance Protocol is not supported in either direction. An application using this protocol must use a new application.id that has not been used by any application on the classic protocol. Furthermore, this ID must not be in use as a group.id by any consumer (“classic” or “consumer”) nor share-group application.
You can delete a previous consumer group by using kafka-consumer-groups.sh in ${CONFLUENT_HOME}/bin before starting the application with the new protocol, but this also deletes all offsets for that group.
To operate the new streams groups, you can explore the options of kafka-streams-groups.sh to list, describe, and delete streams groups. In the new protocol, session.timeout.ms, heartbeat.interval.ms and num.standby.replicas are group-level configurations, which are ignored when they are set on the client side.
You can use the kafka-configs.sh tool to set these configurations, for example:
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type groups
--entity-name wordcount --add-config streams.num.standby.replicas=1
KIP-1111: Enforce explicit naming for internal resources
The introduction of KIP-1111 enables you to enforce explicit naming for all internal resources of the topology, including internal topics, like changelog and repartition topics, and their associated state stores. This ensures that every internal resource is named before the Kafka Streams application is deployed, which is essential for upgrading your topology. You can enable this feature via StreamsConfig using the StreamsConfig#ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG parameter. When set to true, the application won’t start if any internal resource has an auto-generated name.
KIP-1020: Move window.size.ms and windowed.inner.class.serde
The window.size.ms and windowed.inner.class.serde configurations are now defined in TimeWindowed and SessionWindowed SerDes. For more information, see KIP-1020.
Streams API changes in Confluent Platform 8.0.x (Kafka Streams version 4.0.x)
For a full list of API changes, see KAFKA-12822.
All public API, deprecated in Confluent Platform 7.6.x (Kafka 3.6) or an earlier release, have been removed, with the exception of
JoinWindows.of()andJoinWindows#grace(). For more information, see KAFKA-17531.Old protocol API versions have been removed. You should ensure brokers are at Confluent Platform version 5.1.x (Kafka Streams 2.1) or later before upgrading the Java clients to 4.0, including Connect and Kafka Streams that use the clients internally.
Similarly, ensure that your Java clients, including Connect and Kafka Streams, are at Confluent Platform version 5.1.x (Kafka Streams 2.1) or later before upgrading the brokers to Confluent Platform 8.x (Kafka Streams 4.0).
Care needs to be taken with Kafka clients that are not part of Apache Kafka®. For more information, see KIP-896.
The minimum Java version required by clients and Kafka Streams applications has been increased from Java 8 to Java 11. For more information, see KIP-750
Brokers, Connect, and tools now require Java 17. For more information, see KIP-1013
In this release,
eos-v1(Exactly Once Semantics version 1) is no longer supported. To useeos-v2, brokers must be running Confluent Platform version 5.5.x (Kafka Streams 2.5) or later.
KAFKA-12822: Remove deprecated APIs of Kafka Streams in 4.0
All deprecated methods, classes, APIs, and config parameters up to and including Confluent Platform 7.6.x (Kafka Streams 3.6) have been removed.
The following list shows some of the important deprecated APIs. For the full list, see KAFKA-12822.
KIP-1056: Remove default. prefix for exception handler StreamsConfig
The configs default.deserialization.exception.handler and default.production.exception.handler are deprecated, because they don’t have any overwrites. Instead, use the new configs: deserialization.exception.handler and production.exception.handler. For more information, see KIP-1056.
KIP-1070: Deprecate MockProcessorContext
In the previous release, a new version of the Processor API was introduced, and the old Processor API was incrementally replaced and deprecated.
These APIs follow this path and are deprecated:
MockProcessorContextTransformerTransformerSupplierValueTransformerValueTransformerSupplier
For more information, see KIP-1070.
KIP-1077: Deprecate ForeachProcessor and move to internal package
The ForeachProcessor class is deprecated. This change is aimed at improving the organization and clarity of the Kafka Streams API by ensuring that internal classes are not exposed in public packages. For more information, see KIP-1077.
KIP-1078: Remove leaking getter methods in Joined helper class
The leaking getter methods in the Joined helper class are deprecated. These methods are deprecated without a replacement for future removal, because they don’t add any value to Kafka Streams users. For more information, see KIP-1078.
KIP-1085: Fix leaking *_DOC variables in StreamsConfig
To ensure better encapsulation and organization of configuration documentation within Kafka Streams, certain public doc description variables that are only used within the StreamsConfig or TopologyConfig classes are deprecated. Also, the unused variable DUMMY_THREAD_INDEX is deprecated. For more information, see KIP-1085.
KIP-1087: Removing intermediateTopicsOption from StreamsResetter
Due to the removal of the already deprecated #through method in Kafka Streams, the intermediateTopicsOption of StreamsResetter tool in Kafka is not needed any more and is deprecated. For more information, see KIP-1087.
KIP-1091: Improved Kafka Streams operator metrics
Because string metrics can’t be collected on the broker side (KIP-714), this version introduces numeric counterparts to enable proper broker-side metric collection for Kafka Streams applications. These metrics are available at the INFO recording level, and a thread-level metric with a String value is available for users leveraging Java Management Extensions (JMX). For more information, see KIP-1091.
KIP-1104: Allow foreign key extraction from both key and value in KTable joins
To reduce storage overhead and improve API usability, a new method in the Java and Scala APIs that accepts a BiFunction for foreign key extraction is introduced, which enables foreign key extraction from both the key and value in KTable joins.
Previously, foreign key joins in KTables allowed only extraction from the value, which led to data duplication and potential inconsistencies.
This enhancement introduces a new method in the Java and Scala APIs that accepts a BiFunction for foreign key extraction, enabling more intuitive and efficient joins.
The existing methods are deprecated but not removed, ensuring backward compatibility. This change aims to reduce storage overhead and improve API usability.
For more information, see KIP-1104.
KIP-1106: Add duration based offset reset option for consumer clients
The Topology.AutoOffsetReset enum is deprecated and replaced with a new class, org.apache.kafka.streams.AutoOffsetReset to capture the reset strategies. New methods are added to the org.apache.kafka.streams.Topology and org.apache.kafka.streams.kstream.Consumed classes to support the new reset strategy. These changes aim to provide more flexibility and efficiency in managing offsets, especially in scenarios involving long-term storage and infinite retention. For more information, see KIP-1106.
KIP-1112: Allow custom processor wrapping
You can now configure your topology with a ProcessorWrapper, which enables accessing and optionally wrapping and replacing any processor in the topology by injecting an alternative ProcessorSupplier in its place. You can use this to peek records and access the processor context even for DSL operators, for example, to implement a logging or tracing framework, or to aid in testing or debugging scenarios.
You must implement the ProcessorWrapper interface and pass the class or class name into the configs via the new StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG config.
This config is applied during the topology building phase, and therefore doesn’t take effect unless the config is passed in when creating the StreamsBuilder (DSL) or Topology (PAPI) objects. You must use the StreamsBuilder or Topology constructor overload that accepts a TopologyConfig parameter for the StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG to be picked up.
For more information, see KIP-1112.
Upgraded RocksDB dependency
This version upgrades the RocksDB dependency to version 9.7.3, from 7.9.2. This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes.
The
org.rocksdb.AccessHintclass, along with its associated methods, has been removed.Several methods related to compressed block cache configuration in the
BlockBasedTableConfigclass have been removed, includingblockCacheCompressedNumShardBits,blockCacheCompressedSize, and their corresponding setters. These functionalities are now consolidated under thecacheoption, and you should configure your compressed block cache by using thesetCachemethod instead.The
NO_FILE_CLOSESfield has been removed from theorg.rocksdb.TickerTypeenum. As a result, thenumber-open-filesmetric does not work as expected. Metricnumber-open-filesreturns constant -1 from now on, until it is officially removed.The
org.rocksdb.Options.setLogger()method now accepts aLoggerInterfaceas a parameter, instead of the previousLogger.
Some data types used in RocksDB’s Java API have been modified. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations.
These changes are expected to be largely transparent to most Kafka Streams users. However, if you’re using advanced RocksDB customizations within your Streams applications, particularly through the rocksdb.config.setter, you are advised to consult the detailed RocksDB 9.7.3 changelog to ensure a smooth transition and adapt your configurations as needed. Specifically, if you’re leveraging the removed AccessHint class, the removed methods from the BlockBasedTableConfig class, the NO_FILE_CLOSES field from TickerType, or you’re relying on the previous signature of setLogger(), you must update your implementations.
Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.
KIP-714: Client metrics and observability
In this release, the ClientInstanceIds instance stores the global consumer Uuid for the KIP-714 id with a key of global stream-thread name appended with -global-consumer, where before it was only the global stream-thread name.
KIP-1065: Add “retry” return-option to ProductionExceptionHandler
Previously, the ProductionExceptionHandler was not invoked on a (retryable) TimeoutException. With Kafka Streams 4.0, the handler is called, and the default handler returns RETRY, to not change existing behavior. Now, a custom handler can decide to break the infinite retry loop by returning either CONTINUE or FAIL. For more information, see KIP-1065.
KIP-1076: Metrics for client applications KIP-714 extension
Kafka Streams metrics can be collected broker-side by using the KIP-714 broker plugin. This enables collecting the metrics of the internally used clients of a Kafka Streams application by using a broker-side plugin. Also, it enables collecting the metrics of the Kafka Streams runtime itself. For more information, see KIP-1076.
Streams API changes in Confluent Platform 7.9.0
KIP-1033: Improve exception handling
You can provide a processing exception handler to manage exceptions during the processing of a record, rather than throwing the exception all the way out of your Kafka Streams application. Provide the configs by using the StreamsConfig as StreamsConfig#PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG. The specified handler must implement the org.apache.kafka.streams.errors.ProcessingExceptionHandler interface. For more information, see KIP-1033
KIP-1049: Customize logging interval
Kafka Streams now enables you to customize the logging interval of stream-thread runtime summary, by using the newly added configuration, log.summary.interval.ms. By default, the summary is logged every 2 minutes. For more information, see KIP-1049.
Streams API changes in Confluent Platform 7.8.0
KIP-924: Customizable task assignment for Streams
Kafka Streams now supports customizable task assignment strategies via the task.assignor.class configuration. You can set the configuration to the fully qualified class name of a custom task assignor implementation that extends the new org.apache.kafka.streams.processor.assignment.TaskAssignor interface.
The new configuration also enables bringing back the behavior of the old task assignor, StickyTaskAssignor, that was used before the introduction of the HighAvailabilityTaskAssignor. If no custom task assignor is configured, the default task assignor, HighAvailabilityTaskAssignor, is used.
If you were using the internal.task.assignor.class config, you should switch to using the new task.assignor.class config instead, because the internal config will be removed in a future release. If you were previously plugging in the StickyTaskAssignor via the legacy internal.task.assignor.class config, you must ensure that you are importing the new org.apache.kafka.streams.processor.assignment.StickTaskAssignor when you switch to the new task.assignor.class config, which is a version of the StickyTaskAssignor that implements the new public TaskAssignor interface. For more information, see the public interface section of KIP-924.
KIP-989: Improved StateStore Iterator metrics for detecting leaks
To improve detection of leaked state store iterators, KIP-989 adds new store-level metrics to track the number and age of open iterators. The new metrics are num-open-iterators, iterator-duration-avg, iterator-duration-max, and oldest-iterator-open-since-ms. These metrics are available for all state stores, including RocksDB, in-memory, and custom
Streams API changes in Confluent Platform 7.7.0
KIP-925: Rack aware task assignment in Kafka Streams (Part 2)
In part one of KIP-925, the min_traffic assignment strategy for Kafka Streams was added. Part Two finishes the KIP by introducing the second rack-aware assignment strategy: balanced_subtopology.
KIP-954: Expand default DSL store configuration to custom types
KIP-954 builds on KIP-591 and enables you to provide a default state store provider for your custom stores. As part of this change, a new interface has been provided along with default support for RocksDB and in-memory state stores.
KIP-962: Relax non-null key requirement in Kafka Streams
Kafka Streams treated records with null-keys as invalid input for joins and dropped them. KIP-962 relaxes this behavior for various left-joins, allowing null-key records to be processed successfully.
The behavior of the following operators changed.
left join KStream-KStream: no longer drop left records with null-key and call
ValueJoinerwithnullfor the right value.outer join KStream-KStream: no longer drop left/right records with null-key and call
ValueJoinerwithnullfor the right/left value.left-foreign-key join KTable-KTable: no longer drop left records with null-foreign-key returned by the
ForeignKeyExtractorand callValueJoinerwithnullfor the right value.left join KStream-KTable: no longer drop left records with null-key and call
ValueJoinerwithnullfor the right value.left join KStream-GlobalTable: no longer drop records when
KeyValueMapperreturnsnulland callValueJoinerwithnullfor right value.
Stream-DSL users who want to keep the current behavior can prepend a .filter() operator to the previously listed operators and filter accordingly. The following snippets illustrate how to keep the pre-7.7.0 behavior.
//left join KStream-KStream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//outer join KStream-KStream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//left-foreign-key join KTable-KTable
Function<String, String> foreignKeyExtractor = leftValue -> ...
leftTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(rightTable, foreignKeyExtractor, (leftValue, rightValue) -> join(leftValue, rightValue), Named.as("left-foreign-key-table-join"));
//left join KStream-KTable
leftStream
.filter((key, value) -> key != null)
.leftJoin(kTable, (k, leftValue, rightValue) -> join(leftValue, rightValue));
//left join KStream-GlobalTable
KeyValueMapper<String, String, String> keyValueMapper = (key, value) -> ...;
leftStream
.filter((key, value) -> keyValueMapper.apply(key,value) != null)
.leftJoin(globalTable, keyValueMapper, (leftValue, rightValue) -> join(leftValue, rightValue));
KIP-960 / KIP-968: IQ support for Versioned State Stores
Versioned state stores were added in the Kafka 3.5 release (KIP-889), but it was not possible to query the new stores. KIP-960 and KIP-968 close this gap by adding new query types for IQv2, namely VersionedKeyQuery and MultiVersionedKeyQuery, respectively. Both queries enable you to do lookups for a single key, to ask for the most recent value, a historic value, or a range of historic values for the provided key.
KIP-985: Add reverseRange and reverseAll query over kv-store in IQv2
IQv2 supports RangeQuery and enables you to query for a range of keys and specify unbounded, bounded, or half-open key-ranges. It returns data in ascending (byte[]-lexicographical) order (per partition). KIP-985 extends this functionality by adding the .withDescendingKeys() method to enable receiving data in descending order, so you can request the result to be ordered (per partition) in either ascending or descending order, or to leave the order unspecified.
KIP-988: Streams Standby Update Listener
KIP-988 adds a new interface for handling cases where standby tasks have their state stores registered, load a batch of records, and stop updates.
KIP-992: Introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery
KIP-992 adds new timestamped-key and timestamped-range interactive queries for timestamped key-value state stores. This change improves the type safety of the IQv2 API. The existing RangeQuery now always returns only the value if issued against a timestamped key-value store.
default.dsl.store config deprecated
Instead, use the dsl.store.suppliers.class. If you currently specify default.dsl.store=ROCKS_DB or default.dsl.store=IN_MEMORY, replace these configurations with dsl.store.suppliers.class=BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class and dsl.stores.suppliers.class=BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class, respectively.
Streams API changes in Confluent Platform 7.6.0
KIP-923: Add A Grace Period to Stream Table Join
KIP-923 adds a grace period to stream-table joins to improve table-side out-of-order data handling. The joined object has a new method named withGracePeriod that causes the table-side lookup to happen only after the grace period has passed.
KIP-925: Rack aware task assignment in Kafka Streams
Rack aware task assignment was introduced in KIP-925. Rack aware task assignment can be enabled for StickyTaskAssignor or HighAvailabilityTaskAssignor to compute task assignments, which can minimize cross-rack traffic under certain conditions. For more information, including how it can be enabled and further configured, see rack.aware.assignment.strategy.
KIP-941: Range queries to accept null lower and upper bounds
Previously, RangeQuery did not support null to specify “”no upper/lower bound”. KIP-941 allows users to pass null into withRange(...) for lower/upper bounds to specify a full or half-open range:
withRange(null, null) == withNoBounds()withRange(lower, null) == withLowerBound(lower)withRange(null, upper) == withUpperBound(upper)
Streams API changes in Confluent Platform 7.5.0
Downgrading from Confluent Platform 7.5.x (Kafka Streams 3.5.x) or later to Confluent Platform 7.4.x or earlier requires special attention: starting in the 3.5.0 release, Kafka Streams uses a new serialization format for repartition topics. This means that older versions of Kafka Streams don’t recognize the bytes written by newer versions, so it’s harder to downgrade Kafka Streams with version 3.5.0 or later to older versions in-flight. For more information, see KIP-904.
For a downgrade, first switch the config from upgrade.from to the version you’re downgrading to. This disables writing the new serialization format in your application. It’s important to wait in this state long enough to ensure that the application has finished processing any “in-flight” messages written into the repartition topics in the new serialization format. Afterward, you can downgrade your application to a pre-3.5.x version.
KIP-399: Extend ProductionExceptionHandler to cover serialization exceptions
KIP-399 adds a method, handleSerializationException(), to the ProductionExceptionHandler interface to handle any serialization errors encountered while producing records.
KIP-884: Add config to configure KafkaClientSupplier
KIP-884 adds a new config, default.client.supplier that enables using a custom KafkaClientSupplier without any code changes.
KIP-889: Versioned state stores
KIP-889 introduces versioned state stores to improve the accuracy of joins when out-of-order records are processed. for more information, see Timestamp-based semantics for table processors.
In addition to KIP-889, KIP-914 updates DSL processing semantics if a user opts-in to use the new versioned key-value stores. Using the new versioned key-value stores, DSL processing can better handle out-of-order data. For example, a late record may be dropped and stream-table joins can do a timestamp-based lookup into the table. Table aggregations and primary/foreign-key table-table joins are also improved. Versioned key-value stores are not supported for global-KTable, and they don’t work with suppress().
KIP-904: Guarantee subtractor is called before adder if key has not changed
KIP-904 improves the implementation of KTable aggregations. In general, an input KTable update triggers a result refinement for two rows, but prior to KIP-904, if both refinements happened to the same result row, two independent updates to the same row are applied, resulting in spurious intermediate results. KIP-904 enables detecting this case and applies only a single update, avoiding spurious intermediate results.
KIP-907: Add Boolean serde to public interface
Kafka Streams includes built-in Serdes for most primitive types. KIP-907 adds a new one for booleans.
Streams API changes in Confluent Platform 7.4.0
KIP-770: Replace cache.max.bytes.buffering with statestore.cache.max.bytes
KIP-770 deprecates the existing cache.max.bytes.buffering config and introduces a new statestore.cache.max.bytes config to replace it. The semantics and default value of the cache size config is unchanged. This KIP also adds a new cache.size metric at the DEBUG level for users to monitor the actual size of the Kafka Streams cache.
KIP-837: Allow MultiCasting a Result Record
KIP-837 enables you to multicast result records to every partition of downstream sink topics and adds functionality for choosing to drop result records without sending.
KIP-865: Support “–bootstrap-server” in kafka-streams-application-reset
KIP-865 updates the Kafka Streams application reset tool’s server parameter name to conform to the other Kafka tooling by deprecating the --bootstrap-servers parameter and introducing a new --bootstrap-server parameter in its place.
Streams API changes
Source/sink node metrics for consumed/produced throughput in Kafka Streams
Source and sink node metrics for consumed and produced throughput are available in Kafka Streams.
Previously, with the metrics available in the plain consumer you could derive the consumed throughput of your applications at the subtopology level, but the same was not true for the produced throughput.
KIP-846 fills this gap and gives you a way to compute the production rate of each subtopology by introducing two new metrics for the throughput at sink nodes. Even though it’s possible to derive the consumed throughput with existing client-level metrics, KIP-846 also adds two new metrics for the throughput at source nodes, to provide an equally fine-grained metrics scope as for the newly added metrics at the sink nodes, and to simplify the user experience.
Pause/resume KafkaStreams topologies
KIP-834 adds the ability to pause and resume topologies. You can use this feature to reduce resources used or modify data pipelines. Paused topologies skip processing, punctuation, and standby tasks. For distributed Kafka Streams applications, each instance must be paused and resumed separately.
Consolidate KStream transform() and process() methods
KIP-820 generalizes the Kafka Streams API to consolidate Transformers, which could forward results, and Processors, which could not. The change makes use of the new type-safe Processor API, which simplifies Kafka Streams, making it easier to use and learn.
New KafkaStreams.close() API
KIP-812 introduces another form of the KafkaStreams.close() API that forces the member to leave the consumer group. This new method efficiently closes the stream permanently by forcing the member to leave the consumer group.
Streams API changes in Confluent Platform 7.2.0
Rack awareness for Kafka Streams
Starting with Confluent Platform 7.2.0, Kafka Streams can distribute its standby replicas over distinct “racks” with KIP-708. To form a “rack”, Kafka Streams uses tags in the application configuration. For example, Kafka Streams clients might be tagged with the cluster or the cloud region they are running in. Users can specify the tags that should be used for the rack-aware distribution of the standby replicas by setting the rack.aware.assignment.tags configuration. During task assignment, Kafka Streams tries to distribute the standby replicas over different task dimensions. Rack-aware standby assignment improves fault tolerance in case of the failure of an entire “rack”. This can be used, for example, to ensure that replicas are distributed over different availability zones in a cloud hosting provider.
Add record metadata to state store context
KIP-791 adds the recordMetadata() method to the StateStoreContext, providing access to the topic, partition, and offset of the record currently being processed. Exposing the current context in this way enables state stores to track their current offset in each input partition, allowing them to implement the consistency mechanisms introduced in KIP-796.
Interactive Query v2 preview
Confluent Platform 7.2.0 introduces Interactive Queries v2 in Kafka Streams (IQv2). IQv2 is a preview feature, and the interfaces of IQv2 are marked as @Evolving, which means that they may break compatibility in minor releases without a deprecation period if preview users find significant flaws in the current API.
KIP-796 specifies an improved interface for Interactive Queries in Kafka Streams (IQv2). The new interface makes querying the state store simpler and faster and reduces the maintenance cost when modifying existing state stores and adding new state stores. KIP-796 describes the generic interface for querying state stores with Interactive Queries. Specific query types can be added to Interactive Query v2 by implementing the
Queryinterface. KIP-976 also defines theKeyQueryclass to enable users to evaluate a key/value lookup by using IQv2.KIP-805 adds the
RangeQueryclass to IQv2. TheRangeQueryclass is an implementation of theQueryinterface that enables querying state stores over a range specified by upper or lower key bounds or by scanning all records of a state store when no bounds are provided.KIP-806 adds two implementations of the
Queryinterface.The
WindowKeyQueryclass enables scanning over windows with a given key within a specified time range.The
WindowRangeQueryclass enables scanning over windows within a given time range independently of the windows’ keys.
Streams API changes in Confluent Platform 7.1.0
Java 17 support
In Confluent Platform 7.1.0, Kafka Streams supports Java 17.
Improved left/outer stream-stream join semantics
The semantics of left/outer stream-stream join were improved by KIP-633.
Previously, a left-/outer stream-stream join might have emitted so-called spurious left/outer results, due to an eager-emit strategy. The implementation was changed to emit left/outer join result records only after the join window is closed. The old API to specify the join window, JoinWindows.of(), that enables the eager-emit strategy, was deprecated in favor of the JoinWindows.ofTimeDifferenceAndGrace() and JoinWindows.ofTimeDifferencWithNoGrace() methods. The new semantics are enabled only if you use the new join window builders.
Additionally, KIP-633 makes setting a grace period mandatory for windowed aggregations, i.e., TimeWindows (hopping/tumbling), SessionWindows, and SlidingWindows. The corresponding builder methods .of(...) were deprecated in favor of the new .ofTimeDifferenceAndGrace() and .ofTimeDifferencWithNoGrace() methods.
New metrics to track blocking times
KIP-761 adds new metrics that enable tracking blocking times on the underlying consumer and producer clients. For more information, see Kafka Streams metrics.
Interactive Query improvements
Interactive Queries were improved by KIP-763 and KIP-766. Range queries now accept null as a lower/upper key-range bound to indicate an open-ended lower/upper bound.
Custom partitioners for foreign-key table-table joins
Foreign-key table-table joins now support custom partitioners via KIP-775. Previously, if an input table was partitioned by a non-default partitioner, joining records might fail. With KIP-775, you now can pass a custom StreamPartitioner into the join using the newly added TableJoined object.
Upgrade guide for versions earlier than Confluent Platform 7.1.x (Kafka Streams 3.1)
For upgrade guidance on Confluent Platform versions earlier than 7.1.0, see Legacy Streams Upgrade Guide.
Note
This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.