Confluent Platform を使用した Apache Kafka のクイックスタート(Docker)¶
このクイックスタートに従うことにより、 Confluent Platform とその主要コンポーネントをDocker コンテナーを使用して実行することができます。このクイックスタートでは、Confluent Platform に含まれている Confluent Control Center を使用したトピック管理と ksqlDB を利用したイベントストリーム処理を行います。
このクイックスタートでは、Apache Kafka® トピックを作成し、Kafka Connect を使ってそれらのトピックに対する模擬データを生成して、それらのトピックに対する ksqlDB ストリーミングクエリを作成します。Control Center に移動して、イベントストリーミングクエリをモニタリングし、分析します。
参考
このクイックスタートの自動化バージョン を実行することもできます。これは、Confluent Platform のローカルインストール向けです。
- 前提条件:
- Docker
- Docker バージョン 1.11 またはそれ以降が インストールされ動作している。
- Docker Compose が インストール済みである 。Docker Compose は、Docker for Mac ではデフォルトでインストールされます。
- Docker メモリーに最小でも 6 GB が割り当てられている。Docker Desktop for Mac を使用しているとき、Docker メモリーの割り当てはデフォルトで 2 GB です。Docker のデフォルトの割り当てを 6 GB に変更できます。Preferences、Resources、Advanced の順に移動します。
- インターネット接続
- Confluent Platform で現在サポートされる オペレーティングシステム
- Docker でのネットワークと Kafka
- 内部コンポーネントと外部コンポーネントが Docker ネットワークに通信できるように、ホストとポートを構成します。詳細については、こちらの記事 を参照してください。
- Docker
ステップ 1: Docker を使用した Confluent Platform のダウンロードおよび起動¶
次のように、Confluent Platform all-in-one Docker Compose ファイル の内容をダウンロードするか、コピーします。
curl --silent --output docker-compose.yml \ https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.5-post/cp-all-in-one/docker-compose.yml
-d
オプションを使用して Confluent Platform を起動し、デタッチモードで実行します。docker-compose up -d
上記のコマンドで、各 Confluent Platform コンポーネントの別のコンテナーとともに Confluent Platform が起動されます。出力は次のようになります。
Creating network "cp-all-in-one_default" with the default driver Creating zookeeper ... done Creating broker ... done Creating schema-registry ... done Creating rest-proxy ... done Creating connect ... done Creating ksql-datagen ... done Creating ksqldb-server ... done Creating control-center ... done Creating ksqldb-cli ... done
サービスが稼働状態であることを検証するには、以下のコマンドを実行します。
docker-compose ps
出力は次のようになります。
Name Command State Ports ------------------------------------------------------------------------------------------ broker /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp ksqldb-cli /bin/sh Up ksql-datagen bash -c echo Waiting for K ... Up ksqldb-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp rest-proxy /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
ステートが
Up
ではない場合は、docker-compose up -d
コマンドを再度実行します。
ステップ 2: Kafka トピックの作成¶
このステップでは、Confluent Control Center を活用して Kafka トピックを作成します。Confluent Control Center は、本稼働環境データパイプラインおよびイベントストリーミングアプリケーションを構築およびモニタリングするための機能を備えています。
http://localhost:9021 にある Control Center ウェブインターフェイスに移動します。
異なるホストに Confluent Platform をインストールした場合は、
localhost
をアドレスのホスト名と置き換えます。Control Center がオンラインになるまでに 1 ~ 2 分かかる場合があります。
注釈
Control Center が
localhost
ブラウザーセッションで開いておらず、実行中ではない場合、Control Center は ksqlDB に接続されません。controlcenter.cluster タイルをクリックします。
ナビゲーションバーで Topics をクリックしてトピックリストを開いた後、Add a topic をクリックします。
Topic name フィールドで
pageviews
を指定し、Create with defaults をクリックします。トピック名では、大文字と小文字が区別されます。
ナビゲーションバーで Topics をクリックしてトピックリストを開いた後、Add a topic をクリックします。
Topic name フィールドで
users
を指定し、Create with defaults をクリックします。
ステップ 3: Kafka コネクターのインストールおよびサンプルデータの生成¶
このステップでは、Kafka Connect を使用して、Kafka トピック pageviews
および users
のサンプルデータを作成する、kafka-connect-datagen
という名前のデモソースコネクターを実行します。
ちなみに
「ステップ 1: Docker を使用した Confluent Platform のダウンロードおよび起動」での Docker Compose の起動時に、Kafka Connect Datagen コネクターは自動的にインストールされました。Datagen コネクターの場所を特定する際に問題が発生した場合は、「トラブルシューティング」セクションの「問題: Datagen コネクターの場所を特定できない」を参照してください。
Kafka Connect Datagen コネクターの最初のインスタンスを実行して、
pageviews
トピックに対して Kafka データを AVRO フォーマットで生成します。ナビゲーションバーで Connect をクリックします。
Connect Clusters リストで
connect-default
クラスターをクリックします。Add connector をクリックします。
DatagenConnector
タイルを選択します。ちなみに
表示されるコネクターを絞り込むには、Filter by category をクリックし、Sources をクリックします。
名前 フィールドで、コネクターの名前として
datagen-pageviews
を入力します。次の構成値を入力します。
- Key converter class:
org.apache.kafka.connect.storage.StringConverter
。 - kafka.topic:
pageviews
。 - max.interval:
100
。 - quickstart:
pageviews
。
- Key converter class:
Continue をクリックします。
コネクター構成を確認し、Launch をクリックします。
Kafka Connect Datagen コネクターの 2 番目のインスタンスを実行して、
users
トピックに対して Kafka データを AVRO フォーマットで生成します。ナビゲーションバーで Connect をクリックします。
Connect Clusters リストで
connect-default
クラスターをクリックします。Add connector をクリックします。
DatagenConnector
タイルを選択します。ちなみに
表示されるコネクターを絞り込むには、Filter by category をクリックし、Sources をクリックします。
名前 フィールドで、コネクターの名前として
datagen-users
を入力します。次の構成値を入力します。
- Key converter class:
org.apache.kafka.connect.storage.StringConverter
- kafka.topic:
users
- max.interval:
1000
- quickstart:
users
- Key converter class:
Continue をクリックします。
コネクター構成を確認し、Launch をクリックします。
ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み¶
ちなみに
docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
コマンドにより、Docker コンテナーから ksqlDB CLI を使用して、これらのコマンドを実行することもできます。
ストリームおよびテーブルの作成¶
このステップでは、ksqlDB を使用して、pageviews
トピックのストリームおよび users
トピックのテーブルを作成します。
ナビゲーションバーで ksqlDB をクリックします。
ksqlDB
アプリケーションを選択します。以下のコードをエディターのウィンドウにコピーします。Run query をクリックして
PAGEVIEWS
ストリームを作成します。ストリーム名では、大文字と小文字が区別されません。CREATE STREAM PAGEVIEWS (VIEWTIME BIGINT, USERID VARCHAR, PAGEID varchar) WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
以下のコードをエディターのウィンドウにコピーします。Run query をクリックして
USERS
テーブルを作成します。テーブル名では、大文字と小文字が区別されません。CREATE TABLE USERS (USERID VARCHAR PRIMARY KEY, REGISTERTIME BIGINT, GENDER VARCHAR, REGIONID VARCHAR) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
クエリの記述¶
このステップでは、上記で作成したストリームおよびテーブルに対する ksqlDB クエリを作成します。
Editor タブで Add query properties をクリックして、カスタムクエリプロパティを追加します。
auto.offset.reset
パラメーターをEarliest
に設定します。この設定により ksqlDB クエリは、使用可能なすべてのトピックデータを先頭から読み取ります。この構成は、以降の各クエリに対して使用されます。詳細については、「ksqlDB 構成パラメーターリファレンス」を参照してください。
以下のクエリを作成します。
Stop をクリックして、現時点で実行中のクエリを停止します。
最大 3 行までに制限された結果とともにストリームからデータを返す、非永続的なクエリを作成します。
エディターで以下のクエリを入力します。
SELECT PAGEID FROM PAGEVIEWS EMIT CHANGES LIMIT 3;
Run query をクリックします。出力は以下のようになります。
Card view アイコンまたは Table view アイコンをクリックして、出力レイアウトを変更します。
女性(female)ユーザーの
PAGEVIEWS
ストリームをフィルター処理する永続的なクエリを(ストリームとして)作成します。クエリの結果は Kafka のPAGEVIEWS_FEMALE
トピックに書き込まれます。エディターで以下のクエリを入力します。
CREATE STREAM PAGEVIEWS_FEMALE AS SELECT USERS.USERID AS USERID, PAGEID, REGIONID FROM PAGEVIEWS LEFT JOIN USERS ON PAGEVIEWS.USERID = USERS.USERID WHERE GENDER = 'FEMALE' EMIT CHANGES;
Run query をクリックします。出力は以下のようになります。
REGIONID
が8
または9
で終わる永続的なクエリを作成します。このクエリの結果は、クエリで明示的に指定されたように、pageviews_enriched_r8_r9
という名前の Kafka トピックに書き込まれます。エディターで以下のクエリを入力します。
CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO') AS SELECT * FROM PAGEVIEWS_FEMALE WHERE REGIONID LIKE '%_8' OR REGIONID LIKE '%_9' EMIT CHANGES;
Run query をクリックします。出力は以下のようになります。
カウントが 1 より大きい場合に 30 秒の タンブリングウィンドウ で、
REGION
とGENDER
の組み合わせごとにPAGEVIEWS
をカウントする永続的なクエリを作成します。手順がグループ化およびカウントであるため、結果はストリームではなく、テーブルになります。このクエリの結果は、PAGEVIEWS_REGIONS
という名前の Kafka トピックに書き込まれます。エディターで以下のクエリを入力します。
CREATE TABLE PAGEVIEWS_REGIONS AS SELECT GENDER, REGIONID , COUNT(*) AS NUMUSERS FROM PAGEVIEWS LEFT JOIN USERS ON PAGEVIEWS.USERID = USERS.USERID WINDOW TUMBLING (size 30 second) GROUP BY GENDER, REGIONID HAVING COUNT(*) > 1 EMIT CHANGES;
Run query をクリックします。出力は以下のようになります。
Running queries タブをクリックします。以下の永続的なクエリが表示されます。
- PAGEVIEWS_FEMALE
- PAGEVIEWS_FEMALE_LIKE_89
- PAGEVIEWS_REGIONS
Editor タブをクリックします。All available streams and tables ペインには、アクセスできるすべてのストリームおよびテーブルが表示されます。
All available streams and tables セクションで KSQL_PROCESSING_LOG をクリックすると、ストリームのスキーマ(ネスト化されたデータ構造など)が表示されます。
クエリの実行¶
このステップでは、前のセクションでストリームおよびテーブルとして保存した ksqlDB クエリを実行します。
Streams タブで
PAGEVIEWS_FEMALE
ストリームを選択します。Query stream をクリックします。
エディターが開き、クエリのストリーミング出力が表示されます。
Stop をクリックして、出力の生成を停止します。
Tables タブで
PAGEVIEWS_REGIONS
テーブルを選択します。Query table をクリックします。
エディターが開き、クエリのストリーミング出力が表示されます。
Stop をクリックして、出力の生成を停止します。
ステップ 5: コンシューマーラグのモニタリング¶
ナビゲーションバーで Consumers をクリックして、ksqlDB により作成されたコンシューマーを表示します。
コンシューマーグループ ID をクリックして、
_confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE_5
コンシューマーグループの詳細を表示します。このページで、ストリーミングクエリのコンシューマーラグおよび消費量の値を確認できます。
詳細については、Control Center の「Consumers」ドキュメントを参照してください。
ステップ 6: Confluent コンテナーの停止およびクリーンアップ¶
Docker での作業が完了したら、Docker コンテナーおよびイメージを停止し、削除できます。
以下のコマンドを実行して、Confluent の Docker コンテナーを停止します。
docker-compose stop
Docker コンテナーを停止した後に、以下のコマンドを実行して Docker システムを削除します。これらのコマンドを実行すると、コンテナー、ネットワーク、ボリューム、イメージが削除され、ディスク領域が解放されます。
docker system prune -a --volumes --filter "label=io.confluent.docker"
詳細については、Docker の公式ドキュメントを参照してください。
次のステップ¶
このクイックスタートで示したコンポーネントについて、さらに詳しく確認します。
- ksqlDB ドキュメント: ストリーミング ETL、リアルタイムモニタリング、異常検出などのユースケースにおける、ksqlDB を使用したデータの処理について確認できます。一連の スクリプト化されたデモ により、ksqlDB の使用方法も参照してください。
- Kafka チュートリアル : ステップごとの手順に従って、Kafka、Kafka Streams、および ksqlDB の基本的なチュートリアルを試すことができます。
- Kafka Streams ドキュメント: ストリーム処理アプリケーションを Java または Scala で構築する方法を確認できます。
- Kafka Connect ドキュメント: Kafka を他のシステムに統合し、すぐに使用できるコネクター をダウンロードして、 Kafka 内外のデータをリアルタイムで簡単に取り込む方法を確認できます。
- Kafka クライアントドキュメント: Go、Python、.NET、C/C++ などのプログラミング言語を使用して、Kafka に対してデータの読み取りおよび書き込みを行う方法を確認できます。
- ビデオ、デモ、および参考文献: Confluent Platform のチュートリアルやサンプルを試したり、デモやスクリーンキャストを参照したり、ホワイトペーパーやブログで学べます。
トラブルシューティング¶
クイックスタートのワークフローの進行中に問題が発生した場合は、ステップを再試行する前に、以下の解決策を確認してください。
問題: Datagen コネクターの場所を特定できない¶
詳細については、「ステップ 1: Docker を使用した Confluent Platform のダウンロードおよび起動」を参照してください。
解決策 : connect に対してのみ build
コマンドを実行して、connect コンテナーが正常に構築されたかどうか確認します。
docker-compose build --no-cache connect
出力は以下のようになります。
Building connect
...
Completed
Removing intermediate container cdb0af3550c8
---> 36d00047d29b
Successfully built 36d00047d29b
Successfully tagged confluentinc/kafka-connect-datagen:latest
connect コンテナーが既に正常に構築されていた場合は、以下のような出力になります。
connect uses an image, skipping
解決策 : Connect ログで Datagen
を確認します。
docker-compose logs connect | grep -i Datagen
出力は以下のようになります。
connect | [2019-04-17 20:03:26,137] INFO Loading plugin from: /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-04-17 20:03:26,206] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-04-17 20:03:26,206] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-04-17 20:03:28,102] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
解決策 : Connect ログで docker-compose up -d
コマンドを適切に実行するための警告およびリマインダーを確認します。
docker-compose logs connect | grep -i Datagen
解決策 : kafka-connect-datagen
の .jar
ファイルが追加されており、lib
サブフォルダー内に存在するかを確認します。
docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/
出力は以下のようになります。
...
kafka-connect-datagen-0.1.0.jar
...
解決策 : プラグインがコネクターのパスに存在するかを検証します。
docker-compose exec connect bash -c 'echo $CONNECT_PLUGIN_PATH'
出力は以下のようになります。
/usr/share/java,/usr/share/confluent-hub-components
その内容が存在することを確認します。
docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen
出力は以下のようになります。
assets doc etc lib manifest.json
問題: ストリームとストリームの結合に関するエラー¶
エラーにより、ストリームとストリームの結合には WITHIN
句の指定が必要であることが通知されます。このエラーは pageviews
と users
の両方をストリームとして誤って作成した場合に発生する可能性があります。

解決策 : 「ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み」で pageviews
の "ストリーム" を作成し、users
の "テーブル" を作成したことを確認します。
問題: ksqlDB クエリのステップを正常に完了できない¶
Java エラーまたはその他の重大なエラーが発生しました。
解決策 : Confluent Platform により現在サポートされている オペレーティングシステム で作業していることを確認します。
解決策 : Docker メモリーが 8 MB まで増設されたかを確認します。Docker > Preferences > Advanced に移動してください。Docker メモリーが不十分な場合は、その他の予期しない問題が発生する可能性があります。