Confluent Operator でのネットワークの構成

このドキュメントでは、Kafka などの Confluent Platform コンポーネントにアクセスするための、Confluent Operator の構成オプションについて説明します。

このガイドの例では、以下が想定されています。

  • $VALUES_FILE は「グローバル構成ファイルの作成」でセットアップする構成ファイルのことです。

  • Operator ドキュメントで簡潔で明快な例を提示できるよう、すべての構成パラメーターが構成ファイル( $VALUES_FILE )に指定されます。ただし、本稼働環境でのデプロイにおいて、Helm を使用して機密データを扱う場合には、--set または --set-file オプションを使用します。以下に例を示します。

    helm upgrade --install kafka \
     --set kafka.services.mds.ldap.authentication.simple.principal="cn=mds\,dc=test\,dc=com" \
     --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \
     --set kafka.enabled=true
    
  • operator は Confluent Platform のデプロイ先となる名前空間です。

  • コマンドはすべて、Confluent Operator のダウンロード先ディレクトリにある helm ディレクトリで実行されます。

Kafka へのアクセスの構成

Kafka への外部アクセス

外部クライアントによる Kafka との接続用に、以下のいずれかの方法で Kafka を構成する必要があります。

  • ロードバランサー : クライアントによる Kafka との接続に、クラウドプロバイダーのロードバランサーが使用されます。
  • ノードポート : クライアントによる Kafka との接続に、Kubernetes ワーカーノードの指定された静的ポート(NodePort)(または、指定されたポートにトラフィックを転送できるお客様管理のネットワークインフラストラクチャ)が使用されます。
  • 静的ポートベースルーティング : クライアントによる Kafka との接続は、Kubernetes Ingress コントローラーによってポートベースルーティングを使用して管理されます。
  • 静的ホストベースのルーティング : クライアントによる Kafka との接続は、Kubernetes Ingress コントローラーによってホストベースルーティングを使用して管理されます。

Kafka デプロイにおける Operator で有効にできるのは、上記方法のいずれか 1 つのみで、いずれかの方法で外部アクセスを有効にすると、別の外部アクセス方法には変更できません。

ロードバランサーを使用した Kafka への外部アクセス

クライアントによる Kafka クラスターへのアクセスでは、まずブートストラップサーバーへの接続が行われ、クラスターに属するすべてのブローカーのメタデータリストが取得されます。次に、クライアントにより、関心の対象であるブローカーのアドレスが認識され、データを生成または消費するためにブローカーへの直接接続が行われます。ロードバランサーを使用するように構成すると、Operator により、ブートストラップサーバー用のロードバランサーに加えて、各ブローカー用のロードバランサーが作成されます。Kafka ブローカーが N 個であれば、Operator によってロードバランサーサービスが N+1 個作成されます。

  • 1 個は、初期接続と Kafka クラスターに関するメタデータの受信に使用するブートストラップサービスとして作成されます。
  • 残りの N 個のサービスは、各ブローカーにつき 1 個で、ブローカーに直接対応します。

重要

外部ロードバランサーを構成する際には、TLS を使用してエンドポイントを保護することをお勧めします。

