FAQ

Attention

Kafka Streams is a new library. We recommend that you thoroughly test your Kafka Streams applications before putting them into production. We are looking for feedback on APIs, operators, documentation, and really anything that will make the end user experience better. Feel free to provide your feedback via email to users@kafka.apache.org.

General

Is Kafka Streams a project separate from Apache Kafka?

No, it is not. Kafka Streams is a new component of the Apache Kafka open source project, and thus included in Apache Kafka 0.10+ releases.

Is Kafka Streams a proprietary library of Confluent?

No, it is not. Kafka Streams is a new component of the Apache Kafka open source project.

Do Kafka Streams applications run inside the Kafka brokers?

First, “Kafka Streams applications” are normal Java applications that happen to use the Kafka Streams library. You would run these applications on client machines at the perimeter of a Kafka cluster. In other words, Kafka Streams application do not run inside the Kafka brokers (servers) or the Kafka cluster – they are client-side applications.

What are the system dependencies of Kafka Streams?

Kafka Streams has no external dependencies on systems other than Apache Kafka.

Note

ZooKeeper dependency: The Kafka Streams implementation included in Apache Kafka 0.10.1.0 has an additional dependency on Apache ZooKeeper. This explains why you are currently required to configure the zookeeper.connect setting in your application (see Kafka Streams Upgrade Guide). This dependency on ZooKeeper is only temporary though and will be removed once KIP-4 is included in Apache Kafka, and the decision was made to not duplicate parts of the KIP-4 efforts just for the sake of Kafka Streams. In the meantime, this additional system dependency should not matter much in practice because ZooKeeper is required for running Apache Kafka anyways.

How do I migrate my Kafka Streams applications CP 3.0.x applications to CP 3.1.1

We provide instructions in our Upgrade Guide.

Can I use Kafka Streams with Kafka clusters running 0.9, 0.8, or 0.7?

Unfortunately, no. You can run Kafka Streams applications based on CP 3.0.x (Kafka 0.10.0.x-cp1) against Kafka clusters running CP 3.0.x (Kafka 0.10.0.x-cp1 and) and CP 3.1.1 (Kafka 0.10.1.0-cp2). Kafka Streams applications based on CP 3.1.1 (Kafka 0.10.1.0-cp2) can only run against Kafka clusters running CP 3.1.1 (Kafka 0.10.1.0-cp2).

Troubleshooting and debugging

Java stacktraces

When you write Kafka Streams applications, you often create chains of method calls such as the following:

// Code all on one line.  Unfortunately, this is bad practice when it comes to stacktraces.
myStream.map(...).filter(...).groupByKey(...).count(...);

Now if your code happens to trigger a runtime error, the Java stacktrace may not be very helpful because the JVM provides only line-number information about where an error occurred (“NullPointerException at line 123”). So, to pick the example above, you may deduce that some operation in the map/filter/countByKey chain failed, but the stacktrace will not tell you where exactly in the affected line.

A simple trick is to split your method chains across multiple lines, so that the row number returned by the stacktrace will more easily identify the actual culprit:

// Split the same code across multiple lines to benefit from more actionable stacktraces.
myStream
  .map(...)
  .filter(...)
  .groupByKey(...)
  .count(...);

Inspecting topologies

You can visualise the internal topology of a Kafka Streams application by calling the KafkaStreams#toString() method:

...
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams threads
streams.start();
// Print the internal topology to stdout
System.out.println(streams.toString());

An example topology that is output from this call is shown below. All sources, sinks, and their backing topics, as well as all intermediate nodes and their state stores are collected and printed:

KafkaStreams processID:a35956ee-91ee-4e6a-9005-8d93bac82210
StreamsThread appId:join-integration-test
StreamsThread clientId:join-integration-test-1
Active tasks:
    StreamsTask taskId:0_0
        ProcessorTopology:
        KSTREAM-SOURCE-0000000000: topics: [user-clicks] children [KSTREAM-LEFTJOIN-0000000003]
        KSTREAM-LEFTJOIN-0000000003: children [KSTREAM-MAP-0000000004]
        KSTREAM-MAP-0000000004: children [KSTREAM-FILTER-0000000007]
        KSTREAM-FILTER-0000000007: children [KSTREAM-SINK-0000000006]
        KSTREAM-SINK-0000000006: topic:join-integration-test-ClicksPerRegionUnwindowed-repartition
        KSTREAM-SOURCE-0000000001: topics: [user-regions] children [KTABLE-SOURCE-0000000002]
        KTABLE-SOURCE-0000000002: stateStores [user-regions-store-name]
        Partitions [user-clicks-0,user-regions-0]
    StreamsTask taskId:1_0
        ProcessorTopology:
        KSTREAM-SOURCE-0000000008: topics: [ClicksPerRegionUnwindowed-repartition] children [KSTREAM-REDUCE-0000000005]
        KSTREAM-REDUCE-0000000005: stateStores [ClicksPerRegionUnwindowed] children [KTABLE-TOSTREAM-0000000009]
        KTABLE-TOSTREAM-0000000009: children [KSTREAM-SINK-0000000010]
        KSTREAM-SINK-0000000010: topic:output-topic
        Partitions [join-integration-test-ClicksPerRegionUnwindowed-repartition-0]
