Confluent Operator での認可の構成

ロールベースアクセス制御

このガイドでは、Operator を使用した Confluent Platform 用ロールベースアクセス制御(RBAC)のエンドツーエンドのセットアップについて詳しく説明します。このプロセスは、実際のユースケースや環境に合わせてさまざまな形に変更できます。

注釈

  • 現在、Operator を使用した RBAC は新規インストールで利用できます。
  • Operator では、複数の Kafka クラスターにわたる RBAC の一元管理はサポートされていません。

このガイドの例では、以下が想定されています。

  • $VALUES_FILE は「グローバル構成ファイルの作成」でセットアップする構成ファイルのことです。

  • Operator ドキュメントで簡潔で明快な例を提示できるよう、すべての構成パラメーターが構成ファイル( $VALUES_FILE )に指定されます。ただし、本稼働環境でのデプロイにおいて、Helm を使用して機密データを扱う場合には、--set または --set-file オプションを使用します。以下に例を示します。

    helm upgrade --install kafka \
     --set kafka.services.mds.ldap.authentication.simple.principal="cn=mds\,dc=test\,dc=com" \
     --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \
     --set kafka.enabled=true
    
  • operator は Confluent Platform のデプロイ先となる名前空間です。

  • コマンドはすべて、Confluent Operator のダウンロード先ディレクトリにある helm ディレクトリで実行されます。

このガイドで使用される例に限って、以下が想定されています。

  • Control Center にログインしてすべての Confluent Platform コンポーネントを正常に表示できるサンプルユーザーとして、ユーザー testadmin、パスワード testadmin という LDAP ユーザー/パスワードが作成済みです。
  • Metadata Service (MDS) で他のユーザーに関する LDAP クエリができるよう、LDAP の読み取り専用という最小限のアクセス許可を持つ LDAP ユーザー/パスワードとしてユーザー mds、パスワード Developer! を作成済みです。
  • すべての Confluent Platform コンポーネントについて、以下の LDAP ユーザー/パスワードを作成済みです。
    • Kafka: kafka/kafka-secret
    • Confluent REST API: erp/erp-secret
    • Confluent Control Center: c3/c3-secret
    • ksqlDB: ksql/ksql-secret
    • Schema Registry: sr/sr-secret
    • Replicator: replicator/replicator-secret
    • Connect: connect/connect-secret
  • Confluent Platform 内での RBAC のブートストラップ用にスーパーユーザー kafka を定義済みです。
  • ロールベースアクセス制御(RBAC)を使用した認可」で説明されている、Confluent Platform の RBAC 機能の概念やユースケースに習熟しています。

構成およびデプロイプロセスの概要は以下のとおりです。

  1. Confluent Operator 構成ファイル( $VALUES_FILE )で、必要なフィールドを有効にして設定します。
  2. Confluent Operator、ZooKeeper、Kafka をデプロイします。
  3. Confluent Platform コンポーネント Schema Registry、Connect、Replicator、ksqlDB、Control Center に必要なロールバインディングを追加します。
  4. Confluent Platform コンポーネントをデプロイします。

要件

Operator で RBAC を有効にして使用するための要件は以下のとおりです。

  • Confluent Platform で認証に使用できる LDAP サーバーが必要です。

  • Confluent REST API は、RBAC で自動で有効になり、RBAC が有効の場合には無効にできません。

    Confluent REST API へのアクセスには Kafka ブートストラップエンドポイント(MDS エンドポイントと同じ)を使用します。

  • LDAP に Confluent REST API 用のユーザーを作成し、「Kafka の構成」の説明に従ってユーザー名とパスワードを指定する必要があります。

  • ライセンスキーの追加」の説明に従って、有効な Schema Registry ライセンスを指定する必要があります。

グローバル構成

Operator 構成ファイル($VALUES_FILE)で、以下を設定します。

