Confluent Platform のクイックスタート - コミュニティコンポーネント(ローカルインストール)

このクイックスタートに従うことにより、Confluent Platform と Confluent Community のコンポーネントを開発環境で実行することができます。

このクイックスタートでは、Apache Kafka® トピックを作成し、Kafka Connect を使ってそれらのトピックに対する模擬データを生成して、それらのトピックに対する ksqlDB ストリーミングクエリを作成します。

このクイックスタートでは、Confluent Platform CLI、Apache Kafka® CLI、および ksqlDB CLI を活用します。機能豊富な UI ベースのエクスペリエンスの場合は、商用コンポーネントによる Confluent Platform の クイックスタート を試してください。

参考

このクイックスタートの自動化バージョン を実行することもできます。これは、Confluent Platform のローカルインストール向けです。

前提条件:

ステップ 1: Confluent Platform のダウンロードおよび起動

  1. ダウンロードページ に移動します。

  2. Download Confluent Platform セクションまでスクロールし、here リンクをクリックして、無料のコミュニティ機能をダウンロードします。

  3. 以下を入力します。

    • Email: メールアドレス
    • File Type: debrpmtar、または zip
    • Confluent Community ライセンス契約の条件に同意します。
  4. DOWNLOAD をクリックします。

  5. ファイルを解凍します。以下のディレクトリが展開されます。

    フォルダー 説明
    /bin/ サービスを開始および停止するためのドライバスクリプト
    /etc/ 構成ファイル
    /lib/ Systemd サービス
    /logs/ ログファイル
    /share/ Jar およびライセンス
    /src/ プラットフォーム依存のビルドを必要とするソースファイル
  6. 以下のシェル変数を設定します。

    export CONFLUENT_HOME=<path-to-confluent>
    
    export PATH="${CONFLUENT_HOME}/bin:$PATH"
    
  7. Confluent Hub クライアントをインストールします。次のステップで、これを使用して無償のオープンソース kafka-source-datagen コネクターをインストールします。

    • MacOS に Confluent Hub をインストールする。
    • Linux に Confluent Hub をインストールする。
  8. 以下のスクリプトを使用して、Confluent CLI (confluent)をインストールします。

    Microsoft Windows の場合、コマンド curlsh を使用するために、Windows Subsystem for Linux などの適切な Linux 環境をインストールする必要がある場合があります。

    curl -L --http1.1 https://cnfl.io/cli | sh -s -- -b $CONFLUENT_HOME/bin
    

    詳細については、「Confluent CLI」を参照してください。

  9. Confluent Hub クライアントを使用して、Kafka Connect Datagen Source Connector をインストールします。このコネクターはデモ用の模擬データを生成しますが、これは本稼働環境には適していません。Confluent Hub は、パッケージ済みのオンラインライブラリであり、Confluent Platform および Kafka 用にそのままインストールできる拡張機能またはアドオンです。

    confluent-hub install \
     --no-prompt confluentinc/kafka-connect-datagen:latest
    
  10. confluent local services start コマンドを使用して Confluent Platform を起動します。このコマンドにより、すべての Confluent Platform コンポーネント(Kafka、ZooKeeper、Schema Registry、HTTP REST Proxy for Kafka、Kafka Connect、ksqlDB を含む)が起動します。

    重要

    confluent local コマンドは、単一ノードの開発環境向けであり、本稼働環境には適していません。生成されるデータは一過性で、一時的なものです。本稼働環境対応のワークフローについては、「Confluent Platform のインストールおよびアップグレード」を参照してください。

    confluent local services start
    

    出力は以下のようになります。

    Starting Zookeeper
    Zookeeper is [UP]
    Starting Kafka
    Kafka is [UP]
    Starting Schema Registry
    Schema Registry is [UP]
    Starting Kafka REST
    Kafka REST is [UP]
    Starting Connect
    Connect is [UP]
    Starting KSQL Server
    KSQL Server is [UP]
    

ステップ 2: Kafka トピックの作成