Kafka 用の外部ロードバランサーを構成するには、以下の手順に従います。

  1. 構成ファイル( $VALUES_FILE )の以下のパラメーターを使用して、Kafka を構成し、デプロイします。

    kafka:
      loadBalancer:
        enabled: true          ----- [1]
        type:                  ----- [2]
        domain:                ----- [3]
        bootstrapPrefix:       ----- [4]
        brokerPrefix:          ----- [5]
        annotations:           ----- [6]
    
    • [1] enabled: true と設定して、ロードバランサーを有効にします。

    • [2](省略可能)外部ロードバランサーについて、type: external と設定するか、設定を省略します。ロードバランサーは、デフォルトで外部ロードバランサーです。

    • [3](必須) domain を、クラスターが実行されているドメインに設定します。

    • [4](省略可能) bootstrapPrefix を使用して、デフォルトの Kafka ブートストラップロードバランサープレフィックスを変更します。デフォルトのブートストラッププレフィックスは Kafka コンポーネント名( kafka )です。

      この値は、使用ドメインの DNS エントリに使用されます。ブートストラップ DNS 名は <bootstrapPrefix>.<domain> となります。

      設定されていない場合、デフォルトの Kafka ブートストラップ DNS 名は kafka.<domain> です。

      ネットワーク計画の一環として、複数の Kafka クラスターを実行する場合に DNS の競合を回避できるよう、デフォルトプレフィックスをコンポーネントごとに変えることを検討してください。

    • [5](省略可能) brokerPrefix: を使用して、デフォルトの Kafka ブローカープレフィックスを変更します。デフォルトの Kafka ブローカープレフィックスは b です。

      これらは、使用ドメインの DNS エントリに使用されます。ブローカー DNS 名は <brokerPrefix>0.<domain><brokerPrefix>1.<domain>、などとなります。

      設定されていない場合、デフォルトのブローカー DNS 名は b0.<domain>b1.<domain>、などとなります。

      ネットワーク計画の一環として、複数の Kafka クラスターを実行する場合に DNS の競合を回避できるよう、デフォルトプレフィックスをコンポーネントごとに変えることを検討してください。

    • [6](省略可能)アプリケーション固有またはプロバイダー固有の設定を追加するには、annotations: を使用します。

      AWS 用の Kubernetes ロードバランサーアノテーションの詳細については、「Service」を参照してください。

    以下に示すのは、Kafka 用の外部ロードバランサーを作成するための構成ファイル( $VALUES_FILE )のセクション例です。

    kafka:
      loadBalancer:
        enabled: true
        domain: mydevplatform.gcp.cloud
        bootstrapPrefix: kafka-lb
        brokerPrefix: kafka-broker
    
  2. Kafka のブートストラップサーバーとブローカー用に DNS エントリを追加します。

    外部ロードバランサーが作成されたら、Kafka ブローカーロードバランサーと Kafka ブートストラップロードバランサーそれぞれのパブリック IP と関連付けられた DNS エントリを、DNS テーブル(または、プロバイダー環境によって認識された DNS エントリの取得に使用する手段)に追加します。

    重要

    複数のクラスターを単一のドメインに追加する場合に DNS の競合を回避できるよう、デフォルトプレフィックスをコンポーネントごとに変えてください。

    Kafka DNS エントリを取得するには以下が必要です。

    • ステップ #1 の構成ファイル( $VALUES_FILE )で指定した domain ([2])名。

    • Kafka ロードバランサーの外部 IP。

      外部 IP を取得するには、以下のコマンドを使用します。

      kubectl get services -n <namespace>
      
    • Kafka のブートストラッププレフィックスとブローカープレフィックス

    追加する DNS テーブルエントリを以下の出力例に示します。使用パラメーターは次のとおりです。

    • ドメイン : mydevplatform.gcp.cloud
    • デフォルトのプレフィックス/レプリカ番号を使用するブローカーレプリカ 3 個 : b0b1b2
    • Kafka ブートストラッププレフィックス : kafka
    DNS name                           Internal IP             External IP
    b0.mydevplatform.gcp.cloud         10.47.245.57            192.50.14.35
    b1.mydevplatform.gcp.cloud         10.47.240.85            192.50.28.28
    b2.mydevplatform.gcp.cloud         10.35.186.46            192.50.64.18
    kafka.mydevplatform.gcp.cloud      10.47.250.36            192.50.34.20
    

NodePort サービスを使用した、Kafka への外部アクセス

Kafka を外部クライアントに公開するために、NodePort を使用して Operator を構成する場合、Kubernetes では Kafka のブートストラップサーバーとブローカーの各ノードに異なるポートが割り当てられます。特定のポートに対するリクエストは、そのポートが構成されたブートストラップサーバーまたはブローカーに転送されます。

Kafka を NodePort 経由で外部クライアントに公開するために、Operator を構成する場合、Kubernetes では Kafka のブートストラップサーバーと Kafka の各ブローカーに一意のポートが割り当てられます。ブートストラップサーバーと Kafka のブローカーはそれぞれ別個のポートからアクセスできます。

ブローカーが N 個ある Kafka クラスターには、N+1 個の NodePort サービスが作成されます。

  • ブートストラップサーバー用に 1 個。初期接続用
  • N 個のサービス。各ブローカーにつき 1 個で、その後ブローカーとの直接接続で使用

ブローカーが N 個ある、RBAC が有効の Kafka クラスターには、(N+1)*2 個の NodePort サービスが作成されます。

  • ブートストラップサーバー用に 1 個。初期接続用
  • ブートストラップサーバー上の MDS 用に 1 個
  • N 個のサービス。各ブローカーにつき 1 個で、その後ブローカーとの直接接続で使用
  • N 個のサービス。各ブローカー上の MDS につき 1 個

ポートへのトラフィックはすべて、以下のように Kafka ポッドへルーティングされます。portOffset は、Kafka に構成した開始ポート番号です。

  • ブートストラップエンドポイント:

    <host>:<portOffset>
    
  • N 番目のブローカーレプリカの、外部にアドバタイズされるエンドポイント:

    <host>:<portOffset + N*2>
    
  • RBAC が有効の場合の MDS ブートストラップエンドポイント:

    <host>:<portOffset + 1>
    
  • RBAC が有効の場合の、N 番目のブローカーの、MDS にアドバタイズされるリスナーエンドポイント:

    <host>:<portOffset + 1 + N*2>
    

