Confluent Operator および Confluent Platform のインストール

このトピックでは Confluent Operator および Confluent Platform のデプロイ手順について説明します。

このガイドの例では、以下が想定されています。

  • $VALUES_FILE は「グローバル構成ファイルの作成」でセットアップする構成ファイルのことです。

  • Operator ドキュメントで簡潔で明快な例を提示できるよう、すべての構成パラメーターが構成ファイル( $VALUES_FILE )に指定されます。ただし、本稼働環境でのデプロイにおいて、Helm を使用して機密データを扱う場合には、--set または --set-file オプションを使用します。以下に例を示します。

    helm upgrade --install kafka \
     --set kafka.services.mds.ldap.authentication.simple.principal="cn=mds\,dc=test\,dc=com" \
     --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \
     --set kafka.enabled=true
    
    Copy
  • operator は Confluent Platform のデプロイ先となる名前空間です。

  • コマンドはすべて、Confluent Operator のダウンロード先ディレクトリにある helm ディレクトリで実行されます。

ステップ 1. Confluent Operator のインストール

  1. Confluent Operator バンドルのダウンロード先ディレクトリの helm ディレクトリに移動します。

  2. 以下のコマンドを使用して、Operator をインストールします(例として operator 名前空間を使用)。

    helm upgrade --install \
      operator \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set operator.enabled=true
    
    Copy

    上記コマンドの --set operator.enabled=true で、デフォルト設定の false を変更しています。このパラメーター変更により、コンポーネントはインストール直後に有効となって起動されます。

  3. Operator が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。

    kubectl get pods -n operator
    
    Copy

    Operator サービスについて以下の例のような出力が表示され、ステータスが Running になっています。

    NAME                           READY   STATUS    RESTARTS   AGE
    cc-operator-76c54d65cd-vgm5w   1/1     Running   0          7m28s
    
    Copy

ステップ 2. ZooKeeper のインストール

  1. Operator が実行中であることを確認したら、以下のコマンドを入力して ZooKeeper をインストールします。

    helm upgrade --install \
      zookeeper \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set zookeeper.enabled=true
    
    Copy

    以下のような出力が表示されます。

    NAME: zookeeper
    LAST DEPLOYED: Wed Jan  8 14:51:26 2020
    NAMESPACE: operator
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    NOTES:
    Zookeeper Cluster Deployment
    
    Zookeeper cluster is deployed through CR.
    
      1. Validate if Zookeeper Custom Resource (CR) is created
    
         kubectl get zookeeper -n operator | grep zookeeper
    
      2. Check the status/events of CR: zookeeper
    
         kubectl describe zookeeper zookeeper -n operator
    
      3. Check if Zookeeper cluster is Ready
    
         kubectl get zookeeper zookeeper -ojson -n operator
    
         kubectl get zookeeper zookeeper -ojsonpath='{.status.phase}' -n operator
    
      4. Update/Upgrade Zookeeper Cluster
    
         The upgrade can be done either through the ``helm upgrade`` command or by editing the CR directly as below;
    
         kubectl edit zookeeper zookeeper  -n operator
    
    Copy
  2. ZooKeeper が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。

    kubectl get pods -n operator
    
    Copy

    ZooKeeper について以下の例のような出力が表示され、ステータスが Running になっています。

    NAME                           READY   STATUS    RESTARTS   AGE
    zookeeper-0                    1/1     Running   0          4h30m
    zookeeper-1                    1/1     Running   0          4h30m
    zookeeper-2                    1/1     Running   0          4h30m
    
    Copy

