Important Warning

Update July 2016: This Tech Preview documentation from March 2016 is outdated and deprecated. Please use the latest Confluent Platform documentation instead.

Developer Guide

Code examples

Before we begin the deep-dive into Kafka Streams in the subsequent sections, you might want to take a look at a few examples first.

The Apache Kafka project includes a few Kafka Streams code examples, which demonstrate the use of the Kafka Streams DSL and the low-level Processor API; and a juxtaposition of typed vs. untyped examples:

Additionally, the Confluent examples repository contains several Kafka Streams examples, which demonstrate the use of Java 8 lambda expressions (which simplify the code significantly); how to read/write Avro data; and how to implement end-to-end integration tests using embedded Kafka clusters:

Configuring a Kafka Streams application

Overview

The configuration of Kafka Streams is done by specifying parameters in an instance of StreamsConfig.

Typically, you create a java.util.Properties instance, set the necessary parameters, and construct a StreamsConfig instance from the Properties instance. How this StreamsConfig instance is used further (and passed along to other Streams classes) will be explained in the subsequent sections.

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.JOB_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
// Any further settings
settings.put(... , ...);

// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config = new StreamsConfig(settings);

Required configuration parameters

The table below is a list of required configuration parameters.

Parameter Name Description Default Value
bootstrap.servers A list of host/port pairs to use for establishing the initial connection to the Kafka cluster <none>
job.id An identifier for the stream processing application. Must be unique within the Kafka cluster. <none>
key.serializer Serializer class for key that implements the Serializer interface <none>
key.deserializer Deserializer class for key that implements the Deserializer interface <none>
value.serializer Serializer class for value that implements the Serializer interface <none>
value.deserializer Deserializer class for value that implements the Deserializer interface <none>
zookeeper.connect Zookeeper connect string for Kafka topic management the emtpy string

Kafka Bootstrap Servers (bootstrap.servers): This is the same setting that is used by the underlying producer and consumer clients to connect to the Kafka cluster. Example: "kafka-broker1:9092".

Note

One Kafka cluster only: Currently Kafka Streams applications can only talk to a single Kafka cluster specified by this config value. In the future Kafka Streams will be able to support connecting to different Kafka clusters for reading input streams and/or writing output streams.

Application Id (job.id): Each stream processing application must have a unique id. The same id must be given to all instances of the application. It is recommended to use only alphanumeric characters, . (dot), - (hyphen), and _ (underscore). Examples: "hello_world", "hello_world-v1.0.0"

This id is used in the following places to isolate resources used by the application from others:

  • As the default Kafka consumer and producer client.id prefix
  • As the Kafka consumer group.id for coordination
  • As the name of the sub-directory in the state directory (cf. state.dir)
  • As the prefix of internal Kafka topic names

Attention

When an application is updated, it is recommended to change job.id unless it is safe to let the updated application re-use the existing data in internal topics and state stores. One pattern could be to embed version information within job.id, e.g., my-app-v1.0.0 vs. my-app-v1.0.2.

