.. _streams_developer-guide_processor-api: Processor API ^^^^^^^^^^^^^ The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic. .. contents:: Table of Contents :local: Overview """""""" The Processor API can be used to implement both **stateless** as well as **stateful** operations, where the latter is achieved through the use of :ref:`state stores `. .. tip:: **Combining the DSL and the Processor API:** You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the section :ref:`streams_developer-guide_dsl_process`. For a complete list of available API functionality, see the :ref:`Kafka Streams API docs `. .. _streams_developer-guide_stream-processor: Defining a Stream Processor """"""""""""""""""""""""""" A :ref:`stream processor ` is a node in the processor topology that represents a single processing step. With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect these processors with their associated state stores to compose the processor topology. You can define a customized stream processor by implementing the ``Processor`` interface, which provides two main API methods: ``process()`` and ``punctuate()``. * ``process()`` is called on each of the received record. * ``punctuate()`` is called periodically based on elapsed :ref:`stream-time ` (by default, *stream-time* is configured to represent *event-time*). Thus, ``punctuate()`` is purely data-driven and not related to wall-clock time (even if you use ``WallclockTimestampExtractor``). For example, let's assume you registered a ``punctuate()`` schedule of 10 seconds. If you were to process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), then ``punctuate()`` would be called 6 times -- regardless of the time required to actually process those records; i.e., ``punctuate()`` would be called 6 times no matter whether processing these 60 records would take a second, a minute, or an hour. .. attention:: Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. If at least one partition does not have any new data available, stream-time will not be advanced and thus ``punctuate()`` will not be triggered. This behavior is independent of the configured timestamp extractor, i.e., using ``WallclockTimestampExtractor`` does not enable wall-clock triggering of ``punctuate()``. The ``Processor`` interface also has an ``init()`` method, which is called by the Kafka Streams library during task construction phase. Processor instances should perform any required initialization in this method. The ``init()`` method passes in a ``ProcessorContext`` instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, and its corresponding message offset. You can also use this context instance to schedule the punctuation period (via ``ProcessorContext#schedule()``) for ``punctuate()``, to forward a new record as a key-value pair to the downstream processors (via ``ProcessorContext#forward()``), and to commit the current processing progress (via ``ProcessorContext#commit()``). The following example ``Processor`` defines a simple word-count algorithm and the following actions are performed: - In the ``init()`` method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name "Counts". - In the ``process()`` method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section). - In the ``punctuate()`` method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state. .. attention:: The code shows a simplified example that only works for single partition input topics. A generic Processor API word-count would require two processors: the first is stateless, splits each line into words, and sets the words as record keys. The result of the first processor is written back to an additional topic that is consumed by the second (stateful) processor that does the actual counting. Writing the words back to a topic is required to group the same words together so they go to the same instance of the second processor. This is similar in function to the shuffle phase of a Map-Reduce computation. .. sourcecode:: java public class WordCountProcessor implements Processor { private ProcessorContext context; private KeyValueStore kvStore; @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { // keep the processor context locally because we need it in punctuate() and commit() this.context = context; // call this processor's punctuate() method every 1000 time units. this.context.schedule(1000); // retrieve the key-value store named "Counts" kvStore = (KeyValueStore) context.getStateStore("Counts"); } @Override public void process(String dummy, String line) { String[] words = line.toLowerCase().split(" "); for (String word : words) { Long oldValue = kvStore.get(word); if (oldValue == null) { kvStore.put(word, 1L); } else { kvStore.put(word, oldValue + 1L); } } } @Override public void punctuate(long timestamp) { KeyValueIterator iter = this.kvStore.all(); while (iter.hasNext()) { KeyValue entry = iter.next(); context.forward(entry.key, entry.value.toString()); } iter.close(); // commit the current processing progress context.commit(); } @Override public void close() { // nothing to do } } .. note:: **Stateful processing with state stores:** The ``WordCountProcessor`` defined above can access the currently received record in its ``process()`` method, and it can leverage :ref:`state stores ` to maintain processing states to, for example, remember recently arrived records for stateful processing needs like aggregations and joins. For more information, see the :ref:`state stores ` documentation. .. _streams_developer-guide_state-store: State Stores """""""""""" To implement a **stateful** ``Processor`` or ``Transformer``, you must provide one or more state stores to the processor or transformer (*stateless* processors or transformers do not need state stores). State stores can be used to remember recently received input records, to track rolling aggregates, to de-duplicate input records, and more. Another feature of state stores is that they can be :ref:`interactively queried ` from other applications, such as a NodeJS-based dashboard or a microservice implemented in Scala or Go. The :ref:`available state store types ` in Kafka Streams have :ref:`fault tolerance ` enabled by default. .. _streams_developer-guide_state-store_defining: Defining and creating a State Store ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can either use one of the available store types or :ref:`implement your own custom store type `. It's common practice to leverage an existing store type via the ``Stores`` factory. Note that, when using Kafka Streams, you normally don't create or instantiate state stores directly in your code. Rather, you define state stores indirectly by creating a so-called ``StateStoreSupplier``. This supplier is used by Kafka Streams as a factory to instantiate the actual state stores locally in application instances when and where needed. The following store types are available out of the box. .. rst-class:: non-scrolling-table width-100-percent +----------------------------+----------------+--------------------------+--------------------------------------------------------------------------+ | Store Type | Storage Engine | Fault-tolerant? | Description | +============================+================+==========================+==========================================================================+ | Persistent | RocksDB | Yes (enabled by default) | | | ``KeyValueStore`` | | | - **The recommended store type for most use cases.** | | | | | - Stores its data on local disk. | | | | | - Storage capacity: | | | | | managed local state can be larger than the memory (heap space) of an | | | | | application instance, but must fit into the available local disk | | | | | space. | | | | | - RocksDB settings can be fine-tuned, see | | | | | :ref:`RocksDB configuration `. | | | | | - Available :streams-apidocs-store-persistent:`store variants|`: | | | | | time window key-value store, session window key-value store. | | | | | | | | | | .. literalinclude:: api-papi-store-persistent.java | | | | | :language: java | | | | | | | | | | See | | | | | :streams-apidocs-store-persistent:`PersistentKeyValueFactory|` for | | | | | detailed factory options. | +----------------------------+----------------+--------------------------+--------------------------------------------------------------------------+ | In-memory | \- | Yes (enabled by default) | | | ``KeyValueStore`` | | | - Stores its data in memory. | | | | | - Storage capacity: | | | | | managed local state must fit into memory (heap space) of an | | | | | application instance. | | | | | - Useful when application instances run in an environment where local | | | | | disk space is either not available or local disk space is wiped | | | | | in-between app instance restarts. | | | | | | | | | | .. literalinclude:: api-papi-store-inmemory.java | | | | | :language: java | | | | | | | | | | See | | | | | :streams-apidocs-store-inmem:`InMemoryKeyValueFactory|` for | | | | | detailed factory options. | +----------------------------+----------------+--------------------------+--------------------------------------------------------------------------+ .. _streams_developer-guide_state-store_fault-tolerance: Fault-tolerant State Stores ~~~~~~~~~~~~~~~~~~~~~~~~~~~ To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be continuously backed up to a Kafka topic behind the scenes. For example, to migrate a stateful stream task from one machine to another when :ref:`elastically adding or removing capacity from your application `. This topic is sometimes referred to as the state store's associated *changelog topic*, or its *changelog*. For example, if you experience machine failure, the state store and the application's state can be fully restored from its changelog. You can :ref:`enable or disable this backup feature ` for a state store. By default, persistent key-value stores are fault-tolerant. They are backed by a `compacted `__ changelog topic. The purpose of compacting this topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, and to minimize recovery time if a state store needs to be restored from its changelog topic. Similarly, persistent window stores are fault-tolerant. They are backed by a topic that uses both compaction and deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are composite keys that include the "normal" key and window timestamps. For these types of composite keys it would not be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion enabled, old windows that have expired will be cleaned up by Kafka's log cleaner as the log segments expire. The default retention setting is ``Windows#maintainMs()`` + 1 day. You can override this setting by specifying ``StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG`` in the ``StreamsConfig``. .. _streams_developer-guide_state-store_enable-disable-fault-tolerance: Enable or Disable Fault Tolerance of State Stores (Store Changelogs) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can enable or disable fault tolerance for a state store by enabling or disabling the change logging of the store through ``enableLogging()`` and ``disableLogging()``. You can also fine-tune the associated topic’s configuration if needed. Example for disabling fault-tolerance: .. sourcecode:: java import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.Stores; StateStoreSupplier countStoreSupplier = Stores.create("Counts") .withKeys(Serdes.String()) .withValues(Serdes.Long()) .persistent() .disableLogging() // disable backing up the store to a changelog topic .build(); .. attention:: If the changelog is disabled then the attached state store is no longer fault tolerant and it can't have any :ref:`standby replicas `. Here is an example for enabling fault tolerance, with additional changelog-topic configuration: You can add any log config from :kafka-file:`kafka.log.LogConfig|core/src/main/scala/kafka/log/LogConfig.scala#L61`. Unrecognized configs will be ignored. .. sourcecode:: java import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.Stores; Map changelogConfig = new HashMap(); // override min.insync.replicas changelogConfig.put("min.insync.replicas", "1"); StateStoreSupplier countStoreSupplier = Stores.create("Counts") .withKeys(Serdes.String()) .withValues(Serdes.Long()) .persistent() .enableLogging(changelogConfig) // enable changelogging, with custom changelog settings .build(); .. _streams_developer-guide_state-store_custom: Implementing Custom State Stores ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can use the :ref:`built-in state store types ` or implement your own. The primary interface to implement for the store is ``org.apache.kafka.streams.processor.StateStore``. Kafka Streams also has a few extended interfaces such as ``KeyValueStore``. You also need to provide a "factory" for the store by implementing the ``org.apache.kafka.streams.processor.StateStoreSupplier`` interface, which Kafka Streams uses to create instances of your store. There is an example state store implementation in Scala, which can serve as a starting point for your own stores: * :cp-examples:`CMSStore|src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala` (Scala) -- an in-memory, fault-tolerant store that leverages a Count-Min Sketch data structure for probabilistic counting of items in an input stream. The state store supplier is implemented in :cp-examples:`CMSStoreSupplier|src/main/scala/io/confluent/examples/streams/algebird/CMSStoreSupplier.scala`. The backup and restore functionality of the state store and its fault tolerance can be enabled and disabled through configuration. The changelogging of the store is performed through :cp-examples:`CMSStoreChangeLogger|src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala`. Connecting Processors and State Stores """""""""""""""""""""""""""""""""""""" Now that a :ref:`processor ` (WordCountProcessor) and the state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by using the ``TopologyBuilder`` instance. In addition, you can add source processors with the specified Kafka topics to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate output data streams out of the topology. Here is an example implementation: .. sourcecode:: java TopologyBuilder builder = new TopologyBuilder(); // add the source processor node that takes Kafka topic "source-topic" as input builder.addSource("Source", "source-topic") // add the WordCountProcessor node which takes the source processor as its upstream processor .addProcessor("Process", () -> new WordCountProcessor(), "Source") // add the count store associated with the WordCountProcessor processor .addStateStore(countStoreSupplier, "Process") // add the sink processor node that takes Kafka topic "sink-topic" as output // and the WordCountProcessor node as its upstream processor .addSink("Sink", "sink-topic", "Process"); Here is a quick explanation of this example: - A source processor node named ``"Source"`` is added to the topology using the ``addSource`` method, with one Kafka topic ``"source-topic"`` fed to it. You can specify optional key and value deserializers to read the source, like :ref:`Confluent GenericAvroSerde and SpecificAvroSerde`. - A processor node named ``"Process"`` with the pre-defined ``WordCountProcessor`` logic is then added as the downstream processor of the ``"Source"`` node using the ``addProcessor`` method. - A predefined persistent key-value state store is created and associated with the ``"Process"`` node, using ``countStoreSupplier``. - A sink processor node is then added to complete the topology using the ``addSink`` method, taking the ``"Process"`` node as its upstream processor and writing to a separate ``"sink-topic"`` Kafka topic. In this topology, the ``"Process"`` stream processor node is considered a downstream processor of the ``"Source"`` node, and an upstream processor of the ``"Sink"`` node. As a result, whenever the ``"Source"`` node forwards a newly fetched record from Kafka to its downstream ``"Process"`` node, the ``WordCountProcessor#process()`` method is triggered to process the record and update the associated state store. Whenever ``context#forward()`` is called in the ``WordCountProcessor#punctuate()`` method, the aggregate key-value pair will be sent via the ``"Sink"`` processor node to the Kafka topic ``"sink-topic"``. Note that in the ``WordCountProcessor`` implementation, you must refer to the same store name ``"Counts"`` when accessing the key-value store, otherwise an exception will be thrown at runtime, indicating that the state store cannot be found. If the state store is not associated with the processor in the ``TopologyBuilder`` code, accessing it in the processor's ``init()`` method will also throw an exception at runtime, indicating the state store is not accessible from this processor. Now that you have fully defined your processor topology in your application, you can proceed to :ref:`running the Kafka Streams application `.