Kafka との外部通信に NodePort サービスを使用するには、以下の手順に従います。

  1. Kubernetes クラスターに属する単一または複数のノードのアドレスを使用して、DNS レコードを作成します。

  2. ノードポートを持つ Kafka を構成し、デプロイします。

    kafka:
      nodePort:
        enabled: true            ----- [1]
        host:                    ----- [2]
        portOffset:              ----- [3]
        annotations: {}          ----- [4]
    
    • [1] enabled: true と設定して、NodePort ベースの外部アクセスを有効にします。

    • [2](必須) host に、ステップ #1 で作成した DNS レコードのホスト名を指定します。

    • [3](必須)最初のノードポート portOffset を指定します。Kafka ブートストラップサービスによって、この portOffset ポートが取得されます。Kafka ブローカーが、ポート portOffet+2portOffset+4 のように N 番目のブローカーの portOffet+2*N まで割り当てられます。

      RBAC が有効な場合、ブートストラップサービスの MDS ポートは portOffset+1 に、N 番目のブローカーの MDS ポートは portOffset+1+N*2 になります。

      portOffset をポートとして選択する場合は、Kafka に必要となるポートの数を考慮します。ポートが、構成済みの Kubernetes ノードポートの範囲内に収まっている必要があるからです。

      ノードのポート番号が使用中だった場合、該当する Kafka サービスの作成は失敗します。

    • [4](省略可能)指定する annotations は、Confluent Operator によってこの Kafka クラスター用に作成されるすべての NodePort サービスに追加されます。

  3. 使用予定の NodePort 範囲で接続できるように、ファイアウォールルールを作成します。ファイアウォールルールの作成については、「ファイアウォールルールの使用」を参照してください。

  4. NodePort サービスが正しく作成されていることを確認するには、以下のコマンドを使用して、名前空間内の Kafka のサービスをリストします。

    kubectl get services -n <namespace>
    

    以下のような Kafka サービスリストが出力されます。この例では portOffset として 31000 が使用されています。

    NAME                   TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)
    kafka                  ClusterIP   None           <none>        9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
    kafka-0-internal       ClusterIP   10.71.10.235   <none>        9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
    kafka-0-np             NodePort    10.71.7.28     <none>        9092:31002/TCP
    kafka-1-internal       ClusterIP   10.71.8.62     <none>        9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
    kafka-1-np             NodePort    10.71.6.82     <none>        9092:31004/TCP
    kafka-2-internal       ClusterIP   10.71.6.0      <none>        9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
    kafka-2-np             NodePort    10.71.12.57    <none>        9092:31006/TCP
    kafka-bootstrap-np     NodePort    10.71.15.28    <none>        9092:31000/TCP
    

静的ポートベースルーティングを使用した Kafka への外部アクセス

Kubernetes Ingress のサポート対象が HTTP ベースのサービスのみであるのに対し、 Kafka は TCP ベースです。ただし、エコシステムには Ingress コントローラーの実装があり、たとえば NGINX Ingress Controller では、Kafka のような非 HTTP ベースのサービスがサポートされています。Kafka への HTTP 経由の外部アクセスを実現するには、そうした Ingress コントローラー の 1 つを使用します。

RBAC が有効な Kafka を公開する方法については、「静的ポートベースルーティングと RBAC」を参照してください。

