Important Warning

Update July 2016: This Tech Preview documentation from March 2016 is outdated and deprecated. Please use the latest Confluent Platform documentation instead.

Architecture

Kafka Streams simplifies application development by building on the Kafka producer and consumer libraries and leveraging the native capabilities in Kafka to offer data parallelism, distributed coordination, fault tolerance, and operational simplicity. In this section, we describe how Kafka Streams works underneath the covers.

../_images/streams-architecture-overview.jpg

Logical view of a Kafka Streams application, containing multiple stream threads with each hosting multiple stream tasks.

The picture above shows the anatomy of an application that uses the Kafka Streams library. Let’s walk through some details.

Processor Topology

A processor topology or simply topology defines the stream processing computational logic for your application, i.e., how input data is transformed into output data. A topology is a graph of stream processors (nodes) that are connected by streams (edges). There are two special processors in the topology:

  • Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forward them to its down-stream processors.
  • Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic partition.

A stream processing application – i.e., your application – may define one or more such topologies, though typically it defines only one. Developers can define topologies either via the low-level Processor API or via the Kafka Streams DSL, which builds on top of the former.

../_images/streams-architecture-topology.jpg

A processor topology is merely a logical abstraction for your stream processing code. At runtime, the logical topology is instantiated and replicated inside the application for parallel processing (see Parallelism Model).

Parallelism Model

Stream Partitions and Tasks

Kafka Streams uses the concepts of partitions and tasks as logical units of its parallelism model. There are close links between Kafka Streams and Kafka in the context of parallelism:

  • Each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition.
  • A data record in the stream maps to a Kafka message from that topic.
  • The keys of data records determine the partitioning of data in both Kafka and Kafka Streams, i.e., how data is routed to specific partitions within topics.

An application’s processor topology is scaled by breaking it into multiple tasks. More specifically, Kafka Streams creates a fixed number of tasks based on the input stream partitions for the application, with each task assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of partitions to tasks never changes so that each task is a fixed unit of parallelism of the application. Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process messages one-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention.

Note

Sub-topologies aka topology sub-graphs: If there are multiple processor topologies specified in a Kafka Streams application, each task will only instantiate one of the topologies for processing. In addition, a single processor topology may be decomposed into independent sub-topologies (sub-graphs) as long as sub-topologies are not connected by any streams in the topology; here, each task may instantiate only one such sub-topology for processing. This further scales out the computational workload to multiple tasks.

../_images/streams-architecture-tasks.jpg

Two tasks each assigned with one partition of the input streams

It is important to understand that Kafka Streams is not a resource manager, but a library that “runs” anywhere its stream processing application runs. Multiple instances of the application are executed either on the same machine, or spread across multiple machines and tasks can be distributed automatically by the library to those running application instances. The assignment of partitions to tasks never changes; if an application instance fails, all its assigned tasks will be restarted on other instances and continue to consume from the same stream partitions.

Threading Model

Kafka Streams allows the user to configure the number of threads that the library can use to parallelize processing within an application instance. Each thread can execute one or more tasks with their processor topologies independently.

../_images/streams-architecture-threads.jpg

One stream thread running two stream tasks

Starting more stream threads or more instances of the application merely amounts to replicating the topology and having it process a different subset of Kafka partitions, effectively parallelizing processing. It is worth noting that there is no shared state amongst the threads, so no inter-thread coordination is necessary. This makes it very simple to run topologies in parallel across the application instances and threads. The assignment of Kafka topic partitions amongst the various stream threads is transparently handled by Kafka Streams leveraging Kafka’s server-side coordination functionality.

As we described above, scaling your stream processing application with Kafka Streams is easy: you merely need to start additional instances of your application, and Kafka Streams takes care of distributing partitions amongst tasks that run in the application instances. You can start as many threads of the application as there are input Kafka topic partitions so that, across all running instances of an application, every thread (or rather, the tasks it runs) has at least one input partition to process.

Example

To understand the parallelism model that Kafka Streams offers, let’s walk through an example.

Let’s say we have a Kafka Streams application that consumes from two topics, A and B, with each having 3 partitions. We start running the application on a single machine with the number of threads configured to 2, resulting in two stream threads instance1-thread1 and instance1-thread2. If we assume that each thread executes a single stream task that takes one partition from each of the input topics, Kafka’s server-side coordination will then assign partitions of topic A and topic B evenly to these two threads, i.e.,:

../_images/streams-architecture-example-01.jpg
### On first machine
# `topicA-p1`, for example, denotes partition 1 of topic A
instance1-thread1 => {topicA-p1, topicA-p2, topicB-p1}
instance1-thread2 => {topicA-p3, topicB-p2, topicB-p3}

Now imagine we want to scale out this application later on, perhaps because the data volume has increased significantly. We decide to start running the same application but with only a single thread on another, different machine. A new thread instance2-thread1 will be created to join in the Kafka coordination, and partitions will be re-assigned similar to:

../_images/streams-architecture-example-02.jpg
### On first, initial machine
instance1-thread1 => {topicA-p1, topicB-p1}
instance1-thread2 => {topicA-p3, topicB-p3}

### On new, second machine
instance2-thread1 => {topicA-p2, topicB-p2}

When the re-assignment occurs, some partitions (and hence their corresponding tasks) will be “migrated” from the existing threads to the newly added threads. As a result, Kafka Streams has effectively rebalanced the workload among instances of the application at the granularity of Kafka topic partitions.

