Confluent Platform Community コンポーネントを使用した Apache Kafka のクイックスタート(ローカル)¶
このクイックスタートに従うことにより、Confluent Platform と Confluent Community のコンポーネントを開発環境で実行することができます。
このクイックスタートでは、Apache Kafka® トピックを作成し、Kafka Connect を使ってそれらのトピックに対する模擬データを生成して、それらのトピックに対する ksqlDB ストリーミングクエリを作成します。
このクイックスタートでは、Confluent Platform CLI、Apache Kafka® CLI、および ksqlDB CLI を活用します。機能豊富な UI ベースのエクスペリエンスの場合は、商用コンポーネントによる Confluent Platform の クイックスタート を試してください。
参考
このクイックスタートの自動化バージョン を実行することもできます。これは、Confluent Platform のローカルインストール向けです。
- 前提条件:
- インターネットに接続されている。
- オペレーティングシステム が現在 Confluent Platform でサポートされている。
- サポートされている Java バージョン をダウンロードしてインストール済みである。
ステップ 1: Confluent Platform のダウンロードおよび起動¶
ダウンロードページ に移動します。
community
バージョンリンクをクリックします。以下を入力します。
- Email: メールアドレス
- File Type:
zip
- Confluent Community ライセンス契約の条件に同意します。
DOWNLOAD をクリックします。
ファイルを解凍します。以下のディレクトリが展開されます。
フォルダー 説明 /bin/ サービスを開始および停止するためのドライバスクリプト /etc/ 構成ファイル /lib/ Systemd サービス /logs/ ログファイル /share/ Jar およびライセンス /src/ プラットフォーム依存のビルドを必要とするソースファイル 以下のシェル変数を設定します。
export CONFLUENT_HOME=<path-to-confluent>
export PATH="${CONFLUENT_HOME}/bin:$PATH"
Confluent Hub クライアントをインストールします。次のステップで、これを使用して無償のオープンソース
kafka-source-datagen
コネクターをインストールします。以下のスクリプトを使用して、Confluent CLI (
confluent
)をインストールします。Microsoft Windows の場合、コマンド
curl
とsh
を使用するために、Windows Subsystem for Linux などの適切な Linux 環境をインストールする必要がある場合があります。curl -L --http1.1 https://cnfl.io/cli | sh -s -- -b $CONFLUENT_HOME/bin
詳細については、「Confluent CLI」を参照してください。
Confluent Hub クライアントを使用して、Kafka Connect Datagen Source Connector をインストールします。このコネクターはデモ用の模擬データを生成しますが、これは本稼働環境には適していません。Confluent Hub は、パッケージ済みのオンラインライブラリであり、Confluent Platform および Kafka 用にそのままインストールできる拡張機能またはアドオンです。
confluent-hub install \ --no-prompt confluentinc/kafka-connect-datagen:latest
confluent local services start
コマンドを使用して Confluent Platform を起動します。このコマンドにより、すべての Confluent Platform コンポーネント(Kafka、ZooKeeper、Schema Registry、HTTP REST Proxy for Kafka、Kafka Connect、ksqlDB を含む)が起動します。重要
confluent local コマンドは、単一ノードの開発環境向けであり、本稼働環境には適していません。生成されるデータは一過性で、一時的なものです。本稼働環境対応のワークフローについては、「Confluent Platform のインストールおよびアップグレード」を参照してください。
confluent local services start
出力は以下のようになります。
Starting Zookeeper Zookeeper is [UP] Starting Kafka Kafka is [UP] Starting Schema Registry Schema Registry is [UP] Starting Kafka REST Kafka REST is [UP] Starting Connect Connect is [UP] Starting KSQL Server KSQL Server is [UP]
ステップ 2: Kafka トピックの作成¶
このステップでは、Kafka CLI を使用して Kafka トピックを作成します。
users
という名前のトピックを作成します。kafka-topics --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic users
pageviews
という名前のトピックを作成します。kafka-topics --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic pageviews
ステップ 3: Kafka コネクターのインストールおよびサンプルデータの生成¶
このステップでは、Kafka Connect を使用して、Kafka トピック pageviews
および users
のサンプルデータを作成する、kafka-connect-datagen
という名前のデモソースコネクターを実行します。
Kafka Connect Datagen コネクターの最初のインスタンスを実行して、
pageviews
トピックに対して Kafka データを AVRO フォーマットで生成します。curl -L -O -H 'Accept: application/vnd.github.v3.raw' \ https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_pageviews_cos.config
curl -X POST -H "Content-Type: application/json" \ --data @connector_pageviews_cos.config http://localhost:8083/connectors
Kafka Connect Datagen コネクターの 2 番目のインスタンスを実行して、
users
トピックに対して Kafka データを AVRO フォーマットで生成します。curl -L -O -H 'Accept: application/vnd.github.v3.raw' \ https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_users_cos.config
curl -X POST -H "Content-Type: application/json" \ --data @connector_users_cos.config http://localhost:8083/connectors
ちなみに
Kafka Connect Datagen コネクターは「ステップ 1: Confluent Platform のダウンロードおよび起動」で手動でインストールされました。Datagen コネクターの場所を特定する際に問題が発生した場合は、「トラブルシューティング」セクションの「問題: Datagen コネクターの場所を特定できない」を参照してください。
ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み¶
このステップでは、ksqlDB SQL を使用してストリーム、テーブル、およびクエリを作成します。ksqlDB の SQL 構文の詳細については、「ksqlDB 構文リファレンス」を参照してください。
ストリームおよびテーブルの作成¶
このコマンドを使用して、ターミナルで ksqlDB CLI を起動します。
LOG_DIR=$CONFLUENT_HOME/ksql_logs ksql
重要
デフォルトで、ksqlDB は
ksql
実行可能ファイルの場所に対応するlogs
と呼ばれるディレクトリにそのログを保管しようとします。たとえば、ksql
が/usr/local/bin/ksql
にインストールされると、/usr/local/logs
にそのログを保管しようとします。ksql
を Confluent Platform のデフォルトのロケーションである$CONFLUENT_HOME/bin
から実行している場合は、LOG_DIR
変数を使用して、このデフォルトの動作をオーバーライドする必要があります。value_format
にAVRO
を指定して、Kafka トピックpageviews
からPAGEVIEWS
ストリームを作成します。CREATE STREAM PAGEVIEWS (VIEWTIME bigint, USERID varchar, PAGEID varchar) WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
Kafka トピック
users
から複数の列を含むUSERS
テーブルを作成し、value_format
にAVRO
を指定します。CREATE TABLE USERS (USERID VARCHAR PRIMARY KEY, REGISTERTIME BIGINT, GENDER VARCHAR, REGIONID VARCHAR) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
クエリの記述¶
このステップでは、ksqlDB SQL クエリを実行します。
auto.offset.reset1
クエリプロパティをearliest
に設定します。これにより、ksqlDB クエリは、使用可能なすべてのトピックデータを先頭から読み取ります。この構成は、以降の各クエリに対して使用されます。詳細については、「ksqlDB 構成パラメーターリファレンス」を参照してください。
SET 'auto.offset.reset'='earliest';
最大 3 行までに制限された結果とともにストリームからデータを返す、非永続的なクエリを作成します。
SELECT PAGEID FROM PAGEVIEWS EMIT CHANGES LIMIT 3;
出力は以下のようになります。
Page_45 Page_38 Page_11 LIMIT reached Query terminated
女性(female)ユーザーの
PAGEVIEWS
ストリームをフィルター処理する永続的なクエリを(ストリームとして)作成します。クエリの結果は Kafka のPAGEVIEWS_FEMALE
トピックに書き込まれます。CREATE STREAM PAGEVIEWS_FEMALE \ AS SELECT USERS.USERID AS USERID, PAGEID, REGIONID \ FROM PAGEVIEWS LEFT JOIN USERS ON PAGEVIEWS.USERID = USERS.USERID \ WHERE GENDER = 'FEMALE' EMIT CHANGES;
REGIONID
が8
または9
で終わる永続的なクエリを作成します。このクエリの結果は、クエリで明示的に指定されたように、pageviews_enriched_r8_r9
という名前の Kafka トピックに書き込まれます。CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 \ WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO') \ AS SELECT * FROM PAGEVIEWS_FEMALE \ WHERE REGIONID LIKE '%_8' OR REGIONID LIKE '%_9' EMIT CHANGES;
カウントが 1 より大きい場合に 30 秒の タンブリングウィンドウ で、
REGION
とGENDER
の組み合わせごとにPAGEVIEWS
をカウントする永続的なクエリを作成します。手順がグループ化およびカウントであるため、結果はストリームではなく、テーブルになります。このクエリの結果は、PAGEVIEWS_REGIONS
という名前の Kafka トピックに書き込まれます。CREATE TABLE PAGEVIEWS_REGIONS \ AS SELECT GENDER, REGIONID , COUNT(*) AS NUMBERS \ FROM PAGEVIEWS LEFT JOIN USERS ON PAGEVIEWS.USERID = USERS.USERID \ WINDOW TUMBLING (size 30 second) \ GROUP BY GENDER, REGIONID \ HAVING COUNT(*) > 1 EMIT CHANGES;
ストリーム、テーブル、およびクエリの確認¶
ストリームを一覧表示します。
SHOW STREAMS;
テーブルを一覧表示します。
SHOW TABLES;
ストリームまたはテーブルの詳細を表示します。
DESCRIBE EXTENDED <stream-or-table-name>;
たとえば、
users
テーブルの詳細を表示するには、以下を実行します。DESCRIBE EXTENDED USERS;
実行中のクエリを一覧表示します。
SHOW QUERIES;
クエリ実行プランを確認します。
SHOW QUERIES
の出力からクエリ ID を取得し、EXPLAIN
を実行して、そのクエリ ID のクエリ実行プランを表示します。EXPLAIN <Query ID>;
ステップ 5: ストリーミングデータのモニタリング¶
これで、ストリームまたはテーブルとして作成された実行中のクエリをモニタリングできます。
以下のクエリは、女性(female)ユーザーのページビュー情報を返します。
SELECT * FROM PAGEVIEWS_FEMALE EMIT CHANGES;
以下のクエリは、
regionid
が8
または9
で終わる地域における女性(female)ユーザーのページビュー情報を返します。SELECT * FROM PAGEVIEWS_FEMALE_LIKE_89 EMIT CHANGES;
以下のクエリは、30 秒のタンブリングウィンドウでの地域と性別の組み合わせごとのページビューのカウントを返します。
SELECT * FROM PAGEVIEWS_REGIONS EMIT CHANGES;
ステップ 6: Confluent Platform の停止¶
ローカルインストールでの作業が完了したら、Confluent Platform を停止できます。
Confluent CLI confluent local services connect stop コマンドを使用して、Confluent Platform を停止します。
confluent local services stop
confluent local destroy コマンドを使用して、Confluent Platform インスタンスのデータを破棄します。
confluent local destroy
confluent local services start コマンドを使用して、Confluent Platform のローカルインストールを再度起動することができます。
次のステップ¶
このクイックスタートで示したコンポーネントについて、さらに詳しく確認します。
- ksqlDB ドキュメント: ストリーミング ETL、リアルタイムモニタリング、異常検出などのユースケースにおける、ksqlDB を使用したデータの処理について確認できます。一連の スクリプト化されたデモ により、ksqlDB の使用方法も参照してください。
- Kafka チュートリアル : ステップごとの手順に従って、Kafka、Kafka Streams、および ksqlDB の基本的なチュートリアルを試すことができます。
- Kafka Streams ドキュメント: ストリーム処理アプリケーションを Java または Scala で構築する方法を確認できます。
- Kafka Connect ドキュメント: Kafka を他のシステムに統合し、すぐに使用できるコネクター をダウンロードして、 Kafka 内外のデータをリアルタイムで簡単に取り込む方法を確認できます。
- Kafka クライアントドキュメント: Go、Python、.NET、C/C++ などのプログラミング言語を使用して、Kafka に対してデータの読み取りおよび書き込みを行う方法を確認できます。
- ビデオ、デモ、および参考文献: Confluent Platform のチュートリアルやサンプルを試したり、デモやスクリーンキャストを参照したり、ホワイトペーパーやブログで学べます。
トラブルシューティング¶
クイックスタートのワークフローの進行中に問題が発生した場合は、ステップを再試行する前に、以下の解決策を確認してください。
問題: Datagen コネクターの場所を特定できない¶
解決策: DataGen コネクターがインストールされており、実行中であることを確認します。
「ステップ 1: Confluent Platform のダウンロードおよび起動」の説明に従って、kafka-connect-datagen
がインストールされており、実行中であることを確認します。
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
出力は以下のようになります。
Running in a "--no-prompt" mode
...
Completed
解決策: Confluent CLI confluent local services connect log コマンドを使用して、Datagen
の connect ログを確認します。
confluent local services connect log | grep -i Datagen
出力は以下のようになります。
[2019-04-18 14:21:08,840] INFO Loading plugin from: /Users/user.name/Confluent/confluent-version/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:215)
[2019-04-18 14:21:08,894] INFO Registered loader: PluginClassLoader{pluginLocation=file:/Users/user.name/Confluent/confluent-version/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:238)
[2019-04-18 14:21:08,894] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:167)
[2019-04-18 14:21:09,882] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:386)
解決策 : kafka-connect-datagen
の .jar
ファイルが追加されており、lib
サブフォルダー内に存在するかを確認します。
ls $CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/
出力は以下のようになります。
...
kafka-connect-datagen-0.1.0.jar
...
解決策 : プラグインがコネクターのパスに存在するかを検証します。
Confluent Hub から kafka-connect-datagen
ファイルをインストールした場合、インストールディレクトリは複数のプロパティファイルのプラグインのパスに追加されます。
Adding installation directory to plugin path in the following files:
/Users/user.name/Confluent/confluent-version/etc/kafka/connect-distributed.properties
/Users/user.name/Confluent/confluent-version/etc/kafka/connect-standalone.properties
/Users/user.name/Confluent/confluent-version/etc/schema-registry/connect-avro-distributed.properties
/Users/user.name/Confluent/confluent-version/etc/schema-registry/connect-avro-standalone.properties
...
これらのいずれかを使用して、コネクターのパスを確認できます。この例では、connect-avro-distributed.properties
ファイルを使用します。
grep plugin.path $CONFLUENT_HOME/etc/schema-registry/connect-avro-distributed.properties
出力は以下のようになります。
plugin.path=share/java,/Users/user.name/Confluent/confluent-version/share/confluent-hub-components
その内容が存在することを確認します。
ls $CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-datagen
出力は以下のようになります。
assets doc lib manifest.json
問題: ストリームとストリームの結合に関するエラー¶
エラーにより、ストリームとストリームの結合には WITHIN
句の指定が必要であることが通知されます。このエラーは pageviews
と users
の両方をストリームとして誤って作成した場合に発生する可能性があります。
解決策: 「ステップ 4: ksqlDB を使用したストリームおよびテーブルの作成および書き込み」で pageviews
の "ストリーム" を作成し、users
の "テーブル" を作成したことを確認します。
問題: ksqlDB クエリのステップを正常に完了できない¶
Java エラーまたはその他の重大なエラーが発生しました。
解決策 : Confluent Platform により現在サポートされている オペレーティングシステム で作業していることを確認します。
ksqlDB のエラーが発生しました。
解決策: ksqlDB CLI のヘルプで、コマンドを成功させるためのヒントや、詳細なドキュメントへのリンクを確認します。
ksql> help