Confluent Operator による共通デプロイアーキテクチャ¶
Confluent Operator と Confluent Platform の基本的なデプロイ例については、「Confluent Operator および Confluent Platform のインストール」で説明しています。以降のセクションでは、他のデプロイ構成について説明します。
このガイドの例では、以下が想定されています。
$VALUES_FILE
は「グローバル構成ファイルの作成」でセットアップする構成ファイルのことです。Operator ドキュメントで簡潔で明快な例を提示できるよう、すべての構成パラメーターが構成ファイル(
$VALUES_FILE
)に指定されます。ただし、本稼働環境でのデプロイにおいて、Helm を使用して機密データを扱う場合には、--set
または--set-file
オプションを使用します。以下に例を示します。helm upgrade --install kafka \ --set kafka.services.mds.ldap.authentication.simple.principal="cn=mds\,dc=test\,dc=com" \ --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \ --set kafka.enabled=true
operator
は Confluent Platform のデプロイ先となる名前空間です。コマンドはすべて、Confluent Operator のダウンロード先ディレクトリにある
helm
ディレクトリで実行されます。
複数の Confluent Platform クラスターのデプロイ¶
複数のクラスターをデプロイするには、追加する Confluent Platform クラスターを別の名前空間にデプロイします。追加するクラスターは、同じ Kubernetes クラスターにある他のどのクラスターとも異なる名前にしてください。複数のクラスターを単一の Kubernetes クラスターで実行する場合、Confluent Operator インスタンスの追加インストールは しません。
たとえば、Operator インスタンスが 1 つあり、2 つの Apache Kafka® クラスターをデプロイする場合は、1 番目の Kafka クラスターを kafka1、2 番目の Kafka クラスターを kafka2 のように命名します。そうしたうえで、それぞれを異なる名前空間にデプロイします。
また、Operator インスタンスを追加インストールしていないので、Docker レジストリシークレットが新しい名前空間にインストールされていることを確認する必要があります。これを Helm の install コマンドで実行するには、コンポーネントを有効化する際に global.injectPullSecret=true
を追加します。
注釈
パラメーター global.injectPullSecret=true
が必要になるのは、新しい名前空間に Docker シークレットがないか、またはイメージをプルするために Docker シークレットが実際に必要か、どちらか一方の場合に限られます。シークレットのある名前空間への新しいコンポーネントのインストールの実行を global.injectPullSecret=true
で試行すると、Helm からリソースが既に存在している旨のエラーが返されます。
たとえば、名前空間 operator2
で kafka2
を使用する場合、コマンドは次のようになります。
helm upgrade --install \
kafka2
--values $VALUES_FILE \
--namespace operator2 \
--set kafka.enabled=true,global.injectPullSecret=true \
./confluent-operator/
新しい名前空間に Docker シークレットが作成されたら、サービスアカウントをパッチします。
kubectl -n operator patch serviceaccount default -p '{"imagePullSecrets": [{"name": "confluent-docker-registry" }]}'
基本認証のプライベートまたはローカルレジストリを使用している場合は、次のコマンドを使用します。
kubectl -n operator patch serviceaccount default -p '{"imagePullSecrets": [{"name": "<your registry name here>" }]}'
複数のアベイラビリティゾーンの使用¶
複数のアベイラビリティゾーン(AZ)を使用する場合は、構成ファイル( $VALUES_FILE
)でまず zones 値のブロックを構成する必要があります。次の例には 3 つのゾーン(us-central-a/-b/-c)が示されています。
provider:
name: gcp
region: us-central1
kubernetes:
deployment:
zones:
- us-central1-a
- us-central1-b
- us-central1-c
重要
Kubernetes クラスターがゾーン a、b、c にわたっている一方、上記の yaml ブロックで a と b のみ構成されていた場合でも、Confluent Operator によって a と b だけではなく a、b、c に含まれるポッドがスケジュールされます。加えて、このように構成すると、全ポッドのストレージディスクはゾーン a と b のみに存在しますが、ポッドはゾーン a、b、c すべてに分散されます。
注釈
パブリッククラウドにある Kubernetes ノードは、その AZ でタグ付けされます。Kubernetes では、ポッドの該当ゾーンでの分散が試行されます。詳細については、「複数のゾーンで動かす」を参照してください。
複数の Kafka クラスターでの Replicator の使用¶
以下の手順では、Replicator を複数の Kafka クラスターにデプロイする方法を説明します。この例は、開発とテストの場合のみ参考にしてください。
- 前提条件
- 外部 DNS が開発プラットフォーム上で構成および実行されていることを確認します。
- DNS 名の作成が許可されていることを確認します。
また、これから示す手順は以下の条件のもとで実行されています。
- GCP 環境に基づいています。
- サンプル DNS 名として
mydevplatform.gcp.cloud
が使用されます。 - 双方向 TLS セキュリティが使用されます。
クラスターのデプロイ¶
次の手順を完了して、クラスターをデプロイします。こちらの デプロイ手順の例 を参考にしてください。
- Operator を名前空間
operator
にデプロイします。 - 送信先となる Kafka および ZooKeeper クラスターを名前空間
kafka-dest
にデプロイします。以下の名前を使用します。- ZooKeeper クラスター :
zookeeper-dest
- Kafka クラスター:
kafka-dest
- ZooKeeper クラスター :
- ソースとなる Kafka および ZooKeeper クラスターを名前空間
kafka-src
にデプロイします。以下の名前を使用します。- ZooKeeper クラスター :
zookeeper-src
- Kafka クラスター:
kafka-src
- ZooKeeper クラスター :
- Replicator を、名前空間
kafka-dest
に、デフォルト名replicator
を使用してデプロイします。Replicator エンドポイントをreplicator.dependencies.kafka.bootstrapEndpoint:kafka-dest:9071
に設定します。TLS 一方向セキュリティ用のエンドポイントを構成します。
Replicator の構成¶
以下の手順を実行して Replicator を構成します。
ローカルマシンで kubectl exec を使用して、クラスター内のポッドのいずれかで bash セッションを開始します。この例では、デフォルト名
replicator-0
のポッドを、kafka-dest
という名前の Kafka クラスターで使用します。kubectl -n kafka-dest exec -it replicator-0 bash
Replicator ポッドで、
test.json
という名前のファイルを作成し、内容を入力します。コンテナーにはテキストエディターがインストールされていないので、このファイルの作成には以下の例に示すように cat コマンドを使用します。ファイルを保存するには、Ctrl + D
キーを押します。cat <<EOF >> test.json { "name": "test-replicator", "config": { "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector", "tasks.max": "4", "topic.whitelist": "example", "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "src.kafka.bootstrap.servers": "kafka-src.mydevplatform.gcp.cloud:9092", "src.kafka.security.protocol": "SSL", "src.kafka.ssl.keystore.location": "/tmp/keystore.jks", "src.kafka.ssl.keystore.password": "mystorepassword", "src.kafka.ssl.key.password": "mykeypassword", "src.kafka.ssl.truststore.location": "/tmp/truststore.jks", "src.kafka.ssl.truststore.password": "mystorepassword", "dest.kafka.bootstrap.servers": "kafka-dest:9071", "dest.kafka.security.protocol": "PLAINTEXT", "confluent.license": "", "confluent.topic.replication.factor": "3" } } EOF
いずれかの Replicator ポッドから以下のコマンドを入力して、Replicator 構成の POST テストを実行します。
curl -XPOST -H "Content-Type: application/json" --data @test.json https://localhost:8083/connectors -kv
Replicator コネクターが作成されていることを確認します。以下のコマンドを実行すると、
[''test-replicator'']
が返されます。curl -XGET -H "Content-Type: application/json" https://localhost:8443/connectors -kv
Replicator のテスト¶
以下の手順を実行して Replicator をテストします。
ローカルマシンで kubectl exec を使用して、クラスター内のポッドのいずれかで bash セッションを開始します。この例では、デフォルト名
kafka-src-0
のポッドを、kafka-src
という名前の Kafka クラスターで使用します。kubectl -n kafka-src exec -it kafka-src-0 bash
ポッドで、
kafka.properties
という名前のファイルを作成し、内容を入力します。コンテナーにはテキストエディターがインストールされていないので、このファイルの作成には以下の例に示すように cat コマンドを使用します。ファイルを保存するには、Ctrl + D
キーを押します。cat <<EOF > kafka.properties bootstrap.servers=kafka-src.mydevplatform.gcp.cloud:9092 security.protocol=SSL ssl.keystore.location=/tmp/keystore.jks ssl.keystore.password=mystorepassword ssl.key.password=mystorepassword ssl.truststore.location=/tmp/truststore.jks ssl.truststore.password=mystorepassword EOF
kafka.properties
を確認します(DNS が正しく構成されていることを確認します)。kafka-broker-api-versions \ --command-config kafka.properties \ --bootstrap-server kafka-src.mydevplatform.gcp.cloud:9092
ソース Kafka クラスターでトピックを作成します。Kafka ポッドで以下のコマンドを入力します。
kafka-topics --create --topic test-topic \ --replication-factor 1 --partitions 4 \ --bootstrap-server kafka-src.mydevplatform.gcp.cloud:9092
ソース Kafka クラスター
kafka-src
への生成を行います。seq 10000 | kafka-console-producer --topic test-topic \ --broker-list kafka-src.mydevplatform.gcp.cloud:9092 \ --producer.config kafka.properties
新しいターミナルから、bash セッションを
kafka-dest-0
上で開始します。kubectl -n kafka-dest exec -it kafka-dest-0 bash
ポッドで、次のような
kafka.properties
ファイルを作成します。cat <<EOF > kafka.properties sasl.mechanism=PLAIN bootstrap.servers=kafka-dest:9071 security.protocol=PLAINTEXT EOF
kafka.properties
を確認します(DNS が正しく構成されていることを確認します)。kafka-broker-api-versions --command-config kafka.properties \ --bootstrap-server kafka-dest:9071
テストトピックが
kafka-dest
に作成されたことを確認します。kafka-topics --describe --topic test-topic.replica \ --bootstrap-server kafka-dest:9071
送信先 Kafka クラスター
kafka-dest
でデリバリーを確認します。kafka-console-consumer --from-beginning --topic test-topic \ --bootstrap-server kafka-dest:9071 \ --consumer.config kafka.properties