Kafka への外部アクセスに静的ポートベースルーティングを使用するよう構成するには、以下の手順に従います。

  1. 外部アクセスの種類として staticForPortBasedRouting を使用して Kafka をデプロイします。

    kafka:
      staticForPortBasedRouting:
        enabled: true            ----- [1]
        host:                    ----- [2]
        portOffset:              ----- [3]
    
    • [1] enabled: true と設定して、静的ポートベースルーティングを有効にします。

    • [2](必須) host に、使用するホスト名を指定します。このホスト名と、Ingress コントローラーの外部 IP アドレスを使用して、このワークフローの後のほうで Kafka 用の DNS レコードをセットアップします。

    • [3](必須) PortOffset は開始ポート番号で、9092 より大きい必要があります。

      portOffset ポートには Kafka ブートストラップサーバーが割り当てられます。Kafka ブローカーが、ポート portOffet+2portOffset+4、のように N 番目のブローカーの portOffet+2*N まで割り当てられます。

      RBAC が有効な場合、ブートストラップサービスの MDS ポートは portOffset+1 に、N 番目のブローカーの MDS ポートは portOffset+1+N*2 になります。

  2. 種類が ClusterIP であるブートストラップサービスを作成します。

    以下に例を示します。

    1. bootstrap.yaml ファイルを以下の内容で作成します。

      apiVersion: v1
      kind: Service
      metadata:
        name: kafka-bootstrap
        namespace: operator
        labels:
          app: kafka-bootstrap
      spec:
        ports:
          - name: external
            port: 9092
            protocol: TCP
            targetPort: 9092
        selector:
          physicalstatefulcluster.core.confluent.cloud/name: kafka
          physicalstatefulcluster.core.confluent.cloud/version: v1
        type: ClusterIP
      
    2. 次のコマンドを実行して、上記のように設定されたブートストラップサービスを作成します。

      kubectl apply -f bootstrap.yaml -n <namespace>
      
  3. ingress-nginx などの Ingress コントローラーをデプロイします。利用可能なコントローラーのリストについては、「Ingress コントローラー」を参照してください。

    以下のコマンド例を参考に、TCP ポートと Kafka サービスとのマッピングを指定します。各ブローカーおよびブートストラップサーバーは、ステップ #1 で指定したポートオフセットに基づいて TCP ポートにマップされている必要があります。Kafka clusterIP サービスとポートを表示するには、以下のコマンドを使用します。

    kubectl get services -n <namespace>
    

    以下の Helm コマンドの例では、NGINX Ingress コントローラーをインストールし、ポート 9093、9095、9097、9099 を Kafka のサービスとサービスポート(ブートストラップサーバー 1 個とブローカー 3 個)にマップしています。

    helm install <release name>  stable/nginx-ingress -n <namespace> \
      --set controller.ingressClass=kafka \
      --set tcp.9093="operator/kafka-bootstrap:9092" \
      --set tcp.9095="operator/kafka-0-internal:9092" \
      --set tcp.9097="operator/kafka-1-internal:9092" \
      --set tcp.9099="operator/kafka-2-internal:9092"
    
  4. Ingress コントローラーが適切に構成されていることを確認します。詳細については、使用している Ingress コントローラーのドキュメントを参照してください。

    NGINX の場合は、以下のコマンドを実行して、configmap 名を取得し、Ingress コントローラーによって作成された configmap を検証します。

    kubectl get configmap -n <namespace>
    
    kubectl describe configmap <configmap name> -n <namespace>
    

    出力には、上記の Helm コマンドの場合と同様、名前空間名、Kafka ブローカーサービス名、ポートが示されます。たとえば、operator/kafka-bootstrap:9093 となります。

  5. インバウンドトラフィックの Kafka へのルーティングで Ingress コントローラーによって使用されるルールのコレクションが含まれた Ingress リソース を作成します。

    Ingress では、Ingress コントローラーに応じて一部のオプションの構成にアノテーションを使用します。その一例が rewrite-target アノテーション です。サポートされているアノテーションについては、使用している Ingress コントローラーのドキュメントを参照してください。

    NGINX コントローラーのデプロイと Ingress リソースの構成の詳細については、この チュートリアル を参照してください。

    以下に例を示します。

    1. NGINX Ingress コントローラーによって Kafka ブートストラップサーバーと 3 個のブローカーが公開されるよう、Ingress リソース用の ingress-resource.yaml を以下の内容で作成します。

      apiVersion: networking.k8s.io/v1beta1
      kind: Ingress
      metadata:
        name: <Ingress resource>
        annotations:
          kubernetes.io/ingress.class: nginx
          nginx.ingress.kubernetes.io/rewrite-target: /
      spec:
        rules:
          - host: example.mydevplatform.gcp.cloud
            http:
              paths:
                - path:
                  backend:
                    serviceName: kafka-bootstrap
                    servicePort: 9092
                - path:
                  backend:
                    serviceName: kafka-0-internal
                    servicePort: 9092
                - path:
                  backend:
                    serviceName: kafka-1-internal
                    servicePort: 9092
                - path:
                  backend:
                    serviceName: kafka-0-internal
                    servicePort: 9092
      
    2. 次のコマンドを実行して、上記のように設定された Ingress リソースを作成します。

      kubectl apply -f ingress-service.yaml -n <namespace>
      
  6. Kafka 用にステップ #1 で指定したホスト名と、Ingress コントローラーの外部ロードバランサー IP アドレスを使用して、DNS レコードを作成します。

    外部 IP を取得するには、以下のコマンドを使用します。

    kubectl get services -n <namespace>
    

静的ホストベースルーティングを使用した Kafka への外部アクセス