ステップ 3. Kafka ブローカーのインストール

  1. Kafka をインストールするには、以下のコマンドを入力します。

    helm upgrade --install \
      kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set kafka.enabled=true
    
    Copy

    以下のような出力が表示されます。

    NAME: kafka
    LAST DEPLOYED: Wed Jan  8 15:07:46 2020
    NAMESPACE: operator
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    NOTES:
    Kafka Cluster Deployment
    
    Kafka Cluster is deployed to kubernetes through CR Object
    
    
      1. Validate if Kafka Custom Resource (CR) is created
    
         kubectl get kafka -n operator | grep kafka
    
      2. Check the status/events of CR: kafka
    
         kubectl describe kafka kafka -n operator
    
      3. Check if Kafka cluster is Ready
    
         kubectl get kafka kafka -ojson -n operator
    
         kubectl get kafka kafka -ojsonpath='{.status.phase}' -n operator
    
    ... output omitted
    
    Copy
  2. Kafka が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。

    kubectl get pods -n operator
    
    Copy

    Kafka について以下の例のような出力が表示され、ステータスが Running になっています。

    NAME                           READY   STATUS    RESTARTS   AGE
    kafka-0                        1/1     Running   0          4h20m
    kafka-1                        1/1     Running   0          4h20m
    kafka-2                        1/1     Running   0          4h20m
    
    Copy

ステップ 4. Schema Registry のインストール

  1. Schema Registry をインストールするには、以下のコマンドを入力します。

    helm upgrade --install \
      schemaregistry \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set schemaregistry.enabled=true
    
    Copy

    以下のような出力が表示されます。

    NAME: schemaregistry
    LAST DEPLOYED: Thu Jan  9 15:51:21 2020
    NAMESPACE: operator
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    NOTES:
    Schema Registry is deployed through PSC. Configure Schema Registry through REST Endpoint
    
      1. Validate if schema registry cluster is running
    
         kubectl get pods -n operator | grep schemaregistry
    
      2. Access
    
        Internal REST Endpoint : http://schemaregistry:8081  (Inside kubernetes)
    
        OR
    
        http://localhost:8081 (Inside Pod)
    
        More information about schema registry REST API can be found here,
    
        https://docs.confluent.io/platform/current/schema-registry/docs/api.html
    
    Copy
  2. Schema Registry が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。

    kubectl get pods -n operator
    
    Copy

    Schema Registry について以下の例のような出力が表示され、ステータスが Running になっています。

    NAME                           READY   STATUS    RESTARTS   AGE
    schemaregistry-0               1/1     Running   0          4h18m
    schemaregistry-1               1/1     Running   0          4h18m
    
    Copy

ステップ 5. Kafka Connect のインストール

注釈

コネクターを使用する予定の場合は、Connect をインストールする前に、「Confluent コネクターのデプロイ」の手順に従って、コネクターが含まれるように Connect イメージを拡張します。

  1. Connect をインストールするには、以下のコマンドを入力します。

    helm upgrade --install \
      connectors \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set connect.enabled=true
    
    Copy
  2. Connect が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。

    kubectl get pods -n operator
    
    Copy

    Connect について以下の例のような出力が表示され、ステータスが Running になっています。

    NAME                           READY   STATUS    RESTARTS   AGE
    connectors-0                   1/1     Running   0          4h15m
    connectors-1                   1/1     Running   0          4h15m
    
    Copy

ステップ 6. Confluent Replicator のインストール

  1. Replicator をインストールするには、以下のコマンドを入力します。

    helm upgrade --install \
      replicator \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set replicator.enabled=true
    
    Copy
  2. Replicator が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。

    kubectl get pods -n operator
    
    Copy

    Replicator について以下の例のような出力が表示され、ステータスが Running になっています。

    NAME                           READY   STATUS    RESTARTS   AGE
    replicator-0                   1/1     Running   0          4h18m
    replicator-1                   1/1     Running   0          4h18m
    
    Copy

ステップ 7. Confluent Control Center のインストール

  1. Confluent Control Center をインストールするには、以下のコマンドを入力します。

    helm upgrade --install \
      controlcenter \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set controlcenter.enabled=true
    
    Copy
  2. Confluent Control Center が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。

    kubectl get pods -n operator
    
    Copy

    Confluent Control Center について以下の例のような出力が表示され、ステータスが Running になっています。

    NAME                           READY   STATUS    RESTARTS   AGE
    controlcenter-0                1/1     Running   0          4h18m
    
    Copy

