.. title:: Frequently Asked Questions for Kafka Streams in Confluent Platform
.. meta::
:description: Frequently asked questions about Kafka Streams and Confluent Platform
.. _streams_faq:
Frequently Asked Questions for |kstreams| in |cp|
=================================================
|confluent| is looking for feedback on APIs, operators, documentation, and
anything else that will improve the end-user experience. Feel free to
provide your feedback at the `Confluent Community Forum `__.
General
-------
Is |kstreams| a project separate from |ak|?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
No. The |kstreams| API, aka |kstreams|, is a component of the |ak-tm| open
source project and is included in |ak| 0.10+ releases. The source code is
available at https://github.com/apache/kafka/tree/trunk/streams.
Is |kstreams| a proprietary library of |confluent|?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
No. The |kstreams| API is a component of the |ak| open source project is
included in |ak| 0.10+ releases. The source code is available at
https://github.com/apache/kafka/tree/trunk/streams.
Do |kstreams| applications run inside the |ak| brokers?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
No, they don't run inside the |ak| brokers. "|kstreams| applications" are
normal Java applications that use the |kstreams| library.
You run these applications on client machines at the periphery of a |ak|
:term:`cluster `. In other words, |kstreams| applications don't
run inside the :term:`Kafka brokers ` (servers) or the |ak| cluster.
They're client-side applications.
Why does my |kstreams| application use so much memory?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If your |kstreams| application is using a permanent state store, you must take
care to close each ``Iterator`` after you're done using it, to reclaim
resouces. Not closing an ``Iterator`` can lead to OOM issues.
What are the system dependencies of |kstreams|?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|kstreams| has no external dependencies on systems other than |ak|.
How do I migrate my older |kstreams| applications to the latest |cp| version?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To upgrade to the latest |cp| version, see the |kstreams|
:ref:`Upgrade Guide `.
Which versions of |ak| clusters are supported by |kstreams|?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The following versions are supported:
.. include:: includes/compatibilityMatrix.rst
What programming languages are supported?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The |kstreams| API is implemented in Java.
The :ref:`Developer Guide ` provides several example
applications written in Java 7 and Java 8+. Additionally, |kstreams| ships with
a :ref:`Scala wrapper ` on top of Java.
Also, you can write applications in other JVM-based languages such as Kotlin or
Clojure, but there's no native support for these languages.
Why is my application re-processing data from the beginning?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|ak| remembers your application by storing consumer :term:`offsets `
in a special topic.
Offsets are numbers assigned to messages by the |ak| broker(s) indicating the
order in which they arrived at the broker(s).
By remembering your application's last committed offset, your application is
only going to process newly arrived messages.
The configuration setting ``offsets.retention.minutes`` controls how long |ak|
remembers offsets in the special topic. The default value is 10,080 minutes (7 days).
The default value in older |ak| versions is 1,440 minutes (24 hours).
If your application is stopped, which means that it hasn't connected to the
:term:`Kafka cluster` for awhile, you could end up in a situation where you
start reprocessing data on application restart because the broker(s) have
deleted the offsets in the meantime.
The actual startup behavior depends on your ``auto.offest.reset`` configuration
that can be set to "earliest", "latest", or "none".
To avoid this problem, it's recommended to increase ``offsets.retention.minutes``
to an appropriately large value.
.. _streams_faq-scalability:
Scalability
-----------
.. _streams_faq-scalability-maximum_parallelism:
Maximum parallelism of my application? Maximum number of app instances I can run?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Slightly simplified, the maximum parallelism at which your application may run
is determined by the maximum number of :term:`partitions ` of the
input topic(s) the application is reading from in its
:ref:`processing topology `. The number of
input partitions determines how many
:ref:`stream tasks ` |kstreams| creates for the
application, and the number of stream tasks is the upper bound on the
application's parallelism.
For example, imagine your application is reading from an input topic that has
5 partitions. How many app instances can you run?
The short answer is that you can run up to 5 instances of this application,
because the application's maximum parallelism is 5. If you run more than 5 app
instances, the "excess" app instances launch successfully but remain idle. If
one of the busy instances goes down, one of the idle instances will resume the
former's work.
Here's the more detailed answer for this example.
- **5 stream tasks** are created, and each of these processes one of the input
partitions. It's actually the number of stream tasks that determines the
maximum parallelism of an application. The number of input partitions is
simply the main parameter from which the number of stream tasks is computed.
Now that you know the application's theoretical maximum parallelism, the next
question is how to actually run the application at its maximum parallelism? To
do so, you must ensure that all 5 stream tasks are running in parallel, which
means that there should be 5 :ref:`processing threads `,
with each thread executing one task.
There are several options at your disposal:
- **Option 1 is to scale horizontally (scaling out):** You run five
single-threaded instances of your application, each of which executes one
thread/task (i.e., ``num.stream.threads`` is set to ``1``; see
:ref:`optional configuration parameters `).
This option is used, for example, when you have many low-powered machines for
running the app instances.
- **Option 2 is to scale vertically (scaling up):** You run a single multi-threaded
instance of your application that executes all threads/tasks (i.e., ``num.stream.threads``
is set to ``5``). This option is useful, for example, when you have a very
powerful machine to run the app instance.
- **Option 3 combines options 1 and 2:** You run multiple instances of your
application, each of which is running multiple threads. This option is used,
for example, when your application runs at large scale. Here, you may even
chose to run multiple app instances per machine.
.. _streams_faq-processing:
Processing
----------
How should I retain my |kstreams| application's processing results from being cleaned up?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
As with any |ak| topics, you can set the ``log.retention.ms``,
``log.retention.minutes`` and ``log.retention.hours`` configs on the :term:`broker `
side for the sink topics to indicate how long processing results written to
those topics are retained. Brokers determine whether to cleanup old data by
comparing the record's associated timestamp with the current system time.
For the window or session states, their retention policy can also be set in
your code by using the ``Materialized#withRetention()`` method, which will
be honored by the |kstreams| library by comparing stored records' timestamps
with the current system time. Key-value state stores don't have retention
policy, as their updates are retained forever.
By default, |kstreams| applications don't modify the resultant record's
timestamp from its original source topics. In other words, if processing an
event record at some time in the past, for example, during a bootstrapping
phase that's processing accumulated old data, resulted in one or more records
as well as state updates, the resulting records or state updates would also
be reflected at the same time in the past as indicated by their associated
timestamps. If the timestamp is older than the retention threshold compared
with the current system time, they will be cleaned up soon after they've been
written to |ak| topics or state stores.
You can optionally let the |kstreams| application code to modify the resulting
record's timestamp in version 5.1.x and beyond (see
`5.1 Upgrade Guide `__
for details), but pay attention to its semantic implications: processing an
event as of some time would actually result in a result for a different time.
.. _streams_faq-processing-record_metadata:
Accessing record metadata such as topic, partition, and offset information?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Record metadata is accessible through the
:ref:`Processor API `. It's also
accessible indirectly through the :ref:`DSL `,
because of its
:ref:`Processor API integration `.
With the Processor API, you can access record metadata through a
``ProcessorContext``. You can store a reference to the context in an instance
field of your processor during ``Processor#init()``, and then query the
processor context within ``Processor#process()``, for example. This is also
true for ``Transformer``. The context is updated automatically to match the
record that's currently being processed, which means that methods such as
``ProcessorContext#partition()`` always return the current record's metadata.
Some caveats apply when calling the processor context within scheduled
``punctuate()`` function. See the Javadocs for details.
If you use the DSL combined with a custom ``Transformer``, for example, you
could transform an input record's value to also include partition and offset
metadata, and subsequent DSL operations such as ``map`` or ``filter`` could then
leverage this information.
Difference between ``map``, ``peek``, ``foreach`` in the DSL?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The three methods ``map``, ``peek``, and ``foreach`` are quite similar to each other. In some sense, ``peek`` and
``foreach`` are but variants of ``map``. An important reason for explicit support of all three is that it allows a
developer to clearly communicate the *intent* of an operation:
* ``map``: "When I use ``map``, my intent is to **modify** the input stream and **continue processing** the (modified)
output, i.e. I want to create an output stream that contains the modified data so that I can perform additional
operations on the modified data."
* ``foreach``: "When I use ``foreach``, my intent is to **cause some side effects** based on the (unmodified) input
stream and then I want to **terminate the processing**." That's why ``foreach`` returns ``void`` and thus is labeled
as a terminal operation.
* ``peek``: "When I use ``peek``, my intent is to **cause some side effects** based on the (unmodified) input data and
then I want to **continue the processing** of the (unmodified) input data."
See also the
`Java 8 documentation on java.util.Stream `__,
which supports map, peek, and foreach operations, too.
How to avoid data repartitioning if you know it's not required?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|kstreams| inserts a repartitioning step if a key-based operation like aggregation or join is preceded
by a key changing operation like ``selectKey()``, ``map``, or ``flatMap()``.
It is generally preferable to use ``mapValues()`` and ``flatMapValues()`` as they ensure
the key has not been modified and thus, the repartitioning step can be omitted.
Often, users want to get read-only access to the key while modifying the value.
For this case, you can call ``mapValues()`` with a ``ValueMapperWithKey`` instead of using the ``map()`` operator.
The ``XxxWithKey`` extension is available for multiple operators.
There are special cases for which you want to modify the key in a way that you know that the partitioning
is preserved and thus repartitioning is actually not required.
For these cases, it's sometimes possible, to apply the key changing operation *after* the aggregation
to avoid the repartitioning step.
You can apply this strategy if the original and modified key result in the same "groups of data".
Finally, you can always fall back to the Processor API and do a custom aggregation via ``process()``,
``transform()`` or ``transformValues()``.
Those operations do not automatically trigger a repartitioning step even if the key might have been
modified in a previous operation.
Serdes ``config`` method
^^^^^^^^^^^^^^^^^^^^^^^^
If you implement a custom Serde and specify this ``Serde`` in the config properties, and if this class implements
``org.apache.kafka.common.Configurable``, ``Serde#configure(...)`` will be called automatically and your code should
forward this call to the corresponding serializer and deserializer. The serdes that are provided by the library implement
this pattern already.
If you manually call ``new`` to create a Serde and pass in this Serde via a method call, you must manually call ``configure(...)``
for the library and custom Serdes.
.. tip:: Manually call ``configure()`` immediately after you have created the Serde object so that this step is not forgotten.
.. _streams_faq-processing-replace-rocksdb:
How can I replace RocksDB with a different store?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
There is no global setting to replace RocksDB.
However, you can set a custom store for each operator individually.
For example ``aggregate()`` (and :ref:`other operators with an internal store `)
has an overload that accepts a ``Materialized`` that enables specifying a ``KeyValueBytesStoreSupplier``.
You can use those overloads to provide a supplier that returns a :ref:`different store `
that replaces RocksDB for the corresponding operator.
Note: if you want to replace RocksDB in every operator, you need to provide a different ``XxxByteStoreSupplier`` for each operator.
Next to a RocksDB-based state store, the |kstreams| API also ships with an in-memory store.
This in-memory store will be backed by a changelog topic and is fully :ref:`fault-tolerant `.
You can get an in-memory supplier via:
.. sourcecode:: java
KeyValueBytesStoreSupplier inMemoryStoreSupplier = Stores.inMemoryKeyValueStore("myStoreName-mustBeUniqueForEachOperator");
Materialized materialized = Materialized.as(inMemoryStoreSupplier);
You can :ref:`disable fault-tolerance `
for the in-memory store by an additional call to ``Materialized#disableLogging()`` before passing it to ``aggregate()`` (or any other operator), too.
If you want to :ref:`plug-in any other third party store `,
you must implement the ``StoreSupplier`` and ``StateStore`` interfaces.
The ``StateStore`` interface is for the actual integration code of the third party store into |kstreams|.
The ``StoreSupplier`` interface returns a *new* instance of your implemented store on each call of ``get()``.
Note: if you integrate a third party store, it is your sole responsibility to take care of fault-tolerance.
There will be no store backup via a changelog topic out-of-the-box, but you will need to implement it by yourself if required.
Can I use NFS or network based storage to store my RocksDB?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Because disk performance has a direct impact on RocksDB's performance,
|confluent| recommends locating this directory on a local disk and discourages
configuring it to a remote network-based location like NFS or HDFS. This is
because writing to remote disks is usually slower. Also, high availability
isn't a requirement for in-flight state. If high disk throughput is required,
local SSD disks are preferred.
.. _streams_faq-failure_handling:
Failure and exception handling
------------------------------
.. _streams_faq-failure_handling-deserialization_errors:
Handling corrupted records and deserialization errors ("poison pill records")?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You might experience that some of the incoming records from your |kstreams| application are corrupted, or that the serializer/deserializer might be incorrect or buggy,
or cannot handle all record types. These types of records are referred to as “poison pills”.
You can use the ``org.apache.kafka.streams.errors.DeserializationExceptionHandler`` interface to customize how to handle such poison pill records.
The library also includes multiple implementations that represent the common handling patterns:
* :ref:`streams_faq-failure_handling-deserialization_errors-fail`
* :ref:`streams_faq-failure_handling-deserialization_errors-skipping`
* :ref:`streams_faq-failure_handling-deserialization_errors-quarantine`
* :ref:`streams_faq-failure_handling-deserialization_errors-serde`
.. _streams_faq-failure_handling-deserialization_errors-fail:
Option 1: Log the error and shut down the application
"""""""""""""""""""""""""""""""""""""""""""""""""""""
``LogAndFailExceptionHandler`` implements ``DeserializationExceptionHandler`` and is the default setting in |kstreams|.
It handles any encountered deserialization exceptions by logging the error and throwing a fatal error to stop your Streams application.
If your application is configured to use ``LogAndFailExceptionHandler``, then an instance of your application will fail-fast when it encounters a corrupted record by terminating itself.
.. sourcecode:: java
Properties streamsSettings = new Properties();
streamsSettings.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class.getName()
);
.. _streams_faq-failure_handling-deserialization_errors-skipping:
Option 2: Skip corrupted records and log the error
""""""""""""""""""""""""""""""""""""""""""""""""""
``LogAndContinueExceptionHandler`` is an alternative implementation of ``DeserializationExceptionHandler``.
Your application will log an ERROR message whenever it encounters a corrupted record, skip the processing of that record, and continue processing the next record.
The rate of skipped records is recorded in a processor-node level metric named ``skippedDueToDeserializationError``.
For a full list of metrics that you can use for monitoring and alerting, see the :ref:`documentation `).
.. sourcecode:: java
Properties streamsSettings = new Properties();
streamsSettings.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class.getName()
);
.. _streams_faq-failure_handling-deserialization_errors-quarantine:
Option 3: Quarantine corrupted records (dead letter queue)
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
You can provide your own ``DeserializationExceptionHandler`` implementation.
For example, you can choose to forward corrupt records into a quarantine topic (think: a "dead letter queue") for further processing.
To do this, use the :ref:`Producer API ` to write a corrupted record directly to the quarantine topic.
The drawback of this approach is that "manual" writes are side effects that are invisible to the |kstreams| runtime library,
so they do not benefit from the end-to-end processing guarantees of the Streams API.
Code example:
.. sourcecode:: java
public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
KafkaProducer dlqProducer;
String dlqTopic;
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord record,
final Exception exception) {
log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
dlqProducer.send(new ProducerRecord<>(dlqTopic, null, record.timestamp(), record.key(), record.value()));
return DeserializationHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map configs) {
dlqProducer = .. // get a producer from the configs map
dlqTopic = .. // get the topic name from the configs map
}
}
Properties streamsSettings = new Properties();
streamsSettings.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
SendToDeadLetterQueueExceptionHandler.class.getName()
);
.. _streams_faq-failure_handling-deserialization_errors-serde:
Option 4: Interpret corrupted records as a sentinel value
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""
You can also :ref:`implement a custom serde ` to handle corrupt records internally rather than relying on ``DeserializationExceptionHandler`` settings.
For example, you can implement a ``org.apache.kafka.common.serialization.Deserializer`` that, instead of throwing an exception, returns a "special-purpose" record that acts as sentinel record of your choice (e.g. null).
This allows downstream processors of your application to recognize and handle such sentinel records accordingly whenever deserialization fails.
Code example:
.. sourcecode:: java
public class ExceptionHandlingDeserializer implements Deserializer {
public void configure(Map configs, boolean isKey) {
// nothing to do
}
public MyObject deserialize(String topic, byte[] data) {
try {
// Attempt deserialization
MyObject deserializedValue = deserialize(topic, data)
// Ok, the record is valid (not corrupted).
return deserializedValue;
} catch (SerializationException e) {
log.warn("Exception caught during Deserialization: {}", e.getMessage());
// return the sentinel record upon corrupted data
return null;
}
}
public void close() {
// nothing to do
}
}
.. important::
**Be careful when returning null**:
``null`` record values have special semantics for tables, where they are interpreted as tombstones that will cause the
deletion of record keys from the table.
Once your custom serde is implemented, you can either
:ref:`configure your application ` to use it as the default key and/or value
serde, or use it explicitly when calling API methods such as ``KStream#to()``, or both.
Sending corrupt records to a quarantine topic or dead letter queue?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
See :ref:`streams_faq-failure_handling-deserialization_errors-quarantine` as described in
:ref:`streams_faq-failure_handling-deserialization_errors`.
.. _streams_faq-interactive_queries:
Interactive Queries
-------------------
Handling InvalidStateStoreException: "the state store may have migrated to another instance"?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
When attempting to access a local state store, you may run into an error such as the following:
.. codewithvars:: bash
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-key-value-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)
Typically this happens for two reasons:
* The local ``KafkaStreams`` instance is not yet ready (i.e., not yet in runtime state ``RUNNING``, see
:ref:`Run-time Status Information `) and thus its local state stores cannot be
queried yet.
* The local ``KafkaStreams`` instance is ready (e.g. in runtime state ``RUNNING``), but the particular state store was
just migrated to another instance behind the scenes. This may notably happen during the startup phase of a distributed
application or when you are adding/removing application instances.
There are a couple of ways to prevent running into ``InvalidStateStoreException``, but keep in mind that, in any case, you
are working with information that is valid only for a particular point in time -- state stores may be migrated at any time
behind the scenes, so your application should accept the fact that access to local state stores will come and go.
The simplest approach is to guard against ``InvalidStateStoreException`` when calling ``KafkaStreams#store()``:
.. sourcecode:: java
// Example: Wait until the store of type T is queryable. When it is, return a reference to the store.
public static T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
}
An end-to-end example is available at
:cp-examples:`ValidateStateWithInteractiveQueriesLambdaIntegrationTest|src/test/java/io/confluent/examples/streams/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java`.
.. _streams_faq-security:
Security
--------
Application fails when running against a secured |ak| cluster?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
When your application processes data from a secured |ak| cluster, you may run into error messages such as:
.. codewithvars:: bash
> Could not create internal topics: Could not create topic: due to Cluster authorization failed”
Make sure that the principal running your application has the ACL ``--cluster --operation Create`` set so that the
application has the permissions to create :ref:`internal topics `.
Troubleshooting and debugging
-----------------------------
Easier to interpret Java stacktraces?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
When you write |kstreams| applications, you often create chains of method calls such as the following:
.. sourcecode:: java
// Code all on one line. Unfortunately, this is bad practice when it comes to stacktraces.
myStream.map(...).filter(...).groupByKey(...).count(...);
Now if your code happens to trigger a runtime error, the Java stacktrace may not be very helpful
because the JVM provides only line-number information about where an error occurred ("NullPointerException at line
123"). So, to pick the example above, you may deduce that *some* operation in the map/filter/countByKey chain failed,
but the stacktrace will not tell you where exactly in the affected line.
A simple trick is to split your method chains across multiple lines, so that the row number returned by the stacktrace
will more easily identify the actual culprit:
.. sourcecode:: java
// Split the same code across multiple lines to benefit from more actionable stacktraces.
myStream
.map(...)
.filter(...)
.groupByKey(...)
.count(...);
Visualizing topologies?
^^^^^^^^^^^^^^^^^^^^^^^
You can visualize a topology of a |kstreams| application by accessing its ``TopologyDescription``.
A ``TopologyDescription`` contains *static* information (i.e., not runtime information) about the processing graph,
i.e., all nodes, (global) stores, and how nodes or stores are connected to each other.
For each node detailed information about node-name, input/output topics (for sources and sinks) etc are available.
Additionally, nodes are grouped in sub-topology (i.e., groups of nodes that are only connected via topics).
.. sourcecode:: java
// for DSL
StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
// for Processor API
Topology topology = new Topology();
TopologyDescription description = topology.describe();
// Get sub-topologies
Set subtopologies = description.subtopologies();
// You can also get all nodes of a sub-topology (can be a `Source`, `Processor`, or `Sink`)
// All nodes have a name and a set of predecessors as well as successors
// Source and Sink also have input/output topic information
// Processor can have stores attached
// Each subtopology has a unique ID
Subtopology subtopology = ...
Set nodes = subtopology.nodes();
// access node informaton...
// You can also get information about global stores
// a GlobalStore has a source topic and an `Processor`
Set globalStores = description.globalStores();
// access global store informaton...
// Or simply print all information at once
System.out.println(description);
Printing the ``TopologyDescription`` for :cp-examples:`WordCountLambdaExample|src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java`
would look as follows:
.. codewithvars:: bash
Sub-topologies:
Sub-topology: 0
Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
Source: KSTREAM-SOURCE-0000000000(topics: inputTopic) --> KSTREAM-FLATMAPVALUES-0000000001
Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003
Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
Sink: KSTREAM-SINK-0000000008(topic: outputTopic) <-- KTABLE-TOSTREAM-0000000007
Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
Global Stores:
none
Older releases
""""""""""""""
For releases prior to Confluent Platform 4.0.x, you can visualise the internal topology of a |kstreams| application by calling the ``KafkaStreams#toString()`` method:
.. sourcecode:: java
KafkaStreams streams = new KafkaStreams(topology, config);
// Start the Kafka Streams threads
streams.start();
// Print the internal topology to stdout
System.out.println(streams.toString());
An example topology for :cp-examples:`WordCountLambdaExample|src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java`
that is output from this call is shown below.
All sources, sinks, and their backing topics, as well as all intermediate nodes and their state stores are collected and printed:
.. codewithvars:: bash
KafkaStreams processID: 5fe8281e-d756-42ec-8a92-111e4af3d4ca
StreamsThread appId: wordcount-lambda-integration-test
StreamsThread clientId: wordcount-lambda-integration-test-5fe8281e-d756-42ec-8a92-111e4af3d4ca
StreamsThread threadId: wordcount-lambda-integration-test-5fe8281e-d756-42ec-8a92-111e4af3d4ca-StreamThread-1
Active tasks:
Running:
StreamsTask taskId: 0_0
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [inputTopic]
children: [KSTREAM-FLATMAPVALUES-0000000001]
KSTREAM-FLATMAPVALUES-0000000001:
children: [KSTREAM-KEY-SELECT-0000000002]
KSTREAM-KEY-SELECT-0000000002:
children: [KSTREAM-FILTER-0000000005]
KSTREAM-FILTER-0000000005:
children: [KSTREAM-SINK-0000000004]
KSTREAM-SINK-0000000004:
topic: wordcount-lambda-integration-test-Counts-repartition
Partitions [inputTopic-0]
StreamsTask taskId: 1_0
ProcessorTopology:
KSTREAM-SOURCE-0000000006:
topics: [wordcount-lambda-integration-test-Counts-repartition]
children: [KSTREAM-AGGREGATE-0000000003]
KSTREAM-AGGREGATE-0000000003:
states: [Counts]
children: [KTABLE-TOSTREAM-0000000007]
KTABLE-TOSTREAM-0000000007:
children: [KSTREAM-SINK-0000000008]
KSTREAM-SINK-0000000008:
topic: outputTopic
Partitions [wordcount-lambda-integration-test-Counts-repartition-0]
Suspended:
Restoring:
New:
Standby tasks:
Running:
Suspended:
Restoring:
New:
Note that ``KafkaStreams#toString()`` returns more information than ``TopologyDescription``, for example,
the tasks currently assigned to each of the running stream threads. Starting in 4.0.x such runtime information is retrievable via
the ``KafkaStreams#localThreadsMetadata()`` API, and as a result ``KafkaStreams#toString()`` is deprecated.
Inspecting streams and tables?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To inspect the records of a stream or a table, you can call the ``KStream#print()`` method.
For a ``KTable`` you can inspect changes to it by getting a ``KTable``'s changelog stream via ``KTable#toStream()``.
You can use ``print()`` to print the elements to ``STDOUT`` as shown below
or you can write into a file via ``Printed.toFile("fileName")``.
Here is an example that uses ``KStream#print(Printed.toSysOut())``:
.. sourcecode:: java
import java.util.concurrent.TimeUnit;
KStream left = ...;
KStream right = ...;
// Java 8+ example, using lambda expressions
KStream joined = left
.join(right,
(leftValue, rightValue) -> leftValue + " --> " + rightValue, /* ValueJoiner */
JoinWindows.of(Duration.ofMinutes(5)),
Joined.with(
Serdes.String(), /* key */
Serdes.Long(), /* left value */
Serdes.Long())) /* right value */
.print(Printed.toSysOut());
The output would be the records after the join; e.g., if we were joining two records with the same key ``K`` and values
``V1`` and ``V2``, then what is printed on the console would be ``K, V1 --> V2`` as shown for some sample data below:
.. codewithvars:: bash
alice, 5 --> 7
bob, 234 --> 19
charlie, 9 --> 10
If you want to inspect the content of a ``KTable``'s internal store, you can use :ref:`streams_developer-guide_interactive-queries`.
Invalid Timestamp Exception
^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you get an exception similar to the one shown below, there are multiple possible causes:
.. codewithvars:: bash
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Input record {...} has invalid (negative) timestamp. \
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, \
or because the input topic was created before upgrading the Kafka cluster to 0.10+. \
Use a different TimestampExtractor to process this data.
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:62)
This error means that the timestamp extractor of your |kstreams| application failed to extract a valid timestamp from a record.
Typically, this points to a problem with the record (e.g., the record does not contain a timestamp at all),
but it could also indicate a problem or bug in the timestamp extractor used by the application.
When does a record not contain a valid timestamp?
"""""""""""""""""""""""""""""""""""""""""""""""""
- If you are using the default ``FailOnInvalidTimestamp`` timestamp extractor, it
is most likely that your records do not carry an embedded timestamp (embedded
record timestamps were introduced in |ak|'s message format in |ak| ``0.10``).
This might happen, if for example, you consume a topic that is written by old
|ak| producer clients (i.e., version ``0.9`` or earlier) or by third-party
producer clients. Another situation where this may happen is after upgrading
your |ak| cluster from ``0.8`` or ``0.9`` to ``0.10``, where all the data that
was generated with ``0.8`` or ``0.9`` does not include the ``0.10`` message
timestamps. You can consider using an alternative timestamp extractor like
``UsePartitionTimeOnInvalidTimestamp`` or ``LogAndSkipOnInvalidTimestamp``, both
of which handle negative timestamps more gracefully. See :ref:`Kafka Streams
Developer Guide: Timestamp Extractor
` for details.
- If you are using a custom timestamp extractor, make sure that your extractor is
properly handling invalid (negative) timestamps, where "properly" depends on the
semantics of your application. For example, you can return a default timestamp
or an estimated timestamp if you cannot extract a valid timestamp directly from
the record.
- You can also switch from event-time processing to processing-time semantics via ``WallclockTimestampExtractor``;
whether such a fallback is an appropriate response to this situation depends on your use case.
However, as a first step you should identify and fix the root cause for why such problematic records were written to
|ak| in the first place. In a second step you may consider applying workarounds as described above when dealing with
such records. Another option is to regenerate the records with correct timestamps and write them to a new |ak| topic.
When the timestamp extractor causes the problem?
""""""""""""""""""""""""""""""""""""""""""""""""
In this situation you should debug and fix the erroneous extractor.
If the extractor is built into |ak|, report the bug to the |ak| developer mailing list at dev@kafka.apache.org (see instructions at https://kafka.apache.org/contact);
in the meantime, you may write a custom timestamp extractor that fixes the problem and configure your application to use that extractor for the time being.
Why do I get an ``IllegalStateException`` when accessing record metadata?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you attach a new ``Processor/Transformer/ValueTransformer`` to your topology using a corresponding supplier,
you need to make sure that the supplier returns a *new* instance each time ``get()`` is called.
If you return the same object, a single ``Processor/Transformer/ValueTransformer`` would be shared over multiple tasks
resulting in an ``IllegalStateException`` with error message
``"This should not happen as topic() should only be called while a record is processed"``
(depending on the method you are calling it could also be ``partition()``, ``offset()``, or ``timestamp()`` instead of ``topic()``).
Why is ``punctuate()`` not called?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you scheduled the ``punctuate()`` function based on *event-time* (i.e. ``PunctuationType.STREAM_TIME``), then this function is triggered not based on *wall-clock-time* but purely data-driven (i.e., driven by internally tracked event-time).
The event-time is derived from the extracted record timestamps provided by the used ``TimestampExtractor``.
For example, let's assume you scheduled a ``punctuate()`` function every 10 seconds based on ``PunctuationType.STREAM_TIME``.
If you were to process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record),
then ``punctuate()`` would be called 6 times -- regardless of the time required to actually process those records;
i.e., ``punctuate()`` would be called 6 times no matter whether processing these 60 records would take a second, a minute, or an hour.
.. attention::
Event-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
If at least one partition does not have any new data available, event-time will not be advanced and thus ``punctuate()`` will not be triggered.
This behavior is independent of the configured timestamp extractor, i.e., using ``WallclockTimestampExtractor`` does not enable wall-clock triggering of ``punctuate()``.
Scala: compile error "no type parameter", "Java-defined trait is invariant in type T"
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
When using Scala without the :ref:`Scala wrapper `,
you may occasionally run into a Scala compiler error similar to the following: ::
Error:(156, 8) no type parameters for method leftJoin:
(x$1: org.apache.kafka.streams.kstream.KTable[String,VT],
x$2: org.apache.kafka.streams.kstream.ValueJoiner[_ >: Long, _ >: VT, _ <: VR])
org.apache.kafka.streams.kstream.KStream[String,VR]
exist so that it can be applied to arguments
(org.apache.kafka.streams.kstream.KTable[String,String],
org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)] with Serializable)
--- because ---
argument expression's type is not compatible with formal parameter type;
found : org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)] with Serializable
required: org.apache.kafka.streams.kstream.ValueJoiner[_ >: Long, _ >: ?VT, _ <: ?VR]
Note: Long <: Any (and org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)]
with Serializable <: org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)]),
but Java-defined trait ValueJoiner is invariant in type V1.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
Note: String <: Any (and org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)]
with Serializable <: org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)]),
but Java-defined trait ValueJoiner is invariant in type V2.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
The root cause of this problem is Scala-Java interoperability -- the |kstreams| API is implemented in Java, but your
application is written in Scala. Notably, this problem is caused by how the type systems of Java and Scala interact.
Generic wildcards in Java, for example, are often causing such Scala issues.
It is recommended to use the :ref:`Scala wrapper ` to avoid this issue.
If this is not possible, you must declare types explicitly in your Scala application in order for the code to
compile. For example, you may need to break a single statement that chains multiple DSL operations into multiple
statements, where each statement explicitly declares the respective return types.
The :cp-examples:`StreamToTableJoinScalaIntegrationTest|src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala`
demonstrates how the types of return variables are explicitly declared.
How can I convert a KStream to a KTable without an aggregation step?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you want to convert a derived ``KStream`` (i.e., a ``KSTream`` that is not read from a |ak| topic) into a ``KTable``
you have three options.
Option 1 (recommended if on 5.5.x or newer): Use ``KStream.toTable()``
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
This new toTable() API was introduced in CP 5.5 to simplify the steps. It will completely transform an event stream
into a changelog stream, which means null-values shall still be serialized as a delete and every record
will be applied to the KTable instance. The resulted KTable may only be materialized if the overloaded function ``toTable(Materialized)``
is used when the topology optimization feature is turned on.
Option 2: Write KStream to |ak|, read back as KTable
"""""""""""""""""""""""""""""""""""""""""""""""""""""
You can write the ``KStream`` into a |ak| topic and read it back as a ``KTable``. As per our general recommendation,
you should manually pre-create this topic to ensure it has the correct number of partitions, topic settings, and so on.
.. sourcecode:: java
StreamsBuilder builder = new StreamsBuilder();
KStream stream = ...; // some computation that creates the derived KStream
// You should manually create the dummy topic before starting your application.
//
// Also, because you want to read the topic back as a KTable, you might want to enable
// log compaction for this topic to align the topic's cleanup policy with KTable semantics.
stream.to("dummy-topic", Produced.with(Serdes.String(), Serdes.Long()));
KTable table = builder.table("dummy-topic", Consumed.with(Serdes.String(), Serdes.Long()));
This is the simplest approach with regard to the code. However, it has the disadvantages that
(a) you need to manage an additional topic and that
(b) it results in additional network traffic because data is written to and re-read from |ak|.
Option 3: Perform a dummy aggregation
"""""""""""""""""""""""""""""""""""""
As an alternative to option 2, you can choose to create a dummy aggregation step:
.. sourcecode:: java
StreamsBuilder builder = new StreamsBuilder();
KStream stream = ...; // some computation that creates the derived KStream
// Java 8+ example, using lambda expressions
KTable table = stream.groupByKey().reduce(
(aggValue, newValue) -> newValue);
// Java 7 example
KTable table = stream.groupByKey().reduce(
new Reducer() {
@Override
public Long apply(Long aggValue, Long newValue) {
return newValue;
}
});
This approach is somewhat more complex with regard to the code compared to option 2 but has the advantage that
(a) no manual topic management is required and
(b) re-reading the data from |ak| is not necessary.
In option 3, |kstreams| will create an internal changelog topic to back up the KTable for fault tolerance.
Thus, both approaches require some additional storage in |ak| and result in additional network traffic.
Overall, it's a trade-off between slightly more complex code in option 3 versus manual topic management in
option 2.
RocksDB behavior in 1-core environments
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
There is a known issue with RocksDB when running in environments with just one
CPU core. In some scenarios, the symptom is that the application's performance
might be very slow or unresponsive.
The workaround is to set a particular RocksDB configuration using
the :ref:`RocksDB config setter ` as follows:
.. sourcecode:: java
public static class CustomRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(final String storeName, final Options options, final Map configs) {
// Workaround: We must ensure that the parallelism is set to >= 2.
int compactionParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 2);
// Set number of compaction threads (but not flush threads).
options.setIncreaseParallelism(compactionParallelism);
}
}
Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
.. include:: ../.hidden/docs-common/home/includes/ak-share.rst