Kubernetes Ingress のサポート対象が HTTP ベースのサービスのみであるのに対し、 Kafka は TCP ベースです。ただし、エコシステムには Ingress コントローラーの実装があり、たとえば NGINX Ingress Controller では、Kafka のような非 HTTP ベースのサービスがサポートされています。Kafka への HTTP 経由の外部アクセスを実現するには、そうした Ingress コントローラー の 1 つを使用します。

RBAC 対応の Kafka を公開する方法については、静的ホストベースルーティングと RBAC を参照してください。

Kafka への外部アクセスに静的ホストベースルーティングを使用するよう構成するには、以下の手順に従います。

  1. モードが TLS でアクセスの種類が staticForHostBasedRouting の Kafka を構成してデプロイします。

    kafka:
      staticForHostBasedRouting:
        enabled: true          ----- [1]
        domain:                ----- [2]
        bootstrapPrefix:       ----- [3]
        brokerPrefix:          ----- [4]
        port:                  ----- [5]
      tls:
        enabled: true          ----- [6]
    
    • [1] enabled: true と設定して、静的ホストベースルーティングを有効にします。

    • [2](必須)クラスターの domain 名を指定します。

    • [3](省略可能) bootstrapPrefix を使用して、デフォルトの Kafka ブートストラッププレフィックスを変更します。デフォルトのブートストラッププレフィックスは Kafka コンポーネント名( kafka )です。

      この値は、使用ドメインの DNS エントリに使用されます。ブートストラップ DNS 名は <bootstrapPrefix>.<domain> となります。

      設定されていない場合、デフォルトの Kafka ブートストラップ DNS 名は kafka.<domain> です。

      ネットワーク計画の一環として、複数の Kafka クラスターを実行する場合に DNS の競合を回避できるよう、デフォルトプレフィックスをコンポーネントごとに変えることを検討してください。

    • [4](省略可能) brokerPrefix: を使用して、デフォルトの Kafka ブローカープレフィックスを変更します。

      デフォルトの Kafka ブローカープレフィックスは b です。

      これらは、使用ドメインの DNS エントリに使用されます。ブローカー DNS 名は <brokerPrefix>0.<domain><brokerPrefix>1.<domain>、などとなります。

      設定されていない場合、デフォルトのブローカー DNS 名は b0.<domain>b1.<domain>、などとなります。

    • [5] port は必須です。デフォルトの TLS ポートである 443 を指定できます。

    • [6] enabled: true と設定して、Kafka での TLS 暗号化の使用を構成します。Operator では、TLS パススルー機能が使用されることから、静的ホストベースルーティングを使用して Kafka を公開する場合は TLS 暗号化を有効にする必要があります。

  2. 種類が ClusterIP であるブートストラップサービスを作成します。

    以下に例を示します。

    1. YAML ファイル bootstrap.yaml を以下の内容で作成します。

      apiVersion: v1
      kind: Service
      metadata:
        name: kafka-bootstrap
        namespace: operator
        labels:
          app: kafka-bootstrap
      spec:
        ports:
          - name: external
            port: 9092
            protocol: TCP
            targetPort: 9092
        selector:
          physicalstatefulcluster.core.confluent.cloud/name: kafka
          physicalstatefulcluster.core.confluent.cloud/version: v1
        type: ClusterIP
      
    2. 次のコマンドを実行して、上記のように設定されたブートストラップサービスを作成します。

      kubectl apply -f bootstrap.yaml -n <namespace>
      
  3. ingress-nginx などの Ingress コントローラーをデプロイします。利用可能なコントローラーのリストについては、「Ingress コントローラー」を参照してください。

    使用する Ingress コントローラーには、構成された HTTPS ポート(デフォルト: 443)ですべてのトラフィックをインターセプトして Kafka TCP プロキシに渡す SSL パススルーのサポートが必要です。

    たとえば、以下の Helm コマンドでは、SSL パススルーが有効の NGINX Ingress コントローラーがインストールされます。

    helm upgrade --install <release name> stable/nginx-ingress \
      --set rbac.create=true \
      --set controller.publishService.enabled=true \
      --set controller.extraArgs.enable-ssl-passthrough="true"
    
  4. インバウンドトラフィックの Kafka へのルーティングで Ingress のコントロール機能によって使用されるルールのコレクションが含まれた Ingress リソース を作成します。

    hosts は、前述の DNS セットアップで設定した、Ingress ロードバランサーの外部 IP に解決されることが必要です。

    NGINX コントローラーのデプロイと Ingress リソースの構成の詳細については、この チュートリアル を参照してください。

    以下に例を示します。

    1. NGINX Ingress コントローラーによって Kafka ブートストラップサーバーと 3 個のブローカーが公開されるよう、Ingress リソース用の ingress-resource.yaml を以下の内容で作成します。ドメイン名は mydevplatform.gcp.cloud です。ブートストラップの prefix/brokerPrefix/port はデフォルト値に設定されます。ルールは、指定された Kafka ブローカーとブートストラップサーバーホストに適用されます。

      apiVersion: extensions/v1beta1
      kind: Ingress
      metadata:
        name: <ingress service name>
      
        annotations:
          nginx.ingress.kubernetes.io/ssl-passthrough: "true" ---[1]
          nginx.ingress.kubernetes.io/ssl-redirect: "false"   ---[2]
          nginx.ingress.kubernetes.io/backend-protocol: HTTPS ---[3]
          ingress.kubernetes.io/ssl-passthrough: "true"       ---[4]
          kubernetes.io/ingress.class: nginx                  ---[5]
      spec:
        tls:
          - hosts:
            - b0.mydevplatform.gcp.cloud
            - b1.mydevplatform.gcp.cloud
            - b2.mydevplatform.gcp.cloud
            - kafka.mydevplatform.gcp.cloud
        rules:
          - host: kafka.mydevplatform.gcp.cloud
            http:
              paths:
                - backend:
                    serviceName: kafka-bootstrap
                    servicePort: 9092
          - host: b0.mydevplatform.gcp.cloud
            http:
              paths:
                - backend:
                    serviceName: kafka-0-internal
                    servicePort: 9092
          - host: b1.mydevplatform.gcp.cloud
            http:
              paths:
                - backend:
                    serviceName: kafka-1-internal
                    servicePort: 9092
          - host: b2.mydevplatform.gcp.cloud
            http:
              paths:
                - backend:
                    serviceName: kafka-2-internal
                    servicePort: 9092
      
      • アノテーション [1] ではコントローラーに対して、TLS 接続を直接バックエンドへ送信し、NGINX での通信の復号は行わないよう指示します。
      • アノテーション [2] では、デフォルト値を無効にします。
      • アノテーション [3] には、NGINX によるバックエンドサービスとの通信方法を指定します。
      • アノテーション [4] の ssl-passthrough は必須です。
      • アノテーション [5] では、NGINX コントローラーの使用を指定しています。
    2. 次のコマンドを実行して、上記のように設定された Ingress リソースを作成します。

      kubectl apply -f Ingress-resource.yaml -n <namespace>
      
  5. Kafka のブートストラップとブローカーの DNS アドレスが Ingress コントローラーを指すように構成します。Kafka DNS エントリを取得するには以下が必要です。

    • ステップ #1 の構成ファイル( $VALUES_FILE )で指定した domain ([2])名。

    • Ingress コントローラーのロードバランサーの外部 IP。

      外部 IP を取得するには、以下のコマンドを使用します。

      kubectl get services -n <namespace>
      
    • Kafka のブートストラッププレフィックスとブローカープレフィックス

    追加する DNS テーブルエントリを以下の出力例に示します。使用パラメーターは次のとおりです。

    • ドメイン : mydevplatform.gcp.cloud
    • デフォルトのプレフィックス/レプリカ番号を使用するブローカーレプリカ 3 個 : b0b1b2
    • Kafka ブートストラッププレフィックス : kafka
    DNS name                         ExternalIP
    b0.mydevplatform.gcp.cloud       34.71.198.214
    b1.mydevplatform.gcp.cloud       34.71.198.214
    b2.mydevplatform.gcp.cloud       34.71.198.214
    kafka.mydevplatform.gcp.cloud    34.71.198.214
    