このステップでは、Kafka CLI を使用して Kafka トピックを作成します。

  1. users という名前のトピックを作成します。

    kafka-topics --create \
      --bootstrap-server localhost:9092 \
      --replication-factor 1 \
      --partitions 1 \
      --topic users
    
  2. pageviews という名前のトピックを作成します。

    kafka-topics --create \
      --bootstrap-server localhost:9092   \
      --replication-factor 1 \
      --partitions 1 \
      --topic pageviews
    

ステップ 3: Kafka コネクターのインストールおよびサンプルデータの生成

このステップでは、Kafka Connect を使用して、Kafka トピック pageviews および users のサンプルデータを作成する、kafka-connect-datagen という名前のデモソースコネクターを実行します。

  1. Kafka Connect Datagen コネクターの最初のインスタンスを実行して、 pageviews トピックに対して Kafka データを AVRO フォーマットで生成します。

    curl -L -O -H 'Accept: application/vnd.github.v3.raw' \
      https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_pageviews_cos.config
    
    curl -X POST -H "Content-Type: application/json" \
      --data @connector_pageviews_cos.config http://localhost:8083/connectors
    
  2. Kafka Connect Datagen コネクターの 2 番目のインスタンスを実行して、users トピックに対して Kafka データを AVRO フォーマットで生成します。

    curl -L -O -H 'Accept: application/vnd.github.v3.raw' \
      https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_users_cos.config
    
    curl -X POST -H "Content-Type: application/json" \
      --data @connector_users_cos.config http://localhost:8083/connectors
    

ちなみに

Kafka Connect Datagen コネクターは「ステップ 1: Confluent Platform のダウンロードおよび起動」で手動でインストールされました。Datagen コネクターの場所を特定する際に問題が発生した場合は、「トラブルシューティング」セクションの「問題: Datagen コネクターの場所を特定できない」を参照してください。

ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み

このステップでは、ksqlDB SQL を使用してストリーム、テーブル、およびクエリを作成します。ksqlDB の SQL 構文の詳細については、「ksqlDB 構文リファレンス」を参照してください。

ストリームおよびテーブルの作成

  1. このコマンドを使用して、ターミナルで ksqlDB CLI を起動します。

    LOG_DIR=$CONFLUENT_HOME/ksql_logs ksql
    

    重要

    デフォルトで、ksqlDB は ksql 実行可能ファイルの場所に対応する logs と呼ばれるディレクトリにそのログを保管しようとします。たとえば、 ksql/usr/local/bin/ksql にインストールされると、/usr/local/logs にそのログを保管しようとします。ksql を Confluent Platform のデフォルトのロケーションである $CONFLUENT_HOME/bin から実行している場合は、LOG_DIR 変数を使用して、このデフォルトの動作をオーバーライドする必要があります。

  2. value_formatAVRO を指定して、Kafka トピック pageviews から PAGEVIEWS ストリームを作成します。

    CREATE STREAM pageviews WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
    
  3. Kafka トピック users から複数の列を含む USERS テーブルを作成し、value_formatAVRO を指定します。

    CREATE TABLE users (id VARCHAR PRIMARY KEY) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
    

クエリの記述

