Confluent Operator による共通デプロイアーキテクチャ

Confluent Operator と Confluent Platform の基本的なデプロイ例については、「Confluent Operator および Confluent Platform のインストール」で説明しています。以降のセクションでは、他のデプロイ構成について説明します。

このガイドの例では、以下が想定されています。

  • $VALUES_FILE は「グローバル構成ファイルの作成」でセットアップする構成ファイルのことです。

  • Operator ドキュメントで簡潔で明快な例を提示できるよう、すべての構成パラメーターが構成ファイル( $VALUES_FILE )に指定されます。ただし、本稼働環境でのデプロイにおいて、Helm を使用して機密データを扱う場合には、--set または --set-file オプションを使用します。以下に例を示します。

    helm upgrade --install kafka \
     --set kafka.services.mds.ldap.authentication.simple.principal="cn=mds\,dc=test\,dc=com" \
     --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \
     --set kafka.enabled=true
    
    Copy
  • operator は Confluent Platform のデプロイ先となる名前空間です。

  • コマンドはすべて、Confluent Operator のダウンロード先ディレクトリにある helm ディレクトリで実行されます。

複数の Confluent Platform クラスターのデプロイ

複数のクラスターをデプロイするには、追加する Confluent Platform クラスターを別の名前空間にデプロイします。追加するクラスターは、同じ Kubernetes クラスターにある他のどのクラスターとも異なる名前にしてください。複数のクラスターを単一の Kubernetes クラスターで実行する場合、Confluent Operator インスタンスの追加インストールは しません

たとえば、Operator インスタンスが 1 つあり、2 つの Apache Kafka® クラスターをデプロイする場合は、1 番目の Kafka クラスターを kafka1、2 番目の Kafka クラスターを kafka2 のように命名します。そうしたうえで、それぞれを異なる名前空間にデプロイします。

また、Operator インスタンスを追加インストールしていないので、Docker レジストリシークレットが新しい名前空間にインストールされていることを確認する必要があります。これを Helm の install コマンドで実行するには、コンポーネントを有効化する際に global.injectPullSecret=true を追加します。

注釈

パラメーター global.injectPullSecret=true が必要になるのは、新しい名前空間に Docker シークレットがないか、またはイメージをプルするために Docker シークレットが実際に必要か、どちらか一方の場合に限られます。シークレットのある名前空間への新しいコンポーネントのインストールの実行を global.injectPullSecret=true で試行すると、Helm からリソースが既に存在している旨のエラーが返されます。

たとえば、名前空間 operator2kafka2 を使用する場合、コマンドは次のようになります。

helm upgrade --install \
  kafka2
  --values $VALUES_FILE \
  --namespace operator2 \
  --set kafka.enabled=true,global.injectPullSecret=true \
  ./confluent-operator/
Copy

新しい名前空間に Docker シークレットが作成されたら、サービスアカウントをパッチします。

kubectl -n operator patch serviceaccount default -p '{"imagePullSecrets": [{"name": "confluent-docker-registry" }]}'
Copy

基本認証のプライベートまたはローカルレジストリを使用している場合は、次のコマンドを使用します。

kubectl -n operator patch serviceaccount default -p '{"imagePullSecrets": [{"name": "<your registry name here>" }]}'
Copy

複数のアベイラビリティゾーンの使用

複数のアベイラビリティゾーン(AZ)を使用する場合は、構成ファイル( $VALUES_FILE )でまず zones 値のブロックを構成する必要があります。次の例には 3 つのゾーン(us-central-a/-b/-c)が示されています。

provider:
name: gcp
region: us-central1
kubernetes:
   deployment:
     zones:
      - us-central1-a
      - us-central1-b
      - us-central1-c
Copy

重要

Kubernetes クラスターがゾーン a、b、c にわたっている一方、上記の yaml ブロックで a と b のみ構成されていた場合でも、Confluent Operator によって a と b だけではなく a、b、c に含まれるポッドがスケジュールされます。加えて、このように構成すると、全ポッドのストレージディスクはゾーン a と b のみに存在しますが、ポッドはゾーン a、b、c すべてに分散されます。

注釈

パブリッククラウドにある Kubernetes ノードは、その AZ でタグ付けされます。Kubernetes では、ポッドの該当ゾーンでの分散が試行されます。詳細については、「複数のゾーンで動かす」を参照してください。

複数の Kafka クラスターでの Replicator の使用

以下の手順では、Replicator を複数の Kafka クラスターにデプロイする方法を説明します。この例は、開発とテストの場合のみ参考にしてください。

前提条件
  • 外部 DNS が開発プラットフォーム上で構成および実行されていることを確認します。
  • DNS 名の作成が許可されていることを確認します。

また、これから示す手順は以下の条件のもとで実行されています。

  • GCP 環境に基づいています。
  • サンプル DNS 名として mydevplatform.gcp.cloud が使用されます。
  • 双方向 TLS セキュリティが使用されます。

クラスターのデプロイ

次の手順を完了して、クラスターをデプロイします。こちらの デプロイ手順の例 を参考にしてください。

  1. Operator を名前空間 operator にデプロイします。
  2. 送信先となる Kafka および ZooKeeper クラスターを名前空間 kafka-dest にデプロイします。以下の名前を使用します。
    • ZooKeeper クラスター : zookeeper-dest
    • Kafka クラスター: kafka-dest
  3. ソースとなる Kafka および ZooKeeper クラスターを名前空間 kafka-src にデプロイします。以下の名前を使用します。
    • ZooKeeper クラスター : zookeeper-src
    • Kafka クラスター: kafka-src
  4. Replicator を、名前空間 kafka-dest に、デフォルト名 replicator を使用してデプロイします。Replicator エンドポイントを replicator.dependencies.kafka.bootstrapEndpoint:kafka-dest:9071 に設定します。TLS 一方向セキュリティ用のエンドポイントを構成します。