Kafka への内部アクセス

Operator によって Kubernetes クラスター内にデプロイされる Confluent Platform コンポーネントと、Kubernetes クラスター内のユーザークライアントアプリケーションは、Kafka の内部リスナー経由での Kafka との接続に以下のアドレスを使用します。

  • Kafka クラスターがこのクライアントやコンポーネントと同じ名前空間にデプロイされる場合:

    <kafka-component-name>:9071
    
  • Kafka クラスターがこのクライアントやコンポーネントとは別の名前空間にデプロイされる場合:

    <kafka-component-name>.<kafka-namespace>.svc.cluster.local:9071
    

<kafka-component-name> は、構成ファイル( $VALUES_FILE )の kafka セクションの name: に設定される値です。

ロードバランサーを使用した Kafka への内部アクセス

Kubernetes がプライベートサブネット内または VPC ピアリングされたクラスターの範囲内で実行されている場合は、Kafka や他の Confluent Platform コンポーネントの間で VPC 対 VPC のピアリング接続に使用される内部ロードバランサーを構成します。

内部ロードバランサーを作成するには、構成ファイル( $VALUES_FILE )の Kafka セクションに以下のエントリを追加します。

kafka:
  loadBalancer:
    enabled: true          ----- [1]
    type: internal         ----- [2]
    domain:                ----- [3]
  • [1] enabled: true と設定して、ロードバランサーを有効にします。
  • [2] type: internal と設定して、内部ロードバランサーを作成します。
  • [3] domain を、クラスターのドメイン名に設定します。