ステップ 8. ksqlDB のインストール

  1. ksqlDB をインストールするには、以下のコマンドを入力します。

    helm upgrade --install \
      ksql \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set ksql.enabled=true
    
    Copy
  2. ksqlDB が正常にインストールされ、実行されていることを確認するには、以下のコマンドを入力します。

    kubectl get pods -n operator
    
    Copy

    ksqlDB について以下の例のような出力が表示され、ステータスが Running になっています。

    NAME                           READY   STATUS    RESTARTS   AGE
    ksql-0                         1/1     Running   0          21m
    ksql-1                         1/1     Running   0          21m
    
    Copy

ステップ 9. デプロイのテスト

デプロイをテストおよび検証するには、以下の手順に従います。

内部アクセスの検証

内部通信を検証するには、以下の手順に従います。

  1. ローカルマシンで以下のコマンドを入力して、クラスターの名前空間情報を表示します(例として名前空間 operator を使用)。この情報には、内部通信の検証に必要なブートストラップエンドポイントが含まれています。

    kubectl get kafka -n operator -oyaml
    
    Copy

    ブートストラップエンドポイントは bootstrap.servers の行に示されます。

    ... omitted
    
       internalClient: |-
          bootstrap.servers=kafka:9071
    
    Copy
  2. ローカルマシンで kubectl exec を使用して、クラスター内のポッドのいずれかで bash セッションを開始します。この例では、デフォルト名 kafka-0 のポッドを、デフォルト名 kafka の Kafka クラスターで使用します。

    kubectl -n operator exec -it kafka-0 bash
    
    Copy
  3. ポッドで、kafka.properties という名前のファイルを作成し、内容を入力します。コンテナーにはテキストエディターがインストールされていないので、このファイルの作成には以下の例に示すように cat コマンドを使用します。ファイルを保存するには、Ctrl + D キーを押します。

    注釈

    この例には、デフォルトの SASL/PLAIN セキュリティパラメーターが示されています。本稼働環境では、追加のセキュリティが必要です。詳細については、「Confluent Operator でのセキュリティの構成」を参照してください。

    cat << EOF > kafka.properties
    bootstrap.servers=kafka:9071
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
    sasl.mechanism=PLAIN
    security.protocol=SASL_PLAINTEXT
    EOF
    
    Copy
  4. ポッドで、以下のコマンドを使用してブートストラップサーバーへのクエリを実行します。

    kafka-broker-api-versions --command-config kafka.properties --bootstrap-server kafka:9071
    
    Copy

    3 つの Kafka ブローカーそれぞれにつき、以下のような出力が表示されます。

    kafka-1.kafka.operator.svc.cluster.local:9071 (id: 1 rack: 0) -> (
       Produce(0): 0 to 7 [usable: 7],
       Fetch(1): 0 to 10 [usable: 10],
       ListOffsets(2): 0 to 4 [usable: 4],
       Metadata(3): 0 to 7 [usable: 7],
       LeaderAndIsr(4): 0 to 1 [usable: 1],
       StopReplica(5): 0 [usable: 0],
       UpdateMetadata(6): 0 to 4 [usable: 4],
       ControlledShutdown(7): 0 to 1 [usable: 1],
       OffsetCommit(8): 0 to 6 [usable: 6],
       OffsetFetch(9): 0 to 5 [usable: 5],
       FindCoordinator(10): 0 to 2 [usable: 2],
       JoinGroup(11): 0 to 3 [usable: 3],
       Heartbeat(12): 0 to 2 [usable: 2],
    
    ... omitted
    
    Copy

    この出力をもって、クラスター内での内部通信の検証となります。

外部アクセスの検証

Kafka への外部アクセス」の説明に従って、Kafka への外部アクセスと追加した DNS エントリを有効にしたら、外部通信を検証するために以下の手順に従います。

注釈

この例では、デフォルトの Confluent Platform コンポーネント名とデフォルトの Kafka ブートストラッププレフィックス kafka が使用されます。

  1. ローカルマシンで、 Confluent Platform をダウンロード します。Confluent CLI は、ダウンロードして PATH と必要な環境変数を設定するだけで使用できます。ローカルマシンで Confluent Platform を起動する必要はありません。

    外部通信の検証には、ローカルマシンで実行されている Confluent CLI を使用します。Confluent CLI は Confluent Platform に含まれています。

  2. ローカルマシンで、外部クライアント用のブートストラップサーバーエンドポイントを取得するためのコマンドを実行します。

    kubectl get kafka -n operator -oyaml
    
    Copy

    以下の出力例で、ブートストラップサーバーエンドポイントは kafka.mydomain:9092 です。

    ... omitted
    
    externalClient: |-
       bootstrap.servers=kafka.mydomain:9092
       sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
       sasl.mechanism=PLAIN
       security.protocol=SASL_PLAINTEXT
    
    Copy
  3. Confluent Platform がローカル実行されているローカルマシンで、kafka.properties という名前のファイルを作成し、以下の内容を入力します。上記の手順で取得した外部エンドポイントを bootstrap.servers に割り当てます。

    bootstrap.servers=<kafka bootstrap endpoint>
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
    sasl.mechanism=PLAIN
    security.protocol=SASL_PLAINTEXT
    
    Copy

    注釈

    この例には、デフォルトの SASL/PLAIN セキュリティパラメーターが示されています。本稼働環境では、追加のセキュリティが必要です。詳細については、「Confluent Operator でのセキュリティの構成」を参照してください。

  4. ローカルマシンで、ブートストラップエンドポイント <kafka bootstrap endpoint> を使用してトピックを作成します。以下の例では、パーティションが 1 つ、レプリカが 3 つのトピックが作成されます。

    kafka-topics --bootstrap-server <kafka bootstrap endpoint> \
    --command-config kafka.properties \
    --create --replication-factor 3 \
    --partitions 1 --topic example
    
    Copy
  5. ローカルマシンで、ブートストラップエンドポイント <kafka bootstrap endpoint> を使用して新しいトピックを生成します。これ 1 つですべての Kafka ブローカーにゲートウェイアクセスが提供されることから、ブートストラップサーバーエンドポイントは必要な唯一の Kafka ブローカーエンドポイントであることにご注意ください。

    seq 10000 | kafka-console-producer \
    --topic example --broker-list <kafka bootstrap endpoint> \
    --producer.config kafka.properties
    
    Copy
  6. ローカルマシンの 別のターミナル で、kafka.properties を作成したディレクトリから、Confluent CLI コマンドを発行して新しいトピックを消費します。

    kafka-console-consumer --from-beginning \
    --topic example --bootstrap-server <kafka bootstrap endpoint> \
    --consumer.config kafka.properties
    
    Copy

これらの手順が正常に完了したことをもって、クラスターとの外部通信の検証となります。

Confluent Control Center の外部アクセスの検証

Confluent Platform クラスターに Control Center を使用してアクセスするには、以下の手順に従います。この手順に先立ち、外部ロードバランサーの構成 の説明に従って、Confluent Control Center 用の外部ロードバランサーを有効にし、DNS エントリを追加します。

  1. ローカルマシンで、以下のコマンドを入力して、デフォルトの Confluent Control Center エンドポイントへのポートフォワーディングをセットアップします。

    kubectl port-forward svc/controlcenter 9021:9021 -n operator
    
    Copy
  2. ブラウザーで Control Center に接続します。

    http://localhost:9021/
    
    Copy
  3. Control Center にログインします。基本認証の認証情報は構成ファイル($VALUES_FILE)に設定されています。以下の例で、userID は admin、パスワードは Developer1 です。

    ##
    ## C3 authentication
    ##
    auth:
      basic:
        enabled: true
        ##
        ## map with key as user and value as password and role
        property:
          admin: Developer1,Administrators
          disallowed: no_access
    
    Copy

重要

Confluent Control Center の基本認証は、開発のテストに使用できます。一般に、この種類の認証は本稼働環境では無効にされ、LDAP はユーザーアクセス用に構成されます。LDAP パラメーターは、Control Center の値ファイルで提供されます。