Ser-/Deserialization (key.serializer, key.deserializer, value.serializer, value.deserializer): Serialization and deserialization in Kafka Streams happens whenever data needs to be materialized, i.e.,:

  • Whenever data is read from or written to a Kafka topic (e.g., via the KStreamBuilder#stream() and KStream#to() methods).
  • Whenever data is read from or written to a state store.

We will discuss this in more details later in Data types and serialization.

ZooKeeper connection (zookeeper.connect): Currently Kafka Streams needs to access ZooKeeper directly for creating its internal topics. Internal topics are created when a state store is used, or when a stream is repartitioned for aggregation. This setting must point to the same ZooKeeper ensemble that your Kafka cluster uses (cf. bootstrap.servers). Example: "zookeeper1:2181".

Note

ZooKeeper dependency of Kafka Streams and zookeeper.connect: This configuration option is temporary and will be removed after KIP-4 is incorporated, which is planned post 0.10.0 release.

Optional configuration parameters

The table below is a list of optional configuration parameters. However, users should consider setting the following parameters consciously.

  • num.standby.replicas
  • num.stream.threads
  • replication.factor
  • timestamp.extractor
Parameter Name Description Default Value
auto.offset.reset What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server latest
buffered.records.per.partition The maximum number of records to buffer per partition 1000
client.id An id string to pass to the server when making requests. (This setting is passed to the consumer/producer clients used internally by Kafka Streams.) the empty string
commit.interval.ms The frequency with which to save the position (offsets in source topics) of tasks 30000 (millisecs)
metric.reporters A list of classes to use as metrics reporters the empty list
num.standy.replicas The number of standby replicas for each task 0
num.stream.threads The number of threads to execute stream processing 1
partition.grouper Partition grouper class that implements the PartitionGrouper interface see Partition Grouper
poll.ms The amount of time in milliseconds to block waiting for input 100 (millisecs)
replication.factor The replication factor for changelog topics and repartition topics created by the application 1
state.cleanup.delay.ms The amount of time in milliseconds to wait before deleting state when a partition has migrated 60000 (millisecs)
state.dir Directory location for state stores /tmp/kafka-streams
timestamp.extractor Timestamp extractor class that implements the TimestampExtractor interface see Timestamp Extractor

The Number of Standby Replicas (num.standby.replicas): This specifies the number of standby replicas. Standby replicas are shadow copies of local state stores. Kafka Streams attempts to create the specified number of replicas and keep them up to date as long as there are enough instances running. Standby replicas are used to minimize the latency of task failover. A task that was previously running on a failed instance is preferred to restart on an instance that has standby replicas so that the local state store restoration process from its changelog can be minimized. Details about how Kafka Streams makes use of the standby replicas to minimize the cost of resuming tasks on failover can be found in the State section.

The Number of Stream Threads (num.stream.threads): This specifies the number of stream threads in an instance of the Kafka Streams application. The stream processing code runs in these threads. Details about Kafka Streams threading model can be found in section Threading Model.

The Replication Factor of Internal Topics (replication.factor): This specifies the replication factor of internal topics that Kafka Streams creates when local states are used or a stream is repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics.

State Directory (state.dir): Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting machine, whose name is the application id, directly under the state directory. The state stores associated with the application are created under this subdirectory.

Note

The default value of state.dir in the Kafka Streams Tech Preview is /tmp/kafka-streams. You may want to change the default setting if, for example, your operating system purges the contents of /tmp upon machine restart.

Timestamp Extractor (timestamp.extractor): A timestamp extractor extracts a timestamp from an instance of ConsumerRecord. Timestamps are used to control the progress of streams.

The default extractor is WallclockTimestampExtractor. This extractor does not actually “extract” a timestamp from the consumed record but rather returns the current time in milliseconds from the system clock, which effectively means Streams will operate on the basis of the so-called processing time of events.

If you want your data to be processed based on event time, then you must provide your own implementation of the TimestampExtractor interface to extract the time(stamp) information embedded in your data records.

Here is an example of a custom TimestampExtractor implementation:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (in milliseconds).
    Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      return myPojo.getTimestampInMillis();
    }
    else {
      // Kafka allows `null` as message value.  How to handle such message values
      // depends on your use case.  In this example, we decide to fallback to
      // wall-clock time (= processing time).
      return System.currentTimeMillis();
    }
  }

}

Partition Grouper (partition.grouper): A partition grouper is used to create a list of stream tasks given the partitions of source topics, where each created task is assigned with a group of source topic partitions. The default implementation provided by Kafka Streams is DefaultPartitionGrouper, which assigns each task with at most one partition for each of the source topic partitions; therefore, the generated number of tasks is equal to the largest number of partitions among the input topics. Usually an application does not need to customize the partition grouper.

Non-Streams configuration parameters

Apart from Kafka Streams’ own configuration parameters (see previous sections) you can also specify parameters for the Kafka consumers and producers that are used internally, depending on the needs of your application. Similar to the Streams settings you define any such consumer and/or producer settings via StreamsConfig:

Properties streamsSettings = new Properties();
// Example of a "normal" setting for Kafka Streams
streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings of your Streams application
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
StreamsConfig config = new StreamsConfig(streamsSettings);

Note

A future version of Kafka Streams will allow developers to set their own app-specific configuration settings through StreamsConfig as well, which can then be accessed through ProcessorContext.

Writing a Kafka Streams application

Overview

Any Java application that makes use of the Kafka Streams library is considered a Kafka Streams application. The computational logic of a Kafka Streams application is defined as a processor topology, which is a graph of stream processors (nodes) and streams (edges).

