.. title:: Write a Kafka Streams Application for Confluent Platform .. meta:: :description: Learn about APIs for creating a Kafka Streams Application for Confluent Platform. .. _streams_write_app: Write a |kstreams| Application for |cp| --------------------------------------- Any Java application that makes use of the |kstreams| library is considered a |kstreams| application. The computational logic of a |kstreams| application is defined as a :ref:`processor topology `, which is a graph of stream processors (nodes) and streams (edges). You can define the processor topology with the |kstreams| APIs: :ref:`Kafka Streams DSL ` A high-level API that provides the most common data transformation operations such as ``map``, ``filter``, ``join``, and ``aggregations`` out of the box. The DSL is the recommended starting point for developers new to |kstreams|, and should cover many use cases and stream processing needs. :ref:`Processor API ` A low-level API that lets you add and connect processors as well as interact directly with state stores. The Processor API provides you with even more flexibility than the DSL but at the expense of requiring more manual work on the side of the application developer (e.g., more lines of code). .. _streams_developer-guide_maven: Libraries and Maven artifacts ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ This section lists the |kstreams| related libraries that are available for writing your |kstreams| applications. The corresponding Maven artifacts of these libraries are available in Confluent's Maven repository: .. sourcecode:: xml confluent https://packages.confluent.io/maven/ You can define dependencies on the following libraries for your |kstreams| applications. .. rst-class:: non-scrolling-table +----------------------+---------------------------------+------------------+------------------------------------------------------------------------------------+ | Group ID | Artifact ID | Version | Description | +======================+=================================+==================+====================================================================================+ | ``org.apache.kafka`` | ``kafka-streams`` | |kafka_release| | (Required) Base library for |kstreams|. | +----------------------+---------------------------------+------------------+------------------------------------------------------------------------------------+ | ``org.apache.kafka`` | ``kafka-streams-scala_2.11`` or | |kafka_release| | Scala API for |kstreams|. Optional. | | | ``kafka-streams-scala_2.12`` | | | +----------------------+---------------------------------+------------------+------------------------------------------------------------------------------------+ | ``org.apache.kafka`` | ``kafka-clients`` | |kafka_release| | (Required) |ak-tm| client library. Contains built-in serializers/deserializers. | +----------------------+---------------------------------+------------------+------------------------------------------------------------------------------------+ | ``org.apache.avro`` | ``avro`` | 1.8.2 | Apache Avro library. Optional (only needed when using Avro). | +----------------------+---------------------------------+------------------+------------------------------------------------------------------------------------+ | ``io.confluent`` | ``kafka-streams-avro-serde`` | |release| | Confluent's Avro Serializer/Deserializer. Optional (only needed when using Avro). | +----------------------+---------------------------------+------------------+------------------------------------------------------------------------------------+ .. tip:: See the section :ref:`streams_developer-guide_serdes` for more information about Serializers/Deserializers. Example ``pom.xml`` snippet when using Maven: .. codewithvars:: xml confluent https://packages.confluent.io/maven/ org.apache.kafka kafka-streams |kafka_release| org.apache.kafka kafka-clients |kafka_release| org.apache.kafka kafka-streams-scala_2.11 |kafka_release| io.confluent kafka-avro-serializer |release| org.apache.avro avro 1.8.2 org.apache.avro avro-maven-plugin 1.8.2 See the :cp-examples:`Kafka Streams examples|` in the Confluent examples repository for a full Maven Project Object Model (POM) setup. Using |kstreams| within your application code ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can call |kstreams| from anywhere in your application code, but usually these calls are made within the ``main()`` method of your application, or some variant thereof. The basic elements of defining a processing topology within your application are described below. First, you must create an instance of ``KafkaStreams``. * The first argument of the ``KafkaStreams`` constructor takes a topology (either ``StreamsBuilder#build()`` for the :ref:`DSL ` or ``Topology`` for the :ref:`Processor API `) that is used to define a topology. * The second argument is an instance of ``StreamsConfig``, which defines the configuration for this specific topology. Code example: .. sourcecode:: java import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.kstream.StreamsBuilder; import org.apache.kafka.streams.processor.Topology; // Use the builders to define the actual processing topology, e.g. to specify // from which input topics to read, which stream operations (filter, map, etc.) // should be called, and so on. We will cover this in detail in the subsequent // sections of this Developer Guide. StreamsBuilder builder = ...; // when using the DSL Topology topology = builder.build(); // // OR // Topology topology = ...; // when using the Processor API // Use the configuration properties to tell your application where the Kafka cluster is, // which Serializers/Deserializers to use by default, to specify security settings, // and so on. Properties props = ...; KafkaStreams streams = new KafkaStreams(topology, props); At this point, internal structures are initialized, but the processing is not started yet. You have to explicitly start the |kstreams| thread by calling the ``KafkaStreams#start()`` method: .. sourcecode:: java // Start the Kafka Streams threads streams.start(); If there are other instances of this stream processing application running elsewhere (e.g., on another machine), |ak| Streams transparently re-assigns tasks from the existing instances to the new instance that you just started. For more information, see :ref:`streams_architecture_tasks` and :ref:`streams_architecture_threads`. To catch any unexpected exceptions, you can set an ``java.lang.Thread.UncaughtExceptionHandler`` before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception: .. sourcecode:: java // Java 8+, using lambda expressions streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> { // here you should examine the throwable/exception and perform an appropriate action! }); // Java 7 streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread thread, Throwable throwable) { // here you should examine the throwable/exception and perform an appropriate action! } }); To stop the application instance, call the ``KafkaStreams#close()`` method: .. sourcecode:: java // Stop the Kafka Streams threads streams.close(); To allow your application to gracefully shutdown in response to SIGTERM, it is recommended that you add a shutdown hook and call ``KafkaStreams#close``. - Here is a shutdown hook example in Java 8+: .. sourcecode:: java // Add shutdown hook to stop the Kafka Streams threads. // You can optionally provide a timeout to `close`. Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - Here is a shutdown hook example in Java 7: .. sourcecode:: java // Add shutdown hook to stop the Kafka Streams threads. // You can optionally provide a timeout to `close`. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { streams.close(); } })); After an application is stopped, |kstreams| will migrate any tasks that had been running in this instance to available remaining instances. .. include:: ../../.hidden/docs-common/home/includes/ak-share.rst