FAQ

Attention

We are looking for feedback on APIs, operators, documentation, and really anything that will make the end user experience better. Feel free to provide your feedback via email to users@kafka.apache.org.

Table of Contents

General

Is Kafka Streams a project separate from Apache Kafka?

No, it is not. Kafka’s Streams API – aka Kafka Streams – is a component of the Apache Kafka open source project, and thus included in Apache Kafka 0.10+ releases. The source code is available at https://github.com/apache/kafka/tree/trunk/streams.

Is Kafka Streams a proprietary library of Confluent?

No, it is not. Kafka’s Streams API – aka Kafka Streams – is a component of the Apache Kafka open source project, and thus included in Apache Kafka 0.10+ releases. The source code is available at https://github.com/apache/kafka/tree/trunk/streams.

Do Kafka Streams applications run inside the Kafka brokers?

No, they don’t run inside the Kafka brokers. “Kafka Streams applications” are normal Java applications that happen to use the Kafka Streams library. You would run these applications on client machines at the perimeter of a Kafka cluster. In other words, Kafka Streams application do not run inside the Kafka brokers (servers) or the Kafka cluster – they are client-side applications.

What are the system dependencies of Kafka Streams?

Kafka Streams has no external dependencies on systems other than Apache Kafka.

Which versions of Kafka clusters are supported by Kafka Streams?

The following versions are supported:

  Kafka Broker (columns)
Streams API (rows) 3.0.x / 0.10.0.x 3.1.x / 0.10.1.x 3.2.0 / 0.10.2.0
3.0.x / 0.10.0.x compatible compatible compatible
3.1.x / 0.10.1.x   compatible compatible
3.2.0 / 0.10.2.0   compatible compatible

The Streams API does is not compatible with clusters running older Kafka brokers (0.7, 0.8, 0.9).

What programming languages are supported?

The Kafka Streams API is implemented in Java. The Developer Guide provides several example applications written in Java 7 and Java 8+. Applications can also be written in other JVM-based languages such as Scala, however users may need to declare types explicitly in order for the code to compile: StreamToTableJoinScalaIntegrationTest shows an example where the types of return variables are explicitly declared.

Scalability

Maximum parallelism of my application? Maximum number of app instances I can run?

Slightly simplified, the maximum parallelism at which your application may run is determined by the maximum number of partitions of the input topic(s) the application is reading from in its processing topology. The number of input partitions determines how many stream tasks Kafka Streams will create for the application, and the amount of stream tasks is the upper bound on the application’s parallelism.

Let’s take an example. Imagine your application is reading from an input topic that has 5 partitions. How many app instances can we run here?

The short answer is that we can run up to 5 instances of this application, because the application’s maximum parallelism is 5. If we run more than 5 app instances, then the “excess” app instances will successfully launch but remain idle. If one of the busy instances goes down, one of the idle instances will resume the former’s work.

The longer, more detailed answer for this example would be:

  • 5 stream tasks will be created, each of which will process one of the input partitions. And it’s actually the number of stream tasks that determines the maximum parallelism of an application – the number of input partitions is simply the main parameter from which the number of stream tasks is computed.

Now that we know the application’s theoretical maximum parallelism, the next question is how to actually run the application at its maximum parallelism? To do so, we must ensure that all 5 stream tasks are running in parallel, i.e. there should be 5 processing threads, with each thread executing one task. There are several options at your disposal:

  • Option 1 is to scale horizontally (scaling out): You run five single-threaded instances of your application, each of which executes one thread/task (i.e., num.stream.threads would be set to 1; see optional configuration parameters). This option is used, for example, when you have many low-powered machines for running the app instances.
  • Option 2 is to scale vertically (scaling up): You run a single multi-threaded instance of your application that will execute all threads/tasks (i.e., num.stream.threads would be set to 5). This option is useful, for example, when you have a very powerful machine to run the app instance.
  • Option 3 combines options 1 and 2: You run multiple instances of your application, each of which is running multiple threads. This option is used, for example, when your application runs at large scale. Here, you may even chose to run multiple app instances per machine.

Failure and exception handling

Handling corrupted records and deserialization errors (“poison pill messages”)?

The two most common patterns are:

We provide a full demo application at HandlingCorruptedInputRecordsIntegrationTest.

Option 1: Skip corrupted records with flatMap

