.. _streams_quickstart: |kstreams| Quick Start ====================== This quick start provides you with a first hands-on look at the |kstreams| API. It will demonstrate how to run your first Java application that uses the |kstreams| library by showcasing a simple end-to-end data pipeline powered by |ak-tm|. This quick start only provides a high-level overview of the Streams API. More details are provided in the rest of the :ref:`Kafka Streams documentation `. Purpose ------- This quick start shows how to run the :kafka-file:`WordCount demo application|streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java` that is included in |ak|. Here's the gist of the code, converted to use Java 8 lambda expressions so that it is easier to read (taken from the variant :cp-examples:`WordCountLambdaExample|src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java`): .. sourcecode:: java // Serializers/deserializers (serde) for String and Long types final Serde stringSerde = Serdes.String(); final Serde longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde)); KTable wordCounts = textLines // Split each text line, by whitespace, into words. The text lines are the message // values, i.e. we can ignore whatever data is in the message keys and thus invoke // `flatMapValues` instead of the more generic `flatMap`. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // We use `groupBy` to ensure the words are available as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count(); // Convert the `KTable` into a `KStream` and write to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(stringSerde, longSerde)); This quick start follows these steps: #. Start a |ak| cluster on a single machine. #. Write example input data to a |ak| topic, using the so-called *console producer* included in |ak|. #. Process the input data with a Java application that uses the |kstreams| library. Here, we will leverage a demo application included in |ak| called :kafka-file:`WordCount|streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java`. #. Inspect the output data of the application, using the so-called *console consumer* included in |ak|. #. Stop the |ak| cluster. Start the |ak| cluster ----------------------- In this section we install and start a |ak| cluster on your local machine. This cluster consists of a single-node |ak| cluster (= only one broker) alongside a single-node |zk| ensemble. Later on, we will run the WordCount demo application locally against that cluster. Note that, in production, you'd typically run your |kstreams| applications on client machines at the perimeter of the |ak| cluster -- they do not run "inside" the |ak| cluster or its brokers. First, you must install **Oracle Java JRE or JDK 1.8** on your local machine. Second, you must install Confluent Platform |release| using :ref:`ZIP and TAR archives `. Once installed, change into the installation directory: .. codewithvars:: bash # *** IMPORTANT STEP **** # The subsequent paths and commands used throughout this quick start assume that # your are in the following working directory: cd confluent-|release|/ # Note: If you want to uninstall the Confluent Platform at the end of this quick start, # run the following commands. # # rm -rf confluent-|release|/ # rm -rf /tmp/kafka # Data files of Kafka broker (server) # rm -rf /tmp/kafka-streams # Data files of applications using Kafka's Streams API # rm -rf /tmp/zookeeper # Data files of ZooKeeper .. include:: ../includes/installation-types-zip-tar.rst We begin by starting the |zk| instance, which will listen on ``localhost:2181``. Since this is a long-running service, you should run it in its own terminal. .. codewithvars:: bash # Start ZooKeeper. Run this command in its own terminal. ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties Next we launch the |ak| broker, which will listen on ``localhost:9092`` and connect to the |zk| instance we just started. Since this is a long-running service, too, you should run it in its own terminal. .. codewithvars:: bash # Start Kafka. Run this command in its own terminal ./bin/kafka-server-start ./etc/kafka/server.properties Now that our single-node |ak| cluster is fully up and running, we can proceed to preparing the input data for our first |kstreams| experiments. Prepare the topics and the input data ------------------------------------- .. tip:: In this section we will use built-in CLI tools to manually write some example data to |ak|. In practice, you would rather rely on other means to feed your data into |ak|, for instance via :ref:`Kafka Connect ` if you want to move data from other data systems into |ak|, or via :ref:`Kafka Clients ` from within your own applications. We will now send some input data to a |ak| topic, which will be subsequently processed by a |kstreams| application. First, we need to create the input topic, named ``streams-plaintext-input``, and the output topic, named ``streams-wordcount-output``: .. codewithvars:: bash # Create the input topic ./bin/kafka-topics --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input # Create the output topic ./bin/kafka-topics --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output Next, we generate some input data and store it in a local file at ``/tmp/file-input.txt``: .. codewithvars:: bash echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt The resulting file will have the following contents: .. sourcecode:: text all streams lead to kafka hello kafka streams join kafka summit Lastly, we send this input data to the input topic: .. codewithvars:: bash cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input The |ak| console producer reads the data from ``STDIN`` line-by-line, and publishes each line as a separate |ak| message to the topic ``streams-plaintext-input``, where the message key is ``null`` and the message value is the respective line such as ``all streams lead to kafka``, encoded as a string. .. note:: **This Quick start vs. Stream Data Reality(tm):** You might wonder how this step-by-step quick start compares to a "real" stream data platform, where data is always on the move, at large scale and in realtime. Keep in mind that the purpose of this quick start is to demonstrate, in simple terms, the various facets of an end-to-end data pipeline powered by |ak| and |kstreams|. For didactic reasons we intentionally split the quick start into clearly separated, sequential steps. In practice though, these steps will typically look a bit different and noticeably happen in parallel. For example, input data might not be sourced originally from a local file but sent directly from distributed devices, and the data would be flowing continuously into |ak|. Similarly, the stream processing application (see next section) might already be up and running before the first input data is being sent, and so on. Process the input data with |kstreams| ----------------------------------------- Now that we have generated some input data, we can run our first |kstreams| based Java application. We will run the :kafka-file:`WordCount demo application|streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java`, which is included in |ak|. It implements the WordCount algorithm, which computes a word occurrence histogram from an input text. However, unlike other WordCount examples you might have seen before that operate on *finite, bounded data*, the WordCount demo application behaves slightly differently because it is designed to operate on an **infinite, unbounded stream** of input data. Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed "all" the input data. This is a typical difference between the class of algorithms that operate on unbounded streams of data and, say, batch processing algorithms such as Hadoop MapReduce. It will be easier to understand this difference once we inspect the actual output data later on. |ak|'s WordCount demo application is bundled with Confluent Platform, which means we can run it without further ado, i.e. we do not need to compile any Java sources and so on. .. codewithvars:: bash # Run the WordCount demo application. # The application writes its results to a Kafka output topic -- there won't be any STDOUT output in your console. # You can safely ignore any WARN log messages. ./bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo .. note:: **No deployment magic here:** The WordCount demo is a normal Java application that can be started and deployed just like any other Java application. The script :kafka-file:`kafka-run-class|bin/kafka-run-class.sh` is nothing but a simple wrapper for ``java -cp ...``. The WordCount demo application will read from the input topic ``streams-plaintext-input``, perform the computations of the WordCount algorithm on the input data, and continuously write its current results to the output topic ``streams-wordcount-output`` (the names of its input and output topics are hardcoded). To terminate the demo enter ``control-c`` from the keyboard. Inspect the output data ----------------------- .. tip:: In this section we will use built-in CLI tools to manually read data from |ak|. In practice, you would rather rely on other means to retrieve data from |ak|, for instance via :ref:`Kafka Connect ` if you want to move data from |ak| to other data systems, or via :ref:`Kafka Clients ` from within your own applications. We can now inspect the output of the WordCount demo application by reading from its output topic ``streams-wordcount-output``: .. codewithvars:: bash ./bin/kafka-console-consumer --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer with the following output data being printed to the console: .. sourcecode:: text all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2 join 1 kafka 3 summit 1 Here, the first column is the |ak| message key in ``java.lang.String`` format, and the second column is the message value in ``java.lang.Long`` format. You can stop the console consumer using ``Ctrl+C``. As we discussed above, a streaming word count algorithm continuously computes the latest word counts from the input data, and, in this specific demo application, continuously writes the latest counts of words as its output. We will talk more about how a stream processing application works in the subsequent chapters of this documentation, where we notably explain :ref:`the duality between streams and tables `: in fact, the output we have seen above is actually the changelog stream of a :ref:`KTable `, with the KTable being the result of the :ref:`aggregation ` operation performed by the WordCount demo application. .. TODO for later (this entry is hidden in the generated HTML document): Decide whether we also want to add another section to document how to remove any input or output data used in this quick start so that users can easily start from a blank slate. For example, explain how to purge the local state store (Rocks DB), or how to delete the output topic `streams-wordcount-output`. Stop the |ak| cluster ---------------------- Once you are done with the quick start you can shut down the |ak| cluster in the following order: 1. First, stop the **Kafka broker** by entering ``Ctrl+C`` in the terminal it is running in. Alternatively, you can ``kill`` the broker process. 2. Lastly, stop the **ZooKeeper instance** by entering ``Ctrl+C`` in its respective terminal. Alternatively, you can ``kill`` the |zk| process. Congratulations, you have now run your first |kstreams| applications against data stored in a single-node |ak| cluster, yay! Next steps ---------- As next steps we would recommend you to: * Read the :ref:`Kafka Streams Architecture ` to understand its key concepts and design principles. * Take a deep dive into the :ref:`Kafka Streams Developer Guide `, which includes many code examples to get you started, as well as the documentation of the :ref:`Kafka Streams DSL `. This will get you started on writing your own |kstreams| applications. * Run through the self-paced :ref:`Kafka Streams tutorial for developers` to apply the basic principles of streaming applications in an event-drive architecture. Beyond |kstreams|, you might be interested in learning more about: * :ref:`Kafka Connect ` for moving data between |ak| and other data systems such as Hadoop. * :ref:`Kafka Clients ` for reading and writing data from/to |ak| from within your own applications.