Confluent Operator での Confluent Platform の管理

以降のセクションでは、デプロイされた Confluent Platform のクラスターの管理と運用環境について説明します。

注釈

Confluent では、Confluent Operator と Confluent Platform 1.7 のデプロイに Helm 3 を推奨しています。Helm の移行と Operator のアップグレードの手順については、以下の説明を参照してください。

このガイドの例では、以下が想定されています。

  • $VALUES_FILE は「グローバル構成ファイルの作成」でセットアップする構成ファイルのことです。

  • Operator ドキュメントで簡潔で明快な例を提示できるよう、すべての構成パラメーターが構成ファイル( $VALUES_FILE )に指定されます。ただし、本稼働環境でのデプロイにおいて、Helm を使用して機密データを扱う場合には、--set または --set-file オプションを使用します。以下に例を示します。

    helm upgrade --install kafka \
     --set kafka.services.mds.ldap.authentication.simple.principal="cn=mds\,dc=test\,dc=com" \
     --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \
     --set kafka.enabled=true
    
    Copy
  • operator は Confluent Platform のデプロイ先となる名前空間です。

  • コマンドはすべて、Confluent Operator のダウンロード先ディレクトリにある helm ディレクトリで実行されます。

既存の Kafka クラスターへのコンポーネントの追加

クラスターに他のコンポーネントを追加するには、構成ファイル( $VALUES_FILE )を変更し、追加するコンポーネントについて helm upgrade --install コマンドを実行します。

たとえば、ksqlDB をクラスターに追加する場合は、以下の例のように、ksqlDB 値のブロックを構成ファイル( $VALUES_FILE )に追加し、ksqlDB のみの install コマンドを実行します。

helm upgrade --install \
  ksql \
  --values $VALUES_FILE \
  --namespace operator \
  --set ksql.enabled=true \
  ./confluent-operator
Copy

コンポーネントの削除

コンポーネントリリースをクラスターからアンインストールするには、以下のコマンドを実行します。

helm uninstall <component-release-name> --namespace <namespace-name>
Copy

クラスターから Confluent Platform コンポーネントを削除するには、以下のコマンドを入力します。コンポーネントは、コンポーネントリリース名を使用して、以下に示されている順序で削除する必要があります。以下の例で示されているのはデフォルトのリリース名です。

helm uninstall ksql --namespace <namespace-name>
helm uninstall controlcenter --namespace <namespace-name>
helm uninstall connectors --namespace <namespace-name>
helm uninstall replicator --namespace <namespace-name>
helm uninstall schemaregistry --namespace <namespace-name>
helm uninstall kafka --namespace <namespace-name>
helm uninstall zookeeper --namespace <namespace-name>
helm uninstall operator --namespace <namespace-name>
Copy

クラスターの再起動のトリガー

構成変更を適用するためや、シークレットオブジェクトで認証情報をローテーションした後に変更をコンポーネントに反映するためなどの目的で、クラスターの再起動をトリガーする必要が生じる場合があります。

Confluent Platform コンポーネントクラスターを再起動するには、以下の手順に従います。

  1. 再起動する Confluent Platform クラスターに対応する StatefulSet の名前を検索します。

    kubectl get statefulset --namespace <namespace>
    
    Copy
  2. 前のステップで確認した StatefulSet 名 <name-of-statefulset> を使用して、クラスターをローリングします。

    kubectl rollout restart statefulset/<name-of-statefulset> --namespace <namespace>
    
    Copy

Confluent コネクターのデプロイ

