Kafka Streams のクイックスタート

このクイックスタートでは、Kafka Streams API を実際に使用するための最初のステップを案内します。Apache Kafka® を利用する単純なエンドツーエンドのデータパイプラインを例として、Kafka Streams ライブラリを使用する初めての Java アプリケーションの実行方法を示します。

このクイックスタートは、Streams API の大まかな概要を紹介するだけです。詳細については、他の Kafka Streams ドキュメント を参照してください。

目的

このクイックスタートでは、Kafka に含まれている WordCount デモアプリケーション の実行方法を示します。コードの要点は次のようになります。ここでは、読みやすいように Java 8 のラムダ式を使用する形に変換されています(バリエーションである WordCountLambdaExample から引用)。

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde));

KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.  The text lines are the message
    // values, i.e. we can ignore whatever data is in the message keys and thus invoke
    // `flatMapValues` instead of the more generic `flatMap`.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    // We use `groupBy` to ensure the words are available as message keys
    .groupBy((key, value) -> value)
    // Count the occurrences of each word (message key).
    .count();

// Convert the `KTable<String, Long>` into a `KStream<String, Long>` and write to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(stringSerde, longSerde));

このクイックスタートでは、次の手順に従います。

  1. 1 台のマシンで Kafka クラスターを起動します。
  2. Kafka に付属する コンソールプロデューサー を使用して、Kafka トピックにサンプルの入力データを書き込みます。
  3. Kafka Streams ライブラリを使用する Java アプリケーションで入力データを処理します。ここでは、Kafka に含まれている WordCount というデモアプリケーションを使用します。
  4. Kafka に付属する "コンソールコンシューマー" を使用して、アプリケーションの出力データを調べます。
  5. Kafka クラスターを停止します。

Kafka クラスターを起動します。

このセクションでは、Kafka クラスターをローカルマシンにインストールして起動します。このクラスターは、単一ノードの Kafka クラスター(唯一のブローカー)と単一ノードの ZooKeeper アンサンブルで構成されます。その後、このクラスターに対してローカルで WordCount デモアプリケーションを実行します。本稼働環境では、通常、Kafka クラスターの境界にあるクライアントマシンで Kafka Streams アプリケーションを実行します。これらのアプリケーションは、Kafka クラスターやそのブローカーの "内部" で実行されるものではありません。

まず、Oracle Java JRE または JDK 1.8 をローカルマシンにインストールする必要があります。

次に、ZIP および TAR アーカイブ を使用して Confluent Platform 6.2.4 をインストールする必要があります。インストールが完了したら、インストールディレクトリに移動します。

# *** IMPORTANT STEP ****
# The subsequent paths and commands used throughout this quick start assume that
# your are in the following working directory:
  cd confluent-6.2.4/

# Note: If you want to uninstall the Confluent Platform at the end of this quick start,
# run the following commands.
#
#     rm -rf confluent-6.2.4/
#     rm -rf /tmp/kafka          # Data files of Kafka broker (server)
#     rm -rf /tmp/kafka-streams  # Data files of applications using Kafka's Streams API
#     rm -rf /tmp/zookeeper      # Data files of ZooKeeper

ちなみに

これらの手順では、ZIP または TAR アーカイブを使用して Confluent Platform をインストールしているという想定に基づいています。詳細については「オンプレミスのデプロイ」を参照してください。

最初に ZooKeeper インスタンスを起動します。このインスタンスは localhost:2181 をリッスンします。これは長時間動作するサービスなので、独自のターミナルで実行する必要があります。

# Start ZooKeeper.  Run this command in its own terminal.
  ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

次に Kafka ブローカーを起動します。これは localhost:9092 をリッスンし、先ほど起動した ZooKeeper インスタンスに接続します。これも長時間動作するサービスなので、独自のターミナルで実行する必要があります。

# Start Kafka.  Run this command in its own terminal
  ./bin/kafka-server-start ./etc/kafka/server.properties

これで単一ノードの Kafka クラスターが起動しました。次は、初めて Kafka Streams を試すための入力データを準備します。

トピックと入力データの準備

ちなみに

このセクションでは、組み込みの CLI ツールを使用して、サンプルデータを手動で Kafka に書き込みます。実際には、Kafka にデータを供給するには他の方法を利用することをお勧めします。たとえば、Kafka Connect を通じて他のデータシステムから Kafka にデータを移行する方法や、独自のアプリケーション内から Kafka クライアント を使用する方法があります。

ここでは、Kafka トピックに入力データを送信します。このデータは、後で Kafka Streams アプリケーションによって処理されます。

最初に、streams-plaintext-input という名前の入力トピックと、streams-wordcount-output という名前の出力トピックを作成する必要があります。

# Create the input topic
  ./bin/kafka-topics --create \
          --bootstrap-server localhost:9092 \
          --replication-factor 1 \
          --partitions 1 \
          --topic streams-plaintext-input

# Create the output topic
  ./bin/kafka-topics --create \
          --bootstrap-server localhost:9092 \
          --replication-factor 1 \
          --partitions 1 \
          --topic streams-wordcount-output

次に、入力データを生成し、/tmp/file-input.txt というローカルファイルに保存します。

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt

作成されたファイルの内容は次のようになります。

all streams lead to kafka
hello kafka streams
join kafka summit

最後に、この入力データを入力トピックに送信します。

cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input

Kafka コンソールプロデューサーは、STDIN から 1 行ずつデータを読み取り、各行を個別の Kafka メッセージとして streams-plaintext-input トピックにパブリッシュします。このメッセージのキーは null で、メッセージの値は、all streams lead to kafka などの各行を文字列としてエンコードしたものになります。

注釈

クイックスタートとストリームデータの現実: このステップバイステップのクイックスタートは、"現実" のストリームデータプラットフォームと比べてどのように違うのでしょうか。現実では、データが大規模かつリアルタイムに常に流れています。このクイックスタートの目的は、簡単に言えば、Kafka と Kafka Streams に基づくエンドツーエンドのデータパイプラインをさまざまな面からデモンストレーションすることです。学習しやすいように、このクイックスタートは連続する別々のステップに意図的に分かれています。

ただし、現実のステップは少し異なるのが通例で、特に大きな違いは各ステップが並列して実行される点にあります。たとえば、入力データはローカルファイルから取り込まれるのではなく、分散デバイスから直接送信され、そのデータは継続的に Kafka に流し込まれるのが一般的です。同様に、ストリーム処理アプリケーション(次のセクションを参照)は、最初の入力データが送信される前から既に実行されている可能性があるなど、さまざまな場合があります。

Kafka Streams による入力データの処理

これで入力データが生成されたので、初めての Kafka Streams ベースの Java アプリケーションを実行する準備ができました。

ここでは、Kafka に含まれている WordCount デモアプリケーション を実行します。このアプリケーションには、入力テキストから単語の出現頻度のヒストグラムを計算する WordCount アルゴリズムが実装されています。ただし、通常、他の WordCount の例が処理するのは "境界のある有限データ" ですが、この WordCount デモアプリケーションは少し動作が異なり、入力データの 境界のない無限のストリーム を処理するように設計されています。境界がある場合と同様に、これは単語数を追跡して更新するステートフルなアルゴリズムです。ただし、入力データに境界がない可能性を想定する必要があるため、データを継続的に処理しながら、現在のステートと結果を定期的に出力します。これは、"すべて" の入力データが処理されたかどうかを知る方法がないからです。境界のないデータストリームを処理するアルゴリズムのクラスと、Hadoop MapReduce などのバッチ処理アルゴリズムとの典型的な違いはここにあります。後で実際の出力データを調べると、この違いがよくわかります。

Kafka の WordCount デモアプリケーションは Confluent Platform にバンドルされているため、すぐに実行できます。Java ソースのコンパイルなどの作業は必要ありません。

# Run the WordCount demo application.
# The application writes its results to a Kafka output topic -- there won't be any STDOUT output in your console.
# You can safely ignore any WARN log messages.
  ./bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo

注釈

特別なデプロイは不要: WordCount デモは通常の Java アプリケーションであり、他の Java アプリケーションと同じように起動すればデプロイできます。kafka-run-class スクリプトは、java -cp ... の単純なラッパーに過ぎません。

WordCount デモアプリケーションは、入力トピック streams-plaintext-input から読み取り、入力データに対して WordCount アルゴリズムの計算を実行して、現在の結果を継続的に出力トピック streams-wordcount-output に書き込みます(入力トピックと出力トピックの名前はハードコードされています)。デモを終了するには、キーボードから control-c を入力します。

出力データの検査

ちなみに

このセクションでは、組み込みの CLI ツールを使用して、Kafka から手動でデータを読み取ります。実際には、Kafka からデータを取得するには他の方法を利用することをお勧めします。たとば、Kafka Connect を通じて Kafka から他のデータシステムにデータを移行する方法や、独自のアプリケーション内から Kafka クライアント を使用する方法があります。

次のように出力トピック streams-wordcount-output を読み取ることで、WordCount デモアプリケーションの出力を検査できます。

./bin/kafka-console-consumer --bootstrap-server localhost:9092 \
        --topic streams-wordcount-output \
        --from-beginning \
        --formatter kafka.tools.DefaultMessageFormatter \
        --property print.key=true \
        --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
        --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

次の出力データがコンソールに書き出されます。

all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

最初の列は、java.lang.String 形式の Kafka メッセージキーです。2 番目の列は java.lang.Long 形式のメッセージの値です。コンソールコンシューマーを停止するには、Ctrl+C を使用します。

既に説明したように、ストリーミングワードカウントのアルゴリズムは、入力データから最新の単語数を継続的に計算します。そして、この特定のデモアプリケーションでは、出力として最新のワード数を継続的に書き込みます。このドキュメントの後の章では、ストリーム処理アプリケーションの動作について詳しく取り上げ、特に ストリームとテーブルの二重性 について説明します。実際、上記の出力は KTable の changelog ストリームであり、その KTable は WordCount デモアプリケーションによる 集約 操作の結果です。

Kafka クラスターの停止

クイックスタートが完了したら、次の順序で Kafka クラスターをシャットダウンできます。

  1. まず Kafka ブローカー を停止します。そのためには、ブローカーが実行されているターミナルで Ctrl+C を入力します。または、kill を使用してブローカープロセスを終了することもできます。
  2. 最後に、ZooKeeper インスタンス を停止します。そのためには、該当するターミナルで Ctrl+C を入力します。または、kill を使用して ZooKeeper プロセスを終了することもできます。

これで、単一ノードの Kafka クラスターに格納されたデータに対して、初めての Kafka Streams アプリケーションを実行する手順が完了しました。

次のステップ

次のステップとして、以下の作業を行うことをお勧めします。

Kafka Streams の他に、以下についても必要に応じて詳しく見ることをお勧めします。

  • Kafka Connect を使用すると、Kafka と Hadoop などの他のデータシステムとの間でデータを移行できます。
  • Kafka クライアント を使用すると、独自のアプリケーション内から Kafka に対してデータの読み書きを行うことができます。

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。