global:
  sasl:
    plain:
      username: kafka               ----- [1]
      password: kafka-secret        ----- [2]
  authorization:
    superUsers:                     ----- [3]
    - User:kafka
    rbac:
      enabled: true                 ----- [4]
  dependencies:
    mds:
      endpoint:                     ----- [5]
      publicKey: |-                 ----- [6]
        -----BEGIN PUBLIC KEY-----
        ...
        -----END PUBLIC KEY-----
  • [1] username を、ブローカー間通信や内部通信で使用されるユーザー ID に設定します( kafka など)。

  • [2] password を、[1] のユーザーのパスワードに設定します( kafka-secret など)。

  • [3] Kafka クラスターのブートストラップに使用するスーパーユーザーを指定します( kafka など)。

  • [4] enabled: true と設定して、RBAC を有効にします。

  • [5] endpoint に Kafka ブートストラップのエンドポイントを以下のように設定します。ロードバランサーと静的ホストベースルーティングについては、<kafka-bootstrap-host> はブートストラッププレフィックスとドメインから導出されます。それ以外については、<kafka-bootstrap-host> を指定してください。

    • MDS に HTTPS で内部アクセスする場合 : https://<kafka_name>.svc.cluster.local:8090
    • MDS に HTTP で内部アクセスする場合 : http://<kafka_name>.svc.cluster.local:8091
    • MDS に HTTPS でロードバランサーを使用して外部アクセスする場合 : https://<kafka_bootstrap_host> :443
    • MDS に HTTP でロードバランサーを使用して外部アクセスする場合 : http://<kafka_bootstrap_host> :80
    • MDS に NodePort を使用して外部アクセスする場合 : http(s) ://<kafka_bootstrap_host>:<portOffset+1>
    • MDS に静的ホストベースルーティングを使用して外部アクセスする場合 : https://<kafka_bootstrap_host> :8090
    • MDS に静的ポートベースルーティングを使用して外部アクセスする場合 : http(s) ://<kafka_bootstrap_host>:<portOffset+1>

    このエンドポイントは、Confluent REST API へのアクセスにも使用されます。

  • [6] パブリック/プライベートキーペアの作成の詳細については、「PEM キーペアの作成」を参照してください。

Kafka の構成

Confluent Operator の構成ファイル( $VALUES_FILE )で、Kafka に関する以下の値を設定します。

kafka:
  services:
    restProxy:
      authentication:
        username:                                            ----- [1]
        password:                                            ----- [2]
   password: ""
    mds:
      https:                                                 ----- [3]
      tokenKeyPair: |-                                       ----- [4]
        ----- BEGIN RSA PRIVATE KEY -----
        ...
        ----- END RSA PRIVATE KEY -----
      ldap:
        address: ldaps://ldap.operator.svc.cluster.local:636 ----- [5]
        authentication:
          simple:                                            ----- [6]
            principal: cn=mds,dc=test,dc=com
            credentials: Developer!
        configurations:                                      ----- [7]
          groupNameAttribute: cn
          groupObjectClass: group
          groupMemberAttribute: member
          groupMemberAttributePattern: CN=(.*),DC=test,DC=com
          groupSearchBase: dc=test,dc=com
          userNameAttribute: cn
          userMemberOfAttributePattern: CN=(.*),DC=test,DC=com
          userObjectClass: organizationalRole
          userSearchBase: dc=test,dc=com
  tls:
    enabled: true
    internalTLS: true                                       ----- [8]
    authentication:
      principalMappingRules:                                ----- [9]
      - RULE:^CN=([a-zA-Z0-9.]*).*$/$1/L
      - DEFAULT
    cacerts: |-
  • [1] username を、LDAP にセットアップした Confluent REST API ユーザーに設定します。
  • [2] password を、[1] で指定したユーザーのパスワードに設定します。
  • [3] MDS で HTTPS トラフィックを処理する場合は、https: true と設定します。
  • [4] tokenKeyPair を、MDS でトークンの署名に使用できる、PEM エンコードされた RSA キーペアに設定します。これらは、Confluent Platform コンポーネント間のトークン認証に必要です。パブリック/プライベートキーペアの作成の詳細については、「PEM キーペアの作成」を参照してください。
  • [5] address は、Confluent Platform で認証に使用される LDAP サーバーの URL です。セキュアな LDAPS URL を指定する場合は、使用する LDAP サーバーによって提示される証明書が MDS で信頼されるように kafka.tls.cacerts を構成する必要があります。
  • [6] principalcredentials は、MDS によって、LDAP サーバーでの自身の認証に使用されます。
  • [7] configurations は、使用する LDAP 設定に応じて構成してください。
  • [8] 外部アクセスに NodePort または静的ルーティングを使用する場合は、internalTLS: true と設定します。
  • [9] mTLS( kafka.tls.authentication.type: tls )の場合、プリンシパルは証明書から取得され、ユーザーは LDAP に存在している必要があります。プリンシパルのマッピングルールの使用の詳細については、「SSL リスナー用プリンシパルのマッピングルール(証明書からのプリンシパルの抽出)」を参照してください。

