Confluent Platform のクイックスタート(Docker)

このクイックスタートに従うことにより、Confluent Platform とその主要コンポーネントを Docker コンテナーを使用して実行することができます。このクイックスタートでは、Confluent Platform に含まれている Confluent Control Center を使用したトピック管理と ksqlDB を利用したイベントストリーム処理を行います。

このクイックスタートでは、Apache Kafka® トピックを作成し、Kafka Connect を使ってそれらのトピックに対する模擬データを生成して、それらのトピックに対する ksqlDB ストリーミングクエリを作成します。Control Center に移動して、イベントストリーミングクエリをモニタリングし、分析します。

参考

このクイックスタートの自動化バージョン を実行することもできます。これは、Confluent Platform のローカルインストール向けです。

前提条件:
  • Docker
    • Docker バージョン 1.11 またはそれ以降が インストールされ動作している
    • Docker Compose が インストール済みである 。Docker Compose は、Docker for Mac ではデフォルトでインストールされます。
    • Docker メモリーに最小でも 6 GB が割り当てられている。Docker Desktop for Mac を使用しているとき、Docker メモリーの割り当てはデフォルトで 2 GB です。Docker のデフォルトの割り当てを 6 GB に変更できます。PreferencesResourcesAdvanced の順に移動します。
  • インターネット接続
  • Confluent Platform で現在サポートされる オペレーティングシステム
  • Docker でのネットワークと Kafka
    • 内部コンポーネントと外部コンポーネントが Docker ネットワークに通信できるように、ホストとポートを構成します。詳細については、こちらの記事 を参照してください。

ステップ 1: Docker を使用した Confluent Platform のダウンロードおよび起動

  1. 次のように、Confluent Platform all-in-one Docker Compose ファイル の内容をダウンロードするか、コピーします。

    curl --silent --output docker-compose.yml \
      https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.2.4-post/cp-all-in-one/docker-compose.yml
    
  2. -d オプションを使用して Confluent Platform を起動し、デタッチモードで実行します。

    docker-compose up -d
    

    上記のコマンドで、各 Confluent Platform コンポーネントの別のコンテナーとともに Confluent Platform が起動されます。出力は次のようになります。

    Creating network "cp-all-in-one_default" with the default driver
    Creating zookeeper ... done
    Creating broker    ... done
    Creating schema-registry ... done
    Creating rest-proxy      ... done
    Creating connect         ... done
    Creating ksql-datagen    ... done
    Creating ksqldb-server   ... done
    Creating control-center  ... done
    Creating ksqldb-cli      ... done
    
  3. サービスが稼働状態であることを検証するには、以下のコマンドを実行します。

    docker-compose ps
    

    出力は次のようになります。

         Name                    Command               State                Ports
    ------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp,
                                                               0.0.0.0:9092->9092/tcp
    connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp,
                                                               9092/tcp
    control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp
    ksqldb-cli        /bin/sh                          Up
    ksql-datagen      bash -c echo Waiting for K ...   Up
    ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp
    rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp
    schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp,
                                                               2888/tcp, 3888/tcp
    

    ステートが Up ではない場合は、docker-compose up -d コマンドを再度実行します。

ステップ 2: Kafka トピックの作成

このステップでは、Confluent Control Center を活用して Kafka トピックを作成します。Confluent Control Center は、本稼働環境データパイプラインおよびイベントストリーミングアプリケーションを構築およびモニタリングするための機能を備えています。

  1. http://localhost:9021 にある Control Center ウェブインターフェイスに移動します。

    異なるホストに Confluent Platform をインストールした場合は、localhost をアドレスのホスト名と置き換えます。

    Control Center がオンラインになるまでに 1 ~ 2 分かかる場合があります。

    注釈

    Control Center が localhost ブラウザーセッションで開いておらず、実行中ではない場合、Control Center は ksqlDB に接続されません。

  2. controlcenter.cluster タイルをクリックします。

    ../_images/c3-landing-page.png
  3. ナビゲーションバーで Topics をクリックしてトピックリストを開いた後、Add a topic をクリックします。

    ../_images/c3-create-topic.png
  4. Topic name フィールドで pageviews を指定し、Create with defaults をクリックします。

    トピック名では、大文字と小文字が区別されます。

    ../_images/c3-create-topic-name.png
  5. ナビゲーションバーで Topics をクリックしてトピックリストを開いた後、Add a topic をクリックします。

  6. Topic name フィールドで users を指定し、Create with defaults をクリックします。

