.. title:: Kafka Streams Processor API for Confluent Platform .. meta:: :description: Learn define arbitrary stream processors in Kafka Streams applications. .. _streams_developer-guide_processor-api: |kstreams| Processor API for Confluent Platform ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic. Overview """""""" The Processor API can be used to implement both **stateless** as well as **stateful** operations, where the latter is achieved through the use of :ref:`state stores `. .. tip:: **Combining the DSL and the Processor API:** You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the section :ref:`streams_developer-guide_dsl_process`. For a complete list of available API functionality, see the :ref:`Kafka Streams API docs `. .. _streams_developer-guide_stream-processor: Defining a Stream Processor """"""""""""""""""""""""""" A :ref:`stream processor ` is a node in the processor topology that represents a single processing step. With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect these processors with their associated state stores to compose the processor topology. You can define a customized stream processor by implementing the ``Processor`` interface, which provides the ``process()`` API method. The ``process()`` method is called on each of the received records. The ``Processor`` interface also has an ``init()`` method, which is called by the |kstreams| library during task construction phase. Processor instances should perform any required initialization in this method. The ``init()`` method passes in a ``ProcessorContext`` instance, which provides access to the metadata of the currently processed record, including its source |ak-tm| topic and partition, its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation function (via ``ProcessorContext#schedule()``), to forward a new record as a key-value pair to the downstream processors (via ``ProcessorContext#forward()``), and to commit the current processing progress (via ``ProcessorContext#commit()``). Any resources you set up in ``init()`` can be cleaned up in the ``close()`` method. |kstreams| may re-use a single Processor object by calling ``init()`` on it again after ``close()``. The Processor interface takes two sets of generic parameters: ``KIn``, ``VIn``, ``KOut``, and ``VOut``. These define the input and output types that the processor implementation can handle. ``KIn`` and ``VIn`` define the key and value types that are passed to ``process()``. Likewise, ``KOut`` and ``VOut`` define the forwarded key and value types that ``ProcessorContext#forward()`` accepts. If your processor does not forward any records at all, or if it forwards only null keys or values, a best practice is to set the output generic type argument to ``Void``. If it needs to forward multiple types that don't share a common superclass, you must set the output generic type argument to ``Object``. Both the ``Processor#process()`` and the ``ProcessorContext#forward()`` methods handle records in the form of the ``Record`` data class. This class gives you access to the main components of a |ak| record: the key, value, timestamp and headers. When forwarding records, you can use the constructor to create a new ``Record`` from scratch, or you can use the convenience builder methods to replace one of the ``Record`` properties and copy over the rest. For example, ``inputRecord.withValue(newValue)`` copies the key, timestamp, and headers from ``inputRecord`` while setting the output record's value to ``newValue``. This call doesn't mutate ``inputRecord`` but instead creates a shallow copy. This is only a shallow copy, so if you plan to mutate the key, value, or headers elsewhere in the program, you must create a deep copy of those fields manually. In addition to handling incoming records by using ``Processor#process()``, you can schedule periodic invocation, named "punctuation", in your processor's ``init()`` method by calling ``ProcessorContext#schedule()`` and passing it a ``Punctuator``. The ``PunctuationType`` determines what notion of time is used for the punctuation scheduling: either :ref:`event-time ` or wall-clock-time (by default, event-time is configured to represent event-time via ``TimestampExtractor``). When event-time is used, ``punctuate()`` is triggered purely by data, because event-time is determined (and advanced forward) by the timestamps derived from the input data. When there is no new input data arriving, event-time is not advanced and ``punctuate()`` is not called. For example, if you schedule a ``Punctuator`` function every 10 seconds based on ``PunctuationType.STREAM_TIME`` and if you process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), then ``punctuate()`` would be called 6 times. This happens regardless of the time required to actually process those records. ``punctuate()`` would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour. .. important:: When tasks are moved to a different client or a different thread, |kstreams| calls ``close()`` on the old owner and ``init`` on the new one. |confluent| recommendeds that you also ``cancel()`` the punctuator inside the ``close()`` method. When wall-clock-time (i.e. ``PunctuationType.WALL_CLOCK_TIME``) is used, ``punctuate()`` is triggered purely by the wall-clock time. Reusing the example above, if the ``Punctuator`` function is scheduled based on ``PunctuationType.WALL_CLOCK_TIME``, and if these 60 records were processed within 20 seconds, ``punctuate()`` is called 2 times (one time every 10 seconds). If these 60 records were processed within 5 seconds, then no ``punctuate()`` is called at all. Note that you can schedule multiple ``Punctuator`` callbacks with different ``PunctuationType`` types within the same processor by calling ``ProcessorContext#schedule()`` multiple times inside ``init()`` method. Stream-time is advanced only when |kstreams| processes records. If there are no records to process, or if |kstreams| is waiting for new records due to the :ref:`Task Idling ` configuration, stream time doesn't advance, and ``punctuate()`` isn't triggered if ``PunctuationType.STREAM_TIME`` was specified. This behavior is independent of the configured timestamp extractor, which means that using ``WallclockTimestampExtractor`` doesn't enable wall-clock triggering of ``punctuate()``. The following example ``Processor`` defines a simple word-count algorithm and the following actions are performed: - In the ``init()`` method, schedule the punctuation every second (the minimum time unit supported is one millisecond) and retrieve the local state store by its name "Counts". - In the ``process()`` method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section). The ``Punctuator`` object scheduled in ``init()`` defines a ``punctuate()`` method, in which we iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state. .. attention:: The code shows a simplified example that only works for single partition input topics. A generic Processor API word-count would require two processors: the first is stateless, splits each line into words, and sets the words as record keys. The result of the first processor is written back to an additional topic that is consumed by the second (stateful) processor that does the actual counting. Writing the words back to a topic is required to group the same words together so they go to the same instance of the second processor. This is similar in function to the shuffle phase of a Map-Reduce computation. .. sourcecode:: java public class WordCountProcessor implements Processor { private KeyValueStore kvStore; @Override public void init(final ProcessorContext context) { context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { try (final KeyValueIterator iter = kvStore.all()) { while (iter.hasNext()) { final KeyValue entry = iter.next(); context.forward(new Record<>(entry.key, entry.value.toString(), timestamp)); } } }); kvStore = context.getStateStore("Counts"); } @Override public void process(final Record record) { final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+"); for (final String word : words) { final Integer oldValue = kvStore.get(word); if (oldValue == null) { kvStore.put(word, 1); } else { kvStore.put(word, oldValue + 1); } } } @Override public void close() { // close any resources managed by this processor // Note: Do not close any StateStores as these are managed by the library } } Stateful processing with state stores The ``WordCountProcessor`` defined above can access the currently received record in its ``process()`` method, and it can leverage :ref:`state stores ` to maintain processing states, for example, to remember recently arrived records for stateful processing needs like aggregations and joins. For more information, see :ref:`state stores `. .. _streams_developer-guide_processor-context: Accessing Processor Context """"""""""""""""""""""""""" As we have mentioned :ref:`above `, a ``ProcessorContext`` controls the processing workflow such as scheduling a punctuation function, and committing the current processed state, etc. In fact, this object can also be used to access the metadata related with the application like ``applicationId``, ``taskId``, and ``stateDir`` located to store the task's state, and also the current processed record's metadata like ``topic``, ``partition``, ``offset``, and ``timestamp``. The following example ``process()`` function enriches the record differently based on the record context: .. sourcecode:: java public class EnrichProcessor implements Processor { private ProcessorContext context; @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { // keep the processor context locally because we need it in process() this.context = context; } @Override public void process(String key, String value) { switch(context.topic()) { case "alerts": context.forward(key, decorateWithHighPriority(value)); case "notifications": context.forward(key, decorateWithMediumPriority(value)); default: context.forward(key, decorateWithLowPriority(value)); } } } Record context metadata The metadata of the currently processing record may not always be available. For example, if the current processing record is not piped from any source topic, but is generated from a punctuation function, then its metadata field ``topic`` will be empty, and ``partition`` ``offset`` will be a sentinel value (``-1`` in this case), while its ``timestamp`` field will be the triggering time of the punctuation function that generated this record. Accessing Header Metadata """"""""""""""""""""""""" You can append metadata to records by using the :platform:`headers()|streams/javadocs/javadoc/org/apache/kafka/streams/processor/ProcessorContext.html#headers()` method. Headers are useful for scenarios like propagating tracing context between different components and adding operational information that you can use for filtering records. You can access message metadata in the ``process()``, ``transform()``, and ``transformValues()`` methods. The following code example shows how to add a header to records: .. code:: java public void process(String key, String value) { // Add a header to the elements. context().headers().add(key, value.getBytes()); } .. _streams_developer-guide_state-store: State stores """""""""""" To implement a **stateful** ``Processor`` or ``Transformer``, you must provide one or more state stores to the processor or transformer (*stateless* processors or transformers do not need state stores). State stores can be used to remember recently received input records, to track rolling aggregates, to de-duplicate input records, and more. Another feature of state stores is that they can be :ref:`interactively queried ` from other applications, such as a NodeJS-based dashboard or a microservice implemented in Scala or Go. The :ref:`available state store types ` in |kstreams| have :ref:`fault tolerance ` enabled by default. .. _streams_developer-guide_state-store_defining: Define and create a state store ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can either use one of the available store types or :ref:`implement your own custom store type `. It's common practice to leverage an existing store type via the ``Stores`` factory. When using |kstreams|, you usually don't create or instantiate state stores directly in your code. Instead, you define state stores indirectly by creating a ``StoreBuilder``. This builder is used by |kstreams| as a factory to instantiate the actual state stores locally in application instances when and where needed. The following store types are available out of the box. .. rst-class:: non-scrolling-table width-100-percent +----------------------------+----------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Store Type | Storage Engine | Fault-tolerant? | Description | +============================+================+==========================+=========================================================================================================================================================================================================================+ | Persistent | RocksDB | Yes (enabled by default) | | | ``KeyValueStore`` | | | - **The recommended store type for most use cases.** | | | | | - Stores its data on local disk. | | | | | - Storage capacity: | | | | | managed local state can be larger than the memory (heap space) of an | | | | | application instance, but must fit into the available local disk | | | | | space. | | | | | - RocksDB settings can be fine-tuned, see | | | | | :ref:`RocksDB configuration `. | | | | | - Available store variants: | | | | | :platform:`time window key-value store|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore-java.lang.String-java.time.Duration-java.time.Duration-boolean-`, | | | | | :platform:`session window key-value store|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore-java.lang.String-java.time.Duration-`, | | | | | :platform:`timestamped key-value store|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore-java.lang.String-`, | | | | | :platform:`timestamped window key-value store|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore-java.lang.String-java.time.Duration-java.time.Duration-boolean-` | | | | | | | | | | .. literalinclude:: api-papi-store-persistent.java | | | | | :language: java | | | | | | | | | | See | | | | | :platform:`PersistentKeyValueStore|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore-java.lang.String-` | | | | | for detailed factory options. | +----------------------------+----------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | In-memory | \- | Yes (enabled by default) | | | ``KeyValueStore`` | | | - Stores its data in memory. | | | | | - Storage capacity: | | | | | managed local state must fit into memory (heap space) of an | | | | | application instance. | | | | | - Useful when application instances run in an environment where local | | | | | disk space is either not available or local disk space is wiped | | | | | in-between app instance restarts. | | | | | - Available store variants: | | | | | :platform:`time window key-value store|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryWindowStore-java.lang.String-java.time.Duration-java.time.Duration-boolean-`, | | | | | :platform:`session window key-value store|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#inMemorySessionStore-java.lang.String-long-`, | | | | | :platform:`timestamped key-value store|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html`, | | | | | :platform:`timestamped window key-value store|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html` | | | | | | | | | | .. literalinclude:: api-papi-store-inmemory.java | | | | | :language: java | | | | | | | | | | See | | | | | :platform:`InMemoryKeyValueStore|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-` for detailed factory options. | +----------------------------+----------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ .. _streams_developer-guide_timestamped-state-store: Timestamped state stores ~~~~~~~~~~~~~~~~~~~~~~~~ KTables always store timestamps by default. A timestamped state store improves stream processing semantics and enables management of out-of-order data in source KTables, detects out-of-order joins and aggregations, and gets the timestamp of the latest update in an Interactive Query. You can query timestamped state stores with or without a timestamp. Upgrade state stores ~~~~~~~~~~~~~~~~~~~~ You can upgrade with a single rolling bounce per instance. - For Processor API users, nothing changes in existing applications, and you have the option of using the timestamped stores. - For DSL operators, store data is upgraded lazily in the background. - No upgrade happens if you provide a custom ``XxxBytesStoreSupplier``, but you can opt-in by implementing the ``TimestampedBytesStore`` interface. In this case, the old format is retained, and |kstreams| uses a proxy store that removes/adds timestamps on read/write. .. _streams_developer-guide_versioned-state-stores: Versioned key-value state stores ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Versioned key-value state stores are available since |cp| 7.5 (|kstreams| 3.5). Rather than storing a single record version (value and timestamp) per key, versioned state stores may store multiple record versions per key. This enables versioned state stores to support timestamped retrieval operations to return the latest record (per key) at a specified timestamp. You can create a persistent, versioned state store by passing a VersionedBytesStoreSupplier to the versionedKeyValueStoreBuilder, or by implementing your own VersionedKeyValueStore. :platform:`VersionedBytesStoreSupplier|streams/javadocs/javadoc/org/apache/kafka/streams/state/VersionedBytesStoreSupplier.html` to the :platform:`StoreBuilder>|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#timestampedKeyValueStoreBuilder-org.apache.kafka.streams.state.KeyValueBytesStoreSupplier-org.apache.kafka.common.serialization.Serde-org.apache.kafka.common.serialization.Serde-` Each versioned store has an associated, fixed-duration *history retention* parameter that specifies how long old record versions should be kept for. In particular, a versioned store guarantees returning accurate results for timestamped retrieval operations where the timestamp being queried is within the history retention period of the current observed stream time. History retention also doubles as its *grace period*, which determines how far back in time out-of-order writes to the store are accepted. A versioned store doesn't accept writes (inserts, updates, or deletions) if the timestamp associated with the write is older than the current observed stream time by more than the grace period. Stream time in this context is tracked per-partition, rather than per-key, which means it's important that grace period (history retention) be set high enough to accommodate a record with one key arriving out-of-order relative to a record for another key. Because the memory footprint of versioned key-value stores is higher than that of non-versioned key-value stores, you may need to adjust your :ref:`RocksDB memory settings ` accordingly. Benchmarking your application with versioned stores is also advised, because performance is expected to be lower than when using non-versioned stores. Versioned stores don't support caching or interactive queries. Also, you can't version window stores or global tables. Upgrade to versioned state stores --------------------------------- Versioned state stores are opt-in only, so no automatic upgrades from non-versioned to versioned stores will take place. Upgrades are supported from persistent, non-versioned key-value stores to persistent, versioned key-value stores as long as the original store has the same changelog topic format as the versioned store being upgraded to. Both :platform:`persistent key-value stores|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String).html` and :platform:`timestamped key-value stores|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore(java.lang.String).html` share the same changelog topic format as :platform:`persistent versioned key-value stores|streams/javadocs/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration).html`, so both are eligible for upgrades. Follow this procedure to upgrade an application that has persistent, non-versioned key-value stores to use persistent, versioned key-value stores. #. Stop all application instances, and :ref:`clear any local state directories ` for the store(s) being upgraded. #. Update your application code to use versioned stores where desired. #. Update your changelog topic configs for the relevant state stores, to set the value of ``min.compaction.lag.ms`` to be at least your desired history retention. History retention plus one day is recommended as a buffer for the use of broker wall-clock time during compaction. #. Restart your application instances and allow time for the versioned stores to rebuild state from changelog. .. _streams_developer-guide_state-store_fault-tolerance: Fault-tolerant state stores ~~~~~~~~~~~~~~~~~~~~~~~~~~~ To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be continuously backed up to a |ak| topic behind the scenes. For example, to migrate a stateful stream task from one machine to another when :ref:`elastically adding or removing capacity from your application `. This topic is sometimes referred to as the state store's associated *changelog topic*, or its *changelog*. For example, if you experience machine failure, the state store and the application's state can be fully restored from its changelog. You can :ref:`enable or disable this backup feature ` for a state store. Fault-tolerant state stores are backed by a :kafka-common:`compacted|design/log_compaction.html` changelog topic. The purpose of compacting this topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated |ak| cluster, and to minimize recovery time if a state store needs to be restored from its changelog topic. Fault-tolerant windowed state stores are backed by a topic that uses both compaction and deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are composite keys that include the "normal" key and window timestamps. For these types of composite keys it would not be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion enabled, old windows that have expired will be cleaned up by |ak|'s log cleaner as the log segments expire. The default retention setting is ``Materialized#withRetention()`` + 1 day. You can override this setting by specifying ``StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG`` in the ``StreamsConfig``. When you open an ``Iterator`` from a state store you must call ``close()`` on the iterator when you are done working with it to reclaim resources; or you can use the iterator from within a try-with-resources statement. If you do not close an iterator, you may encounter an OOM error. .. _streams_developer-guide_state-store_enable-disable-fault-tolerance: Enable or disable fault tolerance of state stores (store changelogs) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can enable or disable fault tolerance for a state store by enabling or disabling the change logging of the store through ``withLoggingEnabled()`` and ``withLoggingDisabled()``. You can also fine-tune the associated topic’s configuration if needed. Example for disabling fault-tolerance: .. sourcecode:: java import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; StoreBuilder> countStoreSupplier = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("Counts"), Serdes.String(), Serdes.Long()) .withLoggingDisabled(); // disable backing up the store to a changelog topic .. important:: If the changelog is disabled, the attached state store is no longer fault tolerant, and it can't have any :ref:`standby replicas `. Here is an example for enabling fault tolerance, with additional changelog-topic configuration: You can add any log config from :kafka-file:`kafka.log.LogConfig|core/src/main/scala/kafka/log/LogConfig.scala#L61`. Unrecognized configs will be ignored. .. sourcecode:: java import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; Map changelogConfig = new HashMap<>(); // override min.insync.replicas changelogConfig.put("min.insync.replicas", "1"); StoreBuilder> countStoreSupplier = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("Counts"), Serdes.String(), Serdes.Long()) .withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings .. _streams_developer-guide_state-store_custom: Implement custom state stores ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can use the :ref:`built-in state store types ` or implement your own. The primary interface to implement for the store is ``org.apache.kafka.streams.processor.StateStore``. |kstreams| also has a few extended interfaces such as ``KeyValueStore``. You also need to provide a "factory" for the store by implementing the ``org.apache.kafka.streams.state.StoreBuilder`` interface, which |kstreams| uses to create instances of your store. There is an example state store implementation in Scala, which can serve as a starting point for your own stores: * :cp-examples:`CMSStore|src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala` (Scala) -- an in-memory, fault-tolerant store that leverages a Count-Min Sketch data structure for probabilistic counting of items in an input stream. The state store supplier is implemented in :cp-examples:`CMSStoreSupplier|src/main/scala/io/confluent/examples/streams/algebird/CMSStoreBuilder.scala`. The backup and restore functionality of the state store and its fault tolerance can be enabled and disabled through configuration. The changelogging of the store is performed through :cp-examples:`CMSStoreChangeLogger|src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala`. Connect processors and state stores """"""""""""""""""""""""""""""""""" Now that a :ref:`processor ` (WordCountProcessor) and the state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by using the ``Topology`` instance. In addition, you can add source processors with the specified |ak| topics to generate input data streams into the topology, and sink processors with the specified |ak| topics to generate output data streams out of the topology. Here is an example implementation: .. sourcecode:: java Topology builder = new Topology(); // add the source processor node that takes Kafka topic "source-topic" as input builder.addSource("Source", "source-topic") // add the WordCountProcessor node which takes the source processor as its upstream processor .addProcessor("Process", () -> new WordCountProcessor(), "Source") // add the count store associated with the WordCountProcessor processor .addStateStore(countStoreBuilder, "Process") // add the sink processor node that takes Kafka topic "sink-topic" as output // and the WordCountProcessor node as its upstream processor .addSink("Sink", "sink-topic", "Process"); Here is a quick explanation of this example: - A source processor node named ``"Source"`` is added to the topology using the ``addSource`` method, with one |ak| topic ``"source-topic"`` fed to it. You can specify optional key and value deserializers to read the source, like :ref:`Confluent GenericAvroSerde and SpecificAvroSerde`. - A processor node named ``"Process"`` with the pre-defined ``WordCountProcessor`` logic is then added as the downstream processor of the ``"Source"`` node using the ``addProcessor`` method. - A predefined persistent key-value state store is created and associated with the ``"Process"`` node, using ``countStoreSupplier``. - A sink processor node is then added to complete the topology using the ``addSink`` method, taking the ``"Process"`` node as its upstream processor and writing to a separate ``"sink-topic"`` |ak| topic. Note that users can also use another overloaded variant of ``addSink`` to dynamically determine the |ak| topic to write to for each received record from the upstream processor. In some cases, it may be more convenient to add and connect a state store when you add the processor to the topology. This can be done by implementing ``ConnectedStoreProvider#stores()`` on the ``ProcessorSupplier`` instead of calling ``Topology#addStateStore()``, like this: .. sourcecode:: java Topology builder = new Topology(); // add the source processor node that takes Kafka "source-topic" as input builder.addSource("Source", "source-topic") // add the WordCountProcessor node which takes the source processor as its upstream processor. // the ProcessorSupplier provides the count store associated with the WordCountProcessor .addProcessor("Process", new ProcessorSupplier() { public Processor get() { return new WordCountProcessor(); } public Set> stores() { final StoreBuilder> countsStoreBuilder = Stores .keyValueStoreBuilder( Stores.persistentKeyValueStore("Counts"), Serdes.String(), Serdes.Long() ); return Collections.singleton(countsStoreBuilder); } }, "Source") // add the sink processor node that takes Kafka topic "sink-topic" as output // and the WordCountProcessor node as its upstream processor .addSink("Sink", "sink-topic", "Process"); This enables a processor to "own" state stores, effectively encapsulating their usage from the user wiring the topology. Multiple processors that share a state store may provide the same store with this technique, as long as the ``StoreBuilder`` is the same instance. In these topologies, the ``"Process"`` stream processor node is considered a downstream processor of the ``"Source"`` node, and an upstream processor of the ``"Sink"`` node. As a result, whenever the ``"Source"`` node forwards a newly fetched record from |ak| to its downstream ``"Process"`` node, the ``WordCountProcessor#process()`` method is triggered to process the record and update the associated state store. Whenever ``context#forward()`` is called in the ``WordCountProcessor#punctuate()`` method, the aggregate key-value pair will be sent via the ``"Sink"`` processor node to the |ak| topic ``"sink-topic"``. Note that in the ``WordCountProcessor`` implementation, you must refer to the same store name ``"Counts"`` when accessing the key-value store, otherwise an exception will be thrown at runtime, indicating that the state store cannot be found. If the state store is not associated with the processor in the ``Topology`` code, accessing it in the processor's ``init()`` method will also throw an exception at runtime, indicating the state store is not accessible from this processor. The ``Topology#addProcessor`` function takes a ``ProcessorSupplier`` argument, and the supplier pattern requires that a new ``Processor`` instance is returned each time ``ProcessorSupplier#get()`` is called. Creating a single ``Processor`` object and returning the same object reference in ``ProcessorSupplier#get()`` is a violation of the supplier pattern and leads to runtime exceptions, so don't provide a singleton ``Processor`` instance to ``Topology``. The ``ProcessorSupplier`` should always generate a new instance each time ``ProcessorSupplier#get()`` is called. Now that you have fully defined your processor topology in your application, you can proceed to :ref:`running the Kafka Streams application `. .. _streams_developer-guide_describing_a_topology: Describe a topology """"""""""""""""""" After a ``Topology`` is specified, it is possible to retrieve a description of the corresponding DAG via ``#describe()`` that returns a ``TopologyDescription``. A ``TopologyDescription`` contains all added source, processor, and sink nodes as well as all attached stores. You can access the specified input and output topic names and patterns for source and sink nodes. For processor nodes, the attached stores are added to the description. Additionally, all nodes have a list to all their connected successor and predecessor nodes. Thus, ``TopologyDescription`` allows to retrieve the ``DAG`` structure of the specified topology. Note that global stores are listed explicitly because they are accessible by all nodes without the need to explicitly connect them. Furthermore, nodes are grouped by ``SubTopology``, where each ``SubTopology`` is a group of processor nodes that are directly connected to each other (i.e., either by a direct connection--but not a topic--or by sharing a store). During execution, each ``SubTopology`` will be processed by one or multiple tasks. Thus, each ``SubTopology`` describes an independent unit of works that can be executed by different threads in parallel. Describing a ``Topology`` before starting your streams application with the specified topology is helpful to reason about tasks and thus maximum parallelism (we will talk about how to execute your written application later in this section). It is also helpful to get insight into a ``Topology`` if it is not specified directly as described above but via |kstreams| DSL. .. include:: ../../.hidden/docs-common/home/includes/ak-share.rst