Currently Kafka Streams provides two sets of APIs to define the processor topology:

  1. A high-level Kafka Streams DSL that provides common data transformation operations in a functional programming style such as map and filter operations. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs.
  2. A low-level Processor API that lets you add and connect processors as well as interact directly with state stores.

We describe both of these APIs in the subsequent sections.

Libraries and maven artifacts

This section lists the Kafka Streams related libraries that are available for writing your Kafka Streams applications.

The corresponding maven artifacts of these libraries are available in Confluent’s maven repository:

<!-- Example pom.xml snippet when using maven to build your Java applications. -->
<repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven/</url>
    </repository>
</repositories>

Note

As of March 2016 the Apache Kafka artifacts below (group id org.apache.kafka) are only available in Confluent’s maven repository because the Apache Kafka project does not yet provide an official release that includes the upcoming Kafka Streams library.

Depending on your use case you will need to define dependencies on the following libraries for your Kafka Streams applications.

Group Id Artifact Id Version Description / why needed
org.apache.kafka kafka-streams 0.9.1.0-cp1 Base library for Kafka Streams. Required.
org.apache.kafka kafka-clients 0.9.1.0-cp1 Kafka client library. Contains built-in serializers/deserializers. Required.
org.apache.avro avro 1.7.7 Apache Avro library. Optional (needed only when using Avro).
io.confluent kafka-avro-serializer 2.1.0-alpha1 Confluent’s Avro serializer/deserializer. Optional (needed only when using Avro).

Tip

See the section Data types and serialization for more information about serializers/deserializers.

Example pom.xml snippet when using maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.9.1.0-cp1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.1.0-cp1</version>
</dependency>

<!-- Dependencies below are required/recommended only when using Apache Avro. -->
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>2.1.0-alpha1</version>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.7.7</version>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.7.7</version>
</dependency>

See the Kafka Streams examples in the Confluent examples repository for a full maven/pom setup.

Processor API

The Developer Guide of this Tech Preview release does not currently cover the low-level processor API of Kafka Streams. Please stay tuned! In the meantime, you may want to take a look at:

Kafka Streams DSL

Overview

As mentioned in the Concepts section, a stream is an unbounded, continuously updating data set. However, different streams may have different semantics in their data records. In some streams, each record represents a new immutable datum in their unbounded data set; we call them record streams. In some others, each record represents a revision of their unbounded data set in chronological order; we call them changelog streams.

Both of these two types of streams can be stored as Kafka streams. However, their computational semantics can be quite different. Take the simple aggregation operation that counts the number of records for the given key as an example. For record streams, each record is a simple keyed message from a Kafka topic (e.g., a page view stream keyed by its client ids):

1 => {"time":1440557383335, "user_id":1, "url":"/home?user=1"}
5 => {"time":1440557383345, "user_id":5, "url":"/home?user=5"}
2 => {"time":1440557383456, "user_id":2, "url":"/profile?user=2"}
1 => {"time":1440557385365, "user_id":1, "url":"/profile?user=1"}

Its counting operation is trivial to implement: you can keep a local state storing the latest state of the counts for each key, and upon receiving a new record, update the corresponding key by incrementing its count by one.

For changelog streams, on the other hand, each record is an update of the unbounded data set (e.g., a user profile table changelog stream keyed as the primary key – user id – of the changed row), and in practice people usually store such streams in Kafka topics with log compaction turned on.

1 => {"last_modified_time":1440557383335, "user_id":1, "email":"user1@aol.com"}
5 => {"last_modified_time":1440557383345, "user_id":5, "email":"user5@gmail.com"}
2 => {"last_modified_time":1440557383456, "user_id":2, "email":"user2@yahoo.com"}
1 => {"last_modified_time":1440557385365, "user_id":1, "email":"user1-new-email-addr@comcast.com"}
2 => {"last_modified_time":1440557385395, "user_id":2, "email":null}  <-- user has been deleted

As a result its counting operation is no longer monotonically incrementing: you need to also decrement the counts when a delete update record is received on some given key as well. In addition, even for counting aggregations on an record stream, the resulting aggregate is no longer an record stream but a relation / table, which can then be represented as a changelog stream of updates on the table.

Note

The counting operation of a user profile changelog stream is tedious in the sense that it will only generate a count of either 0 or 1. Usually developers would prefer counting a non-primary-key field in such cases. We use the example above just for the sake of illustration.