Standby tasks:

Inspecting streams

To inspect the records of a stream, you can call the KStream#print() or the KStream#writeAsText() methods. They both print the elements of the stream, but the former prints to standard out, while the latter prints the elements to a file:

...
KStream<Long, String> input1 = builder.stream(INPUT_TOPIC_1);
    input1.join(input2, new ValueJoiner<String, String, String>() {
        @Override
        public String apply(final String value1, final String value2) {
            return value1 + " --> " + value2;
        }
    }, JoinWindows.of(10000)).print();
streams.start();

The output would be the records after the join, e.g., if we were joining two records with the same key K and values V1 and V2 then what is printed on the console would be K, V1 --> V2 as shown for some sample data below:

alice, 5 --> 7
bob, 234 --> 19
charlie, 9 --> 10

Invalid Timestamp Exception

If you get an exception similar to the one shown below, there are multiple possible causes:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Extracted timestamp value is negative, which is not allowed.
  at org.apache.kafka.streams.processor.internal.RecordQueue.addRawRecords(RecordQueue.java:111)

This error means that the timestamp extractor of your Kafka Streams application failed to extract a valid timestamp from a record. Typically, this points to a problem with the record (e.g., the record does not contain a timestamp at all), but it could also indicate a problem or bug in the timestamp extractor used by the application.

When does a record not contain a valid timestamp:

  • If you are using the default ConsumerRecordTimestampExtractor, it is most likely that your records do not carry an embedded timestamp (embedded record timestamps got introduced in Kafka’s message format in Kafka 0.10). This might happen, if for example, you consume a topic that is written by old Kafka producer clients (i.e., version 0.9 or earlier) or by third-party producer clients. Another situation where this may happen is after upgrading your Kafka cluster from 0.9 to 0.10, where all the data that was generated with 0.9 does not include the 0.10 message timestamps.
  • If you are using a custom timestamp extractor, make sure that your extractor is properly handling invalid (negative) timestamps, where “properly” depends on the semantics of your application. For example, you can return a default or an estimated timestamp if you cannot extract a valid timestamp (maybe the timestamp field in your data is just missing).
  • You can also switch to processing-time semantics via WallclockTimestampExtractor; whether such a fallback is an appropriate response to this situation depends on your use case.

However, as a first step you should identify and fix the root cause for why such problematic records were written to Kafka in the first place. In a second step you may consider applying workarounds (as described above) when dealing with such records (for example, if you need to process those records after all). Another option is to regenerate the records with correct timestamps and write them to a new Kafka topic.

When the timestamp extractor causes the problem:

In this situation you should debug and fix the erroneous extractor. If the extractor is built into Kafka, please report the bug to the Kafka developer mailing list at dev@kafka.apache.org (see instructions at http://kafka.apache.org/contact); in the meantime, you may write a custom timestamp extractor that fixes the problem and configure your application to use that extractor for the time being.

Why is punctuate() not called?

Currently, punctuate() is not wall-clock time but data driven (i.e., driven by internally tracked stream-time). The stream-time is derived from the extracted record timestamps provided by the used TimestampExtractor.

For example, let’s assume you registered a punctuate() schedule of 10 seconds. If you were to process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), then punctuate() would be called 6 times – regardless of the time required to actually process those records; i.e., punctuate() would be called 6 times no matter whether processing these 60 records would take a second, a minute, or an hour.

Attention

Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. If at least one partition does not have any new data available, stream-time will not be advanced and thus punctuate() will not be triggered. This behavior is independent of the configured timestamp extractor, i.e., using WallclockTimestampExtractor does not enable wall-clock triggering of punctuate().

How can I convert a KStream to a KTable without an aggregation step?

If you want to convert a derived KStream (i.e., that is not read from a Kafka topic) into a KTable you have two options:

  1. You can write the KStream into a Kafka topic and read it back as a KTable:
KStreamBuilder builder = new KStreamBuilder();
KStream stream = ... // some complex computation

// You should manually create this topic before starting your application.
// Also, because you want to read the topic back as a KTable, you might want to enable log compaction policy for this topic
// to align log cleanup policy with KTable semantics.
stream.to("dummy-topic");
KTable table = builder.table("dummy-topic");

This is (with regard to the code) the simplest approach but has the disadvantage that (a) you need to manage an additional topic and that (b) it results in additional network traffic because data is written and re-read from Kafka.

  1. As an alternative, you can create a dummy aggregation step:
KStreamBuilder builder = new KStreamBuilder();
KStream stream = ... // some computation

KTable table = stream.groupByKey().reduce(new Reducer() {
    @Override
    public Object apply(Object oldValue, Object newValue) {
        return newValue;
    }
}, "someStoreName");

This approach (with regard to the code) is somewhat more complex compared to (1) but has the advantage that (a) no manual topic management is required and (b) re-reading the data from Kafka is not necessary.

Note that in approach (2) Kafka Streams will create an internal changelog topic to back up the KTable for fault-tolerance. Thus, both approaches require some storage in Kafka and result in additional write network traffic. Overall, it’s a trade-off between slightly more complex (and maybe unintuitive code) and manual topic management.