Operator 1.7 から、コネクターは Connect イメージにパッケージ化されなくなりました。コネクターをデプロイするには、コネクターを Connect イメージ cp-server-connect-operator に追加する必要があります。Docker イメージの拡張の詳細については、「Confluent Platform イメージのビルド」を参照してください。

  1. Operator init コンテナーイメージをプルします。コネクターイメージは Operator init イメージによって異なるため、この両方のイメージが、同じリポジトリ内にある必要があります。

    docker pull confluentinc/cp-init-container-operator:<version-tag>
    
    Copy
  2. 1 つまたは複数のコネクターを cp-server-connect-operator イメージに追加するには、<dockerfile-dir>Dockerfile を作成します。

    Confluent Hub からコネクターをプルするか、Docker ビルドの実行元であるマシンにダウンロードしたコネクター JAR を使用することができます。

    • コネクターを Confluent Hub からプルするには、以下のように Dockerfile を作成します。

      FROM confluentinc/cp-server-connect-operator:<version-tag>
      USER root
      RUN confluent-hub install --no-prompt <connector1>:<connector1-version> \
        && confluent-hub install --no-prompt <connector2>:<connector2-version> \
        && ...
      USER 1001
      
      Copy

      この Dockerfile の例では、Docker イメージの作成に Confluent Hub の data-gen コネクターを使用しています。

      FROM confluentinc/cp-server-connect-operator:6.0.0.0
      USER root
      RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.3.3
      USER 1001
      
      Copy
    • Docker ビルドの実行元であるマシン上のコネクター JAR ファイルを使用するには、以下のように Dockerfile を作成します。

      FROM confluentinc/cp-server-connect-operator:<version-tag>
      ADD <local-connector1-path> /usr/share/java/<connector1> \
        && <local-connector2-path> /usr/share/java/<connector2> \
        && ...
      USER 1001
      
      Copy

      この例の Dockerfile では、ローカルマシンの <connector-dir> ディレクトリにある data-gen コネクターを使用します。

      FROM confluentinc/cp-server-connect-operator:6.0.0.0
      ADD​ my-connector-dir/confluentinc-kafka-connect-datagen /usr/share/java/confluentinc-kafka-connect-datagen
      USER 1001
      
      Copy
  3. 以下のコマンドを実行して、イメージをビルドし、プッシュします。

    docker build <dockerfile-dir> -t <custom-registry>/<custom-repository>:<custom-tag>
    
    docker push <custom-registry>/<custom-repository>:<custom-tag>
    
    Copy
  4. 上記のプロセスの出力から Docker イメージの詳細を入手し、次の手順で使用します。

  5. helm upgrade を実行して変更を適用します。

    helm upgrade --install \
      <connect-helm-release-name> \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set connect.enabled=true \
      --set connect.name=connect \
      --set global.provider.registry.fqdn=<custom-registry-endpoint> \
      --set global.provider.registry.credential.required=<true or false> \
      --set global.provider.registry.credential.username=<value if required> \
      --set global.provider.registry.credential.password=<value if required> \
      --set connect.image.repository=<custom-registry>/<custom-repository> \
      --set connect.image.tag=<custom-tag> \
      --set global.initContainer.image.repository=<mypath>/<name> \
      --set global.initContainer.image.tag=<custom-tag>
    
    Copy

Kafka メトリクスのモニタリング

Kafka values.yaml ファイルに構成パラメーター metricReporter が用意されています。以下のパラメーターを構成ファイル( $VALUES_FILE )に追加すると、メッセージがブートストラップエンドポイントへ 30000 ミリ秒(30 秒)ごとにパブリッシュされます。

metricReporter:
  enabled: true
  ## Period (millisecond) the report should use to publish messages to bootstrapEndpoint
  ##
  publishMs: 30000
  ## ReplicationFactor, if empty, the value is based on the replicas count
  ##
  replicationFactor: ""
  tls:
    enabled: false
    ## If true, inter-communication will be encrypted if TLS is enabled. The bootstrapEndpoint will have FQDN name.
    ## If false, the security setting is configured to use either SASL_PLAINTEXT or PLAINTEXT
    internal: false
    authentication:
      type: ""
  ## If above tls.internal is true, configure with Kafka bootstrap DNS configuration on port 9092 e.g <kafka.name>.<domain>:9092
  ## If above tls.internal is false, configure with Kafka service name on port 9071 eg. <kafka.name>:9071 or FQDN name of Kafka serivce name e.g <name>.<namespace>.svc.cluster.local:9071
  bootstrapEndpoint: ""
Copy

重要

bootstrapEndpoint を DNS 名として metricReporter を有効にする場合は、DNS が正常に解決されることが必要です。解決されないと、Kafka ポッドがクラッシュします(CrashLoopBackOff)。boostrapEndpoint で内部アドレス( kafka:9071 など)を使用している場合は、問題になりません。セキュリティに関するその他の情報については、values.yaml ファイルのコメントを参照してください。

Kafka クラスターのスケーリングとデータのバランス調整

スケールアップ

Confluent Platform 6.0.0 以降、Kafka ブローカーの追加または削除時に負荷のバランス調整を行う場合は、Self-Balancing が推奨方法です。Self-Balancing はデフォルトで無効になっています。