One of the key design principles of the Kafka Streams DSL is to distinguish between record streams and changelog streams and provide operators with semantic meanings for these two different types of streams – without this distinction, users will have to hopefully be aware of the semantics of their stream data and just handle different cases appropriately themselves. More concretely, in the DSL we use the KStream class to represent record streams with a provided list of methods for manipulating its records in the stream, and we use a separate KTable class to represent the changelog stream due to its duality to the original unbounded data set. Therefore, in most cases the Kafka Streams DSL is the recommended API to implement a Kafka Streams application. Compared to the lower-level Processor API, its benefits are:

  • Easier for implementing stateful transformations such as joins and aggregations.
  • Stream semantic difference awareness for the same transformation operation.
  • More concise and expressive code, particularly when using Java 8+ with lambda expressions.

In the subsequent sections we provide a step-by-step guidance about how to write a stream processing application using the Kafka Streams DSL.

Creating source streams from Kafka

Both KStream or KTable objects can be created as a source stream from one or more Kafka topics via KStreamBuilder, an extended class of TopologyBuilder used in the lower-level Processor API (for KTable you can only create the source stream from a single topic).

Interface How to define
KStream KStreamBuilder#stream(...)
KTable KStreamBuilder#table(...)

When creating the object, you can specify the overridden serializer classes used for reading the data out of the Kafka topics (see Data types and serdes for more details); otherwise, the default serializers specify through StreamsConfig will be used. As a result, both KStream and KTable are strong-typed in which the key and value can have their own types.

import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

KStreamBuilder builder = new KStreamBuilder();

// assuming string and generic avro serializers are configured to be the default serdes

// create a stream of page view events from the PageView topic
KStream<String, GenericRecord> pageviews = builder.stream("PageViews");

// create a changelog stream for user profiles from the UserProfile topic
KTable<String, GenericRecord> userprofiles = builder.table("UserProfile");

Transform a stream

There is a list of transformation operations provided for KStream and KTable respectively. Each of these operations can be translated into one or more connected processors into the underlying processor topology. Since KStream and KTable are strongly typed, all these transformation operations are defined as generics functions where users could specify the input and output data types.

Some KStream transformations may generate one or more KStream objects (e.g., filter and map on KStream generate another KStream, while branch on KStream can generate multiple KStreams) while some others may generate a KTable object (e.g., aggregation) interpreted as the changelog stream to the resulted relation. This allows Kafka Streams to continuously update the computed value upon arrivals of late records after it has already been produced to the downstream transformation operators. As for KTable, all its transformation operations can only generate another KTable (though the Kafka Streams DSL does provide a special function to convert a KTable representation into a KStream, which we will talk later). Nevertheless, all these transformation methods can be chained together to compose a complex processor topology.

We describe these transformation operations in the following subsections, categorizing them as two categories: stateless and stateful transformations.

Stateless transformations

Stateless transformations include filter, filterOut, map, mapValues, flatMap, flatMapValues, branch. Most of them can be applied to both KStream and KTable (branch can only be applied to KStream), where users can usually pass a customized function to these functions as a parameter, such as Predicate for filter, KeyValueMapper for map, etc. Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise they do not require a state store associated with the stream processor.

Example of mapValues in Java 7:

KStream<PageId, String> categoryViewByUser =
    pageView.mapValue(
        new ValueMapper<PageView>() {
            @Override
            public String apply(PageView pageView) {
                return pageView.pageCategory;
            }
        }
    );

Example of mapValues in Java 8+, using lambda expressions.

KStream<String, PageCategory> categoryViewByUser =
    pageViewByUser.mapValue(pageView -> pageView.pageCategory);

The function is applied to each record, and its result will trigger the creation a new record.

Stateful transformations

Available stateful transformations include:

  • joins: join, outerJoin, leftJoin
  • aggregations: reduceByKey, aggregateByKey, aggregate, reduce
  • general transformation: transform, and transformValues

Stateful transformations are transformations where the processing logic requires accessing an associated state for processing and producing outputs. For example, in join and aggregation operations, a windowing state is usually used to store all the received records within the defined window boundary so far. The operators can then access these accumulated records in the store and compute based on them (see Windowing a Stream for details).

Counting example in Java 7:

KStream<String, MyPageViewEvent> pageViews = ...;