ステップ 3: Kafka コネクターのインストールおよびサンプルデータの生成

このステップでは、Kafka Connect を使用して、Kafka トピック pageviews および users のサンプルデータを作成する、kafka-connect-datagen という名前のデモソースコネクターを実行します。

ちなみに

ステップ 1: Docker を使用した Confluent Platform のダウンロードおよび起動」での Docker Compose の起動時に、Kafka Connect Datagen コネクターは自動的にインストールされました。Datagen コネクターの場所を特定する際に問題が発生した場合は、「トラブルシューティング」セクションの「問題: Datagen コネクターの場所を特定できない」を参照してください。

  1. Kafka Connect Datagen コネクターの最初のインスタンスを実行して、 pageviews トピックに対して Kafka データを AVRO フォーマットで生成します。

    1. ナビゲーションバーで Connect をクリックします。

    2. Connect Clusters リストで connect-default クラスターをクリックします。

    3. Add connector をクリックします。

    4. DatagenConnector タイルを選択します。

      ちなみに

      表示されるコネクターを絞り込むには、Filter by category をクリックし、Sources をクリックします。

    5. 名前 フィールドで、コネクターの名前として datagen-pageviews を入力します。

    6. 次の構成値を入力します。

      • Key converter class: org.apache.kafka.connect.storage.StringConverter
      • kafka.topic: pageviews
      • max.interval: 100
      • quickstart: pageviews
    7. Next をクリックします。

    8. コネクター構成を確認し、Launch をクリックします。

      ../_images/connect-review-pageviews.png
  2. Kafka Connect Datagen コネクターの 2 番目のインスタンスを実行して、users トピックに対して Kafka データを AVRO フォーマットで生成します。

    1. Add connector をクリックします。

    2. DatagenConnector タイルを選択します。

      ちなみに

      表示されるコネクターを絞り込むには、Filter by category をクリックし、Sources をクリックします。

    3. 名前 フィールドで、コネクターの名前として datagen-users を入力します。

    4. 次の構成値を入力します。

      • Key converter class: org.apache.kafka.connect.storage.StringConverter
      • kafka.topic: users
      • max.interval: 1000
      • quickstart: users
    5. Next をクリックします。

    6. コネクター構成を確認し、Launch をクリックします。

ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み

ちなみに

docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088 コマンドにより、Docker コンテナーから ksqlDB CLI を使用して、これらのコマンドを実行することもできます。

ストリームおよびテーブルの作成

このステップでは、ksqlDB を使用して、pageviews トピックのストリームおよび users トピックのテーブルを作成します。

  1. ナビゲーションバーで ksqlDB をクリックします。

  2. ksqlDB アプリケーションを選択します。

  3. 以下のコードをエディターのウィンドウにコピーします。Run query をクリックして、pageviews ストリームを作成します。ストリーム名では、大文字と小文字が区別されません。

    CREATE STREAM pageviews WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
    
  4. 以下のコードをエディターのウィンドウにコピーします。Run query をクリックして、users テーブルを作成します。テーブル名では、大文字と小文字が区別されません。

    CREATE TABLE users (id VARCHAR PRIMARY KEY)
      WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
    

クエリの記述