このステップでは、ksqlDB SQL クエリを実行します。

  1. auto.offset.reset1 クエリプロパティを earliest に設定します。

    これにより、ksqlDB クエリは、使用可能なすべてのトピックデータを先頭から読み取ります。この構成は、以降の各クエリに対して使用されます。詳細については、「ksqlDB 構成パラメーターリファレンス」を参照してください。

    SET 'auto.offset.reset'='earliest';
    
  2. 最大 3 行までに制限された結果とともにストリームからデータを返す、非永続的なクエリを作成します。

    SELECT pageid FROM pageviews EMIT CHANGES LIMIT 3;
    

    出力は以下のようになります。

    Page_45
    Page_38
    Page_11
    LIMIT reached
    Query terminated
    
  3. 女性(female)ユーザーの PAGEVIEWS ストリームをフィルター処理する永続的なクエリを(ストリームとして)作成します。クエリの結果は Kafka の PAGEVIEWS_FEMALE トピックに書き込まれます。

    CREATE STREAM pageviews_female \
      AS SELECT users.id AS userid, pageid, regionid \
      FROM pageviews LEFT JOIN users ON pageviews.userid = users.id \
      WHERE gender = 'FEMALE'
      EMIT CHANGES;
    
  4. REGIONID8 または 9 で終わる永続的なクエリを作成します。このクエリの結果は、クエリで明示的に指定されたように、pageviews_enriched_r8_r9 という名前の Kafka トピックに書き込まれます。

    CREATE STREAM pageviews_female_like_89 \
      WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', value_format='AVRO') \
      AS SELECT * FROM pageviews_female \
        WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
        EMIT CHANGES;
    
  5. カウントが 1 より大きい場合に 30 秒の タンブリングウィンドウ で、REGIONGENDER の組み合わせごとに PAGEVIEWS をカウントする永続的なクエリを作成します。手順がグループ化およびカウントであるため、結果はストリームではなく、テーブルになります。このクエリの結果は、PAGEVIEWS_REGIONS という名前の Kafka トピックに書き込まれます。

    CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON') \
      AS SELECT gender, regionid , COUNT(*) AS numbers \
      FROM pageviews LEFT JOIN users ON pageviews.userid = users.id \
      WINDOW TUMBLING (SIZE 30 SECOND) \
      GROUP BY gender, regionid \
      HAVING COUNT(*) > 1
      EMIT CHANGES;
    

ストリーム、テーブル、およびクエリの確認

  • ストリームを一覧表示します。

    SHOW STREAMS;
    
  • テーブルを一覧表示します。

    SHOW TABLES;
    
  • ストリームまたはテーブルの詳細を表示します。

    DESCRIBE <stream-or-table-name> EXTENDED;
    

    たとえば、users テーブルの詳細を表示するには、以下を実行します。

    DESCRIBE USERS EXTENDED;
    
  • 実行中のクエリを一覧表示します。

    SHOW QUERIES;
    
  • クエリ実行プランを確認します。

    SHOW QUERIES の出力からクエリ ID を取得し、EXPLAIN を実行して、そのクエリ ID のクエリ実行プランを表示します。

    EXPLAIN <Query ID>;
    

ステップ 5: ストリーミングデータのモニタリング

これで、ストリームまたはテーブルとして作成された実行中のクエリをモニタリングできます。

  • 以下のクエリは、女性(female)ユーザーのページビュー情報を返します。

    SELECT * FROM pageviews_female EMIT CHANGES LIMIT 5;
    
  • 以下のクエリは、regionid8 または 9 で終わる地域における女性(female)ユーザーのページビュー情報を返します。

    SELECT * FROM pageviews_female_like_89 EMIT CHANGES LIMIT 5;
    
  • 以下のクエリは、30 秒のタンブリングウィンドウでの地域と性別の組み合わせごとのページビューのカウントを返します。テーブルのアップデートを確認するには、クエリを数秒間実行したままにします。Ctrl キーを押しながら C キーを押して、クエリを停止します。

    SELECT * FROM pageviews_regions EMIT CHANGES;
    

ステップ 6: Confluent Platform の停止

ローカルインストールでの作業が完了したら、Confluent Platform を停止できます。

  1. Confluent CLI confluent local services connect stop コマンドを使用して、Confluent Platform を停止します。

    confluent local services stop
    
  2. confluent local destroy コマンドを使用して、Confluent Platform インスタンスのデータを破棄します。

    confluent local destroy
    

confluent local services start コマンドを使用して、Confluent Platform のローカルインストールを再度起動することができます。

次のステップ