KTable<String, Long> ViewCountsByUser =
    pageViews.aggregateByKey(
        new Initializer<Long>() {
            @Override
            public Long apply() {
                return 0L;
            }
        },
        new Aggregator<String, MyPageViewEvent, Long>() {
            @Override
            public Long apply(String aggKey, MyPageViewEvent value, Long aggregate) {
                return aggregate + 1L;
            }
        },
        null, longSerializer,      // override default serializer for values (null = use default)
        null, longDeserializer,    // override default deserializer for values (null = use default)
        "ViewCountsByUser"
    );

Counting example in Java 8+, using lambda expressions.

KStream<String, MyPageViewEvent> pageViews = ...;

KTable<String, Long> ViewCountsByUser =
    pageViews.aggregateByKey(
        () -> 0L,
        (aggKey, value, aggregate) -> aggregate + 1L,
        null, longSerializer,    // override default serializer for values (null = use default)
        null, longDeserializer,  // override default deserializer for values (null = use default)
        "ViewCountsByUser"
    );

In the above example, we must override the serializer/deserializer for the record value (the input is a KStream<String, MyPageViewEvent> whereas the output is a KTable<String, Long>) because, behind the scenes, the aggregate (here: of type Long) must be materialized into the state store. Additionally, we must provide a name for the returned KTable (here: ViewCountsByUser).

Tip

A KTable object can be converted back into a KStream via the KTable#toStream() function. Implementation-wise this operator does nothing but simply change the underlying stream type that future transformations can be based upon.

Windowing a Stream

Windowing is a common prerequisite for stateful transformations that groups records in a stream by their timestamps. A local state store is usually needed for a windowing operation to store recently received records based on the window interval, while old records in the store are purged after the specified window retention period.

Kafka Streams currently defines three types of windows:

  1. Hopping windows
  2. Tumbling windows
  3. Sliding windows

Hopping windows are defined as a set of (possibly) overlapping fixed size time intervals, where each interval is treated as a single window. Since time intervals (i.e., windows) can overlap, a record may belongs to more than one such intervals. Two properties, the time period between window creation and the size of window, define the hopping window.

Tumbling windows are defined as a set of disjoint fixed size time intervals, where each interval is treated as a single window. An the end of a time interval starts the next one, hence a record belongs to exactly one window. Tumbling windows can be viewed as a special case of hopping windows: it is exactly same as the hopping windows where the period and the window size are equal.

In Kafka Streams, hopping windows and tumbling windows can be used for windowed aggregations. They are specified via the HoppingWindows and TumblingWindows classes, respectively.

Windowed counting example in Java 7:

KStream<String, MyPageViewEvent> pageViews = ...;

KTable<Windowed<String>, Long> ViewCountsByUser =
    pageViews.aggregateByKey(
        new Initializer<Long>() {
            @Override
            public Long apply() {
                return 0L;
            }
        },
        new Aggregator<String, MyPageViewEvent, Long>() {
            @Override
            public Long apply(String aggKey, MyPageViewEvent value, Long aggregate) {
                return aggregate + 1L;
            }
        },
        HoppingWindows.of("ViewCountsByUser").with(5000L).every(1000L), // intervals in milliseconds
        null, longSerializer,   // override default serializer for values (null = use default)
        null, longDeserializer  // override default deserializer for values (null = use default)
    );

Windowed counting example in Java 8+, using lambda expressions.

KStream<String, MyPageViewEvent> pageViews = ...;

KTable<Windowed<String>, Long> ViewCountsByUser =
    pageViews.aggregateByKey(
        () -> 0L,
        (aggKey, value, aggregate) -> aggregate + 1L,
        HoppingWindows.of("ViewCountsByUser").with(5000L).every(1000L), // intervals in milliseconds
        null, longSerializer,   // override default serializer for values (null = use default)
        null, longDeserializer  // override default deserializer for values (null = use default)
    );

Unlike non-windowed aggregates that we have seen previously, windowed aggregates return a windowed KTable whose key type Windowed<K>. This is to differentiate aggregate values with the same key from different windows. The corresponding window instance and the embedded key can be retrieved as Windowed#window() and Windowed#value().

Sliding windows are actually quite different from hopping and tumbling windows: a sliding window is defined as a fixed size window that slides continuously over the time axis. Two records are said to be in the same window if the difference of their timestamps is within the window size. In Kafka Streams, sliding windows are used only for join operations, and can be specified through the JoinWindows class.