静的ホストベースルーティングを使用する RBAC の構成

静的ホストベースルーティングを使用する RBAC を有効にする場合は、構成情報の追加指定が必要です。

RBAC と外部アクセスに静的ホストベースルーティングを使用するよう Kafka を構成するには、以下の手順に従います。

  1. Kafka の構成」の説明に従って Kafka を構成およびデプロイします。

  2. internalTLS を有効にします。

    kafka:
      tls:
        internalTLS: true
    
  3. Ingress コントローラーをデプロイします。

    RBAC 対応の Kafka を外部に公開する場合、Ingress コントローラーをデプロイするとともに、SSL パススルー機能を有効にする必要があります。

    たとえば、次のコマンドでは、Kafka 用に NGINX コントローラーをインストールしています。

    helm upgrade --install ingress-with-sni stable/nginx-ingress \
      --set rbac.create=true \
      --set controller.publishService.enabled=true \
      --set controller.extraArgs.enable-ssl-passthrough="true" \
      --set tcp.8090="operator/kafka-bootstrap:8090"
    

    MDS に外部からアクセスできるよう、--set tcp.8090="operator/kafka-bootstrap-sni:8090" フラグが設定されています。

  4. 外部ポートに加えて MDS ポートを持つ、ClusterIP タイプのブートストラップサービスを作成します。

    以下に例を示します。

    1. bootstrap.yaml ファイルを以下の内容で作成します。

      apiVersion: v1
      kind: Service
      metadata:
        name: kafka-bootstrap
        namespace: operator
        labels:
          app: kafka-bootstrap
      spec:
        ports:
          - name: external
            port: 9092
            protocol: TCP
            targetPort: 9092
      selector:
        physicalstatefulcluster.core.confluent.cloud/name: kafka
        physicalstatefulcluster.core.confluent.cloud/version: v1
      type: ClusterIP
      
    2. 次のコマンドを実行して、上記のように設定されたブートストラップサービスを作成します。

      kubectl apply -f bootstrap.yaml -n <namespace>
      
  5. すべてのブローカーおよびブートストラップサービスについて、MDS ポートを Ingress リソースに追加します。

    以下に例を示します。

    1. NGINX Ingress コントローラーによって Kafka のブートストラップサービスと 3 つのブローカーが公開されるように、Ingress リソース用の ingress-resource.yaml を作成します。ドメイン名は mydevplatform.gcp.cloud です。ブートストラップの prefix/brokerPrefix/port はデフォルト値に設定されます。ルールは、指定された Kafka のブローカーとブートストラップサービスホストに適用されます。

      ホストは、同じ Ingress ロードバランサーの外部 IP に解決されることが必要です。

      apiVersion: extensions/v1beta1
      kind: Ingress
      metadata:
        name: ingress-with-sni
        annotations:
          nginx.ingress.kubernetes.io/ssl-passthrough: "true"
          kubernetes.io/ingress.class: nginx
          nginx.ingress.kubernetes.io/ssl-redirect: "false"
          ingress.kubernetes.io/ssl-passthrough: "true"
          nginx.ingress.kubernetes.io/backend-protocol: HTTPS
      spec:
        tls:
          - hosts:
              - b0.platformops.dev.gcp.devel.cpdev.cloud
              - b1.platformops.dev.gcp.devel.cpdev.cloud
              - b2.platformops.dev.gcp.devel.cpdev.cloud
              - kafka.platformops.dev.gcp.devel.cpdev.cloud
        rules:
          - host: kafka.platformops.dev.gcp.devel.cpdev.cloud
            http:
              paths:
                - backend:
                    serviceName: kafka-bootstrap
                    servicePort: 9092
                - backend:
                    serviceName: kafka-bootstrap
                    servicePort: 8090
          - host: b0.platformops.dev.gcp.devel.cpdev.cloud
            http:
              paths:
                - backend:
                    serviceName: kafka-0-internal
                    servicePort: 9092
          - host: b1.platformops.dev.gcp.devel.cpdev.cloud
            http:
              paths:
                - backend:
                    serviceName: kafka-1-internal
                    servicePort: 9092
          - host: b2.platformops.dev.gcp.devel.cpdev.cloud
            http:
              paths:
                - backend:
                    serviceName: kafka-2-internal
                    servicePort: 9092
      
    2. 次のコマンドを実行して、上記のように設定された Ingress リソースを作成します。

      kubectl apply -f ingress-resource.yaml -n <namespace>
      