内部ロードバランサーにより、VPC ピアリング 用のコンポーネントサポートが追加されます。

インストーラーにより、以下のプロバイダー固有アノテーションが自動的に構成され、該当プロバイダー用の内部ロードバランサーが作成されます。

  • Azure

    kafka:
      loadBalancer:
        annotations:
          service.beta.kubernetes.io/azure-load-balancer-internal: "true"
    
  • AWS

    kafka:
      loadBalancer:
        annotations:
          service.beta.kubernetes.io/aws-load-balancer-internal: "0.0.0.0/0"
    
  • GCP

    kafka:
      loadBalancer:
        annotations:
          cloud.google.com/load-balancer-type: "Internal"
    

他の Confluent Platform コンポーネントによる Kafka へのアクセス

Confluent Platform コンポーネントによる Kafka との通信を実現するには、構成ファイル( $VALUES_FILE )のコンポーネントセクションの bootstrapEndpoint: を使用して、以下の例のように Kafka ブローカーエンドポイントを設定します。Kafka の dependencies セクションは、Confluent Control Center の場合は c3KafkaCluster に、他のコンポーネントの場合は kafka にあります。

<component>
  dependencies:
    kafka:
      bootstrapEndpoint:

コンポーネントと Operator でデプロイされた Kafka との通信が Kafka の内部リスナー経由の場合:

  • Kafka クラスターがこのコンポーネントと同じ名前空間にデプロイされる場合:

    <kafka-component-name>:9071
    
  • Kafka クラスターがこのコンポーネントとは別の名前空間にデプロイされる場合:

    <kafka-component-name>.<kafka-namespace>.svc.cluster.local:9071
    

    <kafka-component-name> は、構成ファイル( $VALUES_FILE )の kafka セクションの name に設定される値です。

他の Confluent Platform コンポーネントへのアクセスの構成

Confluent Operator では、外部クライアントと他の Confluent Platform コンポーネントとの接続用にロードバランサーがサポートされています。

注釈

Kafka ブローカーと ZooKeeper との間には常時接続が維持されますが、ロードバランサーが ZooKeeper 用に使用されると切断のおそれが生じます。ロードバランサーを ZooKeeper 用には構成しないでください。

ロードバランサーが有効の場合、Confluent Platform コンポーネントへのアクセスには、以下のエンドポイントのいずれかを使用します。

  • TLS が有効の場合 : https://<component>.<domain>
  • TLS が無効の場合 : http://<component>.<domain>

たとえば、TLS が有効の mydevplatform.gcp.cloud ドメインでは、Confluent Platform コンポーネントへのアクセスに以下のエンドポイントを使用します。

https://connectors.mydevplatform.gcp.cloud
https://replicator.mydevplatform.gcp.cloud
https://schemaregistry.mydevplatform.gcp.cloud
https://ksql.mydevplatform.gcp.cloud
https://controlcenter.mydevplatform.gcp.cloud

外部ロードバランサーは、グローバル構成ファイル( $VALUES_FILE )に構成されます。コンポーネントの loadBalancer: セクションで、外部ロードバランサーを構成するために以下のパラメーターを使用します。

<component>:
  loadBalancer:
    enabled: true          ----- [1]
    type:                  ----- [2]
    domain:                ----- [3]
    prefix:                ----- [4]
  • [1] enabled: true と設定して、ロードバランサーを有効にします。

  • [2] 外部ロードバランサーについて、type: external と設定するか、設定を省略します。ロードバランサーは、デフォルトで外部ロードバランサーです。

  • [3] domain を、クラスターのドメイン名に設定します。

  • [4] prefix を設定して、デフォルトのロードバランサープレフィックスを変更します。デフォルトはコンポーネント名です。

    prefix: <myprefix> のように指定すると、コンポーネント DNS 名は <myprefix>.<domain> のように構成されます。prefix の設定がない場合、コンポーネント DNS 名は <component-name>.<domain> のように構成されます。

コンポーネント DNS エントリの追加

外部ロードバランサーが作成されたら、コンポーネントロードバランサーに関連付けられた DNS エントリを DNS テーブル(または、プロバイダー環境によって認識された DNS エントリの取得に使用する手段)に追加します。