このクイックスタートで示したコンポーネントについて、さらに詳しく確認します。

  • ksqlDB ドキュメント: ストリーミング ETL、リアルタイムモニタリング、異常検出などのユースケースにおける、ksqlDB を使用したデータの処理について確認できます。一連の スクリプト化されたデモ により、ksqlDB の使用方法も参照してください。
  • Kafka チュートリアル : ステップごとの手順に従って、Kafka、Kafka Streams、および ksqlDB の基本的なチュートリアルを試すことができます。
  • Kafka Streams ドキュメント: ストリーム処理アプリケーションを Java または Scala で構築する方法を確認できます。
  • Kafka Connect ドキュメント: Kafka を他のシステムに統合し、すぐに使用できるコネクター をダウンロードして、 Kafka 内外のデータをリアルタイムで簡単に取り込む方法を確認できます。
  • Kafka クライアントドキュメント: Go、Python、.NET、C/C++ などのプログラミング言語を使用して、Kafka に対してデータの読み取りおよび書き込みを行う方法を確認できます。
  • ビデオ、デモ、および参考文献: Confluent Platform のチュートリアルやサンプルを試したり、デモやスクリーンキャストを参照したり、ホワイトペーパーやブログで学べます。

トラブルシューティング

クイックスタートのワークフローの進行中に問題が発生した場合は、ステップを再試行する前に、以下の解決策を確認してください。

問題: Datagen コネクターの場所を特定できない

解決策: DataGen コネクターがインストールされており、実行中であることを確認します。

ステップ 1: Confluent Platform のダウンロードおよび起動」の説明に従って、kafka-connect-datagen がインストールされており、実行中であることを確認します。

confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest

出力は以下のようになります。

Running in a "--no-prompt" mode
...
Completed

解決策: Confluent CLI confluent local services connect log コマンドを使用して、Datagen の connect ログを確認します。

confluent local services connect log | grep -i Datagen

出力は以下のようになります。

[2019-04-18 14:21:08,840] INFO Loading plugin from: /Users/user.name/Confluent/confluent-version/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:215)
[2019-04-18 14:21:08,894] INFO Registered loader: PluginClassLoader{pluginLocation=file:/Users/user.name/Confluent/confluent-version/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:238)
[2019-04-18 14:21:08,894] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:167)
[2019-04-18 14:21:09,882] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:386)

解決策 : kafka-connect-datagen.jar ファイルが追加されており、lib サブフォルダー内に存在するかを確認します。

ls $CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/

出力は以下のようになります。

...
kafka-connect-datagen-0.1.0.jar
...

解決策 : プラグインがコネクターのパスに存在するかを検証します。

Confluent Hub から kafka-connect-datagen ファイルをインストールした場合、インストールディレクトリは複数のプロパティファイルのプラグインのパスに追加されます。

 Adding installation directory to plugin path in the following files:
/Users/user.name/Confluent/confluent-version/etc/kafka/connect-distributed.properties
/Users/user.name/Confluent/confluent-version/etc/kafka/connect-standalone.properties
/Users/user.name/Confluent/confluent-version/etc/schema-registry/connect-avro-distributed.properties
/Users/user.name/Confluent/confluent-version/etc/schema-registry/connect-avro-standalone.properties
...

これらのいずれかを使用して、コネクターのパスを確認できます。この例では、connect-avro-distributed.properties ファイルを使用します。

grep plugin.path $CONFLUENT_HOME/etc/schema-registry/connect-avro-distributed.properties

出力は以下のようになります。

plugin.path=share/java,/Users/user.name/Confluent/confluent-version/share/confluent-hub-components

その内容が存在することを確認します。

ls $CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-datagen

出力は以下のようになります。

assets   doc  lib  manifest.json

問題: ストリームとストリームの結合に関するエラー

エラーにより、ストリームとストリームの結合には WITHIN 句の指定が必要であることが通知されます。このエラーは pageviewsusers の両方をストリームとして誤って作成した場合に発生する可能性があります。

解決策:ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み」で pageviews の "ストリーム" を作成し、users の "テーブル" を作成したことを確認します。

問題: ksqlDB クエリのステップを正常に完了できない

Java エラーまたはその他の重大なエラーが発生しました。

解決策 : Confluent Platform により現在サポートされている オペレーティングシステム で作業していることを確認します。

ksqlDB のエラーが発生しました。

解決策: ksqlDB CLI のヘルプで、コマンドを成功させるためのヒントや、詳細なドキュメントへのリンクを確認します。

ksql> help