Kafka クラスターをスケールアップするには、以下の手順に従います。

  1. Self-Balancing を有効にして、均等ではない負荷について Kafka のバランス調整を行います。

    Self-Balancing の管理に使用できる設定の網羅的なリストについては、「Self-Balancing Cluster の構成オプションおよびコマンド」を参照してください。設定は、構成ファイル( $VALUES_FILE )の kafka セクションの configOverrides で渡します。

    kafka:
      configOverrides:
        server:
          - confluent.balancer.enable=                   ----- [1]
          - confluent.balancer.heal.uneven.load.trigger= ----- [2]
    
    Copy
  2. Kafka レプリカの数を増やします。

    kafka:
      replicas:
    
    Copy
  3. 新しいブローカーポッドの DNS チェックを Operator で実行するかどうかを指定します。

    kafka:
      scalingDnsPreCheck:
    
    Copy
    • DNS のチェックまたは解決を Operator ノードから実行できることが保証できない環境、つまり、Operator ノードから DNS を解決できるかどうかを、Kafka ブローカーをスケールアップする前に Operator でチェックすることが望ましくない環境では、DNS チェックを無効(scalingDnsPreCheck: false)にします。このバージョンの Operator では、DNS チェックがデフォルトで無効になっています。

    • DNS が完全マネージド型で、全ノード(Operator ポッドを含む)から解決可能である環境では、DNS チェックを有効(scalingDnsPreCheck: true)にします。つまり、スケールアップ前に DNS をチェックするように Operator を構成します。

      この設定が有効になっている場合、ブローカーのレプリカ数が増やされたときに、Operator は新たに起動するブローカーごとに DNS チェックを実行して正常性を確認します。この DNS チェックがエラーになると、スケーリング操作がエラーになります。

  4. 前の手順で DNS チェックを有効にした場合、新しいブローカー用に適切な DNS レコードが構成されていることを確認します。また、nslookup などのコマンドを使用して、Operator が新しいブローカーのホスト名を解決できることを確認します。

    DNS サービスではなく hosts ファイルを使用している場合は、hosts ファイルを新しいブローカーの情報でアップデートする必要があります。以下に例を示します。

    1. 新しいブローカーの IP アドレスを取得します。

      kubectl get services -n <namespace>
      
      Copy
    2. ブローカープレフィックスを使用して既存のブローカーホスト名を参照し、新しいブローカーのホスト名を取得します。

    3. 追加エントリーをhostAliasesに追加する」の説明に従って、新しいブローカーホストを /etc/hosts ファイルに追加し、アップデートされたファイルを Operator ポッドに挿入します。

  5. 新しい設定で Kafka をアップデートします。

    helm upgrade --install kafka ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set kafka.enabled=true
    
    Copy

注釈

Auto Data Balancer を使用する必要がある場合は、あらかじめ Self-Balancing をオフにしておきます。Self-Balancing と Auto Data Balancer は同時に使用できないからです。Auto Data Balancer を使用したスケールアップ処理の詳細については、5.5.1 以前のバージョンのドキュメントを参照してください。

スケールダウン

Kafka クラスターのスケールダウンは、Confluent Operator の現行バージョンではサポートされません。

ストレージのスケール

ディスクのスケールアップ

Confluent Platform の大規模な使用を成功させるためには、ディスクのスケールアップ機能が重要です。たとえば、組織における Kafka の使用量が増加するにつれて、Kafka トピックの数が大幅に増加することが考えられ、それに合わせてディスクを拡張できれば、該当するトピックにデータを格納するのに十分なディスク容量を確保できます。

Confluent Operator を使用すると、シンプルで自動化されたストレージ拡張が実現されます。環境によっては、クラスター(ZooKeeper や Kafka など)に関連付けられている全ディスクの拡張を非常に簡素化でき、構成ファイル( $VALUES_FILE )の値を 1 つ変えて helm upgrade コマンドを実行するだけになります。しばらくすると、拡張されたディスク容量をクラスター内の全ポッドで中断なく活用できる状態になります。場合によっては、この拡張をクラスター全体についてものの数秒で完了できます。

このセクションでは、ディスク拡張の基盤となるプロセス、どの処理を自動化できるかを左右する設定や構成(ストレージプロビジョナー、Kubernetes のバージョン、StorageClass の構成など)、設定や構成に応じてディスクを拡張するために従う必要のある具体的なワークフローについて詳細に説明します。

環境と構成に関する考慮事項

ディスクを拡張するには、PersistentVolumeClaim(PVC)のサイズ変更と、所定の関連 PersistentVolume(PV)のマウント先ポッドに公開されているファイルシステムの拡張の両方が必要です。

PVC のサイズを自動変更する機能は、使用しているボリュームの種類と、PVC に関連付けられている StorageClass の構成に応じて異なります。PVC 拡張サポートの詳細については、「永続ボリューム」の「永続ボリュームクレームの拡大」を参照してください。