このステップでは、上記で作成したストリームおよびテーブルに対する ksqlDB クエリを作成します。

  1. Editor タブで Add query properties をクリックして、カスタムクエリプロパティを追加します。

  2. auto.offset.reset パラメーターを Earliest に設定します。

    この設定により ksqlDB クエリは、使用可能なすべてのトピックデータを先頭から読み取ります。この構成は、以降の各クエリに対して使用されます。詳細については、「ksqlDB 構成パラメーターリファレンス」を参照してください。

  3. 以下のクエリを作成します。

    1. Stop をクリックして、現時点で実行中のクエリを停止します。

    2. 最大 3 行までに制限された結果とともにストリームからデータを返す、非永続的なクエリを作成します。

      エディターで以下のクエリを入力します。

      SELECT pageid FROM pageviews EMIT CHANGES LIMIT 3;
      
    3. Run query をクリックします。出力は以下のようになります。

      ../_images/c3-ksql-query-results-pageid.png

      Card view アイコンまたは Table view アイコンをクリックして、出力レイアウトを変更します。

    4. 女性(female)ユーザーの PAGEVIEWS ストリームをフィルター処理する永続的なクエリを(ストリームとして)作成します。クエリの結果は Kafka の PAGEVIEWS_FEMALE トピックに書き込まれます。

      エディターで以下のクエリを入力します。

      CREATE STREAM pageviews_female
        AS SELECT users.id AS userid, pageid, regionid
        FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
        WHERE gender = 'FEMALE'
        EMIT CHANGES;
      
    5. Run query をクリックします。出力は以下のようになります。

      ../_images/c3-ksql-persist-query-pv-female-results.png
    6. REGIONID8 または 9 で終わる永続的なクエリを作成します。このクエリの結果は、クエリで明示的に指定されたように、pageviews_enriched_r8_r9 という名前の Kafka トピックに書き込まれます。

      エディターで以下のクエリを入力します。

      CREATE STREAM pageviews_female_like_89
        WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', VALUE_FORMAT='AVRO')
        AS SELECT * FROM pageviews_female
        WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
        EMIT CHANGES;
      
    7. Run query をクリックします。出力は以下のようになります。

      ../_images/c3-ksql-persist-query-pv-female89-results.png
    8. カウントが 1 より大きい場合に 30 秒の タンブリングウィンドウ で、REGIONGENDER の組み合わせごとに PAGEVIEWS をカウントする永続的なクエリを作成します。手順がグループ化およびカウントであるため、結果はストリームではなく、テーブルになります。このクエリの結果は、PAGEVIEWS_REGIONS という名前の Kafka トピックに書き込まれます。

      エディターで以下のクエリを入力します。

      CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')
        AS SELECT gender, regionid, COUNT(*) AS numusers
        FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
        WINDOW TUMBLING (SIZE 30 SECOND)
        GROUP BY gender, regionid
        HAVING COUNT(*) > 1
        EMIT CHANGES;
      
    9. Run query をクリックします。出力は以下のようになります。

      ../_images/c3-ksql-persist-query-table-results.png
    10. Persistent queries タブをクリックします。以下の永続的なクエリが表示されます。

      • PAGEVIEWS_FEMALE
      • PAGEVIEWS_FEMALE_LIKE_89
      • PAGEVIEWS_REGIONS
    11. Editor タブをクリックします。All available streams and tables ペインには、アクセスできるすべてのストリームおよびテーブルが表示されます。

      ../_images/c3-ksql-stream-table-view-1.png
    12. All available streams and tables セクションで KSQL_PROCESSING_LOG をクリックすると、ストリームのスキーマ(ネスト化されたデータ構造など)が表示されます。

クエリの実行

このステップでは、前のセクションでストリームおよびテーブルとして保存した ksqlDB クエリを実行します。

  1. Streams タブで PAGEVIEWS_FEMALE ストリームを選択します。

  2. Query stream をクリックします。

    エディターが開き、クエリのストリーミング出力が表示されます。

  3. Stop をクリックして、出力の生成を停止します。

  4. Tables タブで PAGEVIEWS_REGIONS テーブルを選択します。

  5. Query table をクリックします。

    エディターが開き、クエリのストリーミング出力が表示されます。

  6. Stop をクリックして、出力の生成を停止します。

ステップ 5: コンシューマーラグのモニタリング

  1. ナビゲーションバーで Consumers をクリックして、ksqlDB により作成されたコンシューマーを表示します。

  2. コンシューマーグループ ID をクリックして、_confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE_5 コンシューマーグループの詳細を表示します。

    このページで、ストリーミングクエリのコンシューマーラグおよび消費量の値を確認できます。

詳細については、Control Center の「Consumers」ドキュメントを参照してください。

ステップ 6: Confluent コンテナーの停止およびクリーンアップ

Docker での作業が完了したら、Docker コンテナーおよびイメージを停止し、削除できます。

  1. 以下のコマンドを実行して、Confluent の Docker コンテナーを停止します。

    docker-compose stop
    
  2. Docker コンテナーを停止した後に、以下のコマンドを実行して Docker システムを削除します。これらのコマンドを実行すると、コンテナー、ネットワーク、ボリューム、イメージが削除され、ディスク領域が解放されます。

    docker system prune -a --volumes --filter "label=io.confluent.docker"
    

詳細については、Docker の公式ドキュメントを参照してください。

次のステップ

