.. _streams_upgrade-guide:
Upgrade Guide
=============
**Table of Contents**
.. contents::
:local:
.. FIXME: The version numbers are just place-holders while we add docs for the coming release
Upgrading from CP 4.0.x (Kafka 1.0.x-cp1) to CP 4.1.0 (Kafka 1.1.0-cp1)
-----------------------------------------------------------------------
.. _streams_upgrade-guide_4.1.x-compatibility:
Compatibility
^^^^^^^^^^^^^
Kafka Streams applications built with CP 4.1.0 (Kafka 1.1.0-cp1) are forward and backward compatible with certain Kafka clusters.
* **Forward-compatible to CP 4.1.0 clusters (Kafka 1.1.0-cp1):**
Existing Kafka Streams applications built with CP 3.0.x (Kafka 0.10.0.x-cp1), CP 3.1.x (Kafka 0.10.1.x-cp2), CP 3.2.x (Kafka 0.10.2.x-cp1), CP 3.3.x (Kafka 0.11.0.x-cp1) or CP 4.0.x (Kafka 1.0.x-cp1)
will work with upgraded Kafka clusters running CP 4.1.0 (Kafka 1.1.0-cp1).
* **Backward-compatible to older clusters down to CP 3.1.x (Kafka 0.10.1.x-cp2):**
New Kafka Streams applications built with CP 4.1.0 (Kafka 1.1.0-cp1) will work with older Kafka clusters running CP 3.1.x (Kafka 0.10.1.x-cp2), CP 3.2.x (Kafka 0.10.2.x-cp1), CP 3.3.x (Kafka 0.11.0.x-cp1) or CP 4.0.x (Kafka 1.0.x-cp1).
However, when exactly-once processing guarantee is required, your Kafka cluster needs to be upgraded to at least CP 3.3.x (Kafka 0.11.0.x-cp1).
Note, that exactly-once feature is disabled by default and thus a rolling bounce upgrade of your Streams application is possible if you don't enable this new feature explicitly.
Kafka clusters running CP 3.0.x (Kafka 0.10.0.x-cp1) are *not* compatible with new CP 4.1.0 Kafka Streams applications.
.. note::
As of CP 4.0.0 (Kafka 1.0.0-cp1), Kafka Streams requires message format 0.10 or higher.
Thus, if you kept an older message format when upgrading your brokers to CP 3.1 (Kafka 0.10.1-cp1) or a later version,
Kafka Streams CP 4.1.0 (Kafka 1.1.0-cp1) won't work.
You will need to upgrade the message format to 0.10 before you upgrade your Kafka Streams application to CP 4.1.0 (Kafka 1.1.0-cp1).
Compatibility Matrix:
.. include:: compatibilityMatrix.rst
.. _streams_upgrade-guide_4.1.x-upgrade-apps:
Upgrading your Kafka Streams applications to CP 4.1.0
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To make use of Confluent Platform 4.1.0 (Kafka 1.1.x-cp1), you just need to update the Kafka Streams dependency of your application
to use the version number ``1.1.0-cp1``, and then recompile your application.
For example, in your ``pom.xml`` file:
.. sourcecode:: xml
org.apache.kafka
kafka-streams
1.1.0-cp1
Note that there are some :ref:`Streams API changes in CP 4.1.0`.
So it is recommended, though not required, to update your code when using the new version.
.. _streams_upgrade-guide_4.1.x-api-changes:
API changes
^^^^^^^^^^^
.. FIXME: This summary is a place holder
A few new Streams configurations and public interfaces are added into CP 4.1.x release.
.. _streams_upgrade-guide_4.1.x-admin-config:
Embedded Admin Client Configuration
"""""""""""""""""""""""""""""""""""
You can now customize the embedded admin client inside your Streams application which would be used to send all the administrative requests to Kafka brokers,
such as internal topic creation, etc. This is done via the additional ``KafkaClientSupplier#getAdminClient(Map)`` interface; for example, users can provide
their own ``AdminClient`` implementations to override the default ones in their integration testing. In addition, users can also override the configs that are
passed into ``KafkaClientSupplier#getAdminClient(Map)`` to configure the returned ``AdminClient``.
Such overridden configs can be specified via the ``StreamsConfig`` by adding the admin configs with the prefix as defined by ``StreamsConfig#adminClientPrefix(String)``.
Any configs that aren't admin client configs will be ignored.
For example:
.. sourcecode:: java
Properties streamProps = ...;
// use retries=10 for the embedded admin client
streamsProps.put(StreamsConfig.adminClientPrefix("retries"), 10);
New Functions in Window Store Interface
"""""""""""""""""""""""""""""""""""""""
Confluent Platform now supports methods in ``ReadOnlyWindowStore`` which allows you to query all windows in a time-range (``#fetchAll(long fromTime, long toTime)``),
as well as, to iterate over the all windows in the store via ``#all()``.
If you have customized window store implementations on the above interface, you must update your code to implement the newly
added method. For more details, see `KIP-205 `__.
Full upgrade workflow
^^^^^^^^^^^^^^^^^^^^^
A typical workflow for upgrading Kafka Streams applications from CP 4.0.x to CP 4.1.0 has the following steps:
#. **Upgrade your application:** See :ref:`upgrade instructions ` above.
#. **Stop the old application:** Stop the old version of your application, i.e. stop all the application instances
that are still running the old version of the application.
#. **Optional, upgrade your Kafka cluster:** See :ref:`kafka upgrade instructions `.
`Note, if you want to use exactly-once processing semantics, upgrading your cluster to CP 3.3.x is mandatory.`
#. **Start the upgraded application:** Start the upgraded version of your application, with as many instances as needed.
By default, the upgraded application will resume processing its input data from the point when the old version was stopped (see previous step).
Upgrading older Kafka Streams applications to CP 4.1.0
------------------------------------------------------
It is also possible to upgrade CP 3.0.x, CP 3.1.x, CP 3.2.x, CP 3.3.x, CP 4.0.x applications to CP 4.1.0.
Some of the API improvements introduced in CP 3.1, CP 3.2, CP 3.3 and CP 4.0 may require you to update the code of your Kafka Streams applications.
In the following sections we focus on these changes.
API changes (from CP 3.3 to CP 4.0)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Kafka Streams and its API were improved and modified in the CP 4.0.x release.
All of these changes are backward compatible, thus it's not require to update the code of your Kafka Streams applications immediately.
However, some methods were deprecated and thus it is recommend to update your code eventually to allow for future upgrades.
In this section we focus on deprecated APIs.
Building and running a topology
"""""""""""""""""""""""""""""""
The two main classes to specify a topology, ``KStreamBuilder`` and ``TopologyBuilder``, were deprecated and replaced by
``StreamsBuilder`` and ``Topology``.
Note, that both new classes are in package ``org.apache.kafka.streams`` and that ``StreamsBuilder`` does not extend ``Topology``,
i.e., the class hierarchy is different now.
This change also affects ``KafkaStreams`` constructors that now only accept a ``Topology``.
If you use ``StreamsBuilder`` you can obtain the constructed topology via ``StreamsBuilder#build()``.
The new classes have basically the same methods as the old ones to build a topology via DSL or Processor API.
However, some internal methods that were public in ``KStreamBuilder`` and ``TopologyBuilder``, but not part of the actual API,
are no longer included in the new classes.
.. literalinclude:: upgrade-guide-4_0/upgrade-guide_builder.java
:language: java
Describing topology and stream task metadata
""""""""""""""""""""""""""""""""""""""""""""
``KafkaStreams#toString()`` and ``KafkaStreams#toString(final String indent)``, which were previously used to retrieve
the user-specified processor topology information as well as runtime stream tasks metadata, are deprecated in 4.0.0.
Instead, a new method of ``KafkaStreams``, namely ``localThreadsMetadata()`` is added which returns an ``org.apache.kafka.streams.processor.ThreadMetadata`` object
for each of the local stream threads that describes the runtime state of the thread as well as its current assigned tasks metadata. Such information will be very helpful
in terms of debugging and monitoring your streams applications.
For retrieving the specified processor topology
information, users can now call ``Topology#describe()`` which returns an ``org.apache.kafka.streams.TopologyDescription`` object that contains the detailed description
of the topology (for DSL users they would need to call ``StreamsBuilder#build()`` to get the ``Topology`` object first).
Merging KStreams:
"""""""""""""""""
As mentioned above, ``KStreamBuilder`` was deprecated in favor of ``StreamsBuilder``.
Additionally, ``KStreamBuilder#merge(KStream...)`` was replaced by ``KStream#merge(KStream)``
and thus ``StreamsBuilder`` does not have a ``merge()`` method.
Note: instead of merging an arbitrary number of ``KStream`` instances into a single ``KStream``
as in the old API, the new ``#merge()`` method only accepts
a single ``KStream`` and thus merges two ``KStream`` instances into one.
If you want to merge more than two ``KStream`` instances, you can call ``KStream#merge()`` multiple times.
.. literalinclude:: upgrade-guide-4_0/upgrade-guide_merge.java
:language: java
Punctuation functions
"""""""""""""""""""""
The Processor API was extended to allow users to schedule ``punctuate`` functions either based on :ref:`event-time ` (i.e. ``PunctuationType.STREAM_TIME``) or *wall-clock-time* (i.e. ``PunctuationType.WALL_CLOCK_TIME``).
Before this, users could only schedule based on *event-time* and hence the ``punctuate`` function was data-driven only.
As a result, the original ``ProcessorContext#schedule`` is deprecated with a new overloaded function. In addition, the ``punctuate`` function inside ``Processor``
is also deprecated, and is replaced by the newly added ``Punctuator#punctuate`` interface.
.. literalinclude:: upgrade-guide-4_0/upgrade-guide_schedule-punctuator.java
:language: java
Streams Configuration
"""""""""""""""""""""
You can now override the configs that are used to create internal repartition and changelog topics.
You provide these configs via the ``StreamsConfig`` by adding the topic configs with the prefix as defined by ``StreamsConfig#topicPrefix(String)``.
Any properties in the ``StreamsConfig`` with the prefix will be applied when creating internal topics.
Any configs that aren't topic configs will be ignored.
If you are already using ``StateStoreSupplier`` or ``Materialized`` to provide configs for changelogs, then they will take precedence over those supplied in the config.
For example:
.. sourcecode:: java
Properties streamProps = ...;
// use cleanup.policy=delete for internal topics
streamsProps.put(StreamsConfig.topicPrefix("cleanup.policy"), "delete");
New classes for optional DSL parameters
"""""""""""""""""""""""""""""""""""""""
Several new classes were introduced, i.e., ``Serialized``, ``Consumed``, ``Produced`` etc. to enable us to reduce the overloads in the DSL.
These classes mostly have a static method ``with`` to create an instance, i.e., ``Serialized.with(Serdes.Long(), Serdes.String())``.
Scala users should be aware that they will need to surround ``with`` with backticks.
For example:
.. sourcecode:: scala
// When using Scala: enclose "with" with backticks
Serialized.`with`(Serdes.Long(), Serdes.String())
API changes (from CP 3.2 to CP 3.3)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Kafka Streams and its API were improved and modified since the release of CP 3.2.x.
All of these changes are backward compatible, thus it's not require to update the code of your Kafka Streams applications immediately.
However, some methods and configuration parameters were deprecated and thus it is recommend to update your code eventually to allow for future upgrades.
In this section we focus on deprecated APIs.
Streams Configuration
"""""""""""""""""""""
The following configuration parameters were renamed and their old names were deprecated.
* ``key.serde`` renamed to ``default.key.serde``
* ``value.serde`` renamed to ``default.value.serde``
* ``timestamp.extractor`` renamed to ``default.timestamp.extractor``
Thus, ``StreamsConfig#KEY_SERDE_CONFIG``, ``StreamsConfig#VALUE_SERDE_CONFIG``, and ``StreamsConfig#TIMESTAMP_EXTRACTOR_CONFIG`` were deprecated, too.
Additionally, the following method changes apply:
* method ``keySerde()`` was deprecated and replaced by ``defaultKeySerde()``
* method ``valueSerde()`` was deprecated and replaced by ``defaultValueSerde()``
* new method ``defaultTimestampExtractor()`` was added
Local timestamp extractors
""""""""""""""""""""""""""
The Streams API was extended to allow users to specify a per stream/table timestamp extractor.
This simplifies the usage of different timestamp extractor logic for different streams/tables.
Before, users needed to apply an ``if-then-else`` pattern within the default timestamp extractor to apply different logic to different input topics.
The old behavior introduced unnecessary dependencies and thus limited code modularity and code reuse.
To enable the new feature, the methods ``KStreamBuilder#stream()``, ``KStreamBuilder#table()``, ``KStream#globalTable()``,
``TopologyBuilder#addSource()``, and ``TopologyBuilder#addGlobalStore()`` have new overloads that allow to specify a "local" timestamp
extractor that is solely applied to the corresponding input topics.
.. literalinclude:: upgrade-guide-3_3/upgrade-guide_timestamp-extractor.java
:language: java
KTable Changes
""""""""""""""
The following methods have been deprecated on the ``KTable`` interface
* ``void foreach(final ForeachAction super K, ? super V> action)``
* ``void print()``
* ``void print(final String streamName)``
* ``void print(final Serde keySerde, final Serde valSerde)``
* ``void print(final Serde keySerde, final Serde valSerde, final String streamName)``
* ``void writeAsText(final String filePath)``
* ``void writeAsText(final String filePath, final Serde keySerde, final Serde valSerde)``
* ``void writeAsText(final String filePath, final String streamName)``
* ``void writeAsText(final String filePath, final String streamName, final Serde keySerde, final Serde valSerde)``
These methods have been deprecated in favor of using the :ref:`Interactive Queries API `.
If you want to query the current content of the state store backing the ``KTable``, use the following approach:
* Make a call to ``KafkaStreams.store(String storeName, QueryableStoreType queryableStoreType)`` followed by a call to ``ReadOnlyKeyValueStore.all()`` to iterate over the keys of a ``KTable``.
If you want to view the changelog stream of the ``KTable`` then you could do something along the lines of the following:
* Call ``KTable.toStream()`` then call ``KStream#print()``.
API changes (from CP 3.1 to CP 3.2)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Kafka Streams and its API were improved and modified since the release of CP 3.1.x.
Some of these changes are breaking changes that require you to update the code of your Kafka Streams applications.
In this section we focus on only these breaking changes.
Handling Negative Timestamps and Timestamp Extractor Interface
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
Kafka Streams behavior with regard to invalid (i.e., negative) timestamps was improved.
By default you will still get an exception on an invalid timestamp.
However, you can reconfigure your application to react more gracefully to invalid timestamps which was not possible before.
Even if you do not use a custom timestamp extractor, you need to recompile your application code,
because the ``TimestampExtractor`` interface was changed in an incompatible way.
The internal behavior of Kafka Streams with regard to negative timestamps was changed.
Instead of raising an exception if the timestamp extractor returns a negative timestamp,
the corresponding record will be dropped silently and not be processed.
This allows to process topic for which only a few records cannot provide a valid timestamp.
Furthermore, the ``TimestampExtractor`` interface was changed and now has one additional parameter.
This parameter provides a timestamp that can be used, for example, to return an estimated timestamp,
if no valid timestamp can be extracted from the current record.
The old default timestamp extractor ``ConsumerRecordTimestampExtractor`` was replaced with
``FailOnInvalidTimestamp``, and two new extractors which both extract a record's built-in timestamp
were added (``LogAndSkipOnInvalidTimestamp`` and ``UsePreviousTimeOnInvalidTimestamp``).
The new default extractor (``FailOnInvalidTimestamp``) raises an exception in case of a negative built-in
record timestamp such that Kafka Streams' default behavior is kept (i.e., fail-fast on negative timestamp).
The two newly added extractors allow to handle negative timestamp more gracefully by implementing
a log-and-skip and timestamp-estimation strategy.
.. literalinclude:: upgrade-guide-3_2/upgrade-guide_timestamp-extractor.java
:language: java
Metrics
"""""""
If you provide custom metrics by implementing interface ``StreamsMetrics`` you need to update your code as the
interface has many new methods allowing to register finer grained metrics than before.
More details are available in
`KIP-114 `__.
.. literalinclude:: upgrade-guide-3_2/upgrade-guide_metrics.java
:language: java
Scala
"""""
Starting with 0.10.2.0, if your application is written in Scala, you may need to declare types explicitly in order for
the code to compile. The
:cp-examples:`StreamToTableJoinScalaIntegrationTest|src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala`
has an example where the types of return variables are explicitly declared.
API changes (from CP 3.0 to CP 3.1)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Stream grouping and aggregation
"""""""""""""""""""""""""""""""
Grouping (i.e., repartitioning) and aggregation of the ``KStream`` API was significantly changed to be aligned with the ``KTable`` API.
Instead of using a single method with many parameters, grouping and aggregation is now split into two steps.
First, a ``KStream`` is transformed into a ``KGroupedStream`` that is a repartitioned copy of the original ``KStream``.
Afterwards, an aggregation can be performed on the ``KGroupedStream``, resulting in a new ``KTable`` that contains the result of the aggregation.
Thus, the methods ``KStream#aggregateByKey(...)``, ``KStream#reduceByKey(...)``, and ``KStream#countByKey(...)`` were replaced by ``KStream#groupBy(...)`` and ``KStream#groupByKey(...)`` which return a ``KGroupedStream``.
While ``KStream#groupByKey(...)`` groups on the current key, ``KStream#groupBy(...)`` sets a new key and re-partitions the data to build groups on the new key.
The new class ``KGroupedStream`` provides the corresponding methods ``aggregate(...)``, ``reduce(...)``, and ``count(...)``.
.. literalinclude:: upgrade-guide-3_1/upgrade-guide_grouping.java
:language: java
Auto Repartitioning
"""""""""""""""""""
Previously when performing ``KStream#join(...)``, ``KStream#outerJoin(...)`` or ``KStream#leftJoin(...)`` operations after a key changing operation, i.e,
``KStream#map(...)``, ``KStream#flatMap(...)``, ``KStream#selectKey(...)`` the developer was required to call ``KStream#through(...)`` to repartition the mapped ``KStream``
This is no longer required. Repartitioning now happens automatically for all join operations.
.. literalinclude:: upgrade-guide-3_1/upgrade-guide_repartitioning.java
:language: java
TopologyBuilder
"""""""""""""""
Two public method signatures have been changed on ``TopologyBuilder``, ``TopologyBuilder#sourceTopics(String applicationId)`` and ``TopologyBuilder#topicGroups(String applicationId)``.
These methods no longer take ``applicationId`` as a parameter and instead you should call ``TopologyBuilder#setApplicationId(String applicationId)`` before calling one of these methods.
.. literalinclude:: upgrade-guide-3_1/upgrade-guide_topology-builder.java
:language: java
.. _streams_upgrade-guide_dsl-store-names:
DSL: New parameters to specify state store names
""""""""""""""""""""""""""""""""""""""""""""""""
Apache Kafka ``0.10.1`` introduces :ref:`Interactive Queries `,
which allow you to directly query state stores of a Kafka Streams application.
This new feature required a few changes to the operators in the DSL.
Starting with Kafka ``0.10.1``, state stores must be always be "named",
which includes both explicitly used state stores (e.g., defined by the user)
and internally used state stores (e.g., created behind the scenes by operations such as ``count()``).
This naming is a prerequisite to make state stores queryable.
As a result of this, the previous "operator name" is now the state store name.
This change affects ``KStreamBuilder#table(...)`` and *windowed* aggregates ``KGroupedStream#count(...)``, ``#reduce(...)``, and ``#aggregate(...)``.
.. literalinclude:: upgrade-guide-3_1/upgrade-guide_operator-names.java
:language: java
Windowing
"""""""""
The API for ``JoinWindows`` was improved.
It is not longer possible to define a window with a default size (of zero).
Furthermore, windows are not named anymore.
Rather, any such naming is now done for state stores.
See section :ref:`DSL: New parameters to specify state store names ` above).
.. literalinclude:: upgrade-guide-3_1/upgrade-guide_windows.java
:language: java