.. title:: Kafka Streams Basics for Confluent Platform
.. meta::
:description: Learn about the key concepts underlying Kafka Streams
.. _streams_concepts:
|kstreams| Basics for |cp|
==========================
In this section we summarize the key concepts of |kstreams|. For more detailed information refer to
:ref:`streams_architecture` and the :ref:`streams_developer-guide`. You may also be interested in the `Kafka Streams 101 `__ course.
|ak| 101
---------
|kstreams| is, by deliberate design, tightly integrated with |ak-tm|: many capabilities of |kstreams| such
as its :ref:`stateful processing features `, its
:ref:`fault tolerance `, and its
:ref:`processing guarantees ` are built on top of functionality provided by
|ak-tm|'s storage and messaging layer. It is therefore important to familiarize yourself with the key concepts of |ak|,
notably the sections `Getting Started `__ and
:kafka-common:`Design|design/index.html`.
In particular you should understand:
* **The who's who:** |ak| distinguishes **producers**, **consumers**, and **brokers**. In short, producers publish
data to |ak| brokers, and consumers read published data from |ak| brokers. Producers and consumers are totally
decoupled, and both run outside the |ak| brokers in the perimeter of a |ak| cluster. A |ak| **cluster** consists
of one or more brokers. An application that uses the |kstreams| API acts as both a producer and a consumer.
* **The data:** Data is stored in **topics**. The topic is the most important abstraction provided by |ak|: it is a
category or feed name to which data is published by producers. Every topic in |ak| is split into one or more
**partitions**. |ak| partitions data for storing, transporting, and replicating it. |kstreams| partitions data
for processing it. In both cases, this partitioning enables elasticity, scalability, high performance, and
fault tolerance.
* **Parallelism:** Partitions of |ak| topics, and especially their number for a given topic, are also the main factor
that determines the parallelism of |ak| with regards to reading and writing data. Because of the tight integration
with |ak|, the parallelism of an application that uses the |kstreams| API is primarily depending on |ak|'s
parallelism.
.. _streams_concepts_stream:
Stream
------
A **stream** is the most important abstraction provided by |kstreams|: it represents an unbounded, continuously
updating data set, where unbounded means "of unknown or of unlimited size". Just like a topic in |ak|, a stream in the
|kstreams| API consists of one or more stream partitions.
A **stream partition** is an, ordered, replayable, and fault-tolerant sequence of immutable data records, where a
**data record** is defined as a key-value pair.
Stream Processing Application
-----------------------------
A **stream processing application** is any program that makes use of the |kstreams| library. In practice, this means
it is probably "your" application. It may define its computational logic through one or more
:ref:`processor topologies `.
Your stream processing application doesn't run inside a broker. Instead,
it runs in a separate JVM instance, or in a separate cluster entirely.
.. figure:: images/streams-apps-not-running-in-brokers.png
:width: 400pt
:height: 225pt
:align: center
An **application instance** is any running instance or "copy" of your application.
Application instances are the primary means to
:ref:`elasticly scale and parallelize ` your application, and they also
contribute to making it :ref:`fault-tolerant `.
For example, you may need the power of ten machines to handle the incoming data load of your application; here, you
could opt to run ten instances of your application, one on each machine, and these instances would automatically
collaborate on the data processing -- even as
:ref:`new instances/machines are added or existing ones removed ` during
live operation.
.. figure:: images/scale-out-streams-app.png
:width: 400pt
:height: 225pt
:align: center
.. _streams_concepts_processor-topology:
Processor Topology
------------------
A **processor topology** or simply **topology** defines the computational logic of the data processing that needs to be performed by a stream processing application. A topology is a graph of stream processors (nodes) that are connected by streams (edges). Developers can define topologies either via the :ref:`low-level Processor API ` or via the :ref:`Kafka Streams DSL `, which builds on top of the former.
.. figure:: images/streams-concepts-topology.jpg
:width: 200pt
:height: 280pt
:align: center
The :ref:`Architecture ` documentation describes topologies in more detail.
.. _streams_concepts_processor:
Stream Processor
----------------
A **stream processor** is a node in the processor topology as shown in the diagram of section
:ref:`streams_concepts_processor-topology`. It represents a processing step in a topology, i.e. it is used to transform
data. Standard operations such as :ref:`map or filter `,
:ref:`joins `, and :ref:`aggregations ` are
examples of stream processors that are available in |kstreams| out of the box. A stream processor receives one input
record at a time from its upstream processors in the topology, applies its operation to it, and may subsequently
produce one or more output records to its downstream processors.
|kstreams| provides two APIs to define stream processors:
1. The :ref:`declarative, functional DSL ` is the recommended API for most users -- and
notably for starters -- because most data processing use cases can be expressed in just a few lines of DSL code.
Here, you typically use built-in operations such as ``map`` and ``filter``.
2. The :ref:`imperative, lower-level Processor API ` provides you with even more
flexibility than the DSL but at the expense of requiring more manual coding work. Here, you can define and connect
custom processors as well as directly interact with :ref:`state stores `.
.. _streams_concepts-stateful_processing:
Stateful Stream Processing
--------------------------
Some stream processing applications don't require state -- they are **stateless** -- which means the processing of a
message is independent from the processing of other messages. Examples are when you only need to transform one message
at a time, or filter out messages based on some condition.
In practice, however, most applications require state -- they are **stateful** -- in order to work correctly, and this
state must be managed in a :ref:`fault-tolerant manner `. Your application is
stateful whenever, for example, it needs to :ref:`join `,
:ref:`aggregate `, or :ref:`window `
its input data. |kstreams| provides your application with powerful, elastic, highly scalable, and fault-tolerant
stateful processing capabilities.
.. _streams_concepts_duality:
Duality of Streams and Tables
-----------------------------
When implementing stream processing use cases in practice, you typically need both **streams** and also **databases**.
An example use case that is very common in practice is an e-commerce application that enriches an incoming *stream* of
customer transactions with the latest customer information from a *database table*. In other words, streams are
everywhere, but databases are everywhere, too.
Any stream processing technology must therefore provide **first-class support for streams and tables**. |ak|'s
Streams API provides such functionality through its core abstractions for :ref:`streams ` and
:ref:`tables `, which we will talk about in a minute. Now, an interesting observation is that
there is actually a **close relationship between streams and tables**, the so-called
`stream-table duality `__.
And |ak| exploits this duality in many ways: for example, to make your applications
:ref:`elastic `, to support
:ref:`fault-tolerant stateful processing `, or to run
:ref:`streams_developer-guide_interactive-queries` against your application's latest processing
results. And, beyond its internal usage, the |kstreams| API also allows developers to exploit this duality in their
own applications.
Before we discuss concepts such as :ref:`aggregations ` in |kstreams| we must first
introduce **tables** in more detail, and talk about the aforementioned stream-table duality. Essentially, this duality
means that a stream can be viewed as a table, and a table can be viewed as a stream.
The following explanations are kept simple intentionally and skip the
discussion of compound keys, multisets, and so on.
A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may
look as follows:
.. figure:: images/streams-table-duality-01.jpg
:width: 150pt
:height: 96pt
The **stream-table duality** describes the close relationship between streams and tables.
* **Stream as Table:**
A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the
table. A stream is thus a table in disguise, and it can be easily turned into a "real" table by replaying the
changelog from beginning to end to reconstruct the table. Similarly, *aggregating* data records in a stream will
return a table. For example, we could compute the total number of pageviews by user from an input stream of
pageview events, and the result would be a table, with the table key being the user and the value being the
corresponding pageview count.
* **Table as Stream:**
A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's
data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a "real"
stream by iterating over each key-value entry in the table.
Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column
of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated
accordingly. Here, the state changes between different points in time -- and different revisions of the table -- can be
represented as a changelog stream (second column).
.. figure:: images/streams-table-duality-02.jpg
:width: 320pt
:height: 320pt
Because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):
.. figure:: images/streams-table-duality-03.jpg
:width: 460pt
:height: 399pt
The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within |kstreams|,
to replicate its so-called :ref:`state stores ` across machines for
:ref:`fault tolerance `. The stream-table duality is such an important concept
for stream processing applications in practice that |kstreams| models it explicitly via the
:ref:`KStream ` and :ref:`KTable ` abstractions, which we describe in
the next sections.
.. _streams_concepts_kstream:
KStream
-------
A **KStream** is an abstraction of a **record stream**, where each data record represents a self-contained
datum in the unbounded data set. Using the table analogy, data records in a record stream are always interpreted as
an "INSERT" -- think: adding more entries to an append-only ledger -- because no record replaces an existing row with
the same key. Examples are a credit card transaction, a page view event, or a server log entry.
Only the :ref:`Kafka Streams DSL ` has the notion
of a ``KStream``.
To illustrate, imagine the following two data records are being sent to the stream:
.. codewithvars:: bash
("alice", 1) --> ("alice", 3)
If your stream processing application were to sum the values per user, it would return ``4`` for ``alice``. Why? Because
the second data record would not be considered an update of the previous record. Compare this behavior of KStream to
:ref:`KTable ` below, which would return ``3`` for ``alice``.
.. _streams_concepts_ktable:
KTable
------
A **KTable** is an abstraction of a **changelog stream**, where each data record represents an update. More precisely,
the value in a data record is interpreted as an "UPDATE" of the last value for the same record key, if any (if a
corresponding key doesn't exist yet, the update will be considered an INSERT). Using the table analogy, a data record
in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is
overwritten. Also, ``null`` values are interpreted in a special way: a record with a ``null`` value represents a
"DELETE" or tombstone for the record's key.
Only the :ref:`Kafka Streams DSL ` has the notion of a ``KTable``.
To illustrate, let's imagine the following two data records are being sent to the stream:
.. codewithvars:: bash
("alice", 1) --> ("alice", 3)
If your stream processing application were to sum the values per user, it would return ``3`` for ``alice``. Why?
Because the second data record would be considered an update of the previous record. Compare this behavior of KTable
with the illustration for :ref:`KStream ` above, which would return ``4`` for ``alice``.
You have already seen an example of a changelog stream in the section :ref:`streams_concepts_duality`. Another example
are change data capture (CDC) records in the changelog of a relational database, representing which row in a database
table was inserted, updated, or deleted.
KTable also provides an ability to look up *current* values of data records by keys. This table-lookup functionality
is available through :ref:`join operations ` (see also
:ref:`Joining ` in the Developer Guide) as well as through
:ref:`streams_concepts_interactive-queries`.
For more information, see
`Kafka Streams 101 - KTable `__.
Effect of |ak| log compaction
-----------------------------
Another way of thinking about KStream and KTable is as follows: If you were to
store a KTable into a |ak| topic, you'd probably want to enable |ak|'s
:kafka-common:`log compaction|design/log_compaction.html` feature to save
storage space.
But it wouldn't be safe to enable log compaction in the case of a KStream,
because as soon as log compaction begins purging older data records of the
same key, it would break the semantics of the data. To pick up the
illustration example again, you'd suddenly get a ``3`` for ``alice``, instead
of a ``4``, because log compaction would have removed the ``("alice", 1)`` data
record. this means that log compaction is safe for a KTable (changelog stream)
but it is a mistake for a KStream (record stream).
.. _streams_concepts_globalktable:
GlobalKTable
------------
Like a :ref:`KTable `, a **GlobalKTable** is an abstraction of a **changelog stream**, where
each data record represents an update.
Only the :ref:`Kafka Streams DSL ` has the notion
of a ``GlobalKTable``.
A GlobalKTable differs from a KTable in the data that they are being populated with, i.e. which data from the underlying
|ak| topic is being read into the respective table.
Slightly simplified, imagine you have an input topic with 5 partitions. In your application, you want to read this
topic into a table. Also, you want to run your application across 5 application instances for
:ref:`maximum parallelism `.
* If you read the input topic into a **KTable**, then the "local" KTable instance of each application instance will be
populated with data **from only 1 partition** of the topic's 5 partitions.
* If you read the input topic into a **GlobalKTable**, then the local GlobalKTable instance of each application
instance will be populated with data **from all partitions of the topic**.
GlobalKTable provides the ability to look up *current* values of data records by keys.
This table-lookup functionality is available through :ref:`join operations `
(as described in :ref:`Joining ` in the Developer Guide) and
:ref:`streams_developer-guide_interactive-queries`.
Benefits of global tables:
* You can use global tables to "broadcast" information to all running
instances of your application.
* Global tables enable more convenient and efficient
:ref:`joins `.
* Global tables enable star joins.
* Global tables are more efficient when chaining multiple joins.
* When joining against a global table, the input data doesn't need to be
:ref:`co-partitioned `.
* Global tables support "foreign-key" lookups, which means that you can look up
data in the table not just by record key, but also by data in the record values.
In this case, the join always uses the table's primary key, and the
"foreign key" refers to the stream records. Unlike a stream-table join that
always calculates the join based on the stream-record key, a stream-globalKTable
join enables you to extract the join key directly from the stream record's value.
Drawbacks of global tables include:
* Increased local storage consumption compared to the (partitioned) KTable,
because the entire topic is tracked.
* Increased network and |ak| broker load compared to the (partitioned) KTable,
because the entire topic is read.
.. _streams_concepts_time:
Time
----
A critical aspect in stream processing is the notion of **time**, and how it is modeled and integrated. For
example, some operations such as :ref:`streams_concepts_windowing` are defined based on time boundaries.
|kstreams| supports the following notions of time:
event-time
^^^^^^^^^^
The point in time when an event or data record occurred (that is, was originally
created by the source). Achieving event-time semantics typically requires
embedding timestamps in the data records at the time a data record is being
produced.
* **Example:** If the event is a geo-location change reported by a GPS sensor in a car, then the associated event-time would be the time when the GPS sensor captured the location change.
processing-time
^^^^^^^^^^^^^^^
The point in time when the event or data record happens to be processed by the
stream processing application (that is, when the record is being consumed). The
processing-time may be milliseconds, hours, days, etc. later than the original
event-time.
* **Example:** Imagine an analytics application that reads and processes the geo-location data reported from car sensors to present it to a fleet management dashboard. Here, processing-time in the analytics application might be milliseconds or seconds (such as for real-time pipelines based on |ak| and |kstreams|) or hours (such as for batch pipelines based on Apache Hadoop or Apache Spark) after event-time.
ingestion-time
^^^^^^^^^^^^^^
The point in time when an event or data record is stored in a topic partition by
a |ak| broker. Ingestion-time is similar to *event-time*, as a timestamp gets
embedded in the data record itself. The difference is that the timestamp is
generated when the record is appended to the target topic by the |ak| broker,
not when the record is created at the source. Ingestion-time may approximate
event-time reasonably well if we assume that the time difference between
creation of the record and its ingestion into |ak| is sufficiently small, where
"sufficiently" depends on the specific use case. Thus, ingestion-time may be a
reasonable alternative for use cases where event-time semantics are not
possible, perhaps because the data producers don't embed timestamps (such as with older
versions of |ak|'s Java producer client) or the producer cannot assign
timestamps directly (for example, does not have access to a local clock).
stream-time
^^^^^^^^^^^
The maximum timestamp seen over all processed records so far. |kstreams| tracks
stream-time on a per-task basis.
Timestamps
^^^^^^^^^^
|kstreams| assigns a **timestamp** to every data record via so-called
:ref:`timestamp extractors `. These per-record timestamps describe the
progress of a stream with regards to time (although records may be out-of-order within the stream) and are leveraged by
time-dependent operations such as joins. We call it the **event-time** of the application to differentiate with the *wall-clock-time* when this application is actually executing.
*Event-time* is also used to
:ref:`synchronize multiple input streams ` within the same application.
Concrete implementations of timestamp extractors may retrieve or compute timestamps based on the actual contents of data
records such as an embedded timestamp field to provide event-time or ingestion-time semantics, or use any other approach
such as returning the current wall-clock time at the time of processing, thereby yielding processing-time semantics to
stream processing applications. Developers can thus enforce different notions/semantics of time depending on their
business needs.
Finally, whenever a |kstreams| application writes records to |ak|, then it will also assign timestamps to these new
records. The way the timestamps are assigned depends on the context:
* When new output records are generated via directly processing some input record, output record timestamps are
inherited from input record timestamps directly.
* When new output records are generated via periodic functions, the output record timestamp is defined as the current
internal time of the :ref:`stream task `.
* For :ref:`aggregations `, the timestamp of the resulting update record will
be that of the latest input record that triggered the update.
For aggregations and joins, timestamps are computed using the following
rules.
* For joins (stream-stream, table-table) that have left and right input
records, the timestamp of the output record is assigned ``max(left.ts, right.ts)``.
* For stream-table joins, the output record is assigned the timestamp from the
stream record.
* For aggregations, |kstreams| also computes the ``max`` timestamp across all
records, per key, either globally (for non-windowed) or per-window.
* Stateless operations are assigned the timestamp of the input record.
For ``flatMap`` and siblings that emit multiple records, all output records
inherit the timestamp from the corresponding input record.
Assign timestamps to output records with the Processor API
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
You can change the default behavior in the Processor API by assigning
timestamps to output records explicitly when calling ``#forward()``.
The ``forward()`` method takes two parameters: a key-value pair and a timestamp.
The optional ``timestamp`` parameter can be used to set the timestamp of the
output record explicitly.
The following example shows the explicit assignment of timestamps
to output records using the ``forward()`` method.
.. code:: java
public class MyProcessor implements Processor {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
// Extract the timestamp from the input record.
long inputTimestamp = context.timestamp();
// Process the input record.
String outputValue = processRecord(value);
// Assign the timestamp to the output record explicitly.
// You implement the computeOutputTimestamp method for your use case.
long outputTimestamp = computeOutputTimestamp(inputTimestamp);
KeyValue outputRecord = KeyValue.pair(key, outputValue);
context.forward(outputRecord, outputTimestamp);
}
@Override
public void close() {}
}
In this example, the timestamp is extracted from the input record by using the
``context.timestamp()`` method. The ``computeOutputTimestamp()`` custom method,
which you implement, computes the timestamp for the output record. Finally, a
new key-value pair is created for the output record by using ``KeyValue.pair()``
and calling ``context.forward()`` with this pair and the computed timestamp.
Assign timestamps to output records with the |kstreams| API
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
You can assign timestamps to output records explicitly in |kstreams| by using
the ``TimestampExtractor`` interface. Implement this interface to extract a
timestamp from each record and use it for processing-time or event-time semantics.
The following example shows the explicit assignment of timestamps
to output records using the ``TimestampExtractor`` interface.
.. code:: java
public class CustomTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord