Confluent Platform Community コンポーネントを使用した Apache Kafka のクイックスタート(Docker)

このクイックスタートに従うことにより、 Confluent Platform と Confluent Community のコンポーネントをDocker コンテナーを活用した開発環境で実行することができます。

このクイックスタートでは、Apache Kafka® トピックを作成し、Kafka Connect を使ってそれらのトピックに対する模擬データを生成して、それらのトピックに対する ksqlDB ストリーミングクエリを作成します。

このクイックスタートでは、Confluent Platform CLI、Apache Kafka® CLI、および ksqlDB CLI を活用します。機能豊富な UI ベースのエクスペリエンスの場合は、商用コンポーネントによる Confluent Platform の クイックスタート を試してください。

前提条件:
  • Docker
    • Docker バージョン 1.11 またはそれ以降が インストールされ動作している
    • Docker Compose が インストール済みである 。Docker Compose は、Docker for Mac ではデフォルトでインストールされます。
    • Docker メモリーに最小でも 8 GB が割り当てられている。Docker Desktop for Mac を使用しているとき、Docker メモリーの割り当てはデフォルトで 2 GB です。Docker でデフォルトの割り当てを 8 GB に変更できます。PreferencesResourcesAdvanced の順に移動します。
  • Git
  • インターネットに接続されている
  • Confluent Platform で現在サポートされる オペレーティングシステム
  • Docker でのネットワークと Kafka
    • 内部コンポーネントと外部コンポーネントが Docker ネットワークに通信できるように、ホストとポートを構成します。詳細については、こちらの記事 を参照してください。

ステップ 1: Docker を使用した Confluent Platform のダウンロードおよび起動

  1. confluentinc/cp-all-in-one GitHub リポジトリのクローンを作成します。

  2. 6.0.6-post ブランチをチェックアウトします。

    cd cp-all-in-one
    
    git checkout 6.0.6-post
    
  3. cp-all-in-one-community ディレクトリに移動します。

    cd cp-all-in-one-community
    
  4. -d オプションを指定して Confluent Platform を起動し、デタッチモードで実行します。

    docker-compose up -d
    

    上記のコマンドで、すべての Confluent Platform コンポーネントの個別のコンテナーとともに Confluent Platform が起動します。出力は次のようになります。

    Creating network "cp-all-in-one-community_default" with the default driver
    Creating zookeeper ... done
    Creating broker    ... done
    Creating schema-registry ... done
    Creating rest-proxy      ... done
    Creating connect         ... done
    Creating ksql-datagen    ... done
    Creating ksqldb-server   ... done
    Creating ksqldb-cli      ... done
    
  5. サービスが稼働中であるかを検証します。

    docker-compose ps
    

    次のように表示されます。

         Name                    Command               State                Ports
    ------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp,
                                                               0.0.0.0:9092->9092/tcp
    connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp,
                                                               9092/tcp
    ksqldb-cli        ksql http://localhost:8088       Up
    ksql-datagen      bash -c echo Waiting for K ...   Up
    ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp
    rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp
    schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp,
                                                               2888/tcp, 3888/tcp
    

    ステートが Up ではない場合は、docker-compose up -d コマンドを再度実行します。

ステップ 2: Kafka トピックの作成

このステップでは、Kafka CLI を使用して Kafka トピックを作成します。

  1. users という名前のトピックを作成します。

    docker-compose exec broker kafka-topics \
      --create \
      --bootstrap-server localhost:9092 \
      --replication-factor 1 \
      --partitions 1 \
      --topic users
    
  2. pageviews という名前のトピックを作成します。

    docker-compose exec broker kafka-topics \
      --create \
      --bootstrap-server localhost:9092 \
      --replication-factor 1 \
      --partitions 1 \
      --topic pageviews
    

ステップ 3: Kafka コネクターのインストールおよびサンプルデータの生成

このステップでは、Kafka Connect を使用して、Kafka トピック pageviews および users のサンプルデータを作成する、kafka-connect-datagen という名前のデモソースコネクターを実行します。

  1. Kafka Connect Datagen コネクターの最初のインスタンスを実行して、 pageviews トピックに対して Kafka データを AVRO フォーマットで生成します。

    curl -L -O -H 'Accept: application/vnd.github.v3.raw' \
      https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_pageviews_cos.config
    
    curl -X POST -H 'Content-Type: application/json' \
      --data @connector_pageviews_cos.config \
      http://localhost:8083/connectors
    
  2. Kafka Connect Datagen コネクターの 2 番目のインスタンスを実行して、users トピックに対して Kafka データを AVRO フォーマットで生成します。

    curl -L -O -H 'Accept: application/vnd.github.v3.raw' \
      https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_users_cos.config
    
    curl -X POST -H 'Content-Type: application/json' \
      --data @connector_users_cos.config \
      http://localhost:8083/connectors
    

ちなみに

ステップ 1: Docker を使用した Confluent Platform のダウンロードおよび起動」での Docker Compose の起動時に、Kafka Connect Datagen コネクターは自動的にインストールされました。Datagen コネクターに関する問題が発生した場合は、「トラブルシューティング」セクションの「問題: Datagen コネクターの場所を特定できない」を参照してください。

ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み