What if we wanted to add even more instances of the same application? We can do so until a certain point, which is when there are the number of tasks is equal to the number of available input partitions to read from. At this point, before it would make sense to start further application instances, we would first need to increase the number of partitions for topics A and B (otherwise, we would overprovision the application, ending up with idle tasks and threads).

State

Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations. Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing. These state stores can either be a RocksDB database, an in-memory hashmap, or another convenient data structure. Kafka Streams offers fault-tolerance and automatic recovery for local state stores.

../_images/streams-architecture-states.jpg

Two stream tasks with their dedicated local state stores

Fault Tolerance

Kafka Streams builds on fault-tolerance capabilities integrated natively within Kafka. Kafka partitions are highly available and replicated; so when stream data is persisted to Kafka it is available even if the application fails and needs to reprocess it. Tasks in Kafka Streams leverage the fault-tolerance capability offered by the Kafka consumer client to handle failures. If a task runs on a machine that fails, Kafka Streams automatically restarts the task in one of the remaining running instances of the application.

In addition, Kafka Streams makes sure that the local state stores are robust to failures, too. It follows a similar approach as Apache Samza and, for each state store, maintains a replicated changelog Kafka topic in which it tracks any state updates. These changelog topics are partitioned as well so that each local state store instance, and hence the task accessing the store, has its own dedicated changelog topic partition. Log compaction is enabled on the changelog topics so that old data can be purged safely to prevent the topics from growing indefinitely. If tasks run on a machine that fails and are restarted on another machine, Kafka Streams guarantees to restore their associated state stores to the content before the failure by replaying the corresponding changelog topics prior to resuming the processing on the newly started tasks. As a result, failure handling is completely transparent to the end user.

Tip

Optimization: In order to minimize the time for the state restoration and hence the cost of task (re)initialization, users can configure their applications to have shadow copies of local states. When a task migration happens, Kafka Streams then attempts to assign a task to where a standby replica exists in order to minimize the task initialization cost. See setting num.standby.replicas at Optional configuration parameters in the Developer Guide.

Processing Guarantees

Kafka Streams currently supports at-least-once processing guarantees in the presence of failure. This means that if your stream processing application fails, no data records are lost and fail to be processed, but some data records may be re-read and therefore reprocessed.

It depends on the specific use case whether at-least-once processing guarantees are acceptable or whether you may need exactly-once processing.

For many processing use cases, at-least-once processing turns out to be perfectly acceptable: Generally, as long as the effect of processing a data record is idempotent, it is safe for the same data record to be processed more than once. Also, some use cases can tolerate processing data records more than once even if the processing is not idempotent. For example, imagine you are counting hits by IP address to auto-generate blacklists that help with mitigating DDoS attacks against your infrastructure; here, some overcounting is tolerable because hits from malicious IP addresses involved in an attack will vastly outnumber hits from benign IP addresses anyway.

In general however, for non-idempotent operations such as counting, at-least-once processing guarantees may yield incorrect results. If a Kafka Streams application fails and restarts, it may double-count some data records that were processed shortly before the failure. We are planning to address this limitation and will support stronger guarantees and exactly-once processing semantics in a future release of Kafka Streams.

Flow Control with Timestamps

Kafka Streams tries to regulate progress of streams by the timestamps of data records so that all source streams are roughly synchronized in terms of time. This is important especially when an application is processing multiple streams (i.e., Kafka topics) with a large amount of historical data. For example, a user may want to reprocess past data in case the business logic of an application was changed significantly, e.g. to fix a bug in an analytics algorithm. Now it is easy to retrieve a large amount of past data from Kafka; however, without proper flow control, the processing of the data across topic partitions may become out-of-sync and produce incorrect results.

As mentioned in the Concepts section, each data record in Kafka Streams is associated with a timestamp. Stream tasks determine the next assigned partition to process among all its input streams based on the timestamps of the records in its stream record buffer. However, Kafka Streams does not reorder records within a single stream for processing since reordering would break the delivery semantics of Kafka and make it difficult to recover in the face of failure. This flow control is of course best-effort because it is not always possible to strictly enforce execution order across streams by record timestamp; in fact, in order to enforce strict execution ordering, one has to either wait until the system gets all the records from all streams (which may be quite infeasible in practice) or to inject additional information about timestamp boundary or heuristic estimates such as MillWheel’s watermarks.

Attention

In the upcoming Kafka 0.10.0 release Kafka Streams’ default timestamp extractor will read the timestamp field embedded in the new Kafka message protocol, and hence will be able out of the box to enforce processing ordering based on event time.

Backpressure

Kafka Streams does not use a backpressure mechanism because it does not need one. Using a depth-first processing strategy, each record consumed from Kafka will go through the whole processor (sub-)topology for processing and for (possibly) being written back to Kafka before the next record will be processed. As a result, no records are being buffered in-memory between two connected stream processors. Also, Kafka Streams leverages Kafka’s consumer client behind the scenes, which works with a pull-based messaging model that allows downstream processors to control the pace at which incoming data records are being read.

The same applies to the case of a processor topology that contains multiple independent sub-topologies, which will be processed independently from each other (cf. Parallelism Model). For example, the following code defines a topology with two independent sub-topologies:

stream1.to("my-topic");
stream2 = builder.stream("my-topic");

Any data exchange between sub-topologies will happen through Kafka, i.e. there is no direct data exchange (in the example above, data would be exchanged through the topic “my-topic”). For this reason there is no need for a backpressure mechanism in this scenario, too.