Confluent Platform Community コンポーネントを使用した Apache Kafka のクイックスタート(Docker)¶
このクイックスタートに従うことにより、 Confluent Platform と Confluent Community のコンポーネントをDocker コンテナーを活用した開発環境で実行することができます。
このクイックスタートでは、Apache Kafka® トピックを作成し、Kafka Connect を使ってそれらのトピックに対する模擬データを生成して、それらのトピックに対する ksqlDB ストリーミングクエリを作成します。
このクイックスタートでは、Confluent Platform CLI、Apache Kafka® CLI、および ksqlDB CLI を活用します。機能豊富な UI ベースのエクスペリエンスの場合は、商用コンポーネントによる Confluent Platform の クイックスタート を試してください。
- 前提条件:
- Docker
- Docker バージョン 1.11 またはそれ以降が インストールされ動作している。
- Docker Compose が インストール済みである 。Docker Compose は、Docker for Mac ではデフォルトでインストールされます。
- Docker メモリーに最小でも 6 GB が割り当てられている。Docker Desktop for Mac を使用しているとき、Docker メモリーの割り当てはデフォルトで 2 GB です。Docker のデフォルトの割り当てを 6 GB に変更できます。Preferences、Resources、Advanced の順に移動します。
- インターネット接続
- Confluent Platform で現在サポートされる オペレーティングシステム
- Docker でのネットワークと Kafka
- 内部コンポーネントと外部コンポーネントが Docker ネットワークに通信できるように、ホストとポートを構成します。詳細については、こちらの記事 を参照してください。
- (オプション)`curl <https://curl.se/>`__.
- 以下の手順では、Docker Compose ファイルをダウンロードします。ダウンロードにはさまざまな方法がありますが、この手順では、ファイルのダウンロードに使用できる明示的な curl コマンドを説明します。
- Docker
ステップ 1: Docker を使用した Confluent Platform のダウンロードおよび起動¶
次のように、Confluent Community all-in-one Docker Compose ファイル の内容をダウンロードするか、コピーします。
curl --silent --output docker-compose.yml \ https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.5-post/cp-all-in-one-community/docker-compose.yml
-d
オプションを指定して Confluent Platform を起動し、デタッチモードで実行します。docker-compose up -d
上記のコマンドで、すべての Confluent Platform コンポーネントの個別のコンテナーとともに Confluent Platform が起動します。出力は次のようになります。
Creating network "cp-all-in-one-community_default" with the default driver Creating zookeeper ... done Creating broker ... done Creating schema-registry ... done Creating rest-proxy ... done Creating connect ... done Creating ksql-datagen ... done Creating ksqldb-server ... done Creating ksqldb-cli ... done
サービスが稼働中であるかを検証します。
docker-compose ps
次のように表示されます。
Name Command State Ports ------------------------------------------------------------------------------------------ broker /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp ksqldb-cli ksql http://localhost:8088 Up ksql-datagen bash -c echo Waiting for K ... Up ksqldb-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp rest-proxy /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
ステートが
Up
ではない場合は、docker-compose up -d
コマンドを再度実行します。
ステップ 2: Kafka トピックの作成¶
このステップでは、Kafka CLI を使用して Kafka トピックを作成します。
users
という名前のトピックを作成します。docker-compose exec broker kafka-topics \ --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic users
pageviews
という名前のトピックを作成します。docker-compose exec broker kafka-topics \ --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic pageviews
ステップ 3: Kafka コネクターのインストールおよびサンプルデータの生成¶
このステップでは、Kafka Connect を使用して、Kafka トピック pageviews
および users
のサンプルデータを作成する、kafka-connect-datagen
という名前のデモソースコネクターを実行します。
Kafka Connect Datagen コネクターの最初のインスタンスを実行して、
pageviews
トピックに対して Kafka データを AVRO フォーマットで生成します。curl -L -O -H 'Accept: application/vnd.github.v3.raw' \ https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_pageviews_cos.config
curl -X POST -H 'Content-Type: application/json' \ --data @connector_pageviews_cos.config \ http://localhost:8083/connectors
Kafka Connect Datagen コネクターの 2 番目のインスタンスを実行して、
users
トピックに対して Kafka データを AVRO フォーマットで生成します。curl -L -O -H 'Accept: application/vnd.github.v3.raw' \ https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_users_cos.config
curl -X POST -H 'Content-Type: application/json' \ --data @connector_users_cos.config \ http://localhost:8083/connectors
ちなみに
「ステップ 1: Docker を使用した Confluent Platform のダウンロードおよび起動」での Docker Compose の起動時に、Kafka Connect Datagen コネクターは自動的にインストールされました。Datagen コネクターに関する問題が発生した場合は、「トラブルシューティング」セクションの「問題: Datagen コネクターの場所を特定できない」を参照してください。
ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み¶
このステップでは、ksqlDB SQL を使用してストリーム、テーブル、およびクエリを作成します。ksqlDB の SQL 構文の詳細については、「ksqlDB 構文リファレンス」を参照してください。
ストリームおよびテーブルの作成¶
このコマンドを使用して、ターミナルで ksqlDB CLI を起動します。
docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
重要
デフォルトで、ksqlDB は
ksql
実行可能ファイルの場所に対応するlogs
と呼ばれるディレクトリにそのログを保管しようとします。たとえば、ksql
が/usr/local/bin/ksql
にインストールされると、/usr/local/logs
にそのログを保管しようとします。ksql
を Confluent Platform のデフォルトのロケーションである$CONFLUENT_HOME/bin
から実行している場合は、LOG_DIR
変数を使用して、このデフォルトの動作をオーバーライドする必要があります。value_format
にAVRO
を指定して、Kafka トピックpageviews
からPAGEVIEWS
ストリームを作成します。CREATE STREAM PAGEVIEWS (VIEWTIME bigint, USERID varchar, PAGEID varchar) WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
Kafka トピック
users
から複数の列を含むUSERS
テーブルを作成し、value_format
にAVRO
を指定します。CREATE TABLE USERS (USERID VARCHAR PRIMARY KEY, REGISTERTIME BIGINT, GENDER VARCHAR, REGIONID VARCHAR) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
クエリの記述¶
このステップでは、ksqlDB SQL クエリを実行します。
auto.offset.reset1
クエリプロパティをearliest
に設定します。これにより、ksqlDB クエリは、使用可能なすべてのトピックデータを先頭から読み取ります。この構成は、以降の各クエリに対して使用されます。詳細については、「ksqlDB 構成パラメーターリファレンス」を参照してください。
SET 'auto.offset.reset'='earliest';
最大 3 行までに制限された結果とともにストリームからデータを返す、非永続的なクエリを作成します。
SELECT PAGEID FROM PAGEVIEWS EMIT CHANGES LIMIT 3;
出力は以下のようになります。
Page_45 Page_38 Page_11 LIMIT reached Query terminated
女性(female)ユーザーの
PAGEVIEWS
ストリームをフィルター処理する永続的なクエリを(ストリームとして)作成します。クエリの結果は Kafka のPAGEVIEWS_FEMALE
トピックに書き込まれます。CREATE STREAM PAGEVIEWS_FEMALE \ AS SELECT USERS.USERID AS USERID, PAGEID, REGIONID \ FROM PAGEVIEWS LEFT JOIN USERS ON PAGEVIEWS.USERID = USERS.USERID \ WHERE GENDER = 'FEMALE' EMIT CHANGES;
REGIONID
が8
または9
で終わる永続的なクエリを作成します。このクエリの結果は、クエリで明示的に指定されたように、pageviews_enriched_r8_r9
という名前の Kafka トピックに書き込まれます。CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 \ WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO') \ AS SELECT * FROM PAGEVIEWS_FEMALE \ WHERE REGIONID LIKE '%_8' OR REGIONID LIKE '%_9' EMIT CHANGES;
カウントが 1 より大きい場合に 30 秒の タンブリングウィンドウ で、
REGION
とGENDER
の組み合わせごとにPAGEVIEWS
をカウントする永続的なクエリを作成します。手順がグループ化およびカウントであるため、結果はストリームではなく、テーブルになります。このクエリの結果は、PAGEVIEWS_REGIONS
という名前の Kafka トピックに書き込まれます。CREATE TABLE PAGEVIEWS_REGIONS \ AS SELECT GENDER, REGIONID , COUNT(*) AS NUMBERS \ FROM PAGEVIEWS LEFT JOIN USERS ON PAGEVIEWS.USERID = USERS.USERID \ WINDOW TUMBLING (size 30 second) \ GROUP BY GENDER, REGIONID \ HAVING COUNT(*) > 1 EMIT CHANGES;
ストリーム、テーブル、およびクエリの確認¶
ストリームを一覧表示します。
SHOW STREAMS;
テーブルを一覧表示します。
SHOW TABLES;
ストリームまたはテーブルの詳細を表示します。
DESCRIBE EXTENDED <stream-or-table-name>;
たとえば、
users
テーブルの詳細を表示するには、以下を実行します。DESCRIBE EXTENDED USERS;
実行中のクエリを一覧表示します。
SHOW QUERIES;
クエリ実行プランを確認します。
SHOW QUERIES
の出力からクエリ ID を取得し、EXPLAIN
を実行して、そのクエリ ID のクエリ実行プランを表示します。EXPLAIN <Query ID>;
ステップ 5: ストリーミングデータのモニタリング¶
これで、ストリームまたはテーブルとして作成された実行中のクエリをモニタリングできます。
以下のクエリは、女性(female)ユーザーのページビュー情報を返します。
SELECT * FROM PAGEVIEWS_FEMALE EMIT CHANGES;
以下のクエリは、
regionid
が8
または9
で終わる地域における女性(female)ユーザーのページビュー情報を返します。SELECT * FROM PAGEVIEWS_FEMALE_LIKE_89 EMIT CHANGES;
以下のクエリは、30 秒のタンブリングウィンドウでの地域と性別の組み合わせごとのページビューのカウントを返します。
SELECT * FROM PAGEVIEWS_REGIONS EMIT CHANGES;
ステップ 6: Confluent コンテナーの停止およびクリーンアップ¶
Docker での作業が完了したら、Docker コンテナーおよびイメージを停止し、削除できます。
以下のコマンドを実行して、Confluent の Docker コンテナーを停止します。
docker-compose stop
Docker コンテナーを停止した後に、以下のコマンドを実行して Docker システムを削除します。これらのコマンドを実行すると、コンテナー、ネットワーク、ボリューム、イメージが削除され、ディスク領域が解放されます。
docker system prune -a --volumes --filter "label=io.confluent.docker"
詳細については、Docker の公式ドキュメントを参照してください。
次のステップ¶
このクイックスタートで示したコンポーネントについて、さらに詳しく確認します。
- ksqlDB ドキュメント: ストリーミング ETL、リアルタイムモニタリング、異常検出などのユースケースにおける、ksqlDB を使用したデータの処理について確認できます。一連の スクリプト化されたデモ により、ksqlDB の使用方法も参照してください。
- Kafka チュートリアル : ステップごとの手順に従って、Kafka、Kafka Streams、および ksqlDB の基本的なチュートリアルを試すことができます。
- Kafka Streams ドキュメント: ストリーム処理アプリケーションを Java または Scala で構築する方法を確認できます。
- Kafka Connect ドキュメント: Kafka を他のシステムに統合し、すぐに使用できるコネクター をダウンロードして、 Kafka 内外のデータをリアルタイムで簡単に取り込む方法を確認できます。
- Kafka クライアントドキュメント: Go、Python、.NET、C/C++ などのプログラミング言語を使用して、Kafka に対してデータの読み取りおよび書き込みを行う方法を確認できます。
- ビデオ、デモ、および参考文献: Confluent Platform のチュートリアルやサンプルを試したり、デモやスクリーンキャストを参照したり、ホワイトペーパーやブログで学べます。
トラブルシューティング¶
クイックスタートのワークフローの進行中に問題が発生した場合は、ステップを再試行する前に、以下の解決策を確認してください。
問題: Datagen コネクターの場所を特定できない¶
解決策 : connect に対してのみ build
コマンドを実行して、connect コンテナーが正常に構築されたかどうか確認します。
docker-compose build --no-cache connect
出力は以下のようになります。
Building connect
...
Completed
Removing intermediate container cdb0af3550c8
---> 36d00047d29b
Successfully built 36d00047d29b
Successfully tagged confluentinc/kafka-connect-datagen:latest
connect コンテナーが既に正常に構築されていた場合は、以下のような出力になります。
connect uses an image, skipping
解決策 : Connect ログで Datagen
を確認します。
docker-compose logs connect | grep -i Datagen
出力は以下のようになります。
connect | [2019-04-17 20:03:26,137] INFO Loading plugin from: /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-04-17 20:03:26,206] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-04-17 20:03:26,206] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-04-17 20:03:28,102] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
解決策 : Connect ログで docker-compose up -d
コマンドを適切に実行するための警告およびリマインダーを確認します。
docker-compose logs connect | grep -i Datagen
解決策 : kafka-connect-datagen
の .jar
ファイルが追加されており、lib
サブフォルダー内に存在するかを確認します。
docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/
出力は以下のようになります。
...
kafka-connect-datagen-0.1.0.jar
...
解決策 : プラグインがコネクターのパスに存在するかを検証します。
docker-compose exec connect bash -c 'echo $CONNECT_PLUGIN_PATH'
出力は以下のようになります。
/usr/share/java,/usr/share/confluent-hub-components
その内容が存在することを確認します。
docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen
出力は以下のようになります。
assets doc etc lib manifest.json
問題: ストリームとストリームの結合に関するエラー¶
エラーにより、ストリームとストリームの結合には WITHIN
句の指定が必要であることが通知されます。このエラーは pageviews
と users
の両方をストリームとして誤って作成した場合に発生する可能性があります。
解決策: 「ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み」で pageviews
の "ストリーム" を作成し、users
の "テーブル" を作成したことを確認します。
問題: ksqlDB クエリのステップを正常に完了できない¶
Java エラーまたはその他の重大なエラーが発生しました。
解決策 : Confluent Platform により現在サポートされている オペレーティングシステム で作業していることを確認します。
解決策 : Docker メモリーが 8 MB まで増設されたかを確認します。Docker > Preferences > Advanced に移動してください。Docker メモリーが不十分な場合は、その他の予期しない問題が発生する可能性があります。
ksqlDB のエラーが発生しました。
解決策: ksqlDB CLI のヘルプで、コマンドを成功させるためのヒントや、詳細なドキュメントへのリンクを確認します。
ksql> help