Kafka Streams Upgrade Guide for Confluent Platform¶
To upgrade from Confluent Platform versions earlier than 6.0.0, see Legacy Streams Upgrade Guide.
Upgrade to Confluent Platform 7.8.0 from Confluent Platform 6.0.x¶
Compatibility¶
Kafka Streams applications built with Confluent Platform 7.8.0 are forward and backward compatible with certain Kafka clusters.
- Forward-compatible to newer clusters up to Confluent Platform 7.8.0:
- Existing Kafka Streams applications built with Confluent Platform 3.x and later work with upgraded Kafka clusters running Confluent Platform 7.8.0.
- Backward-compatible to older clusters down to Confluent Platform 3.0.x:
- New Kafka Streams applications built with Confluent Platform 7.8.0 work with older Kafka clusters running Confluent Platform 6.x down to 3.0.x.
- Kafka clusters running Confluent Platform 3.0.x, Confluent Platform 3.1.x, or Confluent Platform 3.2.x are not compatible with new Confluent Platform 7.8.0 Kafka Streams applications.
Compatibility Matrix:
Kafka Broker (columns)
|
|||
Streams API (rows)
|
3.0.x / 0.10.0.x
|
3.1.x / 0.10.1.x and
3.2.x / 0.10.2.x
|
3.3.x / 0.11.0.x and
4.0.x / 1.0.x and
4.1.x / 1.1.x and
5.0.x / 2.0.x and
5.1.x / 2.1.x and
5.2.x / 2.2.x and
5.3.x / 2.3.x and
5.4.x / 2.4.x and
5.5.x / 2.5.x and
6.0.x / 2.6.x and
6.1.x / 2.7.x and
6.2.x / 2.8.x and
7.0.x / 3.0.x and
7.1.x / 3.1.x and
7.2.x / 3.2.x and
7.3.x / 3.3.x and
7.4.x / 3.4.x and
7.5.x / 3.5.x and
7.6.x / 3.6.x and
7.7.x / 3.7.x and
7.8.x / 3.8.x
|
3.0.x / 0.10.0.x
|
compatible
|
compatible
|
compatible
|
3.1.x / 0.10.1.x and
3.2.x / 0.10.2.x
|
compatible
|
compatible
|
|
3.3.x / 0.11.0.x
|
compatible with exactly-once turned
off (requires broker version
Confluent Platform 3.3.x or higher)
|
compatible
|
|
4.0.x / 1.0.x and
4.1.x / 1.1.x and
5.0.x / 2.0.x and
5.1.x / 2.1.x and
5.2.0 / 2.2.0 and
5.2.1 / 2.2.0
|
compatible with exactly-once turned
off (requires broker version
Confluent Platform 3.3.x or higher);
requires message format 0.10 or higher;
message headers are not supported
(requires broker version Confluent
Platform 3.3.x or higher with message
format 0.11 or higher)
|
compatible;
requires message format
0.10 or higher;
if message headers are
used, message format
0.11 or higher required
|
|
5.2.2 / 2.2.1 and
5.3.x / 2.3.x and
5.4.x / 2.4.x and
5.5.x / 2.5.x and
6.0.x / 2.6.x and
6.1.x / 2.7.x and
6.2.x / 2.8.x and
7.0.x / 3.0.x and
7.1.x / 3.1.x and
7.2.x / 3.2.x and
7.3.x / 3.3.x and
7.4.x / 3.4.x and
7.5.x / 3.5.x and
7.6.x / 3.6.x and
7.7.x / 3.7.x and
7.8.x / 3.8.x
|
compatible;
requires message format
0.11 or higher;
enabling exactly-once v2
requires Confluent Platform
5.4.x or higher
|
The Streams API is not compatible with Kafka clusters running older Kafka versions (0.7, 0.8, 0.9).
Upgrade your Kafka Streams applications to Confluent Platform 7.8.0¶
To use Confluent Platform 7.8.0, update the Kafka Streams your application’s dependency to
use the version number 7.8.0-ccs
. You may need to make
minor code changes, detailed below, and recompile your application.
For example, in your pom.xml
file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<!-- update version to 7.8.0-ccs -->
<version>7.8.0-ccs</version>
</dependency>
As of the Confluent Platform 6.0.0 release (Kafka Streams 2.6.0), Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.
Streams API changes in Confluent Platform 7.8.0¶
KIP-924: Customizable task assignment for Streams¶
Kafka Streams now supports customizable task assignment strategies via the
task.assignor.class
configuration. You can set the configuration to the
fully qualified class name of a custom task assignor implementation that
extends the new org.apache.kafka.streams.processor.assignment.TaskAssignor
interface.
The new configuration also enables bringing back the behavior of the old task
assignor, StickyTaskAssignor
, that was used before the introduction of the
HighAvailabilityTaskAssignor
. If no custom task assignor is configured, the
default task assignor, HighAvailabilityTaskAssignor
, is used.
If you were using the internal.task.assignor.class
config, you should
switch to using the new task.assignor.class
config instead, because the
internal config will be removed in a future release. If you were previously
plugging in the StickyTaskAssignor
via the legacy
internal.task.assignor.class
config, you must ensure that you are importing
the new org.apache.kafka.streams.processor.assignment.StickTaskAssignor
when you switch to the new task.assignor.class
config, which is a version
of the StickyTaskAssignor
that implements the new public TaskAssignor
interface. For more information, see the public interface section of
KIP-924.
KIP-989: Improved StateStore Iterator metrics for detecting leaks¶
To improve detection of leaked state store iterators,
KIP-989
adds new store-level metrics to track the number and age of open iterators.
The new metrics are num-open-iterators
, iterator-duration-avg
,
iterator-duration-max
, and oldest-iterator-open-since-ms
. These metrics
are available for all state stores, including RocksDB, in-memory, and custom
Streams API changes in Confluent Platform 7.7.0¶
KIP-925: Rack aware task assignment in Kafka Streams (Part 2)¶
In part one of
KIP-925,
the min_traffic
assignment strategy for Kafka Streams was added. Part Two
finishes the KIP by introducing the second rack-aware assignment strategy:
balanced_subtopology
.
KIP-954: Expand default DSL store configuration to custom types¶
KIP-954 builds on KIP-591 and enables you to provide a default state store provider for your custom stores. As part of this change, a new interface has been provided along with default support for RocksDB and in-memory state stores.
KIP-962: Relax non-null key requirement in Kafka Streams¶
Kafka Streams treated records with null-keys as invalid input for joins and dropped them. KIP-962 relaxes this behavior for various left-joins, allowing null-key records to be processed successfully.
The behavior of the following operators changed.
- left join KStream-KStream: no longer drop left records with null-key and call
ValueJoiner
withnull
for the right value. - outer join KStream-KStream: no longer drop left/right records with null-key
and call
ValueJoiner
withnull
for the right/left value. - left-foreign-key join KTable-KTable: no longer drop left records with
null-foreign-key returned by the
ForeignKeyExtractor
and callValueJoiner
withnull
for the right value. - left join KStream-KTable: no longer drop left records with null-key and call
ValueJoiner
withnull
for the right value. - left join KStream-GlobalTable: no longer drop records when
KeyValueMapper
returnsnull
and callValueJoiner
withnull
for right value.
Stream-DSL users who want to keep the current behavior can prepend a
.filter()
operator to the previously listed operators and filter
accordingly. The following snippets illustrate how to keep the pre-7.7.0
behavior.
//left join KStream-KStream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//outer join KStream-KStream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//left-foreign-key join KTable-KTable
Function<String, String> foreignKeyExtractor = leftValue -> ...
leftTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(rightTable, foreignKeyExtractor, (leftValue, rightValue) -> join(leftValue, rightValue), Named.as("left-foreign-key-table-join"));
//left join KStream-KTable
leftStream
.filter((key, value) -> key != null)
.leftJoin(kTable, (k, leftValue, rightValue) -> join(leftValue, rightValue));
//left join KStream-GlobalTable
KeyValueMapper<String, String, String> keyValueMapper = (key, value) -> ...;
leftStream
.filter((key, value) -> keyValueMapper.apply(key,value) != null)
.leftJoin(globalTable, keyValueMapper, (leftValue, rightValue) -> join(leftValue, rightValue));
KIP-960 / KIP-968: IQ support for Versioned State Stores¶
Versioned state stores were added in the Kafka 3.5 release
(KIP-889),
but it was not possible to query the new stores.
KIP-960 and
KIP-968
close this gap by adding new query types for IQv2, namely VersionedKeyQuery
and MultiVersionedKeyQuery
, respectively. Both queries enable you to do
lookups for a single key, to ask for the most recent value, a historic value,
or a range of historic values for the provided key.
KIP-985: Add reverseRange
and reverseAll
query over kv-store in IQv2¶
IQv2 supports RangeQuery
and enables you to query for a range of keys
and specify unbounded, bounded, or half-open key-ranges. It returns data in
ascending (byte[]-lexicographical) order (per partition).
KIP-985
extends this functionality by adding the .withDescendingKeys()
method to
enable receiving data in descending order, so you can request the result to be
ordered (per partition) in either ascending or descending order, or to leave
the order unspecified.
KIP-988: Streams Standby Update Listener¶
KIP-988 adds a new interface for handling cases where standby tasks have their state stores registered, load a batch of records, and stop updates.
KIP-992: Introduce IQv2 Query Types: TimestampedKeyQuery
and TimestampedRangeQuery
¶
KIP-992
adds new timestamped-key and timestamped-range interactive queries for
timestamped key-value state stores. This change improves the type safety of the
IQv2 API. The existing RangeQuery
now always returns only the value if
issued against a timestamped key-value store.
default.dsl.store
config deprecated¶
Instead, use the dsl.store.suppliers.class
. If you currently specify
default.dsl.store=ROCKS_DB
or default.dsl.store=IN_MEMORY
, replace
these configurations with dsl.store.suppliers.class=BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class
and dsl.stores.suppliers.class=BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class
,
respectively.
Streams API changes in Confluent Platform 7.6.0¶
KIP-923: Add A Grace Period to Stream Table Join¶
KIP-923
adds a grace period to stream-table joins to improve table-side out-of-order
data handling. The joined object has a new method named withGracePeriod
that causes the table-side lookup to happen only after the grace period has
passed.
KIP-925: Rack aware task assignment in Kafka Streams¶
Rack aware task assignment was introduced in
KIP-925.
Rack aware task assignment can be enabled for StickyTaskAssignor
or
HighAvailabilityTaskAssignor
to compute task assignments, which can minimize
cross-rack traffic under certain conditions. For more information, including
how it can be enabled and further configured, see
rack.aware.assignment.strategy.
KIP-941: Range queries to accept null lower and upper bounds¶
Previously, RangeQuery
did not support null to specify “”no upper/lower
bound”. KIP-941 allows users to pass null into withRange(...)
for lower/upper bounds to
specify a full or half-open range:
withRange(null, null) == withNoBounds()
withRange(lower, null) == withLowerBound(lower)
withRange(null, upper) == withUpperBound(upper)
Streams API changes in Confluent Platform 7.5.0¶
Downgrading from Confluent Platform 7.5.x (Kafka Streams 3.5.x) or later to Confluent Platform 7.4.x or earlier requires special attention: starting in the 3.5.0 release, Kafka Streams uses a new serialization format for repartition topics. This means that older versions of Kafka Streams don’t recognize the bytes written by newer versions, so it’s harder to downgrade Kafka Streams with version 3.5.0 or later to older versions in-flight. For more information, see KIP-904.
For a downgrade, first switch the config from upgrade.from
to the version
you’re downgrading to. This disables writing the new serialization format in
your application. It’s important to wait in this state long enough to ensure
that the application has finished processing any “in-flight” messages written
into the repartition topics in the new serialization format. Afterward, you can
downgrade your application to a pre-3.5.x version.
KIP-399: Extend ProductionExceptionHandler to cover serialization exceptions¶
KIP-399
adds a method, handleSerializationException()
, to the
ProductionExceptionHandler
interface to handle any serialization errors
encountered while producing records.
KIP-884: Add config to configure KafkaClientSupplier¶
KIP-884
adds a new config, default.client.supplier
that enables using a custom
KafkaClientSupplier
without any code changes.
KIP-889: Versioned state stores¶
KIP-889 introduces versioned state stores to improve the accuracy of joins when out-of-order records are processed. for more information, see Timestamp-based semantics for table processors.
In addition to KIP-889,
KIP-914
updates DSL processing semantics if a user opts-in to use the new versioned
key-value stores. Using the new versioned key-value stores, DSL processing can
better handle out-of-order data. For example, a late record may be dropped and
stream-table joins can do a timestamp-based lookup into the table. Table
aggregations and primary/foreign-key table-table joins are also improved.
Versioned key-value stores are not supported for global-KTable, and they don’t
work with suppress()
.
KIP-904: Guarantee subtractor is called before adder if key has not changed¶
KIP-904 improves the implemenation of KTable aggregations. In general, an input KTable update triggers a result refinent for two rows, but prior to KIP-904, if both refinements happened to the same result row, two independent updates to the same row are applied, resulting in spurious intermediate results. KIP-904 enables detecting this case and applies only a single update, avoiding spurious intermediate results.
Streams API changes in Confluent Platform 7.4.0¶
KIP-770: Replace cache.max.bytes.buffering with cache.max.bytes¶
KIP-770
deprecates the existing cache.max.bytes.buffering
config and introduces a
new cache.max.bytes
config to replace it. The semantics and default value
of the cache size config is unchanged. This KIP also adds a new cache.size
metric at the DEBUG level for users to monitor the actual size of the Kafka Streams
cache.
KIP-837: Allow MultiCasting a Result Record¶
KIP-837 enables you to multicast result records to every partition of downstream sink topics and adds functionality for choosing to drop result records without sending.
KIP-865: Support “–bootstrap-server” in kafka-streams-application-reset¶
KIP-865 updates the Kafka Streams
application reset tool’s server parameter name to conform to the other Kafka
tooling by deprecating the --bootstrap-servers
parameter and introducing a
new --bootstrap-server
parameter in its place.
Streams API changes in Confluent Platform 7.3.0¶
Source/sink node metrics for consumed/produced throughput in Kafka Streams¶
Starting with Confluent Platform 7.3.0, source and sink node metrics for consumed and produced throughput are available in Kafka Streams.
Previously, with the metrics available in the plain consumer you could derive the consumed throughput of your applications at the subtopology level, but the same was not true for the produced throughput.
KIP-846 fills this gap and gives you a way to compute the production rate of each subtopology by introducing two new metrics for the throughput at sink nodes. Even though it’s possible to derive the consumed throughput with existing client-level metrics, KIP-846 also adds two new metrics for the throughput at source nodes, to provide an equally fine-grained metrics scope as for the newly added metrics at the sink nodes, and to simplify the user experience.
Pause/resume KafkaStreams topologies¶
KIP-834 adds the ability to pause and resume topologies. You can use this feature to reduce resources used or modify data pipelines. Paused topologies skip processing, punctuation, and standby tasks. For distributed Kafka Streams applications, each instance must be paused and resumed separately.
Consolidate KStream transform() and process() methods¶
KIP-820 generalizes the Kafka Streams API to consolidate Transformers, which could forward results, and Processors, which could not. The change makes use of the new type-safe Processor API, which simplifies Kafka Streams, making it easier to use and learn.
Streams API changes in Confluent Platform 7.2.0¶
Rack awareness for Kafka Streams¶
Starting with Confluent Platform 7.2.0, Kafka Streams can distribute its standby replicas over
distinct “racks” with KIP-708.
To form a “rack”, Kafka Streams uses tags in the application configuration. For
example, Kafka Streams clients might be tagged with the cluster or the cloud
region they are running in. Users can specify the tags that should be used for
the rack-aware distribution of the standby replicas by setting the
rack.aware.assignment.tags
configuration. During task assignment,
Kafka Streams tries to distribute the standby replicas over different task
dimensions. Rack-aware standby assignment improves fault tolerance in case of
the failure of an entire “rack”. This can be used, for example, to ensure that
replicas are distributed over different availability zones in a cloud hosting
provider.
Add record metadata to state store context¶
KIP-791
adds the recordMetadata()
method to the StateStoreContext
, providing
access to the topic, partition, and offset of the record currently being
processed. Exposing the current context in this way enables state stores to
track their current offset in each input partition, allowing them to implement
the consistency mechanisms introduced in
KIP-796.
Interactive Query v2 preview¶
Confluent Platform 7.2.0 introduces Interactive Queries v2 in Kafka Streams (IQv2). IQv2 is a
preview feature, and the interfaces of IQv2 are marked as @Evolving
, which
means that they may break compatibility in minor releases without a deprecation
period if preview users find significant flaws in the current API.
- KIP-796
specifies an improved interface for Interactive Queries in Kafka Streams (IQv2).
The new interface makes querying the state store simpler and faster and
reduces the maintenance cost when modifying existing state stores and
adding new state stores. KIP-796 describes the generic interface for querying
state stores with Interactive Queries. Specific query types can be added to
Interactive Query v2 by implementing the
Query
interface. KIP-976 also defines theKeyQuery
class to enable users to evaluate a key/value lookup by using IQv2. - KIP-805
adds the
RangeQuery
class to IQv2. TheRangeQuery
class is an implementation of theQuery
interface that enables querying state stores over a range specified by upper or lower key bounds or by scanning all records of a state store when no bounds are provided. - KIP-806
adds two implementations of the
Query
interface.- The
WindowKeyQuery
class enables scanning over windows with a given key within a specified time range. - The
WindowRangeQuery
class enables scanning over windows within a given time range independently of the windows’ keys.
- The
Streams API changes in Confluent Platform 7.1.0¶
Java 17 support¶
In Confluent Platform 7.1.0, Kafka Streams supports Java 17.
Improved left/outer stream-stream join semantics¶
The semantics of left/outer stream-stream join were improved by KIP-633.
Previously, a left-/outer stream-stream join might have emitted so-called
spurious left/outer results, due to an eager-emit strategy. The implementation
was changed to emit left/outer join result records only after the join window
is closed. The old API to specify the join window, JoinWindows.of()
, that
enables the eager-emit strategy, was deprecated in favor of the
JoinWindows.ofTimeDifferenceAndGrace()
and JoinWindows.ofTimeDifferencWithNoGrace()
methods. The new semantics are enabled only if you use the new join window builders.
Additionally, KIP-633 makes setting a grace period mandatory for windowed
aggregations, i.e., TimeWindows
(hopping/tumbling), SessionWindows
, and
SlidingWindows
. The corresponding builder methods .of(...)
were
deprecated in favor of the new .ofTimeDifferenceAndGrace()
and
.ofTimeDifferencWithNoGrace()
methods.
New metrics to track blocking times¶
KIP-761 adds new metrics that enable tracking blocking times on the underlying consumer and producer clients. For more information, see Kafka Streams metrics.
Interactive Query improvements¶
Interactive Queries were improved by
KIP-763 and
KIP-766.
Range queries now accept null
as a lower/upper key-range bound to indicate
an open-ended lower/upper bound.
Custom partitioners for foreign-key table-table joins¶
Foreign-key table-table joins now support custom partitioners via
KIP-775.
Previously, if an input table was partitioned by a non-default partitioner,
joining records might fail. With KIP-775, you now can pass a custom
StreamPartitioner
into the join using the newly added TableJoined
object.
Cooperative rebalancing protocol¶
Upgrading from any earlier version to Confluent Platform 7.1.0 is supported.
If you’re upgrading from Confluent Platform 5.3.x (Kafka Streams 2.3) or earlier, you must do two rolling bounces.
- During the first rolling bounce, set the configuration
upgrade.from="<older_version>"
(possible values are “0.10.0” through “2.3”). - During the second bounce, remove the
upgrade.from
config.
This is required to upgrade safely to the new cooperative rebalancing protocol of the embedded consumer. If you skip or delay the second rolling bounce, your deployment continues using the previous eager rebalancing protocol , but you can switch safely to cooperative rebalancing at any time, once the entire group is on Confluent Platform 5.4.x (Kafka Streams 2.4) or later by removing the configuration value and bouncing. For more information, see KIP-429.
- Prepare your application instances for a rolling bounce, and ensure that the
upgrade.from
config is set to the version from which it is being upgraded. - Bounce each instance of your application once.
- Prepare your newly deployed Confluent Platform 7.1.0 (Kafka Streams 3.1.0) application
instances for a second round of rolling bounces. Be sure to remove the value
for the
upgrade.from
config. - Bounce each instance of your application once more to complete the upgrade.
As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as Confluent Platform 3.0.x (Kafka Streams 0.10.x) to Confluent Platform 7.1.0 (Kafka Streams 3.1.0) in offline mode requires the following steps:
- Stop all old (for example, Confluent Platform 3.0.x) application instances.
- Update your code and swap old code and JAR files with new code and new JAR files.
- Restart all new Confluent Platform 7.1.0 (Kafka Streams 3.1.0) application instances.
Streams API changes in Confluent Platform 7.0.0¶
Improved task idling semantics¶
In Confluent Platform 7.0.0, Kafka Streams provides stronger in-order join and merge-processing semantics. The new default value for max.task.idle.ms pauses processing on tasks with multiple input partitions when one of the partitions has no data buffered locally but has a non-zero lag. This means Kafka Streams waits to fetch records that are already available on the broker. This results in improved join semantics, because Kafka Streams can interleave the two input partitions in timestamp order, instead of processing whichever partition happens to be buffered. You can disable this new behavior or there is an option to make Kafka Streams wait longer for new records to be produced to the input partitions, which you can use to get stronger time semantics when you know that some of your producers may be slow. For more information, see KIP-695.
New exceptions for Interactive Queries¶
Interactive Queries may throw new exceptions for different errors:
UnknownStateStoreException
: If the specified store name doesn’t exist in the topology, anUnknownStateStoreException
is thrown, instead of theInvalidStateStoreException
thrown in previous versions.StreamsNotStartedException
: If Streams state isCREATED
, aStreamsNotStartedException
is thrown.InvalidStateStorePartitionException
: If the specified partition doesn’t exist, aInvalidStateStorePartitionException
is thrown.
For more information, see KIP-216
exactly_once
processing guarantee deprecated¶
The StreamsConfig processing.guarantee
configuration value exactly_once
(for EOS version 1) is deprecated in favor of the improved EOS version 2, which
was previously configured by using exactly_once_beta
. To avoid confusion
about the term “beta” in the config name and highlight the production readiness
of EOS version 2, “eos-beta” is renamed to “eos-v2”, and the configuration
value exactly_once_beta
is deprecated and replaced with a new configuration
value, exactly_once_v2
. If you use exactly-once semantics, plan to migrate
to the “eos-v2” config and prepare for the removal of the deprecated configs in
Kafka Streams 4.0 or after at least a year from the release of 3.0, whichever
comes last. Note that “eos-v2” requires broker version 2.5 or higher, like
“eos-beta”, so you should begin to upgrade your Kafka cluster if necessary.
For more information, see KIP-732.
Default RocksDBConfigSetter#close()
implementation removed¶
The default implementation of RocksDBConfigSetter#close()
is removed.
Default grace period reduced¶
The default 24-hour grace period for windowed operations such as Window or Session aggregates, or stream-stream joins is reduced. This period determines how long any out-of-order records will still be processed after a window ends. Records arriving after the grace period has elapsed are considered late and are dropped. But in operators such as suppression, a large grace period has the drawback of incurring an equally large output latency. The previous API made it too easy to miss the grace period config completely, leading you to wonder why your application seems to produce no output – it actually is, but not for 24 hours.
To prevent accidentally or unknowingly falling back to the default 24-hour
grace period, all of the existing static constructors for the Windows classes,
like TimeWindows#of
, are deprecated. These constructors are replaced by new
static constructors that have two flavors: #ofSizeAndGrace
and #ofSizeWithNoGrace
(these are for the TimeWindows
class; analogous APIs exist for the JoinWindows
,
SessionWindows
, and SlidingWindows
classes). With these new APIs, you must set
the grace period explicitly or consciously choose to opt out by selecting
the WithNoGrace
flavor, which sets it to 0
for situations where you don’t care
about the grace period, such as during testing or when experimenting with
Kafka Streams for the first time. Note that using the new APIs for the JoinWindows
class also enables a fix for spurious left/outer join results, as described in
the next section. For more information, see
KIP-633.
Public fields on TaskId
deprecated¶
The public topicGroupId
and partition fields on TaskId
are deprecated and
replaced with getters. Instead, use the new TaskId.subtopology()
method, which
replaces topicGroupId
, and the TaskId.partition()
method. Also, the
TaskId#readFrom
and TaskId#writeTo
methods have been deprecated and will be
removed, as they were never intended for public use.
TaskMetadata
class deprecated¶
The org.apache.kafka.streams.processsor.TaskMetadata
class is deprecated and
replaced with the new org.apache.kafka.streams.TaskMetadata
interface. This
change better reflects the fact that TaskMetadata
was not intended to be
instantiated outside of the Kafka codebase. Note that TaskMetadata
offers APIs
that better represent the task id as an actual TaskId
object, instead of a
String. Migrate to the new org.apache.kafka.streams.TaskMetadata
interface,
which offers these improved methods, for example, by using the new
ThreadMetadata#activeTasks
and ThreadMetadata#standbyTasks
.
ThreadMetadata
class deprecated¶
The org.apache.kafka.streams.processor.ThreadMetadata
class is deprecated and
replaced with new org.apache.kafka.streams.ThreadMetadata
interface. In the new
ThreadMetadata
interface, any reference to the deprecated TaskMetadata
is
replaced by the new interface.
StreamsMetadata
class deprecated¶
The org.apache.kafka.streams.state.StreamsMetadata
is deprecated and replaced
with org.apache.kafka.streams.StreamsMetadata
.
Methods under org.apache.kafka.streams.KafkaStreams
deprecated¶
The following methods that returned the previously deprecated classes are deprecated:
- For
KafkaStreams#allMetadata
, migrate to the newKafkaStreams#metadataForAllStreamsClients
. - For
KafkaStreams#allMetadataForStore(String)
, migrate to the newKafkaStreams#streamsMetadataForStore(String)
. - For
KafkaStreams#localThreadsMetadata
, migrate to the newKafkaStreams#metadataForLocalThreads
.
Deprecated APIs removed¶
The following deprecated APIs are removed.
--zookeeper
flag of the application reset tool: deprecated in Kafka 1.0.0 (KIP-198).--execute
flag of the application reset tool: deprecated in Kafka 1.1.0 (KIP-171).StreamsBuilder#addGlobalStore
(one overload): deprecated in Kafka 1.1.0 (KIP-233).ProcessorContext#forward
(some overloads): deprecated in Kafka 2.0.0 (KIP-251).WindowBytesStoreSupplier#segments
: deprecated in Kafka 2.1.0 (KIP-319).segments
,until
,maintainMs
onTimeWindows
,JoinWindows
, andSessionWindows
: deprecated in Kafka 2.1.0 (KIP-328).- Overloaded
JoinWindows#of
,before
,after
,SessionWindows#with
,TimeWindows#of
,advanceBy
,UnlimitedWindows#startOn
, andKafkaStreams#close
withlong
typed parameters: deprecated in Kafka 2.1.0 (KIP-358). - Overloaded
KStream#groupBy
,groupByKey
, andKTable#groupBy
withSerialized
parameter: deprecated in Kafka 2.1.0 (KIP-372). Joined#named
,name
: deprecated in Kafka 2.3.0 (KIP-307).TopologyTestDriver#pipeInput
,readOutput
,OutputVerifier
, andConsumerRecordFactory
classes (KIP-470).KafkaClientSupplier#getAdminClient
: deprecated in Kafka 2.4.0 (KIP-476).- Overloaded
KStream#join
,leftJoin
,outerJoin
withKStream
andJoined
parameters: deprecated in Kafka 2.4.0 (KIP-479). WindowStore#put(K key, V value)
: deprecated in Kafka 2.4.0 (KIP-474).UsePreviousTimeOnInvalidTimestamp
: deprecated in Kafka 2.5.0 as renamed toUsePartitionTimeOnInvalidTimestamp
(KIP-530).- Overloaded
KafkaStreams#metadataForKey
: deprecated in Kafka 2.5.0 (KIP-535). - Overloaded
KafkaStreams#store
: deprecated in Kafka 2.5.0 (KIP-562).
Dependencies removed from Kafka Streams¶
The following dependencies are removed from Kafka Streams:
- Connect-json: Kafka Streams no longer has a compile time dependency on the “connect:json” module (KAFKA-5146). Projects that relied on this transitive dependency must declare it explicitly.
Default replication.factor
changed¶
The default value for the replication.factor
configuration parameter is
changed to -1
, which means to use the broker default replication factor.
The replication.factor
value of -1
requires broker version 2.4 or newer.
ListSerde
introduced¶
The new ListSerde
serde type is introduced:
- Added class
ListSerde
to (de)serialize List-based objects - Introduced
ListSerializer
andListDeserializer
to power the new functionality
Streams API changes in Confluent Platform 6.2.0¶
The type-safe split()
method replaces the branch()
method for branching or splitting a KStream
based on the supplied predicates into one or more KStream
instances.
The following example code shows how you used the branch
method before Confluent Platform
6.2.0:
final KStream<String, Event>[] branches = events.branch(
(id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
(id, event) -> event.getTransactionValue() < FRAUD_LIMIT
);
branches[0].to(suspiciousTransactionsTopicName);
branches[1].to(validatedTransactionsTopicName);
The following example code shows how to rewrite the previous code to use the
split
method:
// Canonical rewrite from the branch() method.
final Map<String, KStream<String, Event>> branches = events.split(Named.as("branch-"))
.branch((id, event) -> event.getTransactionValue() >= FRAUD_LIMIT)
.branch((id, event) -> event.getTransactionValue() < FRAUD_LIMIT)
.defaultBranch();
// Rewrite to exploit the new API.
events.split()
.branch(
(id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
Branched.withConsumer(ks -> ks.to(suspiciousTransactionsTopicName)))
.branch(
(id, event) -> event.getTransactionValue() < FRAUD_LIMIT,
Branched.withConsumer(ks -> ks.to(validatedTransactionsTopicName)));
Streams API changes in Confluent Platform 6.1.0¶
Sliding Windows¶
Confluent Platform 6.1 adds SlidingWindows
as an option for windowedBy()
windowed aggregations
as described in KIP-450.
Sliding windows are fixed-time and data-aligned windows that allow for flexible and efficient windowed aggregations.
Refer to Developer Guide for more details.
TRACE level end-to-end latency metrics¶
The end-to-end latency metrics introduced in 6.0 have been expanded to include store-level metrics. The new store-level
metrics are recorded at the TRACE
level, a new metrics recording level. Enabling TRACE
level metrics automatically
turns on all higher levels, i.e., INFO and DEBUG. For more information, see KIP-613.
Upgrading older Kafka Streams applications to Confluent Platform 6.1.x¶
Streams API changes in Confluent Platform 6.0.0¶
- New processing mode that improves application scalability using exactly-once guarantees (KIP-447)
- Highly available state stores (KIP-441)
- New
KStream.repartition()
operator replacesKStream.through()
(KIP-221) - New end-to-end latency metrics added (KIP-613)
- New
--force
option inStreamsResetter
to force remove left-over members (KIP-571)
Improved exactly-once processing¶
Starting in Confluent Platform 6.0, a new processing mode is available, named exactly_once_v2
,
which is configurable by using the processing.guarantee
parameter.
To use this new feature, your brokers must be on version Confluent Platform 5.5.x / Kafka 2.5.x
or newer.
This implementation is more efficient, because it reduces client and broker resource utilization, like client threads and used network connections, and it enables higher throughput and improved scalability. For more information on how this is done inside the brokers and Kafka Streams, see KIP-447.
Also, as part of the KIP-447 implementation, the transaction timeout has been reduced from 60 seconds to 10 seconds.
If you want to upgrade your EOS application from an older version and enable
this feature in version 6.0+, upgrade your application to version 6.0.x,
staying on exactly_once
, and then do second round of rolling bounces to
switch to exactly_once_v2
.
If you’re upgrading an EOS application from an older version (before Kafka 2.6)
to a version between 2.6 and 2.8, follow the same steps but with the config
exactly_once_beta
instead. No special steps are required to upgrade an
application using exactly_once_beta
from version 2.6+ to 3.0 or higher: you
can just change the config from exactly_once_beta
to exactly_once_v2
during the rolling upgrade.
For a downgrade, do the reverse: first switch the config from exactly_once_v2
to exactly_once
to disable the feature in your 2.6.x application. Afterward,
you can downgrade your application to a pre-2.6.x version.
Highly available state stores¶
For more highly available stateful applications, we’ve modified the task assignment algorithm to delay the movement of stateful active tasks to instances that aren’t yet caught up with that task’s state. Instead, to migrate a task from one instance to another (eg when scaling out), Kafka Streams assigns a warmup replica to the target instance so that it can begin restoring the state while the active task stays available on an instance that already had the task. The instances warming up tasks communicate their progress to the group so that, once ready, Kafka Streams can move active tasks to their new owners in the background. Check out KIP-441 for full details, including several new configs for control over this new feature.
End-to-end latency metrics¶
New end-to-end latency metrics have been added. These task-level metrics are logged at the INFO level and report the minimum and maximum end-to-end latency of a record at the beginning/source node(s) and end/terminal node(s) of a task. For more information, see KIP-613.
Replace through() with repartition() operator¶
The operator KStream.through()
is deprecated in favor of the new KStream.repartition()
operator
(via KIP-221).
KStream.repartition()
is similar to KStream.through()
, however Kafka Streams will manage the topic for you.
If you need to write into and read back from a topic that you manage, you can fall back to use KStream.to()
in combination with StreamsBuilder#stream()
.
If you only want to write into a topic as “side output” and continue processing, you can also fan-out your dataflow via myStream.xxx(); myStream.to()
to avoid the need to read back the data from the cluster.
Upgrade from earlier versions¶
To upgrade from Confluent Platform versions earlier than 6.0.0, see Legacy Streams Upgrade Guide.
Note
This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.