.. _streams_upgrade-guide: Streams Upgrade Guide ===================== Upgrading from |cp| 5.4.x to |cp| |release| ------------------------------------------- .. _streams_upgrade-guide_5.4.x-compatibility: Compatibility ^^^^^^^^^^^^^ |kstreams| applications built with |cp| |release| are forward and backward compatible with certain Kafka clusters. Forward-compatible to newer clusters up to |cp| |release|: Existing |kstreams| applications built with |cp| 3.0.x, |cp| 3.1.x, |cp| 3.2.x, |cp| 3.3.x, |cp| 4.0.x, |cp| 4.1.x, |cp| 5.0.x, |cp| 5.1.x, |cp| 5.2.x, |cp| 5.3.x, or |cp| 5.4.x. will work with upgraded |ak| clusters running |cp| |release|. Backward-compatible to older clusters down to |cp| 3.1.x: New |kstreams| applications built with |cp| |release| will work with older |ak| clusters running |cp| 3.3.x, |cp| 4.0.x, |cp| 4.1.x, |cp| 5.0.x, |cp| 5.1.x, |cp| 5.2.x, or |cp| 5.3.x. |ak| clusters running |cp| 3.0.x, |cp| 3.1.x, or |cp| 3.2.x are *not* compatible with new |cp| |release| |kstreams| applications. .. note:: As of |cp| 5.2.2 and |cp| 5.3.0, |kstreams| requires message format 0.11 or higher. Thus, if you kept an older message format when upgrading your brokers to |cp| 3.3 or a later version, |kstreams| |cp| 5.2.2 or |cp| 5.3.0 won't work. You will need to upgrade the message format to 0.11 before you upgrade your |kstreams| application to |cp| 5.2.2, |cp| 5.3.0, or newer. .. note:: As of |cp| 4.0.0, |kstreams| requires message format 0.10 or higher. Thus, if you kept an older message format when upgrading your brokers to |cp| 3.1 or a later version, |kstreams| |cp| |release| won't work. You will need to upgrade the message format to 0.10 before you upgrade your |kstreams| application to |cp| |release| or newer. Compatibility Matrix: .. include:: includes/compatibilityMatrix.rst Upgrading your |kstreams| applications to |cp| |release| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ To make use of |cp| |release|, you need to update the |kstreams| dependency of your application to use the version number :litwithvars:`|kafka_release|`, and you may need to make minor code changes (details below), and then recompile your application. For example, in your ``pom.xml`` file: .. codewithvars:: xml org.apache.kafka kafka-streams |kafka_release| As of the |release| release, |kstreams| depends on a RocksDB version that requires MacOS 10.13 or higher. .. note:: As of |cp| 5.4.0 Kafka Streams uses the new Incremental Cooperative Rebalancing protocol by default. In order to safely do a rolling upgrade from any version lower than |cp| 5.4.0, you must follow a specific upgrade path that includes two rolling bounces: 1. Set the ``UPGRADE_FROM`` config to whichever version you are upgrading from, then do the first rolling bounce and upgrade to the new bytecode 2. Once all members are on the newer version, remove the ``UPGRADE_FROM`` config and do a second rolling bounce to turn on cooperative rebalancing Note that if you wish to continue following the old eager rebalancing protocol, you can simply leave the ``UPGRADE_FROM`` config in place. Streams will not use cooperative rebalancing as long as this is set to ``2.3`` or lower, but it is recommended to make sure it is at least ``2.0`` as earlier versions will have other side effects required for upgrading from much older versions. .. note:: As of |cp| 4.0.0 a topology regression was introduced when source ``KTable`` instances were changed to have changelog topics instead of re-using the source topic. As of |cp| 5.0.0 ``KTable`` instances re-using the source topic as the changelog topic has been reinstated, but is optional and must be configured by setting ``StreamsConfig.TOPOLOGY_OPTIMIZATION`` to ``StreamsConfig.OPTIMIZE``. This brings up some different scenarios depending on what you are upgrading from and what you are upgrading to. * If you are upgrading from |ak| 2.0.x-cp1 to |ak| |kafka_release| it's recommended to keep the existing optimization configuration. Changing the optimization level might make your application vulnerable to data loss for a small window of time. * If you are upgrading from using ``StreamsBuilder`` version |ak| 1.0.x-cp1/1.1.x-cp1 to |ak| |kafka_release| you can enable the optimization, but your topology will change so you'll need to restart your application with a new application ID. Additionally, if you want to perform a rolling upgrade, it is recommended not to enable the optimization. If you elect not to enable the optimization, then no further changes are required. * If you are upgrading from using ``KStreamBuilder`` version |ak| 1.0.x-cp1/1.1.x-cp1 to ``StreamsBuilder`` |ak| |kafka_release|, it's recommended to enable the optimization as there are no changes to your topology. Please note that if you elect not to enable the optimization, there is a small window of time for possible data loss until the changelog topic contains one record per key. Additionally, when starting with a new application ID, you can possibly end up reprocessing data, since the application ID has been changed. If you don't want to reprocess records, you'll need to create new output topics, so downstream user can cut over in a controlled fashion. .. _streams_upgrade-guide_5.5.x-api-changes: Streams API changes ^^^^^^^^^^^^^^^^^^^ |cp| 5.5 adds new APIs such as ``KStream.toTable()`` transforming a KStream to a KTable, a new ``cogroup()`` operator that allows to aggregate multiple streams in a single operation, a new Serde type ``Void`` to represent ``null`` keys or ``null`` values from input topic, a new ``queryMetadataForKey()`` that exposes metadata on standby replicas, new ``KafkaStreams.store(StoreQueryParameters)`` API to allow performing :ref:`streams_developer-guide_interactive-queries` for specific partition and specific task type, deprecated ``UsePreviousTimeOnInvalidTimestamp`` and replaced it with ``UsePartitionTimeOnInvalidTimeStamp``. .. _kstreams-co-group: Co-group """"""""" |cp| 5.5 adds a new ``cogroup()`` operator (via `KIP-150 `_) that enables aggregating multiple streams in a single operation. Cogrouped streams can also be windowed before they are aggregated. Check out the :ref:`Developer Guide ` for more details. .. _kstreams-stale-data-standby: Querying Stale Data From Standby """"""""""""""""""""""""""""""""" |cp| 5.5 adds new APIs that improve performance and high availability of :ref:`streams_developer-guide_interactive-queries`. The ``KafkaStreams.queryMetadataForKey(store, key, serializer)`` API returns ``KeyQueryMetadata`` object containing both active and standby hosting the specific key, while ``KafkaStreams.allLocalStorePartitionLags()`` provides a map of ``LagInfo`` objects containing lag/staleness information for every store partition on a given streams instance. Together, these APIs help provide granular control on the ``KafkaStreams.store(StoreQueryParameters)`` API, to filter stores only for a specific key's partition or controlling whether stale stores should be queried. New toTable() API """""""""""""""""" In the |cp| 5.5 release, you can transform a |kstreams| directly into a KTable via `KIP-523 `_. This API simplifies the scenario where user needs to take an event stream into a changelog stream. Each record from the input KStream is applied to the result KTable as an insert, update, or delete (if the value is ``null``) for the corresponding key. Records with ``null`` keys will be dropped. Deprecated APIs """""""""""""""" In |cp| 5.5, the ``UsePreviousTimeOnInvalidTimestamp`` was deprecated and replaced with ``UsePartitionTimeOnInvalidTimeStamp`` via `KIP-530 `_. This is because we have improved on the stream time tracking, and fixed the definition for partition time. ``KafkaStreams.store(String, QueryableStoreType)`` was deprecated and replaced with ``KafkaStreams.store(StoreQueryParameters)`` via `KIP-562 `_. The new API gives user more options to route their queries to intended replica and partition. ``KafkaStreams.metadataForKey(...)`` API was deprecated and replaced with ``KafkaStreams.queryMetadataForKey(...)`` equivalents, via `KIP-535 `_. New APIs also provide standby replica information to help route interactive queries for high availability. Upgrading older |kstreams| applications to |cp| 5.5.x ----------------------------------------------------- Streams API changes ^^^^^^^^^^^^^^^^^^^ |cp| 5.4.x adds new APIs for specifying topologies and new metrics for monitoring your applications. The API of the unit test utilities has been improved, and some APIs have been deprecated. Additionally, |cp| 5.4.x introduces incremental cooperative rebalancing. See details below. Foreign key KTable-KTable joins """"""""""""""""""""""""""""""" Previously, you could only join tables on the record keys, which means that both tables had to have the same key space. Starting in |cp| 5.4.0, you can now perform many-to-one KTable-KTable joins on a foreign-key reference. To use the feature, you specify one of the ``KTable#join`` overloads that takes a ``Function foreignKeyExtractor`` argument. |kstreams| will automatically set up the necessary repartition operations internally to compute the correct join result. For more information on the design, see the design document: `KIP-213 `_. Operator Naming """"""""""""""" In the |cp| 5.4.x release, you now can name all operators in a |kstreams| DSL topology via `KIP-307 `_. Giving your operators meaningful names makes it easier to understand the topology description (``Topology#describe()#toString()``) and understand the full context of what your Kafka Streams application is doing. There are new overloads on most ``KStream`` and ``KTable`` methods that accept a ``Named`` object. Typically, you'll provide a name for the DSL operation by using ``Named.as("my operator name")``. Naming of repartition topics for aggregation operations will still use ``Grouped``, and join operations will use either ``Joined`` or the new ``StreamJoined`` object. Naming State Stores in a KStream-KStream Join """""""""""""""""""""""""""""""""""""""""""""" Before the |cp| 5.4.x version of Kafka Streams, users of the DSL could not name the state stores involved in a stream-stream join. If users changed their topology and added an operator before the join, the internal names of the state stores would shift, requiring an application reset when redeploying. In the |cp| 5.4.x release, |kstreams| adds the ``StreamJoined`` class, which gives users the ability to name the join processor, repartition topic(s) (if a repartition is required), and the state stores involved in the join. Also, by naming the state stores, the changelog topics backing the state stores are named as well. It's important to note that naming the stores **will not** make them queryable via :ref:`streams_developer-guide_interactive-queries`. Another feature delivered by ``StreamJoined`` is that you can now configure the type of state store used in the join. You can elect to use in-memory stores, custom state stores or continue to use the built in RocksDB stores for a stream-stream join. Note that the provided stores will not be available for querying via :ref:`streams_developer-guide_interactive-queries`. With the addition of ``StreamJoined``, stream-stream join operations using ``Joined`` have been deprecated. We reccommend you to switch over to stream-stream join methods that use the new overloaded methods. .. literalinclude:: upgrade-guide-5_4/upgrade-guide_stream_joined.java :language: java For more information, see `KIP-479 `_. Metric Improvements """"""""""""""""""" Metrics to monitor RocksDB state stores have been added. The newly added RocksDB metrics collect selected statistics provided by the RocksDB instances. The RocksDB metrics are documented in the :ref:`monitoring section `. Client metrics have been added to monitor Kafka Streams applications on client-level. The new client metrics are documented in the :ref:`monitoring section `. Testing Improvements """""""""""""""""""" Kafka Streams' unit test utilities were improved via `KIP-470 `_. Instead of using low level ``ConsumerRecord`` and ``ProducerRecord`` that can be generated with ``ConsumerRecordFactory`` and verified with the helper class ``OutputVerifier``, new classes ``TestInputTopic``, ``TestOutputTopic``, and ``TestRecord`` were added. The new classes offer an improved API that allows to read/write single (or lists of) values, key-value pairs, or instances of ``TestRecord``. The new ``TestRecord`` class allows to set or verify record metadata like headers and timestamp. Furthermore, the new API allows you to use any assertion libary of your choice and thus to write idiomatic test code. .. literalinclude:: upgrade-guide-5_4/upgrade-guide_topology-test-driver.java :language: java Rebalancing State """"""""""""""""" With the introduction of incremental cooperative rebalancing, Streams no longer requires all tasks to be revoked at the beginning of a rebalance. Instead, at the completion of the rebalance only those tasks which are to be migrated to another consumer for overall load balance will need to be revoked and closed. This changes the semantics of the ``StateListener`` a bit, as it will not necessarily transition to ``REBALANCING`` at the beginning of a rebalance anymore. Note that this means Interactive Queries will now be available at all times except during state restoration, including while a rebalance is in progress. If restoration is occurring when a rebalance begins, we will continue to actively restore the state stores and/or process standby tasks during a cooperative rebalance. Note that with this new rebalancing protocol, you may sometimes see a rebalance be followed by a second short rebalance that ensures all tasks are safely distributed. Cooperative rebalancing will be turned on by default, and requires a specific upgrade path to safely do a rolling upgrade as described in `Upgrading your Kafka Streams applications to Confluent Platform 5.4.0 `_. For details on the new cooperative protocol, see `KIP-429 `_. Deprecated APIs """"""""""""""" In |cp| 5.4.x, we deprecated ``WindowStore#put(K key, V value)``, which should never be used. Use the existing ``WindowStore#put(K key, V value, long windowStartTimestamp)`` instead. For more information, see `KIP-474 `_. .. literalinclude:: upgrade-guide-5_4/upgrade-guide_window-store-put.java :language: java Furthermore, the ``PartitionGrouper`` interface and its corresponding configuration parameter ``partition.grouper`` have been deprecated (`KIP-528 `_) and will be removed in the next major Apache Kafka release (`KAFKA-7785 `_). Hence, this feature won't be supported in the future any longer and you need to update your code accordingly before upgrading to the next major release. If you use a custom ``PartitionGrouper`` and stop to use it, the created tasks might change. You must reset your application to upgrade it. Streams API changes in |cp| 5.3.0 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Several new APIs have been added in |cp| 5.3.x and a few default configurations have changed. Additionally, the RocksDB dependency was updated. See details below. Scala Suppress Operator """"""""""""""""""""""" The ``Suppress`` operator has been added to the ``kafka-streams-scala`` KTable API. In-memory Window/Session Stores """"""""""""""""""""""""""""""" Streams now offers an in-memory version of the window and the session store, in addition to the persistent ones based on RocksDB. The new public interfaces ``inMemoryWindowStore()`` and ``inMemorySessionStore()`` are added to ``Stores`` and provide the built-in in-memory window or session store. Serde close() and configure() """"""""""""""""""""""""""""" We have added default implementation to ``close()`` and ``configure()`` for ``Serializer``, ``Deserializer`` and ``Serde`` so that they can be implemented by lambda expression. For more details please read `KIP-331 `__. Store timestamps in RocksDB """"""""""""""""""""""""""" To improve operator semantics, new store types are added that allow storing an additional timestamp per key-value pair or window. Some DSL operators (for example KTables) are using those new stores. Hence, you can now retrieve the last update timestamp via Interactive Queries if you specify ``TimestampedKeyValueStoreType<`` or ``TimestampedWindowStoreType`` as your ``QueryableStoreType``. While this change is mainly transparent, there are some corner cases that may require code changes: Caution: If you receive an untyped store and use a cast, you might need to update your code to cast to the correct type. Otherwise, you might get an exception similar to ``java.lang.ClassCastException: class org.apache.kafka.streams.state.ValueAndTimestamp cannot be cast to class YOUR-VALUE-TYPE`` upon getting a value from the store. Additionally, ``TopologyTestDriver#getStateStore()`` only returns non-built-in stores and throws an exception if a built-in store is accessed. For more details please read `KIP-258 `__. New flatTransformValues() Operator """""""""""""""""""""""""""""""""" To improve type safety, a new operator ``KStream#flatTransformValues`` is added. For more details please read `KIP-313 `__. max.poll.interval default """"""""""""""""""""""""" |kstreams| used to set the configuration parameter ``max.poll.interval.ms`` to ``Integer.MAX_VALUE``. This default value is removed and |kstreams| uses the consumer default value now. For more details please read `KIP-442 `__. Repartition topic defaults """""""""""""""""""""""""" Default configuration for repartition topic was changed: The segment size for index files ``segment.index.bytes`` is no longer 50MB, but uses the cluster default. Similarly, the configuration``segment.ms`` in no longer 10 minutes, but uses the cluster default configuration. Lastly, the retention period (``retention.ms``) is changed from ``Long.MAX_VALUE`` to `-1`` (infinite). For more details please read `KIP-443 `__. RocksDBConfigSetter close() """"""""""""""""""""""""""" To avoid memory leaks, ``RocksDBConfigSetter`` has a new ``close()`` method that is called on shutdown. Users should implement this method to release any memory used by RocksDB config objects, by closing those objects. More details can be found `here `__. Upgrade RocksDB to v5.18.3 """""""""""""""""""""""""" RocksDB dependency was updated to version 5.18.3. The new version allows to specify more RocksDB configurations, including ``WriteBufferManager`` that helps to limit RocksDB off-heap memory usage. For more details please read `Memory Management `__. Streams API changes in |cp| 5.2.0 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |cp| 5.2.x adds some new APIs and improves some existing functionality. Simplified KafkaStreams Transitions """"""""""""""""""""""""""""""""""" The KafkaStreams#state transition diagram is simplified in |cp| 5.2.0: in older versions the state will transit from ``CREATED`` to ``RUNNING``, and then to ``REBALANCING`` to get the first stream task assignment, and then back to ``RUNNING``; starting in 5.2.0 it will transit from ``CREATED`` directly to ``REBALANCING`` and then to ``RUNNING``. If you have registered a ``StateListener`` that captures state transition events, you may need to adjust your listener implementation accordingly for this simplification (in practice, your listener logic should be very unlikely to be affected at all). TimeWindowSerde """"""""""""""" In ``WindowedSerdes``, there is a new static constructor to return a ``TimeWindowSerde`` with configurable window size. This is to help users construct time window serdes to read directly from a time-windowed store's changelog. More details can be found in `KIP-393 `__. AutoCloseable """"""""""""" In |cp| 5.2.0 a few public interfaces have been extended, including ``KafkaStreams`` to extend ``AutoCloseable`` so that they can be used in a try-with-resource statement. For a full list of public interfaces that are affected, see `KIP-376 `__. Streams API changes in |cp| 5.1.0 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ New config class Grouped """""""""""""""""""""""" We've added a new class ``Grouped`` and deprecated ``Serialized``. The intent of adding ``Grouped`` is the ability to name repartition topics created when performing aggregation operations. Users can name the potential repartition topic using the ``Grouped#as()`` method which takes a ``String`` and is used as part of the repartition topic name. The resulting repartition topic name will still follow the pattern of ``${application-id}->name<-repartition``. The ``Grouped`` class is now favored over ``Serialized`` in ``KStream#groupByKey()``, ``KStream#groupBy()``, and ``KTable#groupBy()``. Note that |kstreams| does not automatically create repartition topics for aggregation operations. Additionally, we've updated the ``Joined`` class with a new method ``Joined#withName`` enabling users to name any repartition topics required for performing Stream/Stream or Stream/Table join. .. literalinclude:: upgrade-guide-5_1/upgrade-guide_grouped-config.java :language: java New UUID Serde """""""""""""" We added a new serde for UUIDs (``Serdes.UUIDSerde``) that you can use via ``Serdes.UUID()``. Improved API Semantics """""""""""""""""""""" We updated a list of methods that take ``long`` arguments as either timestamp (fix point) or duration (time period) and replaced them with ``Instant`` and ``Duration`` parameters for improved semantics. Some old methods based on ``long`` are deprecated, and users are encouraged to update their code. In particular, aggregation windows (hopping/tumbling/unlimited time windows and session windows) as well as join windows now take ``Duration`` arguments to specify window size, hop, and gap parameters. Also, window sizes and retention times are now specified as ``Duration`` type in the ``Stores`` class. The ``Window`` class has new methods ``#startTime()`` and ``#endTime()`` that return window start/end timestamp as ``Instant``. For Interactive Queries, there are new ``#fetch(...)`` overloads taking ``Instant`` arguments. Additionally, punctuations are now registerd via ``ProcessorContext#schedule(Duration interval, ...)``. We deprecated ``KafkaStreams#close(...)`` and replaced it with ``KafkaStreams#close(Duration)`` that accepts a single timeout argument. .. note:: The new ``KafkaStreams#close(Duration)`` method has improved (but slightly different) semantics than the old one. It does not block forever if the provided timeout is zero and does not accept any negative timeout values. If you want to block, you should pass in ``Duration.ofMillis(Long.MAX_VALUE)``. .. literalinclude:: upgrade-guide-5_1/upgrade-guide_time-api-semantics.java :language: java Simplified Store Segments """"""""""""""""""""""""" We deprecated the notion of *number of segments* in window stores and replaced it with *segment interval*. In the old API, the segment interval was computed as ``min( retention-period / (number-of-segments - 1) , 60_000L)`` (with `60` seconds being a lower segment size bound). In the new API, there is no lower bound on the segment interval (i.e., minimum segment interval is ``1`` millisecond) and the number of segments is ``1 + (retention-period / segment-interval)``. If you used 3 segments in your store with a retention time of 24 hours, you should now use a segment interval of 12 hours (`12 = 24 / (3 - 1)`) in the new API to get the same behavior. Method ``Windows#segments()`` and variable ``Windows#segments`` were deprecated. Similarly, ``WindowBytesStoreSupplier#segments()`` was deprecated and replaced with ``WindowBytesStoreSupplier#segmentInterval()``. If you implement custom windows or custom window stores, be aware that you will need to update your code when those methods are removed. Finally, ``Stores#persistentWindowStore(...)`` was deprecated and replaced with a new overload that does not allow specifying the number of segments any longer because those are computed automatically based on retention time and segment interval. If you create persistent window stores explicitly, you should update your code accordingly. .. literalinclude:: upgrade-guide-5_1/upgrade-guide_segments.java :language: java Out-of-Order Data Handling """""""""""""""""""""""""" We added a new config (:ref:`max.task.idle.ms `) to allow users to specify how to handle out-of-order data within a task that may be processing multiple Kafka topic partitions (see the :ref:`Out-of-Order Handling ` section for more details). The default value is set to ``0``, to favor minimized processing latency. If users would like to wait on processing when only part of the topic partitions of a given task have data available in order to reduce risks of handling out-of-order data, they can override this config to a larger value. AdminClient Metrics Exposed """"""""""""""""""""""""""" The newly exposed ``AdminClient`` metrics are now included with other available metrics when calling the ``KafkaStream#metrics()`` method. For more details on monitoring streams applications check out :ref:`Monitoring Streams Applications `. Topology Description Improved """"""""""""""""""""""""""""" We updated the TopologyDescription API to allow for better runtime checking. Users are encouraged to use #topicSet() and #topicPattern() accordingly on TopologyDescription.Source nodes, instead of using #topics(), which has since been deprecated. Similarly, use #topic() and #topicNameExtractor() to get descriptions of TopologyDescription.Sink nodes. .. literalinclude:: upgrade-guide-5_1/upgrade-guide_topology-description.java :language: java StreamsBuilder#build Method Overload """""""""""""""""""""""""""""""""""" We've added an overloaded ``StreamsBuilder#build`` method that accepts an instance of ``java.util.Properties`` with the intent of using the``StreamsConfig#TOPOLOGY_OPTIMIZATION`` config added in |kstreams| 2.0. Before 2.1, when building a topology with the DSL, |kstreams| writes the physical plan as the user makes calls on the DSL. Now, by providing a ``java.util.Properties`` instance when executing a ``StreamsBuilder#build`` call, Kafka Streams can optimize the physical plan of the topology, provided the ``StreamsConfig#TOPOLOGY_OPTIMIZATION`` config is set to ``StreamsConfig#OPTIMIZE``. By setting ``StreamsConfig#OPTIMIZE`` in addition to the ``KTable`` optimization of reusing the source topic as the changelog topic, the topology may be optimized to merge redundant repartition topics into one repartition topic. The original no parameter version of ``StreamsBuilder#build`` is still available if you don't want to optimize your topology. Note that enabling optimization of the topology may require you to do an application reset when redeploying the application. For more details, see :ref:`Optimizing Kafka Streams ` Streams API changes in |cp| 5.0 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ A few new Streams configurations and public interfaces are added into |cp| 5.0.x release. Additionally, some deprecated APIs are removed in |cp| 5.0.x release. Skipped Records Metrics Refactored """""""""""""""""""""""""""""""""" Starting with |cp| 5.0.0, |kstreams| does not report the ``skippedDueToDeserializationError-rate`` and ``skippedDueToDeserializationError-total`` metrics. Deserialization errors, and all other causes of record skipping, are now accounted for in the pre-existing metrics ``skipped-records-rate`` and ``skipped-records-total``. When a record is skipped, the event is now logged at WARN level. Note these metrics are mainly for monitoring unexpected events; If there are systematic issues that caused too many unprocessable records to be skipped, and hence the resulted warning logs become burdensome, you should consider filtering our these unprocessable records instead of depending on record skipping semantics. For more details, see `KIP-274 `__. As of right now, the potential causes of skipped records are: * ``null`` keys in table sources. * ``null`` keys in table-table inner/left/outer/right joins. * ``null`` keys or values in stream-table joins. * ``null`` keys or values in stream-stream joins. * ``null`` keys or values in aggregations / reductions / counts on grouped streams. * ``null`` keys in aggregations / reductions / counts on windowed streams. * ``null`` keys in aggregations / reductions / counts on session-windowed streams. * Errors producing results, when the configured ``default.production.exception.handler`` decides to ``CONTINUE`` (the default is to ``FAIL`` and throw an exception). * Errors deserializing records, when the configured ``default.deserialization.exception.handler`` decides to ``CONTINUE`` (the default is to ``FAIL`` and throw an exception). This was the case previously captured in the ``skippedDueToDeserializationError`` metrics. * Fetched records having a negative timestamp. New Functions in Window Store Interface """"""""""""""""""""""""""""""""""""""" Confluent Platform now supports methods in ``ReadOnlyWindowStore`` which allows you to query the key-value pair of a single window. If you have customized window store implementations on the above interface, you must update your code to implement the newly added method. For more details, see `KIP-261 `__. Simplified KafkaStreams Constructor """"""""""""""""""""""""""""""""""" The ``KafakStreams`` constructor was simplfied. Instead of requiring the user to create a boilderplate ``StreamsConfig`` object, the constructor now directly accepts the ``Properties`` object that specifies the actual user configuration. .. literalinclude:: upgrade-guide-5_0/upgrade-guide_kafka-streams.java :language: java Support Dynamic Routing at Sink """"""""""""""""""""""""""""""" In this release you can now dynamically route records to Kafka topics. More specifically, in both the lower-level ``Topology#addSink`` and higher-level ``KStream#to`` APIs, we have added variants that take a ``TopicNameExtractor`` instance instead of a specific ``String`` topic name. For each record received from the upstream processor, the ``TopicNameExtractor`` will dynamically determine which Kafka topic to write to based on the record's key and value, as well as record context. Note that all output Kafka topics are still considered user topics and hence must be pre-created. Also, we have modified the ``StreamPartitioner`` interface to add the topic name parameter since the topic name now may not be known beforehand; users who have customized implementations of this interface would need to update their code while upgrading their application. Support Message Headers """"""""""""""""""""""" In this release there is message header support in the ``Processor API``. In particular, we have added a new API ``ProcessorContext#headers()`` which returns a ``Headers`` object that keeps track of the headers of the source topic's message that is being processed. Through this object, users can manipulate the headers map that is being propagated throughout the processor topology as well, for example ``Headers#add(String key, byte[] value)`` and ``Headers#remove(String key)``. When Streams DSL is used, users can call ``process`` or ``transform`` in which they can also access the ``ProcessorContext`` to access and manipulate the message header; if user does not manipulate the header, it will still be preserved and forwarded while the record traverses through the processor topology. When the resulted record is sent to the sink topics, the preserved message header will also be encoded in the sent record. KTable Now Supports Transform Values """""""""""""""""""""""""""""""""""" In this release another new API, `KTable#transformValues`, was added. For more information, see `KIP-292 `__. Improved Windowed Serde Support """"""""""""""""""""""""""""""" We added helper class ``WindowedSerdes`` that allows you to create time- and session-windowed serdes without the need to know the details how windows are de/serialized. The created window serdes wrap a user-provided serde for the inner key- or value-data type. Furthermore, two new configs ``default.windowed.key.serde.inner`` and ``default.windowed.value.serde.inner`` were added that allow to specify the default inner key- and value-serde for windowed types. Note, these new configs are only effective, if ``default.key.serde`` or ``default.value.serde`` specifies a windowed serde (either ``WindowedSerdes.TimeWindowedSerde`` or ``WindowedSerdes.SessionWindowedSerde``). .. _streams_upgrade-guide_5.1.x-api-changes-timestamp-manipulation: Allow Timestamp Manipulation """""""""""""""""""""""""""" Using the Processor API, it is now possible to set the timestamp for output messages explicitly. This change implies updates to the ``ProcessorContext#forward()`` method. Some existing methods were deprecated and replaced by new ones. In particular, it is not longer possible to send records to a downstream processor based on its index. .. literalinclude:: upgrade-guide-5_0/upgrade-guide_processor-context.java :language: java Public Test-Utils Artifact """""""""""""""""""""""""" Confluent Platform now ships with a ``kafka-streams-test-uitls`` artifact that contains utility classes to unit test your |kstreams| application. Check out :ref:`Testing Streams Code ` section for more details. Scala API """"""""" Confluent Platform now ships with the Apache Kafka Scala API for |kstreams|. You can add the dependency for Scala 2.11 or 2.12 artifacts: .. sourcecode:: xml org.apache.kafka kafka-streams-scala_2.11 2.0.0-cp1 Deprecated APIs are Removed """"""""""""""""""""""""""" The following deprecated APIs are removed in |cp| 5.0.0: #. **KafkaStreams#toString** no longer returns the topology and runtime metadata; to get topology metadata you can call ``Topology#describe()``, and to get thread runtime metadata you can call ``KafkaStreams#localThreadsMetadata`` (deprecated since |cp| 4.0.0). For detailed guidance on how to update your code please read :ref:`here`. #. **TopologyBuilder** and **KStreamBuilder** are removed and replaced by ``Topology`` and ``StreamsBuidler`` respectively (deprecated since |cp| 4.0.0). #. **StateStoreSupplier** are removed and replaced with ``StoreBuilder`` (deprecated since |cp| 4.0.0); and the corresponding **Stores#create** and **KStream, KTable, KGroupedStream**'s overloaded functions that use it have also been removed. #. **KStream, KTable, KGroupedStream** overloaded functions that requires serde and other specifications explicitly are removed and replaced with simpler overloaded functions that use ``Consumed, Produced, Serialized, Materialized, Joined`` (deprecated since |cp| 4.0.0). #. **Processor#punctuate**, **ValueTransformer#punctuate**, **ValueTransformer#punctuate** and **RecordContext#schedule(long)** are removed and replaced by ``RecordContext#schedule(long, PunctuationType, Punctuator)`` (deprecated since |cp| 4.0.0). #. The second ``boolean`` typed parameter **loggingEnabled in ProcessorContext#register** has been removed; you can now use ``StoreBuilder#withLoggingEnabled, #withLoggingDisabled`` to specify the behavior when they create the state store (deprecated since |cp| 3.3.0). #. **KTable#writeAs, #print, #foreach, #to, #through** are removed as their semantics are more confusing than useful, you can call ``KTable#tostream()#writeAs`` etc instead for the same purpose (deprecated since |cp| 3.3.0). #. **StreamsConfig#KEY_SERDE_CLASS_CONFIG, #VALUE_SERDE_CLASS_CONFIG, #TIMESTAMP_EXTRACTOR_CLASS_CONFIG** are removed and replaced with ``StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG, #DEFAULT_VALUE_SERDE_CLASS_CONFIG, #DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG`` respectively (deprecated since |cp| 3.3.0). #. **StreamsConfig#ZOOKEEPER_CONNECT_CONFIG** is removed as we do not need |zk| dependency in Streams any more (deprecated since |cp| 3.2.0). Streams API changes in |cp| 4.1 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ A few new Streams configurations and public interfaces are added into |cp| 4.1.x release. Changes in bin/kafka-streams-application-reset """""""""""""""""""""""""""""""""""""""""""""" Added options to specify input topics offsets to reset according to `KIP-171 `__. Embedded Admin Client Configuration """"""""""""""""""""""""""""""""""" You can now customize the embedded admin client inside your Streams application which would be used to send all the administrative requests to Kafka brokers, such as internal topic creation, etc. This is done via the additional ``KafkaClientSupplier#getAdminClient(Map)`` interface; for example, users can provide their own ``AdminClient`` implementations to override the default ones in their integration testing. In addition, users can also override the configs that are passed into ``KafkaClientSupplier#getAdminClient(Map)`` to configure the returned ``AdminClient``. Such overridden configs can be specified via the ``StreamsConfig`` by adding the admin configs with the prefix as defined by ``StreamsConfig#adminClientPrefix(String)``. Any configs that aren't admin client configs will be ignored. For example: .. sourcecode:: java Properties streamProps = ...; // use retries=10 for the embedded admin client streamsProps.put(StreamsConfig.adminClientPrefix("retries"), 10); Streams API changes in |cp| 4.0 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. _streams_upgrade-guide_4.0.x-api-changes: |kstreams| and its API were improved and modified in the |cp| 4.0.x release. All of these changes are backward compatible, thus it's not require to update the code of your |kstreams| applications immediately. However, some methods were deprecated and thus it is recommend to update your code eventually to allow for future upgrades. In this section we focus on deprecated APIs. Building and running a topology """"""""""""""""""""""""""""""" The two main classes to specify a topology, ``KStreamBuilder`` and ``TopologyBuilder``, were deprecated and replaced by ``StreamsBuilder`` and ``Topology``. Note, that both new classes are in package ``org.apache.kafka.streams`` and that ``StreamsBuilder`` does not extend ``Topology``, i.e., the class hierarchy is different now. This change also affects ``KafkaStreams`` constructors that now only accept a ``Topology``. If you use ``StreamsBuilder`` you can obtain the constructed topology via ``StreamsBuilder#build()``. The new classes have basically the same methods as the old ones to build a topology via DSL or Processor API. However, some internal methods that were public in ``KStreamBuilder`` and ``TopologyBuilder``, but not part of the actual API, are no longer included in the new classes. .. literalinclude:: upgrade-guide-4_0/upgrade-guide_builder.java :language: java Describing topology and stream task metadata """""""""""""""""""""""""""""""""""""""""""" ``KafkaStreams#toString()`` and ``KafkaStreams#toString(final String indent)``, which were previously used to retrieve the user-specified processor topology information as well as runtime stream tasks metadata, are deprecated in 4.0.0. Instead, a new method of ``KafkaStreams``, namely ``localThreadsMetadata()`` is added which returns an ``org.apache.kafka.streams.processor.ThreadMetadata`` object for each of the local stream threads that describes the runtime state of the thread as well as its current assigned tasks metadata. Such information will be very helpful in terms of debugging and monitoring your streams applications. For retrieving the specified processor topology information, users can now call ``Topology#describe()`` which returns an ``org.apache.kafka.streams.TopologyDescription`` object that contains the detailed description of the topology (for DSL users they would need to call ``StreamsBuilder#build()`` to get the ``Topology`` object first). Merging KStreams: """"""""""""""""" As mentioned above, ``KStreamBuilder`` was deprecated in favor of ``StreamsBuilder``. Additionally, ``KStreamBuilder#merge(KStream...)`` was replaced by ``KStream#merge(KStream)`` and thus ``StreamsBuilder`` does not have a ``merge()`` method. Note: instead of merging an arbitrary number of ``KStream`` instances into a single ``KStream`` as in the old API, the new ``#merge()`` method only accepts a single ``KStream`` and thus merges two ``KStream`` instances into one. If you want to merge more than two ``KStream`` instances, you can call ``KStream#merge()`` multiple times. .. literalinclude:: upgrade-guide-4_0/upgrade-guide_merge.java :language: java Punctuation functions """"""""""""""""""""" The Processor API was extended to allow users to schedule ``punctuate`` functions either based on :ref:`event-time ` (i.e. ``PunctuationType.STREAM_TIME``) or *wall-clock-time* (i.e. ``PunctuationType.WALL_CLOCK_TIME``). Before this, users could only schedule based on *event-time* and hence the ``punctuate`` function was data-driven only. As a result, the original ``ProcessorContext#schedule`` is deprecated with a new overloaded function. In addition, the ``punctuate`` function inside ``Processor`` is also deprecated, and is replaced by the newly added ``Punctuator#punctuate`` interface. .. literalinclude:: upgrade-guide-4_0/upgrade-guide_schedule-punctuator.java :language: java Streams Configuration """"""""""""""""""""" You can now override the configs that are used to create internal repartition and changelog topics. You provide these configs via the ``StreamsConfig`` by adding the topic configs with the prefix as defined by ``StreamsConfig#topicPrefix(String)``. Any properties in the ``StreamsConfig`` with the prefix will be applied when creating internal topics. Any configs that aren't topic configs will be ignored. If you are already using ``StateStoreSupplier`` or ``Materialized`` to provide configs for changelogs, then they will take precedence over those supplied in the config. For example: .. sourcecode:: java Properties streamProps = ...; // use cleanup.policy=delete for internal topics streamsProps.put(StreamsConfig.topicPrefix("cleanup.policy"), "delete"); New classes for optional DSL parameters """"""""""""""""""""""""""""""""""""""" Several new classes were introduced, i.e., ``Serialized``, ``Consumed``, ``Produced`` etc. to enable us to reduce the overloads in the DSL. These classes mostly have a static method ``with`` to create an instance, i.e., ``Serialized.with(Serdes.Long(), Serdes.String())``. Scala users should be aware that they will need to surround ``with`` with backticks. For example: .. sourcecode:: scala // When using Scala: enclose "with" with backticks Serialized.`with`(Serdes.Long(), Serdes.String()) Streams API changes in |cp| 3.3 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |kstreams| and its API were improved and modified since the release of |cp| 3.2.x. All of these changes are backward compatible, thus it's not require to update the code of your |kstreams| applications immediately. However, some methods and configuration parameters were deprecated and thus it is recommend to update your code eventually to allow for future upgrades. In this section we focus on deprecated APIs. Streams Configuration """"""""""""""""""""" The following configuration parameters were renamed and their old names were deprecated. * ``key.serde`` renamed to ``default.key.serde`` * ``value.serde`` renamed to ``default.value.serde`` * ``timestamp.extractor`` renamed to ``default.timestamp.extractor`` Thus, ``StreamsConfig#KEY_SERDE_CONFIG``, ``StreamsConfig#VALUE_SERDE_CONFIG``, and ``StreamsConfig#TIMESTAMP_EXTRACTOR_CONFIG`` were deprecated, too. Additionally, the following method changes apply: * method ``keySerde()`` was deprecated and replaced by ``defaultKeySerde()`` * method ``valueSerde()`` was deprecated and replaced by ``defaultValueSerde()`` * new method ``defaultTimestampExtractor()`` was added Local timestamp extractors """""""""""""""""""""""""" The Streams API was extended to allow users to specify a per stream/table timestamp extractor. This simplifies the usage of different timestamp extractor logic for different streams/tables. Before, users needed to apply an ``if-then-else`` pattern within the default timestamp extractor to apply different logic to different input topics. The old behavior introduced unnecessary dependencies and thus limited code modularity and code reuse. To enable the new feature, the methods ``KStreamBuilder#stream()``, ``KStreamBuilder#table()``, ``KStream#globalTable()``, ``TopologyBuilder#addSource()``, and ``TopologyBuilder#addGlobalStore()`` have new overloads that allow to specify a "local" timestamp extractor that is solely applied to the corresponding input topics. .. literalinclude:: upgrade-guide-3_3/upgrade-guide_timestamp-extractor.java :language: java KTable Changes """""""""""""" The following methods have been deprecated on the ``KTable`` interface * ``void foreach(final ForeachAction action)`` * ``void print()`` * ``void print(final String streamName)`` * ``void print(final Serde keySerde, final Serde valSerde)`` * ``void print(final Serde keySerde, final Serde valSerde, final String streamName)`` * ``void writeAsText(final String filePath)`` * ``void writeAsText(final String filePath, final Serde keySerde, final Serde valSerde)`` * ``void writeAsText(final String filePath, final String streamName)`` * ``void writeAsText(final String filePath, final String streamName, final Serde keySerde, final Serde valSerde)`` These methods have been deprecated in favor of using the :ref:`Interactive Queries API `. If you want to query the current content of the state store backing the ``KTable``, use the following approach: * Make a call to ``KafkaStreams.store(String storeName, QueryableStoreType queryableStoreType)`` followed by a call to ``ReadOnlyKeyValueStore.all()`` to iterate over the keys of a ``KTable``. If you want to view the changelog stream of the ``KTable`` then you could do something along the lines of the following: * Call ``KTable.toStream()`` then call ``KStream#print()``. Streams API changes in |cp| 3.2 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |kstreams| and its API were improved and modified since the release of |cp| 3.1.x. Some of these changes are breaking changes that require you to update the code of your |kstreams| applications. In this section we focus on only these breaking changes. Handling Negative Timestamps and Timestamp Extractor Interface """""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" |kstreams| behavior with regard to invalid (i.e., negative) timestamps was improved. By default you will still get an exception on an invalid timestamp. However, you can reconfigure your application to react more gracefully to invalid timestamps which was not possible before. Even if you do not use a custom timestamp extractor, you need to recompile your application code, because the ``TimestampExtractor`` interface was changed in an incompatible way. The internal behavior of |kstreams| with regard to negative timestamps was changed. Instead of raising an exception if the timestamp extractor returns a negative timestamp, the corresponding record will be dropped silently and not be processed. This allows to process topic for which only a few records cannot provide a valid timestamp. Furthermore, the ``TimestampExtractor`` interface was changed and now has one additional parameter. This parameter provides a timestamp that can be used, for example, to return an estimated timestamp, if no valid timestamp can be extracted from the current record. The old default timestamp extractor ``ConsumerRecordTimestampExtractor`` was replaced with ``FailOnInvalidTimestamp``, and two new extractors which both extract a record's built-in timestamp were added (``LogAndSkipOnInvalidTimestamp`` and ``UsePreviousTimeOnInvalidTimestamp``). The new default extractor (``FailOnInvalidTimestamp``) raises an exception in case of a negative built-in record timestamp such that |kstreams|' default behavior is kept (i.e., fail-fast on negative timestamp). The two newly added extractors allow to handle negative timestamp more gracefully by implementing a log-and-skip and timestamp-estimation strategy. .. literalinclude:: upgrade-guide-3_2/upgrade-guide_timestamp-extractor.java :language: java Metrics """"""" If you provide custom metrics by implementing interface ``StreamsMetrics`` you need to update your code as the interface has many new methods allowing to register finer grained metrics than before. More details are available in `KIP-114 `__. .. literalinclude:: upgrade-guide-3_2/upgrade-guide_metrics.java :language: java Scala """"" Starting with 0.10.2.0, if your application is written in Scala, you may need to declare types explicitly in order for the code to compile. The :cp-examples:`StreamToTableJoinScalaIntegrationTest|src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala` has an example where the types of return variables are explicitly declared. Streams API changes in |cp| 3.1 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Stream grouping and aggregation """"""""""""""""""""""""""""""" Grouping (i.e., repartitioning) and aggregation of the ``KStream`` API was significantly changed to be aligned with the ``KTable`` API. Instead of using a single method with many parameters, grouping and aggregation is now split into two steps. First, a ``KStream`` is transformed into a ``KGroupedStream`` that is a repartitioned copy of the original ``KStream``. Afterwards, an aggregation can be performed on the ``KGroupedStream``, resulting in a new ``KTable`` that contains the result of the aggregation. Thus, the methods ``KStream#aggregateByKey(...)``, ``KStream#reduceByKey(...)``, and ``KStream#countByKey(...)`` were replaced by ``KStream#groupBy(...)`` and ``KStream#groupByKey(...)`` which return a ``KGroupedStream``. While ``KStream#groupByKey(...)`` groups on the current key, ``KStream#groupBy(...)`` sets a new key and re-partitions the data to build groups on the new key. The new class ``KGroupedStream`` provides the corresponding methods ``aggregate(...)``, ``reduce(...)``, and ``count(...)``. .. literalinclude:: upgrade-guide-3_1/upgrade-guide_grouping.java :language: java Auto Repartitioning """"""""""""""""""" Previously when performing ``KStream#join(...)``, ``KStream#outerJoin(...)`` or ``KStream#leftJoin(...)`` operations after a key changing operation, i.e, ``KStream#map(...)``, ``KStream#flatMap(...)``, ``KStream#selectKey(...)`` the developer was required to call ``KStream#through(...)`` to repartition the mapped ``KStream`` This is no longer required. Repartitioning now happens automatically for all join operations. .. literalinclude:: upgrade-guide-3_1/upgrade-guide_repartitioning.java :language: java TopologyBuilder """"""""""""""" Two public method signatures have been changed on ``TopologyBuilder``, ``TopologyBuilder#sourceTopics(String applicationId)`` and ``TopologyBuilder#topicGroups(String applicationId)``. These methods no longer take ``applicationId`` as a parameter and instead you should call ``TopologyBuilder#setApplicationId(String applicationId)`` before calling one of these methods. .. literalinclude:: upgrade-guide-3_1/upgrade-guide_topology-builder.java :language: java .. _streams_upgrade-guide_dsl-store-names: DSL: New parameters to specify state store names """""""""""""""""""""""""""""""""""""""""""""""" Apache Kafka ``0.10.1`` introduces :ref:`streams_developer-guide_interactive-queries`, which allow you to directly query state stores of a |kstreams| application. This new feature required a few changes to the operators in the DSL. Starting with |ak| ``0.10.1``, state stores must be always be "named", which includes both explicitly used state stores (e.g., defined by the user) and internally used state stores (e.g., created behind the scenes by operations such as ``count()``). This naming is a prerequisite to make state stores queryable. As a result of this, the previous "operator name" is now the state store name. This change affects ``KStreamBuilder#table(...)`` and *windowed* aggregates ``KGroupedStream#count(...)``, ``#reduce(...)``, and ``#aggregate(...)``. .. literalinclude:: upgrade-guide-3_1/upgrade-guide_operator-names.java :language: java Windowing """"""""" The API for ``JoinWindows`` was improved. It is not longer possible to define a window with a default size (of zero). Furthermore, windows are not named anymore. Rather, any such naming is now done for state stores. See section :ref:`DSL: New parameters to specify state store names ` above). .. literalinclude:: upgrade-guide-3_1/upgrade-guide_windows.java :language: java