静的ポートベースルーティングを使用する RBAC の構成

静的ポートベースルーティングを使用する RBAC を有効にする場合は、構成情報の追加指定が必要です。

RBAC と外部アクセスに静的ポートベースルーティングを使用するよう Kafka を構成するには、以下の手順に従います。

注意: MDS で処理するトラフィックが HTTPS ではなく HTTP の場合は、ポートの 8090 を 8091 に置き換えます。

  1. Kafka の構成」の説明に従って Kafka を構成およびデプロイします。

  2. 追加の MDS ポートがマップされた Ingress コントローラーをインストールします。

    たとえば、以下のコマンドでは、Helm を使用して NGINX コントローラーがインストールされます。

    helm install <release name>  stable/nginx-ingress --set controller.ingressClass=kafka \
      --set tcp.9093="operator/kafka-bootstrap:9092" \
      --set tcp.9095="operator/kafka-0-internal:9092" \
      --set tcp.9097="operator/kafka-1-internal:9092" \
      --set tcp.9099="operator/kafka-2-internal:9092" \
      --set tcp.9094="operator/kafka-bootstrap:8090" \
      --set tcp.9096="operator/kafka-0-internal:8090" \
      --set tcp.9098="operator/kafka-1-internal:8090" \
      --set tcp.9100="operator/kafka-2-internal:8090"
    
  3. MDS ポートを持つブートストラップサービスを作成します。

    以下に例を示します。

    1. bootstrap.yaml ファイルを以下の内容で作成します。

      apiVersion: v1
      kind: Service
      metadata:
        name: kafka-bootstrap
        namespace: operator
       labels:
       app: kafka-bootstrap
      spec:
        ports:
        - name: external
          port: 9092
          protocol: TCP
          targetPort: 9092
        - name: metadata
          port: 8090
          protocol: TCP
          targetPort: 8090
        selector:
          physicalstatefulcluster.core.confluent.cloud/name: kafka
          physicalstatefulcluster.core.confluent.cloud/version: v1
        type: ClusterIP
      
    2. 次のコマンドを実行して、上記のように設定された Ingress リソースを作成します。

      kubectl apply -f bootstrap.yaml -n <namespace>
      
  4. すべてのブローカーおよびブートストラップサービスについて、MDS ポートを Ingress リソースに追加します。

    以下に例を示します。

    1. NGINX Ingress コントローラーによって Kafka のブートストラップサービスと 3 つのブローカーが公開されるように、Ingress リソース用の ingress-resource.yaml を作成します。ドメイン名は mydevplatform.gcp.cloud です。ブートストラップの prefix/brokerPrefix/port はデフォルト値に設定されます。ルールは、指定された Kafka ブローカーとブートストラップホストに適用されます。

      apiVersion: networking.k8s.io/v1beta1
        kind: Ingress
        metadata:
          name: <ingress resource name>
          annotations:
            kubernetes.io/ingress.class: nginx
            nginx.ingress.kubernetes.io/rewrite-target: /
        spec:
          rules:
            - host: <host name>
              http:
                paths:
                  - path:
                    backend:
                      serviceName: kafka-bootstrap
                      servicePort: 9092
                  - path:
                    backend:
                      serviceName: kafka-0-internal
                      servicePort: 9092
                  - path:
                    backend:
                      serviceName: kafka-1-internal
                      servicePort: 9092
                  - path:
                    backend:
                      serviceName: kafka-2-internal
                      servicePort: 9092
                  - path:
                    backend:
                      serviceName: kafka-bootstrap
                      servicePort: 8090
                  - path:
                    backend:
                      serviceName: kafka-0-internal
                      servicePort: 8090
                  - path:
                    backend:
                      serviceName: kafka-1-internal
                      servicePort: 8090
                  - path:
                    backend:
                      serviceName: kafka-2-internal
                      servicePort: 8090
      
    2. 次のコマンドを実行して Ingress リソースを作成します。

      kubectl apply -f ingress-resource.yaml -n <namespace>
      