This is arguably what most users would like to do. The idea is that we attempt to manually deserialize a record and catch any serialization exception that might be thrown.

  • We use flatMap because it allows you to output zero, one, or more output records per input record. In the case of a corrupted record we output nothing (zero records), thereby effectively ignoring/skipping the corrupted record. As an optional step we may choose to log the fact that we skipped a corrupted record for monitoring and alerting purposes.
  • Benefit of this approach compared to option 2 below: We need to manually deserialize a record only once!
  • Drawback of this approach: flatMap “marks” the input stream for potential data re-partitioning, i.e. if you perform a key-based operation such as groupings (groupBy or groupByKey) or joins afterwards, then your data will be re-partitioned. Since such re-partitioning might be a costly step we don’t want that to happen unnecessarily. Thus if you either (1) know that the record keys are always valid or (2) that you don’t need to operate on the keys in your topology anyways (thus allowing you to keep them as “raw” keys in byte[] format), you can change from flatMap` to flatMapValues, which will not result in data re-partitioning even if you group or join the stream later.

Code example:

Serde<byte[]> bytesSerde = Serdes.ByteArray();
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();

// Input topic, which might contain corrupted records/messages
KStream<byte[], byte[]> input = builder.stream(bytesSerde, bytesSerde, "input-topic");

// Note how the returned stream is of type `KStream<String, Long>`.
KStream<String, Long> doubled = input.flatMap(
    (k, v) -> {
      try {
        // Attempt deserialization
        String key = stringSerde.deserializer().deserialize("input-topic", k);
        long value = longSerde.deserializer().deserialize("input-topic", v);

        // Ok, the record is valid (not corrupted).  Let's take the
        // opportunity to also process the record in some way so that
        // we haven't paid the deserialization cost just for "poison pill"
        // checking.
        return Collections.singletonList(KeyValue.pair(key, 2 * value));
      }
      catch (SerializationException e) {
        // Ignore/skip the corrupted record by catching the exception.
        // Optionally, we can log the fact that we did so:
        System.err.println("Could not deserialize record: " + e.getMessage());
      }
      return Collections.emptyList();
    }
);

Note

Adding a quarantine topic (dead letter queue) to this approach: If you wanted to, you could also forward corrupt records into a quarantine topic in the catch clause above. In order to do so, you would use the Java Producer API to write a corrupted record directly from within the flatMap to the quarantine topic. The drawback of this approach is that such “manual” writes are side effects of your application that are invisible to the Kafka Streams API, and as such they are not benefitting from the end-to-end processing guarantees of the Streams API.

Option 2: Quarantine corrupted records (dead letter queue) with branch

Like in option 1, we attempt to manually deserialize a record. However, in contrast to option 1, which ignores corrupted records, option 2 retains corrupted records by filtering them out of the “main” input stream and writing them to a quarantine topic (think: dead letter queue). The drawback is that, for valid records, we must pay the manual deserialization cost twice.

Code example:

// Input topic, which might contain corrupted records/messages
KStream<byte[], byte[]> input = ...;

// Note how the key and value types of `partitioned` are still `byte[]`.
KStream<byte[], byte[]>[] partitioned = input.branch(
    (k, v) -> {
      boolean isValidRecord = false;
      try {
        // Attempt deserialization
        stringSerde.deserializer().deserialize("input-topic", k);
        longSerde.deserializer().deserialize("input-topic", v);
        // Ok, the record is valid (not corrupted).
        isValidRecord = true;
      }
      catch (SerializationException ignored) {}
      return isValidRecord;
    },
    (k, v) -> true
);

// `partitioned[0]` is the `KStream<byte[], byte[]>` that contains only valid records.
// Note how the returned stream is of type `KStream<String, Long>`.
KStream<String, Long> doubled = partitioned[0].map(
    (key, value) -> KeyValue.pair(
        // Must deserialize a second time unfortunately.
        stringSerde.deserializer().deserialize("input-topic", key),
        2 * longSerde.deserializer().deserialize("input-topic", value)));

// `partitioned[1]` contains only corrupted records and thus acts as a "dead letter queue".
// Don't forget to actually write the dead letter queue back to Kafka!
partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");

Option 3: Skip corrupted records with a custom serde

You can also implement a custom serde to handle corrupt records. For example, you can implement a org.apache.kafka.common.serialization.Deserializer that returns a default value, some sentinel value of your choice, or null whenever deserialization fails instead of throwing an exception.

Important

Be careful when returning null: null record values have special semantics for tables, where they are interpreted as tombstones that will cause the deletion of record keys from the table.

Once your custom serde is implemented, you can either configure your application to use it as the default key and/or value serde, or use it explicitly when calling API methods such as KStream#to(), or both. Don’t forget to adapt your application, if needed, to properly handle the “fallback-if-corrupted” record keys/values.

Security

Application fails when running against a secured Kafka cluster?

When your application processes data from a secured Kafka cluster, you may run into error messages such as:

> Could not create internal topics: Could not create topic: <NAME OF TOPIC> due to Cluster authorization failed”

Make sure that the principal running your application has the ACL --cluster --operation Create set so that the application has the permissions to create internal topics.

Troubleshooting and debugging

Easier to interpret Java stacktraces?

When you write Kafka Streams applications, you often create chains of method calls such as the following:

// Code all on one line.  Unfortunately, this is bad practice when it comes to stacktraces.
myStream.map(...).filter(...).groupByKey(...).count(...);

Now if your code happens to trigger a runtime error, the Java stacktrace may not be very helpful because the JVM provides only line-number information about where an error occurred (“NullPointerException at line 123”). So, to pick the example above, you may deduce that some operation in the map/filter/countByKey chain failed, but the stacktrace will not tell you where exactly in the affected line.

A simple trick is to split your method chains across multiple lines, so that the row number returned by the stacktrace will more easily identify the actual culprit:

// Split the same code across multiple lines to benefit from more actionable stacktraces.
myStream
  .map(...)
  .filter(...)
  .groupByKey(...)
  .count(...);

Visualizing topologies?

You can visualise the internal topology of a Kafka Streams application by calling the KafkaStreams#toString() method:

KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams threads
streams.start();
// Print the internal topology to stdout
System.out.println(streams.toString());

An example topology that is output from this call is shown below. All sources, sinks, and their backing topics, as well as all intermediate nodes and their state stores are collected and printed:

KafkaStreams processID:a35956ee-91ee-4e6a-9005-8d93bac82210
StreamsThread appId:join-integration-test
StreamsThread clientId:join-integration-test-1
Active tasks:
    StreamsTask taskId:0_0
        ProcessorTopology:
        KSTREAM-SOURCE-0000000000: topics: [user-clicks] children [KSTREAM-LEFTJOIN-0000000003]
        KSTREAM-LEFTJOIN-0000000003: children [KSTREAM-MAP-0000000004]
        KSTREAM-MAP-0000000004: children [KSTREAM-FILTER-0000000007]
        KSTREAM-FILTER-0000000007: children [KSTREAM-SINK-0000000006]
        KSTREAM-SINK-0000000006: topic:join-integration-test-ClicksPerRegionUnwindowed-repartition
        KSTREAM-SOURCE-0000000001: topics: [user-regions] children [KTABLE-SOURCE-0000000002]
        KTABLE-SOURCE-0000000002: stateStores [user-regions-store-name]
        Partitions [user-clicks-0,user-regions-0]
    StreamsTask taskId:1_0
        ProcessorTopology:
        KSTREAM-SOURCE-0000000008: topics: [ClicksPerRegionUnwindowed-repartition] children [KSTREAM-REDUCE-0000000005]
        KSTREAM-REDUCE-0000000005: stateStores [ClicksPerRegionUnwindowed] children [KTABLE-TOSTREAM-0000000009]
        KTABLE-TOSTREAM-0000000009: children [KSTREAM-SINK-0000000010]
        KSTREAM-SINK-0000000010: topic:output-topic
        Partitions [join-integration-test-ClicksPerRegionUnwindowed-repartition-0]
Standby tasks:

Inspecting streams and tables?

To inspect the records of a stream or a table, you can call the KStream#print() or the KStream#writeAsText() methods (same for KTable). They both print the elements of the stream, but the former prints to STDOUT, while the latter prints the elements to a file.

Here is an example that uses KStream#print():

import java.util.concurrent.TimeUnit;
KStream<String, Long> left = ...;
KStream<String, Long> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left
    .join(right,
      (leftValue, rightValue) -> leftValue + " --> " + rightValue, /* ValueJoiner */
      JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Long())   /* right value */
    .print(Serdes.String(), Serdes.String());

The output would be the records after the join; e.g., if we were joining two records with the same key K and values V1 and V2, then what is printed on the console would be K, V1 --> V2 as shown for some sample data below:

alice, 5 --> 7
bob, 234 --> 19
charlie, 9 --> 10

Invalid Timestamp Exception

If you get an exception similar to the one shown below, there are multiple possible causes:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Input record {...} has invalid (negative) timestamp. \
        Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, \
        or because the input topic was created before upgrading the Kafka cluster to 0.10+. \
        Use a different TimestampExtractor to process this data.
  at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:62)

This error means that the timestamp extractor of your Kafka Streams application failed to extract a valid timestamp from a record. Typically, this points to a problem with the record (e.g., the record does not contain a timestamp at all), but it could also indicate a problem or bug in the timestamp extractor used by the application.

When does a record not contain a valid timestamp:

  • If you are using the default FailOnInvalidTimestamp timestamp extractor, it is most likely that your records do not carry an embedded timestamp (embedded record timestamps were introduced in Kafka’s message format in Kafka 0.10). This might happen, if for example, you consume a topic that is written by old Kafka producer clients (i.e., version 0.9 or earlier) or by third-party producer clients. Another situation where this may happen is after upgrading your Kafka cluster from 0.8 or 0.9 to 0.10, where all the data that was generated with 0.8 or 0.9 does not include the 0.10 message timestamps. You can consider using an alternative timestamp extractor like UsePreviousTimeOnInvalidTimestamp or LogAndSkipOnInvalidTimestamp, both of which handle negative timestamps more gracefully. See Kafka Streams Developer Guide: Timestamp Extractor for details.
  • If you are using a custom timestamp extractor, make sure that your extractor is properly handling invalid (negative) timestamps, where “properly” depends on the semantics of your application. For example, you can return a default timestamp or an estimated timestamp if you cannot extract a valid timestamp directly from the record.
  • You can also switch from event-time processing to processing-time semantics via WallclockTimestampExtractor; whether such a fallback is an appropriate response to this situation depends on your use case.

However, as a first step you should identify and fix the root cause for why such problematic records were written to Kafka in the first place. In a second step you may consider applying workarounds as described above when dealing with such records. Another option is to regenerate the records with correct timestamps and write them to a new Kafka topic.

When the timestamp extractor causes the problem:

In this situation you should debug and fix the erroneous extractor. If the extractor is built into Kafka, please report the bug to the Kafka developer mailing list at dev@kafka.apache.org (see instructions at http://kafka.apache.org/contact); in the meantime, you may write a custom timestamp extractor that fixes the problem and configure your application to use that extractor for the time being.

Why is punctuate() not called?

Currently, punctuate() is not wall-clock time but data driven (i.e., driven by internally tracked stream-time). The stream-time is derived from the extracted record timestamps provided by the used TimestampExtractor.

For example, let’s assume you registered a punctuate() schedule of 10 seconds. If you were to process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), then punctuate() would be called 6 times – regardless of the time required to actually process those records; i.e., punctuate() would be called 6 times no matter whether processing these 60 records would take a second, a minute, or an hour.

Attention

Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. If at least one partition does not have any new data available, stream-time will not be advanced and thus punctuate() will not be triggered. This behavior is independent of the configured timestamp extractor, i.e., using WallclockTimestampExtractor does not enable wall-clock triggering of punctuate().

How can I convert a KStream to a KTable without an aggregation step?

If you want to convert a derived KStream (i.e., a KSTream that is not read from a Kafka topic) into a KTable you have two options.

Option 1: Write KStream to Kafka, read back as KTable

You can write the KStream into a Kafka topic and read it back as a KTable. As per our general recommendation, you should manually pre-create this topic to ensure it has the correct number of partitions, topic settings, and so on.

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

// You should manually create the dummy topic before starting your application.
//
// Also, because you want to read the topic back as a KTable, you might want to enable
// log compaction for this topic to align the topic's cleanup policy with KTable semantics.
stream.to(Serdes.String(), Serdes.Long(), "dummy-topic");
KTable<String, Long> table = builder.table(Serdes.String(), Serdes.Long(), "dummy-topic", "dummy-store");

This is the simplest approach with regard to the code. However, it has the disadvantages that (a) you need to manage an additional topic and that (b) it results in additional network traffic because data is written to and re-read from Kafka.

Option 2: Perform a dummy aggregation

As an alternative to option 1, you can choose to create a dummy aggregation step:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

// Java 8+ example, using lambda expressions
KTable<String, Long> table = stream.groupByKey().reduce(
    (aggValue, newValue) -> newValue,
    "dummy-aggregation-store");

// Java 7 example
KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() {
      @Override
      public Long apply(Long aggValue, Long newValue) {
        return newValue;
      }
    },
    "dummy-aggregation-store");

This approach is somewhat more complex with regard to the code compared to option 1 but has the advantage that (a) no manual topic management is required and (b) re-reading the data from Kafka is not necessary.

In option 2, Kafka Streams will create an internal changelog topic to back up the KTable for fault tolerance. Thus, both approaches require some additional storage in Kafka and result in additional network traffic. Overall, it’s a trade-off between slightly more complex code in option 2 versus manual topic management in option 1.