You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka Streams is a new library. We recommend that you thoroughly test your Kafka Streams applications before putting them into production. We are looking for feedback on APIs, operators, documentation, and really anything that will make the end user experience better. Feel free to provide your feedback via email to
Is Kafka Streams a project separate from Apache Kafka?¶
No, it is not. Kafka Streams is a new component of the Apache Kafka open source project, and thus included in Apache Kafka 0.10+ releases.
Is Kafka Streams a proprietary library of Confluent?¶
No, it is not. Kafka Streams is a new component of the Apache Kafka open source project.
Do Kafka Streams applications run inside the Kafka brokers?¶
First, “Kafka Streams applications” are normal Java applications that happen to use the Kafka Streams library. You would run these applications on client machines at the perimeter of a Kafka cluster. In other words, Kafka Streams application do not run inside the Kafka brokers (servers) or the Kafka cluster – they are client-side applications.
What are the system dependencies of Kafka Streams?¶
Kafka Streams has no external dependencies on systems other than Apache Kafka.
The Kafka Streams implementation included in Apache Kafka 0.10.0.1 has an additional dependency on Apache ZooKeeper.
This explains why you are currently required to configure the
zookeeper.connect setting in your application (see
Kafka Streams Upgrade Guide).
This dependency on ZooKeeper is only temporary though and will be removed once KIP-4 is included in Apache Kafka, and the decision was made to not duplicate parts of the KIP-4 efforts just for the sake of Kafka Streams.
In the meantime, this additional system dependency should not matter much in practice because ZooKeeper is required
for running Apache Kafka anyways.
How do I migrate my Kafka Streams applications CP 3.0.0 applications to CP 3.0.1¶
We provide instructions in our Upgrade Guide.
Can I use Kafka Streams with Kafka clusters running 0.9, 0.8, or 0.7?¶
Unfortunately, no. You can run Kafka Streams applications based on Kafka version 0.10.0.x-cp1 against Kafka clusters running 0.10.0.x-cp1. For example, you can run 0.10.0.1-cp1 applications (CP 3.0.1) against 0.10.0.0-cp1 Kafka clusters (CP 3.0.0).
Troubleshooting and debugging¶
Get more meaningful Java stacktraces?¶
When you write Kafka Streams applications, you often create chains of method calls such as the following:
// Code all on one line. Unfortunately, this is bad practice when it comes to stacktraces. myStream.map(...).filter(...).countByKey(...);
Now if your code happens to trigger a runtime error, the Java stacktrace may not be very helpful because the JVM provides only line-number information about where an error occurred (“NullPointerException at line 123”). So, to pick the example above, you may deduce that some operation in the map/filter/countByKey chain failed, but the stacktrace will not tell you where exactly in the affected line.
A simple trick is to split your method chains across multiple lines, so that the row number returned by the stacktrace will more easily identify the actual culprit:
// Split the same code across multiple lines to benefit from more actionable stacktraces. myStream .map(...) .filter(...) .countByKey(...);
Invalid Timestamp Exception¶
If you get an exception similar to the one shown below, there are multiple possible causes:
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Extracted timestamp value is negative, which is not allowed. at org.apache.kafka.streams.processor.internal.RecordQueue.addRawRecords(RecordQueue.java:111)
This error means that the timestamp extractor of your Kafka Streams application failed to extract a valid timestamp from a record. Typically, this points to a problem with the record (e.g., the record does not contain a timestamp at all), but it could also indicate a problem or bug in the timestamp extractor used by the application.
When does a record not contain a valid timestamp:
- If you are using the default
ConsumerRecordTimestampExtractor, it is most likely that your records do not carry an embedded timestamp (embedded record timestamps got introduced in Kafka’s message format in Kafka
0.10). This might happen, if for example, you consume a topic that is written by old Kafka producer clients (i.e., version
0.9or earlier) or by third-party producer clients. Another situation where this may happen is after upgrading your Kafka cluster from
0.10, where all the data that was generated with
0.9does not include the
- If you are using a custom timestamp extractor, make sure that your extractor is properly handling invalid (negative) timestamps, where “properly” depends on the semantics of your application. For example, you can return a default or an estimated timestamp if you cannot extract a valid timestamp (maybe the timestamp field in your data is just missing).
- You can also switch to processing-time semantics via
WallclockTimestampExtractor; whether such a fallback is an appropriate response to this situation depends on your use case.
However, as a first step you should identify and fix the root cause for why such problematic records were written to Kafka in the first place. In a second step you may consider applying workarounds (as described above) when dealing with such records (for example, if you need to process those records after all). Another option is to regenerate the records with correct timestamps and write them to a new Kafka topic.
When the timestamp extractor causes the problem:
In this situation you should debug and fix the erroneous extractor. If the extractor is built into Kafka, please report the bug to the Kafka developer mailing list at email@example.com (see instructions <http://kafka.apache.org/contact>__); in the meantime, you may write a custom timestamp extractor that fixes the problem and configure your application to use that extractor for the time being.