Confluent Platform コンポーネントの構成

Confluent Operator の構成ファイル( $VALUES_FILE )で、各 Confluent Platform コンポーネントに以下の値を設定します。

<component>:
  dependencies:
    mds:
      authentication:
        username:
        password:

LDAP サーバーのユーザー名とパスワードが、想定事項 の説明に従って設定済みであることが必要です。

この機密データを $VALUES_FILE には入力しない場合は、各 Confluent Platform コンポーネントに helm upgrade --install コマンドで構成を適用する際に --set フラグを使用します。

RBAC を使用する ksqlDB 構成

RBAC を使用する Confluent Platform 5.5.x イメージを Operator 1.6.x でデプロイすると、ksqlDB ポッドは Liveness Probe および Readiness Probe のチェックでエラーになります。この問題を回避するには、<operator home directory>/helm/confluent-operator/charts/ksql/templates/ksql-psc.yaml ファイルで /v1/metadata/id を Liveness Probe および Readiness Probe として構成します。次に例を示します。

liveness_probe:
  http:
    path: /v1/metadata/id
    port: 9088
  initial_delay_seconds: 30
  timeout_seconds: 10
readiness_probe:
  http:
    path: /v1/metadata/id
    port: 9088
  initial_delay_seconds: 30
  timeout_seconds: 10

Metadata Service (MDS) を使用する Kafka のデプロイ

MDS を構成したら、Kafka コンポーネントを「Confluent Operator および Confluent Platform のインストール」の説明に従って次の順序でデプロイします。

  1. Operator
  2. ZooKeeper
  3. Kafka

MDS 構成の確認

MDS にログインして、構成が正しいことを確認し、Kafka クラスター ID を取得します。コンポーネントのロールバインディングには Kafka クラスター ID が必要です。