Replicator の構成

以下の手順を実行して Replicator を構成します。

  1. ローカルマシンで kubectl exec を使用して、クラスター内のポッドのいずれかで bash セッションを開始します。この例では、デフォルト名 replicator-0 のポッドを、kafka-dest という名前の Kafka クラスターで使用します。

    kubectl -n kafka-dest exec -it replicator-0 bash
    
    Copy
  2. Replicator ポッドで、test.json という名前のファイルを作成し、内容を入力します。コンテナーにはテキストエディターがインストールされていないので、このファイルの作成には以下の例に示すように cat コマンドを使用します。ファイルを保存するには、Ctrl + D キーを押します。

    cat <<EOF >> test.json
    {
    "name": "test-replicator",
    "config": {
       "connector.class":
       "io.confluent.connect.replicator.ReplicatorSourceConnector",
       "tasks.max": "4",
       "topic.whitelist": "example",
       "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
       "value.converter":
       "io.confluent.connect.replicator.util.ByteArrayConverter",
       "src.kafka.bootstrap.servers": "kafka-src.mydevplatform.gcp.cloud:9092",
       "src.kafka.security.protocol": "SSL",
       "src.kafka.ssl.keystore.location": "/tmp/keystore.jks",
       "src.kafka.ssl.keystore.password": "mystorepassword",
       "src.kafka.ssl.key.password": "mykeypassword",
       "src.kafka.ssl.truststore.location": "/tmp/truststore.jks",
       "src.kafka.ssl.truststore.password": "mystorepassword",
       "dest.kafka.bootstrap.servers": "kafka-dest:9071",
       "dest.kafka.security.protocol": "PLAINTEXT",
       "confluent.license": "",
       "confluent.topic.replication.factor": "3"
      }
    }
    EOF
    
    Copy
  3. いずれかの Replicator ポッドから以下のコマンドを入力して、Replicator 構成の POST テストを実行します。

    curl -XPOST -H "Content-Type: application/json" --data @test.json https://localhost:8083/connectors -kv
    
    Copy
  4. Replicator コネクターが作成されていることを確認します。以下のコマンドを実行すると、[''test-replicator''] が返されます。

    curl -XGET -H "Content-Type: application/json" https://localhost:8443/connectors -kv
    
    Copy

Replicator のテスト

以下の手順を実行して Replicator をテストします。

  1. ローカルマシンで kubectl exec を使用して、クラスター内のポッドのいずれかで bash セッションを開始します。この例では、デフォルト名 kafka-src-0 のポッドを、kafka-src という名前の Kafka クラスターで使用します。

    kubectl -n kafka-src exec -it kafka-src-0 bash
    
    Copy
  2. ポッドで、kafka.properties という名前のファイルを作成し、内容を入力します。コンテナーにはテキストエディターがインストールされていないので、このファイルの作成には以下の例に示すように cat コマンドを使用します。ファイルを保存するには、Ctrl + D キーを押します。

    cat <<EOF > kafka.properties
    bootstrap.servers=kafka-src.mydevplatform.gcp.cloud:9092
    security.protocol=SSL
    ssl.keystore.location=/tmp/keystore.jks
    ssl.keystore.password=mystorepassword
    ssl.key.password=mystorepassword
    ssl.truststore.location=/tmp/truststore.jks
    ssl.truststore.password=mystorepassword
    EOF
    
    Copy
  3. kafka.properties を確認します(DNS が正しく構成されていることを確認します)。

    kafka-broker-api-versions \
    --command-config kafka.properties \
    --bootstrap-server kafka-src.mydevplatform.gcp.cloud:9092
    
    Copy
  4. ソース Kafka クラスターでトピックを作成します。Kafka ポッドで以下のコマンドを入力します。

    kafka-topics --create --topic test-topic \
    --replication-factor 1 --partitions 4 \
    --bootstrap-server kafka-src.mydevplatform.gcp.cloud:9092
    
    Copy
  5. ソース Kafka クラスター kafka-src への生成を行います。

    seq 10000 | kafka-console-producer --topic test-topic \
    --broker-list kafka-src.mydevplatform.gcp.cloud:9092 \
    --producer.config kafka.properties
    
    Copy
  6. 新しいターミナルから、bash セッションを kafka-dest-0 上で開始します。

    kubectl -n kafka-dest exec -it kafka-dest-0 bash
    
    Copy
  7. ポッドで、次のような kafka.properties ファイルを作成します。

    cat <<EOF > kafka.properties
    sasl.mechanism=PLAIN
    bootstrap.servers=kafka-dest:9071
    security.protocol=PLAINTEXT
    EOF
    
    Copy
  8. kafka.properties を確認します(DNS が正しく構成されていることを確認します)。

    kafka-broker-api-versions --command-config kafka.properties \
    --bootstrap-server kafka-dest:9071
    
    Copy
  9. テストトピックが kafka-dest に作成されたことを確認します。

    kafka-topics --describe --topic test-topic.replica \
    --bootstrap-server kafka-dest:9071
    
    Copy
  10. 送信先 Kafka クラスター kafka-dest でデリバリーを確認します。

    kafka-console-consumer --from-beginning --topic test-topic \
    --bootstrap-server kafka-dest:9071 \
    --consumer.config kafka.properties
    
    Copy