Confluent Platform のデモ(cp-demo)¶
このサンプルでは、ksqlDB と Kafka Streams を使用してストリームを処理する Apache Kafka® イベントストリーミングアプリケーションを含む完全な Confluent Platform デプロイを構築します。また、すべてのコンポーネントのセキュリティはエンドツーエンドで有効にします。このサンプルの作成手順を詳しく説明する付属のチュートリアルで、サンプル全体がどのように連携して機能しているかを確認してください。
概要¶
ユースケース¶
このユースケースは、実際の Wikipedia ページのリアルタイム編集を処理する Apache Kafka® イベントストリーミングアプリケーションです。

Confluent Platform に基づくイベントストリーミングプラットフォーム全体の説明は、次のとおりです。Wikimedia の EventStreams では、実際の wiki ページで発生しているリアルタイム編集の継続的なストリームが公開されます。Kafka ソースコネクター kafka-connect-sse は、https://stream.wikimedia.org/v2/stream/recentchange からサーバーが送信したイベント(SSE)のストリーミングを行います。また、カスタム Connect 変換 kafka-connect-json-schema は、これらのメッセージから JSON を抽出し、これらは Kafka クラスターに書き込まれます。このサンプルでは、ksqlDB と Kafka Streams アプリケーションを使用してデータを処理します。その後、Kafka シンクコネクター kafka-connect-elasticsearch が Kafka からデータのストリーミングを行い、Elasticsearch として有形化され、Kibana によって分析されます。Confluent Replicator は、同じクラスター内のトピック間でもメッセージをコピーしています。すべてのデータは、Confluent Schema Registry と Avro を使用しており、Confluent Control Center がデプロイを管理し、モニタリングしています。
データパターン¶
データパターンは、次のとおりです。
コンポーネント | 消費元 | 生成先 |
---|---|---|
SSE Source Connector | Wikipedia | wikipedia.parsed |
ksqlDB | wikipedia.parsed |
ksqlDB ストリームおよびテーブル |
Kafka Streams アプリケーション | wikipedia.parsed |
wikipedia.parsed.count-by-domain |
Confluent Replicator | wikipedia.parsed |
wikipedia.parsed.replica |
Elasticsearch Sink Connector | WIKIPEDIABOT (ksqlDB から) |
Elasticsearch/Kibana |
cp-demo の実行¶
cp-demo
は Docker 環境であり、すべてのサービスは 1 台のホストで実行されています。これは、Confluent Platform のデモの実行を容易にするためです。本稼働環境では、すべての Confluent Platform サービスを 1 台のホストでデプロイすることは避けてください。
また、本稼働環境では、有効なライセンスを使用して、本稼働データを含むクラスターとは別の専用のメトリクスクラスターで Confluent Control Center をデプロイする必要があります。専用のメトリクスクラスターを使用すると、本稼働環境のトラフィッククラスターで問題が発生した場合でもシステム正常性モニタリングが提供され続けるため、回復性が向上します。
Docker 以外のサンプルを使用する場合は、confluentinc/examples GitHub リポジトリ にアクセスしてください。
以下のチュートリアルの手順をすべて参照した後、学習した概念を応用して、自分のイベントストリーミングパイプラインを Confluent Cloud で構築します。これは、Kafka を使用した、クラウドネイティブのフルマネージド型イベントストリーミングプラットフォームです。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG
を使用すると、$50 相当を無料で使用できます(詳細)。
前提条件¶
このサンプルは、次の環境で検証されています。
- Docker バージョン 17.06.1-ce
- Docker Compose ファイル形式 2.3 の Docker Compose バージョン 1.16.0
- Java バージョン 1.8.0_92
- MacOS 10.15.3(Ubuntu 環境 向けの注意事項あり)
- OpenSSL 1.1.1d
- git
- jq
セットアップ¶
Docker の詳細(Advanced)`設定 <https://docs.docker.com/docker-for-mac/#advanced>`__ で、Docker 専用のメモリーを 8 GB(デフォルトでは 2 GB)以上に増やし、必ず 2 つ以上の CPU コアを Docker に割り当てます。
confluentinc/cp-demo GitHub リポジトリ のクローンを作成します。
git clone https://github.com/confluentinc/cp-demo
cp-demo
ディレクトリに移動し、Confluent Platform リリースブランチに切り替えます。cd cp-demo git checkout 6.0.6-post
起動¶
cp-demo
ディレクトリ内には、cp-demo
ワークフローをエンドツーエンドで実行する 1 つの スクリプト があります。これにより、キーと証明書の生成、Docker コンテナーの起動、および環境の構成と検証が行われます。これは、次のオプション設定で実行できます。
CLEAN
: 実行の合間に、証明書、およびローカルで作成された Connect イメージを再生成するかどうかを制御します。C3_KSQLDB_HTTPS
:HTTP
またはHTTPS
を使用するように、Confluent Control Center と ksqlDB サーバーを設定します(デフォルト:HTTP
)。VIZ
: Elasticsearch と Kibana を有効にします(デフォルト:true
)。
デフォルト設定で初めて
cp-demo
を実行するには、次のコマンドを実行します。完了までに数分かかります。./scripts/start.sh
それ以降の実行では、生成した証明書、およびローカルで作成した Connect イメージを削除していない場合は、それらが再使用されます。それらを強制的に再生成するには、
CLEAN=true
を設定します。CLEAN=true ./scripts/start.sh
cp-demo
は、http://
(デフォルト)またはセキュアなhttps://
経由での Confluent Control Center GUI へのアクセスをサポートします。後者は、自己署名 CA と、デプロイ中に生成された証明書を使用します。Confluent Control Center は ksqlDB サーバーを含む他のコンポーネントと統合されているため、完全にサポートされるのは一度に 1 つのモードのみです。cp-demo
をhttps
モードで実行するには、cp-demo
の起動時にC3_KSQLDB_HTTPS=true
を設定します。C3_KSQLDB_HTTPS=true ./scripts/start.sh
Elasticsearch と Kibana により、
cp-demo
の localhost メモリー要件が増加します。小さいメモリーフットプリントでユーザーがcp-demo
を実行しようとしている場合は、cp-demo
の起動時にVIZ=false
を設定して、これらのコンポーネントをオプトアウトしてください。VIZ=false ./scripts/start.sh
開始前のチェック¶
チュートリアルを開始する前に、環境が正しく起動されていることを確認します。以下の開始前チェックで異常が見つかった場合は、「トラブルシューティング」セクションを参照してください。
Docker コンテナーのステータスが
Up
ステートを示していることを確認します。docker-compose ps
出力は以下のようになります。
Name Command State Ports ------------------------------------------------------------------------------------------------------------------------------------------------------------ connect bash -c sleep 10 && cp /us ... Up 0.0.0.0:8083->8083/tcp, 9092/tcp control-center /etc/confluent/docker/run Up (healthy) 0.0.0.0:9021->9021/tcp, 0.0.0.0:9022->9022/tcp elasticsearch /bin/bash bin/es-docker Up 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp kafka1 bash -c if [ ! -f /etc/kaf ... Up (healthy) 0.0.0.0:10091->10091/tcp, 0.0.0.0:11091->11091/tcp, 0.0.0.0:12091->12091/tcp, 0.0.0.0:8091->8091/tcp, 0.0.0.0:9091->9091/tcp, 9092/tcp kafka2 bash -c if [ ! -f /etc/kaf ... Up (healthy) 0.0.0.0:10092->10092/tcp, 0.0.0.0:11092->11092/tcp, 0.0.0.0:12092->12092/tcp, 0.0.0.0:8092->8092/tcp, 0.0.0.0:9092->9092/tcp kibana /bin/sh -c /usr/local/bin/ ... Up 0.0.0.0:5601->5601/tcp ksqldb-cli /bin/sh Up ksqldb-server /etc/confluent/docker/run Up (healthy) 0.0.0.0:8088->8088/tcp openldap /container/tool/run --copy ... Up 0.0.0.0:389->389/tcp, 636/tcp restproxy /etc/confluent/docker/run Up 8082/tcp, 0.0.0.0:8086->8086/tcp schemaregistry /etc/confluent/docker/run Up 8081/tcp, 0.0.0.0:8085->8085/tcp streams-demo /app/start.sh Up 9092/tcp tools /bin/bash Up zookeeper /etc/confluent/docker/run Up (healthy) 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
cp-demo
パイプライン全体の末尾に移動し、http://localhost:5601/app/kibana#/dashboard/Wikipedia にある Kibana ダッシュボードを表示します。以下は正常な表示で、cp-demo
の起動スクリプトが問題なく完了したことを確認できます。docker-compose.yml ファイルで Confluent Platform 構成全体を確認します。
Kafka クラスターと Schema Registry に対するセキュリティパラメーターが設定されている クライアント構成 ファイルで Kafka Streams アプリケーション構成を確認します。
チュートリアル¶
Confluent Control Center へのログイン¶
デフォルトの
C3_KSQLDB_HTTPS=false
でcp-demo
を実行した場合は、次の URL で Web ブラウザーから Confluent Control Center GUI にログインします。http://localhost:9021
C3_KSQLDB_HTTPS=true
でcp-demo
を実行した場合は、次の URL で Web ブラウザーから Confluent Control Center GUI にログインします。https://localhost:9022
ブラウザーにより、信頼されていない自己署名証明書および認証機関が検出され、次のようなプライバシー警告が発行されます。続行するには、ブラウザーのプロセスに従ってこの証明書を受諾します。この受諾は、このブラウザーセッションが終了するまで持続します。
Chrome の場合:
Advanced
をクリックし、ウィンドウが展開されたら、Proceed to localhost (unsafe)
をクリックします。Safari の場合: 新しいプライベートブラウザーウィンドウを開き(
Shift + ⌘ + N
)、Show Details
をクリックします。ウィンドウが展開されたら、visit this website
をクリックします。
ログイン画面で、クラスターへのスーパーユーザーアクセス権を持つ
superUser
として、パスワードsuperUser
で Confluent Control Center にログインします。また、他のユーザー としてログインし、各ユーザーへの表示がアクセス許可に応じてどのように変わるかを調べることもできます。
ブローカー¶
Kafka Raleigh という名前のクラスターを選択します。
Brokers をクリックします。
クラスター内の各ブローカーのステータスが表示されます。
Production または Consumption をクリックして、生成メトリクスと消費メトリック、ブローカーの稼働時間、パーティション(オンライン、レプリケーション数不足、レプリカの合計、同期されていないレプリカ)、ディスク使用率、システム(ネットワークプール使用率、リクエストプール使用率)を調べます。
トピック¶
Confluent Control Center では、Kafka クラスター内のトピックを管理できます。Topics をクリックします。
下へスクロールし、トピック
wikipedia.parsed
をクリックします。このトピックの概要が表示されます。
- Throughput
- パーティションレプリケーションのステータス
どのブローカーがどのパーティションのリーダーであるか、およびすべてのパーティションがどこに格納されているかを調べます。
このトピックのメッセージをリアルタイムで検査します。
このトピックのスキーマを表示します。
wikipedia.parsed
では、Schema Registry で登録されたスキーマをトピック値が使用しています(トピックキーは単なる文字列です)。このトピックの構成設定を表示します。
All Topics に戻り、
wikipedia.parsed.count-by-domain
をクリックして、Kafka Streams アプリケーションからの出力トピックを表示します。All topics
ビューに戻り、右上にある + Add a topic ボタンをクリックして、新しいトピックを Kafka クラスターで作成します。また、クラスターの Kafka のトピックの設定を表示し、編集することもできます。Confluent Control Center の トピック管理 に関する説明を参照してください。
Kafka Connect¶
このサンプルでは、3 つのコネクターを実行します。
- SSE Source Connector
- Elasticsearch Sink Connector
- Confluent Replicator
これらは、Confluent Platform セキュリティ機能で構成された Connect ワーカーで実行されています。Connect ワーカーの組み込みプロデューサーは、べき等性を持つように、パーティションごとに "順序に従って厳密に 1 回" のセマンティクスで構成されます(エラーにより、プロデューサーが再試行する場合、同じメッセージ(プロデューサーが複数回送信します)は、ブローカーに関して 1 回だけ Kafka ログに書き込まれます)。
Kafka Connect Docker コンテナーは、
cp-server-connect-base
から作成されたカスタムイメージを、このサンプルに固有の追加コネクターおよび変換を使用して実行しています。カスタムイメージは、この Dockerfile による定義に従って、起動スクリプトの一部としてビルドされます。Confluent Control Center は、Kafka Connect API を使用して、複数の connect クラスター を管理します。Connect をクリックします。
Connect ワーカーのクラスターの名前
connect1
を選択します。このサンプルで実行しているコネクターを確認します。
- ソースコネクター
wikipedia-sse
: このサンプルの SSE ソースコネクターの 構成ファイル を表示します。 - ソースコネクター
replicate-topic
: このサンプルの Replicator コネクターの 構成ファイル を表示します。 - Kafka のトピック
WIKIPEDIABOT
から消費しているシンクコネクターelasticsearch-ksqldb
: このサンプルの Elasticsearch Sink Connector の 構成ファイル を表示します。
- ソースコネクター
コネクターの構成とカスタム変換の詳細を表示または変更するには、コネクター名をクリックします。
ksqlDB¶
このサンプルでは、ksqlDB が認証され、セキュアな Kafka クラスターへの接続を認可されており、ksqlDB コマンドファイル で定義されているように既にクエリを実行しています。組み込みプロデューサーは、べき等性を持つように、パーティションごとに "順序に従って厳密に 1 回" のセマンティクスで構成されます(エラーにより、プロデューサーが再試行する場合、同じメッセージ(プロデューサーが複数回送信します)は、ブローカーに関して 1 回だけ Kafka ログに書き込まれます)。
ナビゲーションバーで ksqlDB をクリックします。
ksqlDB アプリケーションのリストで
wikipedia
を選択します。ksqlDB フローを表示して、このサンプルで作成されたストリームとテーブル、およびそれらの相関を調べます。
Confluent Control Center を使用して ksqlDB と連携するか、ksqlDB CLI を実行して ksqlDB CLI プロンプトを表示します。
docker-compose exec ksqldb-cli bash -c 'ksql -u ksqlDBUser -p ksqlDBUser http://ksqldb-server:8088'
既存の ksqlDB ストリームを表示します(ksqlDB CLI を使用している場合は、
ksql>
プロンプトでSHOW STREAMS;
と入力します)。WIKIPEDIA
をクリックして、既存の ksqlDB ストリームのスキーマの説明(フィールドまたは列)を入力します(ksqlDB CLI を使用している場合は、ksql>
プロンプトでDESCRIBE WIKIPEDIA;
と入力します)。既存の ksqlDB テーブルを表示します(ksqlDB CLI を使用している場合は、
ksql>
プロンプトでSHOW TABLES;
と入力します)。既存の ksqlDB クエリを表示します。これは、継続的に実行されています(ksqlDB CLI を使用している場合は、
ksql>
プロンプトでSHOW QUERIES;
と入力します)。さまざまな ksqlDB ストリームおよびテーブルからのメッセージを表示します。目的のストリームをクリックし、Query stream をクリックしてクエリエディターを開きます。エディターには、
select * from WIKIPEDIA EMIT CHANGES;
のような、内容が事前に入力されたクエリが表示され、新着データの結果が表示されます。ksqlDB Editor をクリックし、
SHOW PROPERTIES;
ステートメントを実行します。構成された ksqlDB サーバープロパティが表示されます。これらの値は docker-compose.yml ファイルで確認できます。このサンプルでは、2 つのストリーム
EN_WIKIPEDIA_GT_1
およびEN_WIKIPEDIA_GT_1_COUNTS
を作成し、ksqlDB の各ウィンドウの機能を紹介します。EN_WIKIPEDIA_GT_1
は、タンブリングウィンドウで発生回数をカウントし、特定のキーについて、初めて表示されたメッセージ上のテーブルに null を書き込みます。EN_WIKIPEDIA_GT_1
の基盤となる Kafka のトピックは、これらの null をフィルタリングで排除しませんが、1 を超えるカウントのみをダウンストリームに送信するために、これらの null をフィルタリングで排除するEN_WIKIPEDIA_GT_1_COUNTS
のための別の Kafka のトピックがあります(クエリに句where ROWTIME is not null
がある、など)。bash プロンプトから、これらの基盤となる Kafka のトピックを表示します。
トピック
EN_WIKIPEDIA_GT_1
のメッセージを表示(オフセット 0/パーティション 0 にジャンプ)し、null を確認します。比較のために、トピック
EN_WIKIPEDIA_GT_1_COUNTS
のメッセージを表示(オフセット 0/パーティション 0 にジャンプ)し、null がないことを確認します。
- ksqlDB 処理ログ は、処理中にレコード単位でエラーを取り込みます。これは、開発者による ksqlDB クエリのデバッグに役立ちます。このサンプルでは、カスタム log4j プロパティファイル での構成に従って、処理ログが相互 TLS(mTLS)認証を使用して Kafka のトピックにエントリを書き込みます。実際の動作を確認するために、次の不完全なクエリを ksqlDB エディターで 20 秒間、実行します。
SELECT ucase(cast(null as varchar)) FROM wikipedia EMIT CHANGES;
このクエリからはレコードが返されないはずです。ksqlDB は、レコードごとに処理ログにエラーを書き込みます。処理ログのトピック ksql-clusterksql_processing_log
をトピック検査とともに表示(オフセット 0/パーティション 0 にジャンプ)するか、対応する ksqlDB ストリーム KSQL_PROCESSING_LOG
を ksqlDB エディターで表示(auto.offset.reset=earliest
を設定)します。
SELECT * FROM KSQL_PROCESSING_LOG EMIT CHANGES;
コンシューマー¶
Confluent Control Center では、コンシューマーラグとスループットパフォーマンスをモニタリングできます。コンシューマーラグは、トピックの最高水準点(書き込まれているトピックの最新オフセット)から、現在のコンシューマーオフセット(そのトピックについてコンシューマーグループが読み取った最新オフセット)を減算したものです。コンシューマーラグのサイズの重要性を検討する際は、トピックの書き込み速度とコンシューマーグループの読み取り速度に留意してください。Consumers をクリックします。
コンシューマーラグは、コンシューマーベース で使用できます。たとえば、シンクコネクター用組み込み Connect コンシューマー(
connect-elasticsearch-ksqldb
など)、ksqlDB クエリ(_confluent-ksql-default_query_
で始まる名前のコンシューマーグループなど)、コンソールコンシューマー(WIKIPEDIANOBOT-consumer
など)などです。また、トピックベース でも使用できます。永続的な ksqlDB "Create Stream As Select" クエリ
CSAS_WIKIPEDIABOT
用のコンシューマーラグを表示します。これは、コンシューマーグループのリストに_confluent-ksql-ksql-clusterquery_CSAS_WIKIPEDIABOT_5
として表示されます。コンシューマーグループ ID
wikipedia-activity-monitor
の下の、Kafka Streams アプリケーションのコンシューマーラグを表示します。このアプリケーションは、cnfldemos/cp-demo-kstreams Docker コンテナー(アプリケーションの ソースコード)によって実行されます。Kafka Streams アプリケーションは、次の クライアント構成 ファイルを使用して Kafka クラスターに接続するように構成されています。消費メトリックは、コンシューマーベース で使用できます。これらの消費チャートは、このサンプルのように Confluent モニタリングインターセプター が構成されている場合にのみ入力されます。
% messages consumed
とend-to-end latency
を確認できます。永続的な ksqlDB "Create Stream As Select" クエリCSAS_WIKIPEDIABOT
用の消費メトリクスを表示します。これは、コンシューマーグループのリストに_confluent-ksql-default_query_CSAS_WIKIPEDIABOT_0
として表示されます。Confluent Control Center には、コンシューマーグループのどのコンシューマーがどのパーディションから消費しているか、およびそれらのパーティションが格納されているブローカーが表示されます。Confluent Control Center は、コンシューマーグループでコンシューマーのバランス調整が発生するとアップデートされます。新規コンシューマーグループ
app
と 1 つのコンシューマーconsumer_app_1
で、トピックwikipedia.parsed
からの消費を開始します。これはバックグラウンドで実行されます。./scripts/app/start_consumer_app.sh 1
Confluent Control Center に、コンシューマーグループ
app
の消費が安定したことが表示されるまで、このコンシューマーグループを 2 分間、実行します。このコンシューマーグループapp
には 1 つのコンシューマーconsumer_app_1
があり、トピックwikipedia.parsed
内のすべてのパーティションを消費しています。2 番目のコンシューマー
consumer_app_2
を、既存のコンシューマーグループapp
に追加します。./scripts/app/start_consumer_app.sh 2
Confluent Control Center に、コンシューマーグループ
app
の消費が安定したことが表示されるまで、このコンシューマーグループを 2 分間、実行します。コンシューマーconsumer_app_1
およびconsumer_app_2
は現在、トピックwikipedia.parsed
内のパーティションの消費を共有しています。Brokers -> Consumption ビューで、リクエストレイテンシの線グラフをクリックして、リクエストライフサイクル 全体のレイテンシの内訳を表示します。
Confluent Replicator¶
Confluent Replicator は、コピー元 Kafka クラスターからコピー先 Kafka クラスターにデータをコピーします。コピー元とコピー先のクラスターは通常、異なるクラスターですが、このサンプルでは、Replicator がクラスター間レプリケーションを実行しています。つまり、コピー元とコピー先の Kafka クラスターは同じです。このソリューションの他のコンポーネントと同様に、Confluent Replicator にもセキュリティが構成されています。
Confluent Control Center の専用ビューで、Replicator のステータスとスループットを表示します。
Consumers: Confluent Replicator のスループットとレイテンシをモニタリングします。Replicator は Kafka Connect ソースコネクターであり、対応するコンシューマーグループ
connect-replicator
があります。Replicator のコンシューマーラグを表示します。
Replicator の消費メトリクスを表示します。
Connect: Settings で右上の一時停止アイコンを押し、有効になるまで 10 秒間待って Replicator コネクターを一時停止します。これで、関連するコンシューマーグループの消費が停止します。
connect-replicator
コンシューマーグループの消費が停止するのを確認します。Replicator コネクターを再起動します。
connect-replicator
コンシューマーグループが消費を再開するのを確認します。次の点を確認できます。- コンシューマーグループ connect-replicator が動作していない時間があるにもかかわらず、すべてのメッセージが配信済みとして表示されます。これは、すべてのバーが、生成タイムスタンプに対して相対的な時間ウィンドウであるためです。
- レイテンシはピークに達した後、徐々に減少します。これも、生成タイムスタンプに対して相対的であるためです。
次の手順: Replicator のチュートリアル で、Replicator について詳しく学習します。
セキュリティ¶
このサンプルのすべての Confluent Platform コンポーネントおよびクライアントでは、多くの セキュリティ機能 が有効になっています。
- メタデータサービス(MDS) は、認証と認可の中心的機能です。Confluent Server Authorizer によって構成されており、LDAP と連携してクライアントを認証します。
- SSL による暗号化および mTLS。サンプルでは、SSL 証明書が 自動的に生成 され、キーストアとトラストストアが作成されてパスワードで保護されます。
- ロールベースアクセス制御(RBAC) による認可。リソースに ACL が関連付けられていない場合、スーパーユーザー以外のユーザーはリソースにアクセスできません。
- ZooKeeper が SSL および SASL/DIGEST-MD5 用に構成されています(注: 試用ライセンス では、REST Proxy および Schema Registry の TLS はサポートされていません)。
- Control Center 用の HTTPS。
- スキーマレジストリ用の HTTPS。
- Connect 用の HTTPS。
各コンポーネントのセキュリティ構成は、サンプルの docker-compose.yml ファイルに記載されています。
注釈
このサンプルは、トレーニングを目的としてセキュアな Confluent Platform を紹介しており、一連のベストプラクティスを示すものではありません。このサンプルの内容と本稼働環境での手順では、異なる点があります。
- すべてのユーザーをスーパーユーザーにするのではなく、必要な操作のみをユーザーに認可してください。
PLAINTEXT
セキュリティプロトコルを使用する場合は、このサンプルのANONYMOUS
というユーザー名をスーパーユーザーとして構成しないでください。SSL
またはSASL_SSL
が構成されている場合は、PLAINTEXT
ポートを開くことすらしないことを検討してください。
サンプルでは OpenLDAP サーバーが実行されており、デモの各 Kafka ブローカーは Metadata Service (MDS) で構成され、LDAP と連携してクライアントおよび Confluent Platform のサービスとクライアントを認証することができます。
ZooKeeper には、2 つのリスナーポートがあります。
名前 | プロトコル | このサンプルでの用途 | ZooKeeper |
---|---|---|---|
(なし) | SASL/DIGEST-MD5 | REST Proxy と Schema Registry の試用ライセンスの検証。(TLS サポートなし) | 2181 |
(なし) | mTLS | ブローカーの通信(kafka1、kafka2) | 2182 |
各ブローカーには、5 つのリスナーポートがあります。
名前 | プロトコル | このサンプルでの用途 | kafka1 | kafka2 |
---|---|---|---|---|
(なし) | MDS | RBAC による認可 | 8091 | 8092 |
INTERNAL | SASL_PLAINTEXT | CP Kafka クライアント(Confluent Metrics Reporter など)、SASL_PLAINTEXT | 9091 | 9092 |
TOKEN | SASL_SSL | 偽装の使用が必要な場合の Confluent Platform サービス( Schema Registry など) | 10091 | 10092 |
SSL | SSL | SSL を使用し、SASL を使用しないエンドクライアント(stream-demo など) | 11091 | 11092 |
CLEAR | PLAINTEXT | セキュリティなし、バックドアとして使用可能、デモおよび学習用のみ | 12091 | 12092 |
エンドクライアント(非 CP クライアント):
- ブローカーの SSL リスナー経由で mTLS を使用して認証します。
- Schema Registry も使用している場合は、LDAP 経由で Schema Registry に対して認証します。
- Confluent Monitoring インターセプターも使用している場合は、ブローカーの SSL リスナー経由で mTLS を使用して認証します。
- TOKEN リスナーは決して使用しないでください。このリスナーは、Confluent コンポーネント間の内部通信専用です。
- Kafka Streams アプリケーションの
wikipedia-activity-monitor
を実行しているstreams-demo
コンテナーがこのサンプルで使用する クライアント構成 を参照してください。
次のコマンドで、Kafka ブローカーがリッスンしているポートを確認します。これは、以下のテーブルに一致する必要があります。
docker-compose logs kafka1 | grep "Registered broker 1" docker-compose logs kafka2 | grep "Registered broker 2"
サンプルのみ: PLAINTEXT ポート経由でブローカーと通信します。クライアントのセキュリティ構成は不要です。
# CLEAR/PLAINTEXT port docker-compose exec kafka1 kafka-consumer-groups \ --list \ --bootstrap-server kafka1:12091
エンドクライアント: SSL ポート経由で、および
--command-config
引数(コマンドラインツールの場合)または--consumer.config
(kafka-console-consumer の場合)を使用して構成された SSL パラメーターを介してブローカーと通信します。# SSL/SSL port docker-compose exec kafka1 kafka-consumer-groups \ --list \ --bootstrap-server kafka1:11091 \ --command-config /etc/kafka/secrets/client_without_interceptors_ssl.config
クライアントが SSL ポート経由でブローカーと通信しようとしても、SSL パラメーターを指定していない場合は通信できません。
# SSL/SSL port docker-compose exec kafka1 kafka-consumer-groups \ --list \ --bootstrap-server kafka1:11091
出力は以下のようになります。
ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread) java.lang.OutOfMemoryError: Java heap space ...
SASL_PLAINTEXT ポート経由で、および
--command-config
引数(コマンドラインツールの場合)または--consumer.config
(kafka-console-consumer の場合)を使用して構成された SASL_PLAINTEXT パラメーターを介してブローカーと通信します。# INTERNAL/SASL_PLAIN port docker-compose exec kafka1 kafka-consumer-groups \ --list \ --bootstrap-server kafka1:9091 \ --command-config /etc/kafka/secrets/client_sasl_plain.config
どのユーザーがスーパーユーザーとして構成されているかを確認します。
docker-compose logs kafka1 | grep "super.users ="
この出力は以下のようになります。これにより、それ自体として認証される各サービスの名前と、
ANONYMOUS
として認証される未認証PLAINTEXT
が認可されることに注意してください(デモの用途のみ)。kafka1 | super.users = User:admin;User:mds;User:superUser;User:ANONYMOUS
LDAP ユーザー
appSA
(スーパーユーザーではない)がトピックwikipedia.parsed
からメッセージを消費できることを確認します。ブローカーに対して mTLS で、Schema Registry に対して LDAP で認証されるように構成されていることに注意してください。docker-compose exec connect kafka-avro-console-consumer \ --bootstrap-server kafka1:11091,kafka2:11092 \ --consumer-property security.protocol=SSL \ --consumer-property ssl.truststore.location=/etc/kafka/secrets/kafka.appSA.truststore.jks \ --consumer-property ssl.truststore.password=confluent \ --consumer-property ssl.keystore.location=/etc/kafka/secrets/kafka.appSA.keystore.jks \ --consumer-property ssl.keystore.password=confluent \ --consumer-property ssl.key.password=confluent \ --property schema.registry.url=https://schemaregistry:8085 \ --property schema.registry.ssl.truststore.location=/etc/kafka/secrets/kafka.appSA.truststore.jks \ --property schema.registry.ssl.truststore.password=confluent \ --property basic.auth.credentials.source=USER_INFO \ --property basic.auth.user.info=appSA:appSA \ --group wikipedia.test \ --topic wikipedia.parsed \ --max-messages 5
LDAP ユーザー
badapp
がトピックwikipedia.parsed
からメッセージを消費できないことを確認します。docker-compose exec connect kafka-avro-console-consumer \ --bootstrap-server kafka1:11091,kafka2:11092 \ --consumer-property security.protocol=SSL \ --consumer-property ssl.truststore.location=/etc/kafka/secrets/kafka.badapp.truststore.jks \ --consumer-property ssl.truststore.password=confluent \ --consumer-property ssl.keystore.location=/etc/kafka/secrets/kafka.badapp.keystore.jks \ --consumer-property ssl.keystore.password=confluent \ --consumer-property ssl.key.password=confluent \ --property schema.registry.url=https://schemaregistry:8085 \ --property schema.registry.ssl.truststore.location=/etc/kafka/secrets/kafka.badapp.truststore.jks \ --property schema.registry.ssl.truststore.password=confluent \ --property basic.auth.credentials.source=USER_INFO \ --property basic.auth.user.info=badapp:badapp \ --group wikipedia.test \ --topic wikipedia.parsed \ --max-messages 5
出力は以下のようになります。
ERROR [Consumer clientId=consumer-wikipedia.test-1, groupId=wikipedia.test] Topic authorization failed for topics [wikipedia.parsed] org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [wikipedia.parsed]
Create role bindings to permit
badapp
client to consume from topicwikipedia.parsed
and its related subject in Schema Registry.Kafka クラスター ID を取得します。
KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert scripts/security/snakeoil-ca-1.crt | jq -r ".id")
Create the role bindings:
# Create the role binding for the topic ``wikipedia.parsed`` docker-compose exec tools bash -c "confluent iam rolebinding create \ --principal User:badapp \ --role ResourceOwner \ --resource Topic:wikipedia.parsed \ --kafka-cluster-id $KAFKA_CLUSTER_ID" # Create the role binding for the group ``wikipedia.test`` docker-compose exec tools bash -c "confluent iam rolebinding create \ --principal User:badapp \ --role ResourceOwner \ --resource Group:wikipedia.test \ --kafka-cluster-id $KAFKA_CLUSTER_ID" # Create the role binding for the subject ``wikipedia.parsed-value``, i.e., the topic-value (versus the topic-key) docker-compose exec tools bash -c "confluent iam rolebinding create \ --principal User:badapp \ --role ResourceOwner \ --resource Subject:wikipedia.parsed-value \ --kafka-cluster-id $KAFKA_CLUSTER_ID \ --schema-registry-cluster-id schema-registry"
LDAP ユーザー
badapp
がトピックwikipedia.parsed
からメッセージを消費できるようになったことを確認します。docker-compose exec connect kafka-avro-console-consumer \ --bootstrap-server kafka1:11091,kafka2:11092 \ --consumer-property security.protocol=SSL \ --consumer-property ssl.truststore.location=/etc/kafka/secrets/kafka.badapp.truststore.jks \ --consumer-property ssl.truststore.password=confluent \ --consumer-property ssl.keystore.location=/etc/kafka/secrets/kafka.badapp.keystore.jks \ --consumer-property ssl.keystore.password=confluent \ --consumer-property ssl.key.password=confluent \ --property schema.registry.url=https://schemaregistry:8085 \ --property schema.registry.ssl.truststore.location=/etc/kafka/secrets/kafka.badapp.truststore.jks \ --property schema.registry.ssl.truststore.password=confluent \ --property basic.auth.credentials.source=USER_INFO \ --property basic.auth.user.info=badapp:badapp \ --group wikipedia.test \ --topic wikipedia.parsed \ --max-messages 5
このクラスターで RBAC 用に構成された ロールバインディングをすべて表示します。
cd scripts/validate ./validate_bindings.sh
ZooKeeper は SASL/DIGEST-MD5 用に構成されているため、ZooKeeper と通信するあらゆるコマンドには、ZooKeeper 認証用に設定されたプロパティが必要です。この認証構成は、ブローカーの
KAFKA_OPTS
設定により提供されます。たとえば、コンシューマースロットルスクリプト は、適切な KAFKA_OPTS 設定を持つ Docker コンテナーkafka1
で実行されていることに注意してください。そうでない場合は、kafka1
またはkafka2
以外の他のコンテナーでコマンドを実行しても機能しません。次の手順: セキュリティのチュートリアル で、セキュリティについて詳しく学習します。
Schema Registry によるデータガバナンス¶
このサンプルで使用するすべてのアプリケーションとコネクターは、Confluent スキーマレジストリ を活用して、Avro 形式のデータを自動的に読み取り、書き込むように構成されています。
Schema Registry と、appSA
などのエンドクライアントとの間に設定されているセキュリティは、次のとおりです。
- 暗号化: TLS。たとえば、クライアントに
schema.registry.ssl.truststore.*
構成がある場合。 - 認証: HTTP Basic 認証ヘッダーからの Bearer トークン認証。たとえば、クライアントに
basic.auth.user.info
およびbasic.auth.credentials.source
構成がある場合。 - 認可: Schema Registry は、RBAC で Bearer トークンを使用してクライアントを認可します。
キーや値のスキーマを登録している、トピックの Schema Registry サブジェクトを表示します。
curl
引数には、(a) ポート 8085 で HTTPS をリッスンしている Schema Registry と連携するために必要な TLS 情報、および (b) RBAC に必要な認証の資格情報(すべてを見るには superUser:superUser を使用します)が含まれることに注意してください。docker-compose exec schemaregistry curl -X GET \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ -u superUser:superUser \ https://schemaregistry:8085/subjects | jq .
出力は以下のようになります。
[ "wikipedia.parsed.replica-value", "EN_WIKIPEDIA_GT_1_COUNTS-value", "WIKIPEDIABOT-value", "EN_WIKIPEDIA_GT_1-value", "_confluent-ksql-ksql-clusterquery_CTAS_EN_WIKIPEDIA_GT_1_7-Aggregate-Aggregate-Materialize-changelog-value", "WIKIPEDIANOBOT-value", "_confluent-ksql-ksql-clusterquery_CTAS_EN_WIKIPEDIA_GT_1_7-Aggregate-GroupBy-repartition-value", "wikipedia.parsed-value" ]
superUser 資格情報を使用する代わりに、今度は、クライアント資格情報 noexist:noexist (ユーザーが LDAP に存在しない)を使用して、新しい Avro スキーマ(2 つのフィールド
username
およびuserid
を含むレコード)を Schema Registry に、新しいトピックusers
の値として登録してみてください。これは、認可エラーにより失敗するはずです。docker-compose exec schemaregistry curl -X POST \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ --data '{ "schema": "[ { \"type\":\"record\", \"name\":\"user\", \"fields\": [ {\"name\":\"userid\",\"type\":\"long\"}, {\"name\":\"username\",\"type\":\"string\"} ]} ]" }' \ -u noexist:noexist \ https://schemaregistry:8085/subjects/users-value/versions
出力は以下のようになります。
{"error_code":401,"message":"Unauthorized"}
存在しないユーザーの資格情報を使用する代わりに、今度は、クライアント資格情報 appSA:appSA (ユーザー appSA が LDAP に存在する)を使用して、新しい Avro スキーマ(2 つのフィールド
username
およびuserid
を含むレコード)を Schema Registry に、新しいトピックusers
の値として登録してみてください。これは、認可エラーにより失敗し、前述のものとは異なるメッセージが表示されるはずです。docker-compose exec schemaregistry curl -X POST \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ --data '{ "schema": "[ { \"type\":\"record\", \"name\":\"user\", \"fields\": [ {\"name\":\"userid\",\"type\":\"long\"}, {\"name\":\"username\",\"type\":\"string\"} ]} ]" }' \ -u appSA:appSA \ https://schemaregistry:8085/subjects/users-value/versions
出力は以下のようになります。
{"error_code":40403,"message":"User is denied operation Write on Subject: users-value"}
appSA
クライアントの ロールバインディングを作成して、Schema Registry へのアクセスを許可します。Kafka クラスター ID を取得します。
KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert scripts/security/snakeoil-ca-1.crt | jq -r ".id")
Create the role binding:
# Create the role binding for the subject ``users-value``, i.e., the topic-value (versus the topic-key) docker-compose exec tools bash -c "confluent iam rolebinding create \ --principal User:appSA \ --role ResourceOwner \ --resource Subject:users-value \ --kafka-cluster-id $KAFKA_CLUSTER_ID \ --schema-registry-cluster-id schema-registry"
もう一度、スキーマの登録を試みます。今度は成功するはずです。返されるスキーマ ID に注意します。たとえば、以下では ID は
11
です。docker-compose exec schemaregistry curl -X POST \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ --data '{ "schema": "[ { \"type\":\"record\", \"name\":\"user\", \"fields\": [ {\"name\":\"userid\",\"type\":\"long\"}, {\"name\":\"username\",\"type\":\"string\"} ]} ]" }' \ -u appSA:appSA \ https://schemaregistry:8085/subjects/users-value/versions
出力は以下のようになります。
{"id":11}
サブジェクト
users-value
の新しいスキーマを表示します。Confluent Control Center で Topics をクリックします。下へスクロールしてトピック users をクリックし、SCHEMA を選択します。または、コマンドライン経由でスキーマをリクエストすることもできます。
docker-compose exec schemaregistry curl -X GET \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ -u appSA:appSA \ https://schemaregistry:8085/subjects/users-value/versions/1 | jq .
出力は以下のようになります。
{ "subject": "users-value", "version": 1, "id": 11, "schema": "{\"type\":\"record\",\"name\":\"user\",\"fields\":[{\"name\":\"username\",\"type\":\"string\"},{\"name\":\"userid\",\"type\":\"long\"}]}" }
トピック
users
の説明を入力します。特別な構成confluent.value.schema.validation=true
を使用していることに注意してください。これにより、Confluent サーバーのデータガバナンス機能である スキーマ検証 が有効になります。この機能によりオペレーターに対して、データ形式の正確性を確保するための一元的な場所が、Kafka クラスター自体の内部で提供されます。Schema Validation を有効にすると、confluent.schema.registry.url
で構成されたブローカーは、トピックに対して生成されるデータが有効なスキーマを使用していることを検証できます。docker-compose exec kafka1 kafka-topics \ --describe \ --topic users \ --bootstrap-server kafka1:9091 \ --command-config /etc/kafka/secrets/client_sasl_plain.config
出力は以下のようになります。
Topic: users PartitionCount: 2 ReplicationFactor: 2 Configs: confluent.value.schema.validation=true Topic: users Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: users Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline:
kafka-console-producer
を使用して、このトピックに対して非 Avro メッセージを生成します。これには、エラーが返されます。docker-compose exec connect kafka-console-producer \ --topic users \ --broker-list kafka1:11091 \ --producer-property security.protocol=SSL \ --producer-property ssl.truststore.location=/etc/kafka/secrets/kafka.appSA.truststore.jks \ --producer-property ssl.truststore.password=confluent \ --producer-property ssl.keystore.location=/etc/kafka/secrets/kafka.appSA.keystore.jks \ --producer-property ssl.keystore.password=confluent \ --producer-property ssl.key.password=confluent
エラーは、以下のようになります。
ERROR Error when sending message to topic users with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence be rejected.
トピック
wikipedia.parsed
の説明を入力します。このトピックは、kafka-connect-sse ソースコネクターの書き込み先です。ここでも Schema Validation が有効になっていることに注意してください。docker-compose exec kafka1 kafka-topics \ --describe \ --topic wikipedia.parsed \ --bootstrap-server kafka1:9091 \ --command-config /etc/kafka/secrets/client_sasl_plain.config
トピック
wikipedia.parsed.replica
の説明を入力します。これは、Replicator がwikipedia.parsed
からレプリケートしたトピックです。ここでも Schema Validation が有効になっています。これは、Replicator のデフォルト設定がtopic.config.sync=true
であるためです( Replicator の 送信先トピック を参照)。docker-compose exec kafka1 kafka-topics \ --describe \ --topic wikipedia.parsed.replica \ --bootstrap-server kafka1:9091 \ --command-config /etc/kafka/secrets/client_sasl_plain.config
次の手順: スキーマレジストリのチュートリアル で、Schema Registry について詳しく学習します。
Confluent REST Proxy¶
Confluent REST Proxy は、オプションのクライアントアクセスのために実行されています。このデモでは、2 つのモードの Confluent REST Proxy を紹介します。
- スタンドアロンサービス。ポート 8086 で HTTPS リクエストをリッスンしています。
- Kafka ブローカー上の組み込みサービス。
kafka1
のポート 8091 およびkafka2
のポート 8092(これらの REST Proxy ポートは、ブローカーの Metadata Service (MDS) リスナーと共有されています)で HTTPS リクエストをリッスンしています。
スタンドアロン REST Proxy を使用して、トピック
users
に対するメッセージの生成を試みます。この際、スキーマ ID11
を参照します。このスキーマは、前のセクションで Schema Registry に登録されています。これは、認可エラーにより失敗するはずです。docker-compose exec restproxy curl -X POST \ -H "Content-Type: application/vnd.kafka.avro.v2+json" \ -H "Accept: application/vnd.kafka.v2+json" \ --cert /etc/kafka/secrets/restproxy.certificate.pem \ --key /etc/kafka/secrets/restproxy.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ --data '{"value_schema_id": 11, "records": [{"value": {"user":{"userid": 1, "username": "Bunny Smith"}}}]}' \ -u appSA:appSA \ https://restproxy:8086/topics/users
出力は以下のようになります。
{"offsets":[{"partition":null,"offset":null,"error_code":40301,"error":"Not authorized to access topics: [users]"}],"key_schema_id":null,"value_schema_id":11}
クライアントの ロールバインディングを作成して、トピック
users
に対する生成を許可します。Kafka クラスター ID を取得します。
KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert scripts/security/snakeoil-ca-1.crt | jq -r ".id")
Create the role binding:
# Create the role binding for the topic ``users`` docker-compose exec tools bash -c "confluent iam rolebinding create \ --principal User:appSA \ --role DeveloperWrite \ --resource Topic:users \ --kafka-cluster-id $KAFKA_CLUSTER_ID"
もう一度、トピック
users
に対してメッセージの生成を試みます。今度は成功するはずです。docker-compose exec restproxy curl -X POST \ -H "Content-Type: application/vnd.kafka.avro.v2+json" \ -H "Accept: application/vnd.kafka.v2+json" \ --cert /etc/kafka/secrets/restproxy.certificate.pem \ --key /etc/kafka/secrets/restproxy.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ --data '{"value_schema_id": 11, "records": [{"value": {"user":{"userid": 1, "username": "Bunny Smith"}}}]}' \ -u appSA:appSA \ https://restproxy:8086/topics/users
出力は以下のようになります。
{"offsets":[{"partition":1,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":11}
コンシューマーインスタンス
my_avro_consumer
を作成します。docker-compose exec restproxy curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --cert /etc/kafka/secrets/restproxy.certificate.pem \ --key /etc/kafka/secrets/restproxy.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \ -u appSA:appSA \ https://restproxy:8086/consumers/my_avro_consumer
出力は以下のようになります。
{"instance_id":"my_consumer_instance","base_uri":"https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance"}
my_avro_consumer
をトピックusers
にサブスクライブします。docker-compose exec restproxy curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --cert /etc/kafka/secrets/restproxy.certificate.pem \ --key /etc/kafka/secrets/restproxy.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ --data '{"topics":["users"]}' \ -u appSA:appSA \ https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
my_avro_consumer
サブスクリプションに対してメッセージの消費を試みます。これは、認可エラーにより失敗するはずです。docker-compose exec restproxy curl -X GET \ -H "Accept: application/vnd.kafka.avro.v2+json" \ --cert /etc/kafka/secrets/restproxy.certificate.pem \ --key /etc/kafka/secrets/restproxy.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ -u appSA:appSA \ https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance/records
出力は以下のようになります。
{"error_code":40301,"message":"Not authorized to access group: my_avro_consumer"}
クライアントの ロールバインディングを作成して、コンシューマーグループ
my_avro_consumer
へのアクセスを許可します。Kafka クラスター ID を取得します。
KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert scripts/security/snakeoil-ca-1.crt | jq -r ".id")
Create the role binding:
# Create the role binding for the group ``my_avro_consumer`` docker-compose exec tools bash -c "confluent iam rolebinding create \ --principal User:appSA \ --role ResourceOwner \ --resource Group:my_avro_consumer \ --kafka-cluster-id $KAFKA_CLUSTER_ID"
もう一度、
my_avro_consumer
サブスクリプションに対してメッセージの消費を試みます。これは、別の認可エラーにより失敗するはずです。# Note: Issue this command twice due to https://github.com/confluentinc/kafka-rest/issues/432 docker-compose exec restproxy curl -X GET \ -H "Accept: application/vnd.kafka.avro.v2+json" \ --cert /etc/kafka/secrets/restproxy.certificate.pem \ --key /etc/kafka/secrets/restproxy.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ -u appSA:appSA \ https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance/records docker-compose exec restproxy curl -X GET \ -H "Accept: application/vnd.kafka.avro.v2+json" \ --cert /etc/kafka/secrets/restproxy.certificate.pem \ --key /etc/kafka/secrets/restproxy.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ -u appSA:appSA \ https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance/records
出力は以下のようになります。
{"error_code":40301,"message":"Not authorized to access topics: [users]"}
クライアントの ロールバインディングを作成して、トピック
users
へのアクセスを許可します。Kafka クラスター ID を取得します。
KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert scripts/security/snakeoil-ca-1.crt | jq -r ".id")
Create the role binding:
# Create the role binding for the group my_avro_consumer docker-compose exec tools bash -c "confluent iam rolebinding create \ --principal User:appSA \ --role DeveloperRead \ --resource Topic:users \ --kafka-cluster-id $KAFKA_CLUSTER_ID"
もう一度、
my_avro_consumer
サブスクリプションに対してメッセージの消費を試みます。今度は成功するはずです。# Note: Issue this command twice due to https://github.com/confluentinc/kafka-rest/issues/432 docker-compose exec restproxy curl -X GET \ -H "Accept: application/vnd.kafka.avro.v2+json" \ --cert /etc/kafka/secrets/restproxy.certificate.pem \ --key /etc/kafka/secrets/restproxy.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ -u appSA:appSA \ https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance/records docker-compose exec restproxy curl -X GET \ -H "Accept: application/vnd.kafka.avro.v2+json" \ --cert /etc/kafka/secrets/restproxy.certificate.pem \ --key /etc/kafka/secrets/restproxy.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ -u appSA:appSA \ https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance/records
出力は以下のようになります。
[{"topic":"users","key":null,"value":{"userid":1,"username":"Bunny Smith"},"partition":1,"offset":0}]
コンシューマーインスタンス
my_avro_consumer
を削除します。docker-compose exec restproxy curl -X DELETE \ -H "Content-Type: application/vnd.kafka.v2+json" \ --cert /etc/kafka/secrets/restproxy.certificate.pem \ --key /etc/kafka/secrets/restproxy.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ -u appSA:appSA \ https://restproxy:8086/consumers/my_avro_consumer/instances/my_consumer_instance
次のいくつかの手順では、Kafka ブローカー上に組み込まれた REST Proxy を使用します。今回は、REST Proxy API v3 のみがサポートされています。クライアントに付与する ロールバインディング、トピック
dev_users
に対するResourceOwner
ロールを作成します。Kafka クラスター ID を取得します。
KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert scripts/security/snakeoil-ca-1.crt | jq -r ".id")
Create the role binding:
# Create the role binding for the topic ``dev_users`` docker-compose exec tools bash -c "confluent iam rolebinding create \ --principal User:appSA \ --role ResourceOwner \ --resource Topic:dev_users \ --kafka-cluster-id $KAFKA_CLUSTER_ID"
組み込み REST Proxy でトピック
dev_users
を作成します。Kafka クラスター ID を取得します。
KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert scripts/security/snakeoil-ca-1.crt | jq -r ".id")
curl
を使用してトピックを作成します。docker-compose exec restproxy curl -X POST \ -H "Content-Type: application/json" \ -H "accept: application/json" \ -d "{\"topic_name\":\"dev_users\",\"partitions_count\":64,\"replication_factor\":2,\"configs\":[{\"name\":\"cleanup.policy\",\"value\":\"compact\"},{\"name\":\"compression.type\",\"value\":\"gzip\"}]}" \ --cert /etc/kafka/secrets/mds.certificate.pem \ --key /etc/kafka/secrets/mds.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ -u appSA:appSA \ "https://kafka1:8091/kafka/v3/clusters/${KAFKA_CLUSTER_ID}/topics" | jq
組み込み REST Proxy でトピックのリストを表示して、新しく作成した
dev_users
を見つけます。Kafka クラスター ID を取得します。
KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert scripts/security/snakeoil-ca-1.crt | jq -r ".id")
curl
を使用してトピックのリストを表示します。docker-compose exec restproxy curl -X GET \ -H "Content-Type: application/json" \ -H "accept: application/json" \ --cert /etc/kafka/secrets/mds.certificate.pem \ --key /etc/kafka/secrets/mds.key \ --tlsv1.2 \ --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \ -u appSA:appSA \ https://kafka1:8091/kafka/v3/clusters/${KAFKA_CLUSTER_ID}/topics | jq '.data[].topic_name'
出力は以下のようになります。出力は作成済みの他のトピックに応じて異なる場合がありますが、前の手順で作成したトピック
dev_users
は必ず表示されるはずです。"_confluent-monitoring" "dev_users" "users" "wikipedia-activity-monitor-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog" "wikipedia-activity-monitor-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition" "wikipedia.failed" "wikipedia.parsed" "wikipedia.parsed.count-by-domain" "wikipedia.parsed.replica"
ブローカーのエラー¶
ブローカーのエラーのシミュレーションを行うために、2 つの Kafka ブローカーのいずれかで実行されている Docker コンテナーを停止します。
Kafka broker 2 を実行している Docker コンテナーを停止します。
docker-compose stop kafka2
数分後に、ブローカー数が 2 から 1 に減り、レプリケーション数が不足しているパーティションが数多くあることがブローカーサマリーに表示されます。
トピック情報の詳細で、同期されていないレプリカが broker 2 にあることを確認します。
生成と消費のメトリクスを調べると、すべてのクライアントが動作し続けていることがわかります。
Kafka broker 2 を実行している Docker コンテナーを再起動します。
docker-compose start kafka2
約 1 分後に、Confluent Control Center でブローカーサマリーを調べます。ブローカー数は 2 に戻り、トピックのパーティションも元に戻って、レプリケーション数が不足しているパーティションがないことがわかります。
Brokers ボックス内のブローカー数
2
をクリックし、Brokers overview ペインが表示されたら、Partitioning and replication ボックス内をクリックして、いつブローカー数が変わったかを調べます。
アラートの生成¶
多くの Control Center アラート があり、その構成方法も数多くあります。アラート管理ページを使用して、トリガーとアクションを定義するか、個々のリソースをクリックしてそこからアラートをセットアップします。

