Reset Kafka Streams Applications in Confluent Platform

You can reset an application and force it to reprocess its data from scratch by using the application reset tool. This can be useful for development and testing, or when fixing bugs.

The application reset tool handles the Kafka Streams user topics (input, output, and intermediate topics) and internal topics differently when resetting the application.

Here’s what the application reset tool does for each topic type:

  • Input topics: Reset offsets to specified position. By default they are reset to the beginning of the topic.
  • Intermediate topics: Skip to the end of the topic, i.e., set the application’s committed consumer offsets for all partitions to each partition’s logSize (for consumer group application.id).
  • Internal topics: Delete the internal topic (this automatically deletes any committed offsets).

The application reset tool does not:

  • Reset output topics of an application. If any output (or intermediate) topics are consumed by downstream applications, it is your responsibility to adjust those downstream applications as appropriate when you reset the upstream application.
  • Reset the local environment of your application instances. It is your responsibility to delete the local state on any machine on which an application instance was run. See the instructions in section Step 2: Reset the local environments of your application instances on how to do this.
  • Delete schemas in Schema Registry for internal topics. You must delete schemas for internal topics manually if you reset an app that uses Schema Registry. The reset tool has a “dry run” option you can use to see the internal topics that the tool will delete.
Prerequisites
  • All instances of your application must be stopped. Otherwise, the application may enter an invalid state, crash, or produce incorrect results. You can verify whether the consumer group with ID application.id is still active by using bin/kafka-consumer-groups.
  • Use this tool with care and double-check its parameters: If you provide wrong parameter values (e.g., typos in application.id) or specify parameters inconsistently (e.g., specify the wrong input topics for the application), this tool might invalidate the application’s state or even impact other applications, consumer groups, or your Apache Kafka® topics.
  • You should manually delete and re-create any intermediate topics before running the application reset tool. This will free up disk space in Kafka brokers.
  • You should delete and recreate intermediate topics before running the application reset tool, unless the following applies:
    • You have external downstream consumers for the application’s intermediate topics.
    • You are in a development environment where manually deleting and re-creating intermediate topics is unnecessary.

Compatible topology changes

In general, altering a Kafka Streams topology by adding or removing operations requires an application reset. This is because Kafka Streams must manage persistent resources like state stores and internal topics by name. You can see these names by printing the topology description with the topology.describe() method.

To check if a topology change is incompatible and requires a reset, perform a comparison of the TopologyDescription for the old and new topology. Names of everything internal that is stateful, like state store names and their changelog topic names, as well as repartition topics, should not change, otherwise the application won’t be able to start.

Potentially compatible changes:

  • changing a filter condition
  • inserting new filters (record-by-record operation)
  • inserting a new map (record-by-record operation), if the data types are compatible, but be aware that a new map may create a downstream repartition topic that didn’t exist before, which may break compatibility
  • calling mapValues(), if the value type doesn’t change

Potentially incompatible changes:

  • changing the structure of the DAG topology
  • changing input or output data types of stateful operations, like aggregations or joins

Some compatibility checks are done on startup. Here are some examples of changes that are detected to determine if the new topology is incompatible. Kafka Streams can’t detect all incompatible changes, so other changes can cause a topology to be incompatible.

Changing the number of partitions

If you change the number of partitions of a topic used in the topology, the existing state stores may become inconsistent or incomplete. This can result in data loss or incorrect results. To avoid this, you should always use the same number of partitions for a topic throughout the life of the application.

A case of a compatible partition change is calling builder.stream("topic1").map().repartition(/*set partition count*/).join(...). Changing the number of partitions of topic1 is compatible, because the repartiton topic count is fixed.

Changing the key or value type

If you change the key or value type of a topic used in the topology, the existing state stores may become incompatible with the new type. This can result in data loss or incorrect results. To avoid this, you should always use the same key and value types for a topic throughout the life of the application.

Also, if schema evolution occurs, and the schema-id in Schema Registry changes for a key, the change may be incompatible.

Changing the state store configuration
If you change the configuration of a state store used in the topology, like the name, retention policy, or changelog topic, the existing state stores may become incompatible with the new configuration. An example of an incompatible modification to changelog configuration is changing the values in the map provided in withLoggingEnabled(Map <String, String> config). This can result in data loss or incorrect results. To avoid this, you should always use the same state store configuration throughout the life of the application.
Changing the topology structure
If you change the structure of the topology by adding or removing nodes, the existing state stores may become inconsistent or incomplete. This can result in data loss or incorrect results. To avoid this, you should always plan the topology structure carefully before deploying the application, and avoid making changes that are incompatible with the existing topology.