次のコマンドで、https://<kafka_bootstrap_endpoint> を、構成ファイル( $VALUES_FILE )で global.dependencies.mds.endpoint に設定した値に置き換えます。

  1. MDS に Kafka スーパーユーザーとしてログインします。次に例を示します。

    confluent login \
     --url https://<kafka_bootstrap_endpoint> \
     --ca-cert-path <path-to-cacerts.pem>
    

    以下の場合は、--ca-cert-path フラグを渡す必要があります。

    • Kafka の構成 で、HTTPS トラフィックを処理する(kafka.services.mds.https: true)よう MDS を構成した場合。
    • MDS 証明書の発行に使用される CA が、コマンドを実行しているシステムから信頼されない場合。

    Kafka のユーザー名とパスワードを要求された場合は指定します。この例では kafkakafka-secret です。

    ログイン成功の確認応答があります。

  2. 以下のコマンドを実行して、公開リスナーが正しく構成されていることを確認します。

    curl -ik \
     -u '<kafka-user>:<kafka-user-password>' \
     https://<kafka_bootstrap_endpoint>/security/1.0/activenodes/https
    
  3. 以下のいずれかのコマンドを実行して、Kafka クラスター ID を取得します。

    confluent cluster describe --url https://<kafka_bootstrap_endpoint>
    
    curl -ik \
     https://<kafka_bootstrap_endpoint>/v1/metadata/id
    

    このトピックの以降の例で使用するために、上記コマンドで得られた出力 Kafka ID を環境変数 $KAFKA_ID として保存します。

Confluent Platform コンポーネントのプリンシパルへのロールの付与

このセクションでは、Confluent Platform コンポーネントのプリンシパルが正しくデプロイされ機能するようなロールバインディングを作成するワークフローについて詳しく見ていきます。

MDS 構成の確認」の最初の手順で説明しているように MDS にログインし、confluent iam rolebinding コマンドを以下のセクションに示すように実行します。

--principal オプションを、コンポーネント LDAP ユーザーを使用して User:<component-ldap-user> に設定します。このセクションのコマンドでは、想定事項 にリストされているコンポーネントユーザー例を使用します。

Schema Registry ロールバインディング

Schema Registry サービスをデプロイするため、必要なロールを Schema Registry ユーザーに付与します。

--schema-registry-cluster-idid_schemaregistry_operator に、より一般的には id_<SR-component-name>_<namespace> に設定します。 <SR-component-name> は構成ファイル( $VALUES_FILE )の schemaregistry.name の値、<namespace> は Schema Registry のデプロイ先である Kubernetes 名前空間です。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:sr \
  --role SecurityAdmin \
  --schema-registry-cluster-id id_schemaregistry_operator

--resourceGroup:id_schemaregistry_operator に、より一般的には Group:id_<SR-component-name>_<namespace> に設定します。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:sr \
  --role ResourceOwner \
  --resource Group:id_schemaregistry_operator

--resourceTopic:_schemas_schemaregistry_operator に、より一般的には Topic:_schemas_<SR-component-name>_<namespace> に設定します。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:sr \
  --role ResourceOwner \
  --resource Topic:_schemas_schemaregistry_operator
confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:sr \
  --role ResourceOwner  \
  --resource Topic:_confluent-license

Kafka Connect ロールバインディング

Connect サービスをデプロイするため、必要なロールを Connect ユーザーに付与します。

--connect-cluster-id を、<namespace>.<Connect-component-name> という形式で設定します。<Connect-component-name> は構成ファイル($VALUES_FILE)の connect.name の値、<namespace> は Connect のデプロイ先である Kubernetes 名前空間です。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:connect \
  --role SecurityAdmin \
  --connect-cluster-id operator.connectors

--resourceGroup:operator.connectors に、より一般的には <namespace>.<Connect-component-name> に設定します。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:connect \
  --role ResourceOwner \
  --resource Group:operator.connectors
confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:connect \
  --role DeveloperWrite \
  --resource Topic:_confluent-monitoring \
  --prefix