このサンプルには、事前に構成されたトリガーとアクションが既に存在します。アラートの
Triggers
画面を表示し、各トリガーに対してEdit
をクリックして構成の詳細を確認します。- トリガー
Under Replicated Partitions
は、レプリケーション数が不足しているパーティションがゼロでないことをブローカーが報告すると発生します。これにより、アクションEmail Administrator
が発生します。 - トリガー
Consumption Difference
は、Elasticsearch コネクターコンシューマーグループの消費の差が0
より大きい場合に発生します。これにより、アクションEmail Administrator
が発生します。
- トリガー
「ブローカーのエラー」セクションの手順に従った場合は、アラート履歴を表示して、トリガー
Under Replicated Partitions
の発生が原因で broker 2 を停止したときにアラートが生成されたことを確認してください。また、
Consumption Difference
トリガーも作動させることができます。Kafka Connect -> Sinks 画面で、実行中の Elasticsearch Sink Connector を編集します。Connect ビューで、Settings の右上の一時停止アイコンを押して Elasticsearch Sink Connector を一時停止します。これで、関連するコンシューマーグループの消費が停止します。
アラート履歴を表示して、このトリガーが作動したことが原因でアラートが生成されたことを確認します。
モニタリング¶
このチュートリアルでは、ユーザーが Confluent Platform デプロイを管理するために Confluent Control Center がどのように役立つか、およびクラスターとアプリケーションのモニタリング機能がどのように提供されるかを紹介しました。ほとんどの Confluent Platform ユーザーにとって、Confluent Control Center のモニタリングと統合だけでも本稼働環境での使用に問題はありませんが、一部のユーザーは、Prometheus、Grafana、Datadog、Splunk などの他のモニタリングソリューションとの統合を望んでいます。次の JMX ベースのモニタリングスタックを使用すると、"1 枚のガラス" のようにインターフェイスが統一された、組織のすべてのサービスとアプリケーション(Kafka など)用のモニタリングソリューションをセットアップできます。
次に示すのは、Confluent Platform と統合されたモニタリングスタックのいくつかの例です。
JMX Exporter + Prometheus + Grafana (https://github.com/confluentinc/jmx-monitoring-stacks から cp-demo で実行可能):
Jolokia + Elasticsearch + Kibana (https://github.com/confluentinc/jmx-monitoring-stacks から cp-demo で実行可能):
次の手順: スループット、レイテンシ、耐久性、可用性などのさまざまなサービス目標のための Kafka デプロイの最適化、およびオンプレミス Kafka クラスターのパフォーマンスとクラスター正常性のモニタリングに便利なメトリクスの実用的なガイドについては、『Optimizing Your Apache Kafka Deployment』ホワイトペーパーを参照してください。
トラブルシューティング¶
起動スクリプトが正常に完了しない場合は、次のトラブルシューティング手順を実行してください。
Docker¶
すべての Docker コンテナーのステータスが
Up
ステートを示していることを確認します。docker-compose ps
Docker の詳細設定で、次のリソースが Docker に割り当てられていることを確認します。
- メモリー: 8 GB 以上(デフォルトでは 2 GB)
- CPU コア: 2 つ以上
Docker の "ディスクイメージ" ファイルに必要なディスクストレージが確保されていることを確認します(Docker ドキュメント)。
ログ¶
完了前にスクリプトにエラーが発生した場合、または
Up
ステートでない Docker コンテナーがある場合は、コマンドdocker-compose logs [container]
でコンテナーのログを表示し、エラーメッセージと例外がないかどうかを調べてください。docker-compose logs
TLS 通信の問題を示すエラーがある場合は、TLS 証明書が適正に生成されており、次のファイルにエラーがないことを確認します。
ls scripts/security/*.log
セキュリティ¶
TLS 通信の問題を示すエラーがある場合は、スクリプトの先頭に
CLEAN=true
を追加して TLS 証明書を強制的に再生成します。CLEAN=true ./scripts/start.sh
ZooKeeper と通信するコマンドがエラー
org.apache.zookeeper.KeeperException$NoAuthException
により失敗していると思われる場合は、コマンドの実行元であるコンテナーを、kafka1
またはkafka2
に変更します。これは、ZooKeeper が SASL/DIGEST-MD5 用に構成されており、ZooKeeper と通信するあらゆるコマンドには、ZooKeeper 認証用に設定されたプロパティが必要であるためです。
検証¶
wikipedia.parsed
などの Kafka トピックにメッセージがあることを確認します。./scripts/consumers/listen.sh
検証スクリプト 内のスクリプトを実行して、正常に実行されることを確認します。
cd scripts/validate/
CLI ログイン¶
長時間
cp-demo
を実行しており、tools
コンテナーで Confluent CLI コマンドを発行したときに次のエラーが発生したとします。Your token has expired. You are now logged out. Error: You must log in to run that command.
この場合は、次のコマンドを実行すると、再び MDS にログインしてトークンが更新されます。
./scripts/helper/refresh_mds_login.sh
終了後の手順¶
コンシューマーグループ
app
を停止してトピックwikipedia.parsed
からの消費を停止します。以下のコマンドにより、コンシューマーがkill -15
で円滑に停止し、シャットダウンシーケンスに従います。./scripts/app/stop_consumer_app_group_graceful.sh
Docker 環境を停止し、すべてのコンポーネントを破棄して、すべての Docker ボリュームをクリアします。
./scripts/stop.sh