このステップでは、ksqlDB SQL を使用してストリーム、テーブル、およびクエリを作成します。ksqlDB の SQL 構文の詳細については、「ksqlDB 構文リファレンス」を参照してください。

ストリームおよびテーブルの作成

  1. このコマンドを使用して、ターミナルで ksqlDB CLI を起動します。

    docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
    

    重要

    デフォルトで、ksqlDB は ksql 実行可能ファイルの場所に対応する logs と呼ばれるディレクトリにそのログを保管しようとします。たとえば、 ksql/usr/local/bin/ksql にインストールされると、/usr/local/logs にそのログを保管しようとします。ksql を Confluent Platform のデフォルトのロケーションである $CONFLUENT_HOME/bin から実行している場合は、LOG_DIR 変数を使用して、このデフォルトの動作をオーバーライドする必要があります。

  2. Kafka トピック pageviews から pageviews ストリームを作成し、value_format には AVRO を指定します。

    CREATE STREAM pageviews \
      (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) \
      WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
    
  3. Kafka トピック users から複数の列を含む users テーブルを作成し、value_format には AVRO を指定します。

    CREATE TABLE users \
      (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY) \
      WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
    

クエリの記述

このステップでは、ksqlDB SQL クエリを実行します。

  1. auto.offset.reset1 クエリプロパティを earliest に設定します。これにより、ksqlDB クエリは、使用可能なすべてのトピックデータを先頭から読み取ります。この構成は、以降の各クエリに対して使用されます。詳細については、「ksqlDB 構成パラメーターリファレンス」を参照してください。

    SET 'auto.offset.reset'='earliest';
    
  2. 最大 3 行までに制限された結果とともにストリームからデータを返す、非永続的なクエリを作成します。

    SELECT pageid FROM pageviews EMIT CHANGES LIMIT 3;
    

    出力は以下のようになります。

    Page_45
    Page_38
    Page_11
    LIMIT reached for the partition.
    Query terminated
    
  3. 女性(female)ユーザーの pageviews ストリームをフィルター処理する永続的なクエリを(ストリームとして)作成します。クエリの結果は Kafka の PAGEVIEWS_FEMALE トピックに書き込まれます。

    CREATE STREAM pageviews_female \
      AS SELECT users.userid AS userid, pageid, regionid, gender \
      FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid \
      WHERE gender = 'FEMALE';
    
  4. regionid8 または 9 で終わる永続的なクエリを作成します。クエリの結果は pageviews_enriched_r8_r9 という名前の Kafka トピックに書き込まれます。

    CREATE STREAM pageviews_female_like_89 \
      WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO') \
      AS SELECT * FROM pageviews_female \
        WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
    
  5. カウントが 1 より大きい場合に 30 秒の タンブリングウィンドウ で地域と性別の組み合わせごとにページビューをカウントする永続的なクエリを作成します。手順がグループ化およびカウントであるため、結果はストリームではなくテーブルになります。このクエリの結果は、PAGEVIEWS_REGIONS という名前の Kafka トピックに書き込まれます。

    CREATE TABLE pageviews_regions \
      AS SELECT gender, regionid , COUNT(*) AS numusers \
      FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid \
      WINDOW TUMBLING (size 30 second) \
      GROUP BY gender, regionid \
      HAVING COUNT(*) > 1;
    

ストリーム、テーブル、およびクエリの確認

  • ストリームを一覧表示します。

    SHOW STREAMS;
    
  • テーブルを一覧表示します。

    SHOW TABLES;
    
  • ストリームまたはテーブルの詳細を表示します。

    DESCRIBE EXTENDED <stream-or-table-name>;
    

    たとえば、users テーブルの詳細を表示するには、以下を実行します。

    DESCRIBE EXTENDED users;
    
  • 実行中のクエリを一覧表示します。

    SHOW QUERIES;
    
  • クエリ実行プランを確認します。

    SHOW QUERIES の出力からクエリ ID を取得し、EXPLAIN を実行して、そのクエリ ID のクエリ実行プランを表示します。

    EXPLAIN <Query ID>;
    

ステップ 5: ストリーミングデータのモニタリング

これで、ストリームまたはテーブルとして作成された実行中のクエリをモニタリングできます。

  • 以下のクエリは、女性(female)ユーザーのページビュー情報を返します。

    SELECT * FROM pageviews_female EMIT CHANGES;
    
  • 以下のクエリは、regionid8 または 9 で終わる地域における女性(female)ユーザーのページビュー情報を返します。

    SELECT * FROM pageviews_female_like_89 EMIT CHANGES;
    
  • 以下のクエリは、30 秒のタンブリングウィンドウでの地域と性別の組み合わせごとのページビューのカウントを返します。

    SELECT * FROM pageviews_regions EMIT CHANGES;
    

ステップ 6: Docker の停止