--resourceTopic:operator.connectors- に、より一般的には <namespace>.<Connect-component-name>- に設定します。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:connect \
  --role ResourceOwner \
  --resource Topic:operator.connectors- \
  --prefix

Confluent Replicator ロールバインディング

Replicator サービスをデプロイするため、必要なロールを Replicator ユーザーに付与します。

--resourceGroup:operator.replicator に、より一般的には <namespace>.<Replicator-component-name> に設定します。 <Replicator-component-name> は構成ファイル( $VALUES_FILE )の replicator.name の値、<namespace> は Replicator のデプロイ先である Kubernetes 名前空間です。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:replicator \
  --role ResourceOwner \
  --resource Group:operator.replicator
confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:replicator \
  --role DeveloperWrite \
  --resource Topic:_confluent-monitoring \
  --prefix

--resourceTopic:operator.replicator- に、より一般的には <namespace>.<Replicator-component-name>- に設定します。 <Replicator-component-name> は構成ファイル( $VALUES_FILE )の replicator.name の値、<namespace> は Replicator のデプロイ先である Kubernetes 名前空間です。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:replicator \
  --role ResourceOwner \
  --resource Topic:operator.replicator- \
  --prefix

ksqlDB ロールバインディング

ksqlDB サービスをデプロイするため、必要なロールを ksqlDB ユーザーに付与します。

--ksql-cluster-idoperator.ksql_ に、より一般的には <namespace>.<ksqldb-component-name>_ に設定します。 <ksql-component-name> は構成ファイル( $VALUES_FILE )の ksql.name の値、<namespace> は ksqlDB のデプロイ先である Kubernetes 名前空間です。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:ksql \
  --role ResourceOwner \
  --ksql-cluster-id operator.ksql_ \
  --resource KsqlCluster:ksql-cluster
confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --principal User:ksql \
  --role ResourceOwner \
  --resource Topic:_confluent-ksql-operator.ksql_ \
  --prefix

Confluent Control Center ロールバインディング

Control Center サービスをデプロイするため、必要なロールを Control Center ユーザーに付与します。

confluent iam rolebinding create \
  --principal User:c3 \
  --role SystemAdmin \
  --kafka-cluster-id $KAFKA_ID

その他の Confluent Platform コンポーネントのデプロイ

各種 Confluent Platform コンポーネントにロールを付与すると、これらのコンポーネントを Confluent Operator で正常にデプロイすること、およびコンポーネントの相互通信を必要に応じて認可することができます。

Confluent Operator および Confluent Platform のインストール」の手順に従って、その他の Confluent Platform をデプロイします。

Confluent Platform の管理を可能にするための、Confluent Control Center ユーザーへのロール付与

Control Center ユーザーには、Control Center UI で表示および管理する Confluent Platform コンポーネントおよびリソースごとに個別のロールが必要です。これから説明するように、ユーザーに明示的なアクセス許可を付与してください。

以降の例では、testadmin プリンシパルが Control Center UI ユーザーとして使用されています。

testadmin にはアクセス許可がまだ与えられていないので、このユーザーは Confluent Control Center にログインできますが、以降のセクションの説明に従って適切なアクセス許可が付与されるまでは何も表示されません。

Kafka クラスターを表示および管理するためのアクセス許可の付与

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --role ClusterAdmin \
  --principal User:testadmin

Schema Registry 情報を表示および管理するためのアクセス許可の付与

--schema-registry-cluster-id を、id_.<Schema-Registry-component-name>_<namespace> という形式で設定します。以下の例では id_schemaregistry_operator と指定しています。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id id_schemaregistry_operator \
  --principal User:testadmin \
  --role SystemAdmin

Connect クラスターを表示および管理するためのアクセス許可の付与

--connect-cluster-id を、<namespace>.<Connect-component-name> という形式で設定します。以下の例では operator.connectors と指定しています。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --connect-cluster-id operator.connectors \
  --principal User:testadmin \
  --role SystemAdmin