Step 1: Run the application reset tool

Invoke the application reset tool from the command line

$CONFLUENT_HOME/bin/kafka-streams-application-reset

The tool accepts the following parameters:

Option Description
–application-id <String: ID>
(REQUIRED) The Kafka Streams application ID
(application.id).
–bootstrap-server <String: server to connect to> REQUIRED unless --bootstrap-servers (deprecated) is specified. The server(s) to connect to. The broker list string has the format HOST1:PORT1,HOST2:PORT2.
–bootstrap-servers <String: urls> (DEPRECATED) Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2 (default: localhost:9092).
–by-duration <String> Reset offsets to offset by duration from the current timestamp. Format: PnDTnHnMnS.
–config-file <String: file name> Property file containing configs to be passed to admin clients and embedded consumer.
–dry-run Display the actions that would be performed, without executing the reset commands.
–force Force removing members of the consumer group. Intended to remove left-over members if long session timeout is configured.
–from-file <String: file name> Reset offsets to values defined in CSV file.
–input-topics <String: list> Comma-separated list of user input topics. For these topics, the tool resets the offset to the earliest available offset.
–intermediate-topics <String: list> Comma-separated list of intermediate user topics (topics used in the through() method). For these topics, the tool skips to the end.
–internal-topics <String: list> Comma-separated list of internal topics to delete. Must be a subset of the internal topics marked for deletion by the default behavior. Tip: Do a dry-run without this option to view these topics.
–shift-by <Long: number-of-offsets> Reset offsets, shifting the current offset by n. The value of n can be positive or negative.
–to-datetime <String> Reset offsets to offset from a datetime. Format: YYYY-MM-DDTHH:mm:SS.sss.
–to-earliest Reset offsets to the earliest offset.
–to-latest Reset offsets to the latest offset.
–to-offset <Long> Reset offsets to the specified offset.

Consider the following options as reset-offset scenarios for input-topics:

  • by-duration
  • from-file
  • shift-by
  • to-datetime
  • to-earliest
  • to-latest
  • to-offset

Only one of these scenarios can be defined. If not defined, to-earliest is executed by default.

You can combine all of the other parameters as needed. For example, if you want to restart an application from an empty internal state but not reprocess previous data, omit the parameters --input-topics and --intermediate-topics.

Step 2: Reset the local environments of your application instances

For a complete application reset, you must delete the application’s local state directory on any machines where the application instance was run. You must do this before restarting an application instance on the same machine. You can use either of these methods:

  • The API method KafkaStreams#cleanUp() in your application code.
  • Manually delete the corresponding local state directory (default location: /var/lib/kafka-streams/<application.id>). For more information, see state.dir StreamsConfig class.

Example

In this example you are developing and testing an application locally and you want to iteratively improve your application via run-reset-modify cycles.

package io.confluent.examples.streams;

import ...;

public class ResetDemo {

  public static void main(String[] args) throws Exception {
    // Kafka Streams configuration
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
    // ...and so on...

    // Define the processing topology
    StreamsBuilder builder = new StreamsBuilder();
    builder.stream("my-input-topic")
        .selectKey(...)
        .through("rekeyed-topic")
        .groupByKey()
        .count("global-count")
        .to("my-output-topic");

    KafkaStreams app = new KafkaStreams(builder.build(), props);

    // Delete the application's local state.
    // Note: In real application you'd call `cleanUp()` only under
    // certain conditions.  See tip on `cleanUp()` below.
    app.cleanUp();

    app.start();

    // Note: In real applications you would register a shutdown hook
    // that would trigger the call to `app.close()` rather than
    // using the sleep-then-close example we show here.
    Thread.sleep(30 * 1000L);
    app.close();
  }

}

Tip

To avoid the corresponding recovery overhead, you should not call cleanUp() unconditionally and every time an application instance is restarted or resumed. For example, in a production application you could use command line arguments to enable or disable the cleanUp() call on an as-needed basis. You can then perform run-reset-modify cycles as shown below:

# Run your application
  bin/kafka-run-class io.confluent.examples.streams.ResetDemo

# After stopping all application instances, reset the application
  bin/kafka-streams-application-reset --application-id my-streams-app \
                                      --input-topics my-input-topic \
                                      --intermediate-topics rekeyed-topic

# Now you can modify/recompile as needed and then re-run the application again.
# You can also experiment, for example, with different input data without
# modifying the application.

Related content

Note

This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.