Docker での作業が完了したら、Docker コンテナーおよびイメージを停止し、削除できます。

  1. すべての Docker コンテナー ID のリストを表示します。

    docker container ls -a -q
    
  2. 以下のコマンドを実行して、Confluent の Docker コンテナーを停止します。

    docker container stop $(docker container ls -a -q -f "label=io.confluent.docker")
    
  3. Docker コンテナーを停止した後に、以下のコマンドを実行して Docker システムを削除します。これらのコマンドを実行すると、コンテナー、ネットワーク、ボリューム、イメージが削除され、ディスク領域が解放されます。

    docker system prune -a -f --volumes
    

    ちなみに

    すべての Confluent Platform Docker コンテナーをシステムから消去するには、Confluent Docker のフィルターラベル(-f "label=io.confluent.docker")を削除します。

詳細については、Docker の公式ドキュメントを参照してください。

トラブルシューティング

クイックスタートのワークフローの進行中に問題が発生した場合は、ステップを再試行する前に、以下の解決策を確認してください。

問題: Datagen コネクターの場所を特定できない

解決策 : connect に対してのみ build コマンドを実行して、connect コンテナーが正常に構築されたかどうか確認します。

docker-compose build --no-cache connect

出力は以下のようになります。

Building connect
...
Completed
Removing intermediate container cdb0af3550c8
---> 36d00047d29b
Successfully built 36d00047d29b
Successfully tagged confluentinc/kafka-connect-datagen:latest

connect コンテナーが既に正常に構築されていた場合は、以下のような出力になります。

connect uses an image, skipping

解決策 : Connect ログで Datagen を確認します。

docker-compose logs connect | grep -i Datagen

出力は以下のようになります。

connect  | [2019-04-17 20:03:26,137] INFO Loading plugin from: /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:26,206] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:26,206] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:28,102] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

解決策 : Connect ログで docker-compose up -d コマンドを適切に実行するための警告およびリマインダーを確認します。

docker-compose logs connect | grep -i Datagen

解決策 : kafka-connect-datagen.jar ファイルが追加されており、lib サブフォルダー内に存在するかを確認します。

docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/

出力は以下のようになります。

...
kafka-connect-datagen-0.1.0.jar
...

解決策 : プラグインがコネクターのパスに存在するかを検証します。

docker-compose exec connect bash -c 'echo $CONNECT_PLUGIN_PATH'

出力は以下のようになります。

/usr/share/java,/usr/share/confluent-hub-components

その内容が存在することを確認します。

docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen

出力は以下のようになります。

assets   doc  etc  lib  manifest.json

問題: ストリームとストリームの結合に関するエラー

エラーにより、ストリームとストリームの結合には WITHIN 句の指定が必要であることが通知されます。このエラーは pageviewsusers の両方をストリームとして誤って作成した場合に発生する可能性があります。

解決策:ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み」で pageviews の "ストリーム" を作成し、users の "テーブル" を作成したことを確認します。

問題: ksqlDB クエリのステップを正常に完了できない

Java エラーまたはその他の重大なエラーが発生しました。

解決策 : Confluent Platform により現在サポートされている オペレーティングシステム で作業していることを確認します。

解決策 : Docker メモリーが 8 MB まで増設されたかを確認します。Docker > Preferences > Advanced に移動してください。Docker メモリーが不十分な場合は、その他の予期しない問題が発生する可能性があります。

ksqlDB のエラーが発生しました。

解決策: ksqlDB CLI のヘルプで、コマンドを成功させるためのヒントや、詳細なドキュメントへのリンクを確認します。

ksql> help

問題: デモのタイムアウト、一部またはすべてのコンポーネントが起動しない

最小 8 GB の Docker メモリーリソースを割り当てる必要があります。Mac 版 Docker Desktop のデフォルトのメモリー割り当ては 2 GB であるため、変更する必要があります。Docker メモリー割り当てが最小要件を満たしていない場合は、Docker 上で実行される Confluent Platform のデモおよびサンプルが正しく機能しないことがあります。

../_images/quickstart-docker-memory-rqmts.ja.png

Docker のリソースに関する設定でのメモリー設定

次のステップ

このクイックスタートで示したコンポーネントについて、さらに詳しく確認します。

  • ksqlDB ドキュメント: ストリーミング ETL、リアルタイムモニタリング、異常検出などのユースケースにおける、ksqlDB を使用したデータの処理について確認できます。一連の スクリプト化されたデモ により、ksqlDB の使用方法も参照してください。
  • Kafka チュートリアル : ステップごとの手順に従って、Kafka、Kafka Streams、および ksqlDB の基本的なチュートリアルを試すことができます。
  • Kafka Streams ドキュメント: ストリーム処理アプリケーションを Java または Scala で構築する方法を確認できます。
  • Kafka Connect ドキュメント: Kafka を他のシステムに統合し、すぐに使用できるコネクター をダウンロードして、 Kafka 内外のデータをリアルタイムで簡単に取り込む方法を確認できます。
  • Kafka クライアントドキュメント: Go、Python、.NET、C/C++ などのプログラミング言語を使用して、Kafka に対してデータの読み取りおよび書き込みを行う方法を確認できます。
  • ビデオ、デモ、および参考文献: Confluent Platform のチュートリアルやサンプルを試したり、デモやスクリーンキャストを参照したり、ホワイトペーパーやブログで学べます。