PVC を拡張した場合、ポッドに公開されるファイルシステムをポッドの再起動を要さずに自動拡張する機能を使用するには、Kubernetes クラスターの ExpandInUsePersistentVolumes 機能を有効にする必要があります。バージョン 1.15 以降の Kubernetes クラスターでは、この機能はデフォルトで有効です。PVC 拡張サポートの詳細については、「永続ボリューム」の「使用中の永続ボリュームクレームのリサイズ」を参照してください。

1 コマンドによるディスク拡張ワークフロー

ボリュームが自動拡張され、ポッドにマウントされているファイルシステムのサイズが自動で変更される環境では、ディスク拡張に以下の非常に簡潔なワークフローを採用できます。新しい構成を適用するには、構成ファイル( $VALUES_FILE )を編集し、コマンドを 1 つ実行します。

以下の例では、Kafka クラスターの各ブローカーに関連付けられているディスクを 10 GB から 100 GB に拡張しています。同じアプローチは、あらゆる Confluent Platform コンポーネントに適用できます。

  1. 構成ファイル( $VALUES_FILE )で、ストレージボリュームをアップデートします。

    以下は、Kafka ブローカー用のディスクを 100 Gi に増強する場合の例です。

    kafka:
      volume:
        data0: 100Gi # defaults to 10Gi if this wasn't specified before
    
    Copy
  2. helm upgrade コマンドを実行して、変更を適用します。

    以下に例を示します。

    helm upgrade --install kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --set kafka.enabled=true \
      --namespace operator
    
    Copy

このワークフローへの対応には、使用環境で以下が true に設定されていることが必要です。

  • Kafka (または他の Confluent Platform コンポーネント)のクラスターに関連付けられている StorageClass について、$VALUES_FILEallowVolumeExpansion: true と設定されている必要があります。

  • 使用するボリュームの種類で拡張がサポートされていることが必要です。

    本ドキュメントの制作時点で、拡張は GCE Persistent Disk、AWS EBS、Azure Disk などでサポートされている一方、Host Path などの種類のボリュームではサポートされていません。

  • ExpandInUsePersistentVolumes 機能が Kubernetes クラスターで有効になっている必要があります。

    バージョン 1.15 以降の Kubernetes クラスターでは、この機能はデフォルトで有効です。

2 コマンドによるディスク拡張ワークフロー

自動拡張の対象はボリュームだけであり、Kubernetes のバージョン 1.15 未満など、ポッド内にマウントされているファイルシステムのサイズが変更されない環境の場合は、上記のワークフローに加えて、新しいサイズがポッドのファイルシステムレベルで反映されるよう、クラスターをローリングする必要があります。

  1. 構成ファイル( $VALUES_FILE )で、ストレージボリュームをアップデートします。

    以下は、Kafka ブローカー用のディスクを 100 Gi に増強する場合の例です。

    kafka:
      volume:
        data0: 100Gi # defaults to 10Gi if this wasn't specified before
    
    Copy
  2. helm upgrade コマンドを実行して、変更を適用します。以下に例を示します。

    helm upgrade --install kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --set kafka.enabled=true \
      --namespace operator
    
    Copy
  3. 構成ファイル( $VALUES_FILE )に "ダミー" のアノテーションを設定して、クラスターのローリング再起動が安全に実行されるように Operator のトリガーを準備します。

    kafka:
      podAnnotations:
        platform.confluent.io/dummy: "any-string"
    
    Copy
  4. helm upgrade コマンドを実行して、変更を適用します。以下に例を示します。

    helm upgrade --install kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --set kafka.enabled=true \
      --namespace operator
    
    Copy

クラスターをローリングすると、拡張されたディスクがクラスターで使用可能になります。この段階で、"ダミー" アノテーションを削除する場合は、アノテーションを構成ファイル($VALUES_FILE)から削除し、helm upgrade コマンドを使用して変更を適用します。こうすることで、クラスターが再び安全にローリングされます。

このワークフローへの対応には、使用環境で以下が true に設定されていることが必要です。

  • Kafka (または他の Confluent Platform コンポーネント)クラスターに関連付けられている StorageClass について allowVolumeExpansion: true と設定されていることが必要です。

  • 使用するボリュームの種類で拡張がサポートされていることが必要です。

    本ドキュメントの制作時点で、拡張は GCE Persistent Disk、AWS EBS、Azure Disk などでサポートされている一方、Host Path などの種類のボリュームではサポートされていません。詳細については、ストレージプロバイダーのドキュメントを参照してください。

ディスクのスケールダウン

ディスクのスケールダウンは、Confluent Operator の現行バージョンではサポートされません。