Replicator クラスターを表示および管理するためのアクセス許可の付与

--connect-cluster-id を、<namespace>.<Connect-component-name> という形式で設定します。Replicator は Connect クラスター内で実行されることから、Replicator のロールバインディングには Connect のクラスター ID が必要です。以下の例では operator.connectors と指定しています。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --connect-cluster-id operator.replicator \
  --principal User:testadmin \
  --role SystemAdmin

ksqlDB クラスターを表示および管理するためのアクセス許可の付与

--ksql-cluster-id を、<namespace>.<ksqldb-component-name>_ という形式で設定します。以下の例では operator.ksql_ と指定しています。

confluent iam rolebinding create \
  --kafka-cluster-id $KAFKA_ID \
  --ksql-cluster-id operator.ksql_ \
  --resource KsqlCluster:ksql-cluster \
  --principal User:testadmin \
  --role ResourceOwner

アクセス制御リスト

アクセス制御リスト認可 を使用する Kafka をデプロイするには、設定を構成ファイル( $VALUES_FILE )に指定します。

認可に ACL を使用するよう Confluent Platform を構成する場合は、以下のような設定が推奨されます。

global:
  authorization:
    simple:
      enabled: true
    superUsers:
    - User:test

以下の設定は、後方互換性のために引き続きサポートされています。上記の構成と下記の構成を両方使用して ACL を有効にした場合は、上記の構成が優先されます。

kafka:
  options:
    acl:
      enabled: true
      # Value for super.users server property in the format, User:UserName;
      supers: "User:test"

注釈

options:acl:supers: が変更されると、Kafka クラスターのローリングアップグレードがトリガーされます。

上記のどちらの例でもスーパーユーザー test が構成され、Kafka クラスターとの通信時にすべての Confluent Platform コンポーネントによって使用されます。このスーパーユーザーは、コンポーネントによる内部トピックの作成を可能にするために必要です。

ACL 対応の mTLS 認証

mTLS 認証が有効の場合、Confluent Platform では認証されるプリンシパルの ID が、クライアント証明書から取得されるデータをもとに決定されます。ユーザー名の決定には、証明書の Subject セクションが使用されます。たとえば、次の証明書のサブジェクト( C=US, ST=CA, L=Palo Alto )の場合、ユーザー名は User:L=Palo Alto,ST=CA,C=US のように取得されます。

Certificate:
    Data:
        Version: 3 (0x2)
        Serial Number:
            omitted...
    Signature Algorithm: sha256WithRSAEncryption
        Issuer: C=US, ST=Palo Alto, L=CA, O=Company, OU=Engineering, CN=TestCA
        Validity
            Not Before: Mar 28 16:37:00 2019 GMT
            Not After : Mar 26 16:37:00 2024 GMT
        Subject: C=US, ST=CA, L=Palo Alto
        Subject Public Key Info:
            Public Key Algorithm: rsaEncryption
                Public-Key: (2048 bit)
                Modulus:

User:ANONYMOUS は、内部クライアントやブローカー間の通信に使用されるデフォルトユーザーです。

Kafka が mTLS 認証 を使用するように構成されている場合は、User:ANONYMOUS に加えて証明書ユーザー名が必要です。

一般に、Confluent Platform のユーザーやクライアントをいくつも持つことになります。複数のユーザー/クライアントの間でクライアント証明書を共有することも考えられますが、ベストプラクティスとしては、ユーザーや証明書それぞれに一意のクライアント証明書を発行し、各証明書が一意のサブジェクト/ユーザー名を持つようにしてください。各ユーザー/クライアントについて、Confluent Platform 内でアクセスが許可される対象を決定し、該当するクライアント証明書のサブジェクト/ユーザー名に対応する ACL が作成されていることを確認してください。「ACL を使用した認可」の手順に従って、Kafka オブジェクトの ACL 認可を有効にしてください。