Confluent Operator および Confluent Platform のインストール¶
このトピックでは Confluent Operator および Confluent Platform のデプロイ手順について説明します。
このガイドの例では、以下が想定されています。
$VALUES_FILE
は「グローバル構成ファイルの作成」でセットアップする構成ファイルのことです。Operator ドキュメントで簡潔で明快な例を提示できるよう、すべての構成パラメーターが構成ファイル(
$VALUES_FILE
)に指定されます。ただし、本稼働環境でのデプロイにおいて、Helm を使用して機密データを扱う場合には、--set
または--set-file
オプションを使用します。以下に例を示します。helm upgrade --install kafka \ --set kafka.services.mds.ldap.authentication.simple.principal="cn=mds\,dc=test\,dc=com" \ --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \ --set kafka.enabled=true
operator
は Confluent Platform のデプロイ先となる名前空間です。コマンドはすべて、Confluent Operator のダウンロード先ディレクトリにある
helm
ディレクトリで実行されます。
ステップ 1. Confluent Operator のインストール¶
Confluent Operator バンドルのダウンロード先ディレクトリの
helm
ディレクトリに移動します。以下のコマンドを使用して、Operator をインストールします(例として
operator
名前空間を使用)。helm upgrade --install \ operator \ ./confluent-operator \ --values $VALUES_FILE \ --namespace operator \ --set operator.enabled=true
上記コマンドの
--set operator.enabled=true
で、デフォルト設定のfalse
を変更しています。このパラメーター変更により、コンポーネントはインストール直後に有効となって起動されます。Operator が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。
kubectl get pods -n operator
Operator サービスについて以下の例のような出力が表示され、ステータスが
Running
になっています。NAME READY STATUS RESTARTS AGE cc-operator-76c54d65cd-vgm5w 1/1 Running 0 7m28s
ステップ 2. ZooKeeper のインストール¶
Operator が実行中であることを確認したら、以下のコマンドを入力して ZooKeeper をインストールします。
helm upgrade --install \ zookeeper \ ./confluent-operator \ --values $VALUES_FILE \ --namespace operator \ --set zookeeper.enabled=true
以下のような出力が表示されます。
NAME: zookeeper LAST DEPLOYED: Wed Jan 8 14:51:26 2020 NAMESPACE: operator STATUS: deployed REVISION: 1 TEST SUITE: None NOTES: Zookeeper Cluster Deployment Zookeeper cluster is deployed through CR. 1. Validate if Zookeeper Custom Resource (CR) is created kubectl get zookeeper -n operator | grep zookeeper 2. Check the status/events of CR: zookeeper kubectl describe zookeeper zookeeper -n operator 3. Check if Zookeeper cluster is Ready kubectl get zookeeper zookeeper -ojson -n operator kubectl get zookeeper zookeeper -ojsonpath='{.status.phase}' -n operator 4. Update/Upgrade Zookeeper Cluster The upgrade can be done either through the ``helm upgrade`` command or by editing the CR directly as below; kubectl edit zookeeper zookeeper -n operator
ZooKeeper が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。
kubectl get pods -n operator
ZooKeeper について以下の例のような出力が表示され、ステータスが
Running
になっています。NAME READY STATUS RESTARTS AGE zookeeper-0 1/1 Running 0 4h30m zookeeper-1 1/1 Running 0 4h30m zookeeper-2 1/1 Running 0 4h30m
ステップ 3. Kafka ブローカーのインストール¶
Kafka をインストールするには、以下のコマンドを入力します。
helm upgrade --install \ kafka \ ./confluent-operator \ --values $VALUES_FILE \ --namespace operator \ --set kafka.enabled=true
以下のような出力が表示されます。
NAME: kafka LAST DEPLOYED: Wed Jan 8 15:07:46 2020 NAMESPACE: operator STATUS: deployed REVISION: 1 TEST SUITE: None NOTES: Kafka Cluster Deployment Kafka Cluster is deployed to kubernetes through CR Object 1. Validate if Kafka Custom Resource (CR) is created kubectl get kafka -n operator | grep kafka 2. Check the status/events of CR: kafka kubectl describe kafka kafka -n operator 3. Check if Kafka cluster is Ready kubectl get kafka kafka -ojson -n operator kubectl get kafka kafka -ojsonpath='{.status.phase}' -n operator ... output omitted
Kafka が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。
kubectl get pods -n operator
Kafka について以下の例のような出力が表示され、ステータスが
Running
になっています。NAME READY STATUS RESTARTS AGE kafka-0 1/1 Running 0 4h20m kafka-1 1/1 Running 0 4h20m kafka-2 1/1 Running 0 4h20m
ステップ 4. Schema Registry のインストール¶
Schema Registry をインストールするには、以下のコマンドを入力します。
helm upgrade --install \ schemaregistry \ ./confluent-operator \ --values $VALUES_FILE \ --namespace operator \ --set schemaregistry.enabled=true
以下のような出力が表示されます。
NAME: schemaregistry LAST DEPLOYED: Thu Jan 9 15:51:21 2020 NAMESPACE: operator STATUS: deployed REVISION: 1 TEST SUITE: None NOTES: Schema Registry is deployed through PSC. Configure Schema Registry through REST Endpoint 1. Validate if schema registry cluster is running kubectl get pods -n operator | grep schemaregistry 2. Access Internal REST Endpoint : http://schemaregistry:8081 (Inside kubernetes) OR http://localhost:8081 (Inside Pod) More information about schema registry REST API can be found here, https://docs.confluent.io/platform/current/schema-registry/docs/api.html
Schema Registry が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。
kubectl get pods -n operator
Schema Registry について以下の例のような出力が表示され、ステータスが
Running
になっています。NAME READY STATUS RESTARTS AGE schemaregistry-0 1/1 Running 0 4h18m schemaregistry-1 1/1 Running 0 4h18m
ステップ 5. Kafka Connect のインストール¶
注釈
コネクターを使用する予定の場合は、Connect をインストールする前に、「Confluent コネクターのデプロイ」の手順に従って、コネクターが含まれるように Connect イメージを拡張します。
Connect をインストールするには、以下のコマンドを入力します。
helm upgrade --install \ connectors \ ./confluent-operator \ --values $VALUES_FILE \ --namespace operator \ --set connect.enabled=true
Connect が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。
kubectl get pods -n operator
Connect について以下の例のような出力が表示され、ステータスが
Running
になっています。NAME READY STATUS RESTARTS AGE connectors-0 1/1 Running 0 4h15m connectors-1 1/1 Running 0 4h15m
ステップ 6. Confluent Replicator のインストール¶
Replicator をインストールするには、以下のコマンドを入力します。
helm upgrade --install \ replicator \ ./confluent-operator \ --values $VALUES_FILE \ --namespace operator \ --set replicator.enabled=true
Replicator が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。
kubectl get pods -n operator
Replicator について以下の例のような出力が表示され、ステータスが
Running
になっています。NAME READY STATUS RESTARTS AGE replicator-0 1/1 Running 0 4h18m replicator-1 1/1 Running 0 4h18m
ステップ 7. Confluent Control Center のインストール¶
Confluent Control Center をインストールするには、以下のコマンドを入力します。
helm upgrade --install \ controlcenter \ ./confluent-operator \ --values $VALUES_FILE \ --namespace operator \ --set controlcenter.enabled=true
Confluent Control Center が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。
kubectl get pods -n operator
Confluent Control Center について以下の例のような出力が表示され、ステータスが
Running
になっています。NAME READY STATUS RESTARTS AGE controlcenter-0 1/1 Running 0 4h18m
ステップ 8. ksqlDB のインストール¶
ksqlDB をインストールするには、以下のコマンドを入力します。
helm upgrade --install \ ksql \ ./confluent-operator \ --values $VALUES_FILE \ --namespace operator \ --set ksql.enabled=true
ksqlDB が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。
kubectl get pods -n operator
ksqlDB について以下の例のような出力が表示され、ステータスが
Running
になっています。NAME READY STATUS RESTARTS AGE ksql-0 1/1 Running 0 21m ksql-1 1/1 Running 0 21m
ステップ 9. デプロイのテスト¶
デプロイをテストおよび検証するには、以下の手順に従います。
内部アクセスの検証¶
内部通信を検証するには、以下の手順に従います。
ローカルマシンで以下のコマンドを入力して、クラスターの名前空間情報を表示します(例として名前空間 operator を使用)。この情報には、内部通信の検証に必要なブートストラップエンドポイントが含まれています。
kubectl get kafka -n operator -oyaml
ブートストラップエンドポイントは
bootstrap.servers
の行に示されます。... omitted internalClient: |- bootstrap.servers=kafka:9071
ローカルマシンで kubectl exec を使用して、クラスター内のポッドのいずれかで bash セッションを開始します。この例では、デフォルト名
kafka-0
のポッドを、デフォルト名kafka
の Kafka クラスターで使用します。kubectl -n operator exec -it kafka-0 bash
ポッドで、
kafka.properties
という名前のファイルを作成し、内容を入力します。コンテナーにはテキストエディターがインストールされていないので、このファイルの作成には以下の例に示すように cat コマンドを使用します。ファイルを保存するには、Ctrl + D
キーを押します。注釈
この例には、デフォルトの SASL/PLAIN セキュリティパラメーターが示されています。本稼働環境では、追加のセキュリティが必要です。詳細については、「Confluent Operator でのセキュリティの構成」を参照してください。
cat << EOF > kafka.properties bootstrap.servers=kafka:9071 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123"; sasl.mechanism=PLAIN security.protocol=SASL_PLAINTEXT EOF
ポッドで、以下のコマンドを使用してブートストラップサーバーへのクエリを実行します。
kafka-broker-api-versions --command-config kafka.properties --bootstrap-server kafka:9071
3 つの Kafka ブローカーそれぞれにつき、以下のような出力が表示されます。
kafka-1.kafka.operator.svc.cluster.local:9071 (id: 1 rack: 0) -> ( Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 4 [usable: 4], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 1 [usable: 1], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 4], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 3 [usable: 3], Heartbeat(12): 0 to 2 [usable: 2], ... omitted
この出力をもって、クラスター内での内部通信の検証となります。
外部アクセスの検証¶
「Kafka への外部アクセス」の説明に従って、Kafka への外部アクセスと追加した DNS エントリを有効にしたら、外部通信を検証するために以下の手順に従います。
注釈
この例では、デフォルトの Confluent Platform コンポーネント名とデフォルトの Kafka ブートストラッププレフィックス kafka
が使用されます。
ローカルマシンで、 Confluent Platform をダウンロード します。Confluent CLI は、ダウンロードして
PATH
と必要な環境変数を設定するだけで使用できます。ローカルマシンで Confluent Platform を起動する必要はありません。外部通信の検証には、ローカルマシンで実行されている Confluent CLI を使用します。Confluent CLI は Confluent Platform に含まれています。
ローカルマシンで、外部クライアント用のブートストラップサーバーエンドポイントを取得するためのコマンドを実行します。
kubectl get kafka -n operator -oyaml
以下の出力例で、ブートストラップサーバーエンドポイントは
kafka.mydomain:9092
です。... omitted externalClient: |- bootstrap.servers=kafka.mydomain:9092 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123"; sasl.mechanism=PLAIN security.protocol=SASL_PLAINTEXT
Confluent Platform がローカル実行されているローカルマシンで、
kafka.properties
という名前のファイルを作成し、以下の内容を入力します。上記の手順で取得した外部エンドポイントをbootstrap.servers
に割り当てます。bootstrap.servers=<kafka bootstrap endpoint> sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123"; sasl.mechanism=PLAIN security.protocol=SASL_PLAINTEXT
注釈
この例には、デフォルトの SASL/PLAIN セキュリティパラメーターが示されています。本稼働環境では、追加のセキュリティが必要です。詳細については、「Confluent Operator でのセキュリティの構成」を参照してください。
ローカルマシンで、ブートストラップエンドポイント
<kafka bootstrap endpoint>
を使用してトピックを作成します。以下の例では、パーティションが 1 つ、レプリカが 3 つのトピックが作成されます。kafka-topics --bootstrap-server <kafka bootstrap endpoint> \ --command-config kafka.properties \ --create --replication-factor 3 \ --partitions 1 --topic example
ローカルマシンで、ブートストラップエンドポイント
<kafka bootstrap endpoint>
を使用して新しいトピックを生成します。これ 1 つですべての Kafka ブローカーにゲートウェイアクセスが提供されることから、ブートストラップサーバーエンドポイントは必要な唯一の Kafka ブローカーエンドポイントであることにご注意ください。seq 10000 | kafka-console-producer \ --topic example --broker-list <kafka bootstrap endpoint> \ --producer.config kafka.properties
ローカルマシンの 別のターミナル で、
kafka.properties
を作成したディレクトリから、Confluent CLI コマンドを発行して新しいトピックを消費します。kafka-console-consumer --from-beginning \ --topic example --bootstrap-server <kafka bootstrap endpoint> \ --consumer.config kafka.properties
これらの手順が正常に完了したことをもって、クラスターとの外部通信の検証となります。
Confluent Control Center の外部アクセスの検証¶
Confluent Platform クラスターに Control Center を使用してアクセスするには、以下の手順に従います。この手順に先立ち、外部ロードバランサーの構成 の説明に従って、Confluent Control Center 用の外部ロードバランサーを有効にし、DNS エントリを追加します。
ローカルマシンで、以下のコマンドを入力して、デフォルトの Confluent Control Center エンドポイントへのポートフォワーディングをセットアップします。
kubectl port-forward svc/controlcenter 9021:9021 -n operator
ブラウザーで Control Center に接続します。
http://localhost:9021/
Control Center にログインします。基本認証の認証情報は構成ファイル(
$VALUES_FILE
)に設定されています。以下の例で、userID は admin、パスワードは Developer1 です。## ## C3 authentication ## auth: basic: enabled: true ## ## map with key as user and value as password and role property: admin: Developer1,Administrators disallowed: no_access
重要
Confluent Control Center の基本認証は、開発のテストに使用できます。一般に、この種類の認証は本稼働環境では無効にされ、LDAP はユーザーアクセス用に構成されます。LDAP パラメーターは、Control Center の値ファイルで提供されます。