Joining Streams

Many stream processing applications can be coded as stream join operations. For example, applications that needs to access multiple (possibly) updating data tables while performing the record can be programed on the tables changelog streams directly without materializing the stores and making queries to them: you can keep the latest state of that table in a local key-value store, and whenever new update for that table comes, you can join it with the latest data for the same key in the other table, and output the joined result.

In Kafka Streams, users can join a KStream object with either another KStream or a KTable, and a KTable object with another KTable object. We talk about each one of these cases below:

  • KStream-KStream Joins are windowed joins, since otherwise the join result size will explode infinitely. In this operator a newly received record from one of the joining streams is joined with the other stream’s records with the specified window interval to produce one result for each matching pair based on user-provided ValueJoiner implementation. A new KStream object representing the join result stream is returned from this operator.

  • KTable-KTable Joins are join operations designed to be consistent with the ones in relational databases. In this operator both changelog streams are materialized into local state stores to represent the latest snapshot of the their dual data tables. When a new record is received from one of the joining streams, it is joined with the other stream’s materialized state stores and produce one result for each matching pair based on user-provided ValueJoiner implementation. A new KTable object representing the join result stream, which is also a changelog stream of the represented table, is returned from this operator.

  • KStream-KTable Joins are provided to simply allow users to do table lookups of a changelog stream upon receiving a new record from another record stream. Hence only received records from the record stream will trigger the join and produce results via ValueJoiner, not vice versa (i.e., received records from the changelog stream will only be used to update the materialized state store). A new KStream object representing the join result stream is returned from this operator.

Three types of joins: inner join, outer join, and left join, are provided to KStream-KStream join and KTable-KTable join. Their join semantics are all similar to the corresponding operators in relational databases:

  • Inner join produces new joined records when the join operator finds some records with the same key in the other stream / materialized store.
  • Outer join works like inner join if some records are found in the windowed stream / materialized store. The difference is that outer join still produces a record even when no records are found. It uses null as the value of missing record.
  • Left join is like outer join except that for KStream-KStream join it is always driven by record arriving from the primary stream; while for KTable-KTable join it is driven by both streams to make the result consistent with the left join of databases while only permits missing records in the secondary stream.

As for KStream-KTable join, only left-join is provided by its definition.

Note

Since stream joins are performed over the keys of records, it is required that joining streams are co-partitioned by key, i.e., their corresponding Kafka topics must have the same number of partitions and partitioned on the same key so that records with the same keys are delivered to the same processing thread. This is validated by Kafka Streams library at runtime (we talked about the threading model and data parallelism with more details in the Architecture section).

Join example in Java 7:

KStream<String, PageViewByRegion> regionCount = pageviews.leftJoin(userprofiles,
    new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
        @Override
        public PageViewByRegion apply(PageView view, UserProfile profile) {
            PageViewByRegion viewByRegion = new PageViewByRegion();
            viewByRegion.user = view.user;
            viewByRegion.page = view.page;

            if (profile != null) {
                viewByRegion.region = profile.region;
            } else {
                viewByRegion.region = "UNKNOWN";
            }
            return viewByRegion;
        }
    });

Join example in Java 8+, using lambda expressions.

KStream<String, PageViewByRegion> regionCount = pageviews.leftJoin(userprofiles,
    (view, profile) -> {
        PageViewByRegion viewByRegion = new PageViewByRegion();
        viewByRegion.user = view.user;
        viewByRegion.page = view.page;

        if (profile != null) {
            viewByRegion.region = profile.region;
        } else {
            viewByRegion.region = "UNKNOWN";
        }
        return viewByRegion;
    });
Applying a custom processor

Tip

See also the documentation of the low-level Processor API.

Beyond the provided transformation operators, users can also specify any customized processing logic on their stream data via the KStream.process function, which takes an implementation of the org.apache.kafka.streams.processor.ProcessorSupplier interface as its parameter. This is essentially equal to the addProcessor function in the lower-level Processor API.

The following example shows how to leverage, via the process method, a custom processor that s ends an email notification whenever a page view count reaches a predefined threshold.

In Java 7:

// Send an email notification when a view count reaches one thousand.
pageViews.countByKey()
         .filter(
            new Predicate<PageId, Long>() {
              public boolean test(PageId pageId, Long viewCount) {
                return viewCount == 1000;
              }
            })
         .process(
           new ProcessorSupplier<PageId, Long>() {
             public Processor<PageId, Long> get() {
               // PopularPageAlert is your custom processor that implements
               // the `Processor` interface, see below.
               return new PopularPageAlert("alerts@yourcompany.com");
             }
           });

In Java 8+, using lambda expressions:

// Send an email notification when a view count reaches one thousand.
pageViews.countByKey()
         .filter((PageId pageId, Long viewCount) -> viewCount == 1000)
         // PopularPageAlert is your custom processor that implements the
         // `Processor` interface, see below.
         .process(() -> new PopularPageAlert("alerts@yourcompany.com"));

In the above examples, PopularPageAlert is a custom processor that implements the Processor interface. There are four methods to implement: init, process, punctuate, and close.

// Sends an alert message about a popular page to a configurable email address
public class PopularPageAlert implements Processor<PageId, Long> {

    private final String emailAddress;
    private ProcessorContext;

    public PopularPageAlert(String emailAddress) {
        this.emailAddress = emailAddress;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;

        // Here you would perform additional initializations
        // such as setting up an email client.
    }

    @Override
    void process(PageId pageId, Long count) {
        // Here would format and send the alert email.
        //
        // In this specific example, you would be able to include information
        // about the page's ID and its view count (because the class implements
        // `Processor<PageId, Long>`).
    }

    @Override
    void punctuate(long timestamp) {
        // Stays empty.  In this use case there would be no need for a periodical
        // action of this processor.
    }

    @Override
    void close() {
        // The code for clean up goes here.
        // This processor instance will not be used again after this call.
    }
}

During task initialization the framework calls processors’ init methods. Processor instances should perform initialization of themselves in the method. They may need to use the processor context to schedule punctuation calls or to get state store objects.

A punctuation call is a periodical call to a processor’s punctuate method. A processor can schedule it by calling ProcessorContext.schedule method:

public init(ProcessorContext context) {
    context.schedule(60000); // scheduling a punctuation call, in milliseconds
}

A processor can gain access to state store objects by calling ProcessorContext.getStateStore:

private RockDBStore myStateStore;

public init(ProcessorContext context) {
    myStateStore = (RockDBStore) context.getStateStore("myStateStore"); // getting a state store with the name "myStateStore"
}

Note that a processor can access only state stores named in the associated KStreams.process method call.

Writing streams back to Kafka

At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through KStream#to and KTable#to.

// Write the stream userCountByRegion to the output topic 'UserCountByRegion'
userCountByRegion.to("UserCountByRegion");

If your application needs to continue reading and processing the records after they have been written to a topic via to above, one option is to construct a new stream that reads from the output topic:

// Write to a Kafka topic.
userCountByRegion.to("UserCountByRegion");

// Read from the same Kafka topic by constructing a new stream from the
// topic UserCountByRegion, and then begin processing it (here: via `map`)
builder.stream("UserCountByRegion").map(...)...;

Kafka Streams provides a convenience method called through that is equivalent to the code above:

// `through` combines write-to-Kafka-topic and read-from-same-Kafka-topic operations
userCountByRegion.through("UserCountByRegion").map(...)...;

Whenever data is read from or written to a Kafka topic, Streams must know the serializers/deserializers to be used for the respective data records. By default the to and through methods above use the default serializers/deserializers defined in the Streams configuration. You can override these default serdes by passing explicit serializers/deserializers to the to and through methods.

Tip

Besides writing the data back to Kafka, users can also apply a custom processor as mentioned above to write to any other external stores, for example, to materialize a data store, as stream sinks at the end of the processing.

Note

Please refer to the current Kafka Streams Javadocs for a complete list of provided operators. Keep in mind that there may be minor changes to them. We plan to provide an organized table of all provided operators in KStream once it gets more stable after the tech preview release.

Running a Kafka Streams application

Good news: a Java application that uses the Kafka Streams library can be run just like any other Java application – there is no special magic or requirement on the side of Kafka Streams.

But what do you need to do inside your Java application – say, your main() method – with regards to Kafka Streams?

First, you must must create an instance of KafkaStreams.

  • The first argument of the KafkaStreams constructor takes a topology builder (either KStreamBuilder for the Kafka Streams DSL, or TopologyBuilder for the Processor API) that is used to define a topology.
  • The second argument is an instance of StreamsConfig, which defines the configuration for this specific topology.
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;