このクイックスタートで示したコンポーネントについて、さらに詳しく確認します。

  • ksqlDB ドキュメント: ストリーミング ETL、リアルタイムモニタリング、異常検出などのユースケースにおける、ksqlDB を使用したデータの処理について確認できます。一連の スクリプト化されたデモ により、ksqlDB の使用方法も参照してください。
  • Kafka チュートリアル : ステップごとの手順に従って、Kafka、Kafka Streams、および ksqlDB の基本的なチュートリアルを試すことができます。
  • Kafka Streams ドキュメント: ストリーム処理アプリケーションを Java または Scala で構築する方法を確認できます。
  • Kafka Connect ドキュメント: Kafka を他のシステムに統合し、すぐに使用できるコネクター をダウンロードして、 Kafka 内外のデータをリアルタイムで簡単に取り込む方法を確認できます。
  • Kafka クライアントドキュメント: Go、Python、.NET、C/C++ などのプログラミング言語を使用して、Kafka に対してデータの読み取りおよび書き込みを行う方法を確認できます。
  • ビデオ、デモ、および参考文献: Confluent Platform のチュートリアルやサンプルを試したり、デモやスクリーンキャストを参照したり、ホワイトペーパーやブログで学べます。

トラブルシューティング

クイックスタートのワークフローの進行中に問題が発生した場合は、ステップを再試行する前に、以下の解決策を確認してください。

問題: Datagen コネクターの場所を特定できない

詳細については、「ステップ 1: Docker を使用した Confluent Platform のダウンロードおよび起動」を参照してください。

解決策 : connect に対してのみ build コマンドを実行して、connect コンテナーが正常に構築されたかどうか確認します。

docker-compose build --no-cache connect

出力は以下のようになります。

Building connect
...
Completed
Removing intermediate container cdb0af3550c8
---> 36d00047d29b
Successfully built 36d00047d29b
Successfully tagged confluentinc/kafka-connect-datagen:latest

connect コンテナーが既に正常に構築されていた場合は、以下のような出力になります。

connect uses an image, skipping

解決策 : Connect ログで Datagen を確認します。

docker-compose logs connect | grep -i Datagen

出力は以下のようになります。

connect  | [2019-04-17 20:03:26,137] INFO Loading plugin from: /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:26,206] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:26,206] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:28,102] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

解決策 : Connect ログで docker-compose up -d コマンドを適切に実行するための警告およびリマインダーを確認します。

docker-compose logs connect | grep -i Datagen

解決策 : kafka-connect-datagen.jar ファイルが追加されており、lib サブフォルダー内に存在するかを確認します。

docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/

出力は以下のようになります。

...
kafka-connect-datagen-0.1.0.jar
...

解決策 : プラグインがコネクターのパスに存在するかを検証します。

docker-compose exec connect bash -c 'echo $CONNECT_PLUGIN_PATH'

出力は以下のようになります。

/usr/share/java,/usr/share/confluent-hub-components

その内容が存在することを確認します。

docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen

出力は以下のようになります。

assets   doc  etc  lib  manifest.json

問題: ストリームとストリームの結合に関するエラー

エラーにより、ストリームとストリームの結合には WITHIN 句の指定が必要であることが通知されます。このエラーは pageviewsusers の両方をストリームとして誤って作成した場合に発生する可能性があります。

../_images/c3-ksql-stream-stream-join-error.png

解決策 :ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み」で pageviews の "ストリーム" を作成し、users の "テーブル" を作成したことを確認します。

問題: ksqlDB クエリのステップを正常に完了できない

Java エラーまたはその他の重大なエラーが発生しました。

解決策 : Confluent Platform により現在サポートされている オペレーティングシステム で作業していることを確認します。

解決策 : Docker メモリーが 8 MB まで増設されたかを確認します。Docker > Preferences > Advanced に移動してください。Docker メモリーが不十分な場合は、その他の予期しない問題が発生する可能性があります。

問題: デモのタイムアウト、一部またはすべてのコンポーネントが起動しない

最小 6 GB の Docker メモリーリソースを割り当てる必要があります。Mac 版 Docker Desktop のデフォルトのメモリー割り当ては 2 GB であるため、変更する必要があります。Docker メモリー割り当てが最小要件を満たしていない場合は、Docker 上で実行される Confluent Platform のデモおよびサンプルが正しく機能しないことがあります。

../_images/quickstart-docker-memory-rqmts.ja.png

Docker のリソースに関する設定でのメモリー設定