重要

複数のクラスターを単一のドメインに追加する場合に DNS の競合を回避できるよう、デフォルトプレフィックスをコンポーネントごとに変えてください。

DNS エントリの追加に必要な情報を取得するには、まず名前空間内のコンポーネントで使用するサービスをリストします。

kubectl get services -n <namespace>

ロードバランサーを追加した各 Confluent Platform コンポーネントに DNS エントリを追加します。フォーマットは以下のとおりです。

DNS 名
コンポーネント名。例: controlcenterconnectorreplicatorschemaregistryksql
外部 IP
kubectl get services -n <namespace> の出力から得られるコンポーネントのブートストラップロードバランサーの "外部 IP"。
内部 IP
kubectl get services -n <namespace> の出力から得られるコンポーネントのブートストラップロードバランサーの Cluster IP

非 DNS 開発アクセス

所属組織の DNS プロセスを経由しない直接外部アクセスが必要な場合は、ローカルワークステーションの hosts ファイルを編集します。Linux ディストリビューションの場合、通常このファイルは /etc/hosts にあります。

重要

非 DNS アクセスは開発やテスト専用であり、本稼働環境では使用しないでください。

hosts ファイルを構成すると、Kafka ブートストラップサーバーにローカルワークステーションからアクセスできるようになります。

ロードバランサーアクセス

  1. 以下のコマンドを実行して、ロードバランサーの外部 IP を取得します。

    kubectl get services -n <namespace>
    

    出力例:

    NAME                         TYPE           CLUSTER-IP     EXTERNAL-IP       PORT(S)
    controlcenter                ClusterIP      None           <none>           9021/TCP,7203/TCP,7777/TCP
    controlcenter-0-internal     ClusterIP      10.77.12.166   <none>           9021/TCP,7203/TCP,7777/TCP
    controlcenter-bootstrap-lb   LoadBalancer   10.77.11.31    35.224.122.181   80:31358/TCP
    kafka                        ClusterIP      None           <none>           9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
    kafka-0-internal             ClusterIP      10.77.8.20     <none>           9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
    kafka-0-lb                   LoadBalancer   10.77.3.227    34.72.187.157    9092:31977/TCP
    kafka-1-internal             ClusterIP      10.77.4.190    <none>           9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
    kafka-1-lb                   LoadBalancer   10.77.15.119   34.72.76.66      9092:30244/TCP
    kafka-2-internal             ClusterIP      10.77.4.234    <none>           9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
    kafka-2-lb                   LoadBalancer   10.77.14.14    35.239.218.11    9092:30695/TCP
    kafka-bootstrap-lb           LoadBalancer   10.77.0.47     35.232.216.94    9092:30601/TC
    
  2. ロードバランサー IP を /etc/hosts ファイルに追加します。

    Kafka ロードバランサーの /etc/hosts ファイルのエントリは、上記の出力例を使用すると以下のようになります。

    34.72.187.157  b0.mydevplatform.gcp.cloud b0
    34.72.76.66    b1.mydevplatform.gcp.cloud b1
    35.239.218.11  b2.mydevplatform.gcp.cloud b2
    35.232.216.94  kafka.mydevplatform.gcp.cloud kafka
    35.224.122.181 controlcenter.mydevplatform.gcp.cloud
    

NodePort アクセス

以下を使用して、DNS エントリを /etc/hosts ファイルに追加します。

以下に例を示します。

34.68.103.48  kafka-nodeport.mydevplatform.gcp.cloud

ホストベースルーティングを使用した静的アクセス

Ingress コントローラーのロードバランサーの外部 IP を使用して、Kafka の IP を /etc/hosts ファイルに追加します。

後述のエントリ例では、以下を使用しています。

  • デフォルトのブートストラッププレフィックス: kafka
  • デフォルトのブローカープレフィックス : b
  • ドメイン名 : mydevplatform.gcp.cloud
  • Ingress コントローラーのロードバランサーの外部 IP: 34.68.103.48

Kafka ロードバランサーの /etc/hosts ファイルエントリは以下のようになります。

34.68.103.48  b0.mydevplatform.gcp.cloud b0
34.68.103.48  b1.mydevplatform.gcp.cloud b1
34.68.103.48  b2.mydevplatform.gcp.cloud b2
34.68.103.48  kafka.mydevplatform.gcp.cloud kafka

ポートベースルーティングを使用した静的アクセス

kafka.staticForPortBasedRouting.host に指定したホスト名と Ingress コントローラーの外部 IP を /etc/hosts ファイルに追加します。

以下に例を示します。

34.68.103.48  kafka-ingress.mydevplatform.gcp.cloud