KStreamBuilder builder = ...;
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

At this point, internal structures are initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the start method:

streams.start();

If there are other instances of this stream processing application running elsewhere (e.g., on another machine), Kafka Streams transparently reassigns tasks from the existing instances to the new instance that you just started. See Stream Partitions and Tasks and Threading Model for details.

To catch any unexpected exceptions, you may set an UncaughtExceptionHandler. This handler is called whenever a stream thread is terminated by an unexpected exception:

streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public uncaughtException(Thread t, throwable e) {
        // here you should examine the exception and perform an appropriate action!
    }
);

To stop the instance call the close method:

streams.close();

After a particular instance of the application was stopped, Kafka Streams migrates any tasks that had been running in this instance to other running instances (assuming there are any).

Data types and serialization

Overview

Every Streams application must provide serializers and deserializers to materialize their data when necessary. There are two ways to provide these:

  1. By setting default serializers and deserializers via a StreamsConfig instance.
  2. By specifying explicit serializers/deserializers when calling the appropriate API methods.

Note

In a future version of Kafka Streams we are planning to consolidate the Serializer and Deserializer interfaces into a single interface. One benefit of such a change would be the reduction of API parameters that need to be passed around.

Configuring default serializers/deserializers

Serializers and deserializers specified in the Streams configuration via StreamsConfig are used as the default serializers and deserializers in your Streams application.

import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

Properties settings = new Properties();
// Default serdes for keys of data records
settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Default serdes for values of data records
settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);

StreamsConfig config = new StreamsConfig(settings);

The following table lists API methods that make use of these default serializers/deserializers:

Interface Methods
KStreamBuilder stream, table
KStream to, through
KTable to, through

Overriding default serializers/deserializers

You can also specify serializers/deserializers explicitly by passing them to the appropriate API methods, which overrides the default serde settings:

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

final Serializer<String> regionSerializer = new StringSerializer();
final Serializer<Long> userCountSerializer = new LongSerializer();
// The stream userCountByRegion has type `String` for record keys (for region)
// and type `Long` for record values (for user counts).
userCountByRegion.to("UserCountByRegion", regionSerializer, userCountSerializer);

If you want to override serializers/deserializers selectively, i.e., keep the defaults for some fields, then pass null whenever you want to leverage the default serde settings:

Properties settings = new Properties();
// Default serializers
settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
...

// Use the default serializer for record keys (here: region as String) by passing `null`,
// but override the default serializer for record values (here: userCount as Long).
final Serializer<Long> userCountSerializer = new LongSerializer();
userCountByRegion.to("UserCountByRegion", null, userCountSerializer);

The following table lists API methods that accept explicit serializers/deserializers:

Interface Methods
KStreamBuilder stream, table
KStream to, through, join, outerJoin, leftJoin, reduceByKey, aggregateByKey, countByKey
KTable to, through, reduce, aggregate, count

Available serializers/deserializers

Apache Kafka includes several built-in serde implementations in its kafka-clients maven artifact:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.1.0-cp1</version>
</dependency>

This artifact provides the following serde implementations under the package org.apache.kafka.common.serialization, which you can leverage when e.g., defining default serializers in your Streams configuration.

Data type Serializer Deserializer
byte[] ByteArraySerializer ByteArrayDeserializer
Integer IntegerSerializer IntegerDeserializer
Long LongSerializer LongDeserializer
String StringSerializer StringDeserializer

Also, the code examples of Kafka Streams include a basic serde implementation for JSON:

Lastly, the Confluent examples repository includes basic serde implementations for Apache Avro:

As well as templated serde implementations:

Implementing custom serializers/deserializers

Here is an example of a custom Serializer implementation:

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.streams.examples.pageview.PageView;

import java.nio.ByteBuffer;

public class MySerializer implements Serializer<MyClass> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // do nothing
    }

    @Override
    public byte[] serialize(String topic, PageView record) {
        ByteBuffer buffer = ByteBuffer.allocate(record.sizeInBytes());
        buffer.put(record.user.getBytes());
        buffer.put(record.pageId.getBytes());
        buffer.putLong(record.timestamp);

        return buffer.array();
    }

    @Override
    public void close() {
        // do nothing
    }
}