Confluent コンポーネントにアクセスするための OpenShift ルートの構成

Confluent for Kubernetes (CFK)では、OpenShift プラットフォーム以外に Confluent Platform コンポーネントサービスを公開するために、OpenShift ルート をサポートしています。

ルートを使用して Confluent コンポーネントを構成した場合、Confluent コンポーネントサービスのルートリソースが CFK によって作成され、外部のクライアントは HTTPS ポート 443 でサービスにアクセスすることになります。

ルートを使用するときは、Confluent コンポーネントを TLS で構成する必要があります。

注: このセクションの例では、CFK の名前空間の作成 で設定したデフォルトの名前空間を想定しています。

ルートを使用した Kafka への外部アクセスの構成

ルートを使用するように構成すると、CFK により、ブートストラップサーバー用のサービスに加えて、各ブローカー用のサービスが作成されます。Kafka ブローカーが N 個であれば、CFK によってルートサービスが N+1 個作成されます。

  • 1 個は、初期接続と Kafka クラスターに関するメタデータの受信に使用するブートストラップサービスとして作成されます。
  • 残りの N 個のサービスは、各ブローカーにつき 1 個で、ブローカーに直接対応します。

クライアントによる Kafka クラスターへのアクセスでは、まずブートストラップサーバールートへの接続が行われ、クラスターに属するすべてのブローカーのメタデータリストが取得されます。次に、クライアントにより対象のブローカーのアドレスが検出され、データを生成または消費するためにブローカーへの直接接続が行われます。

ルートを使用して Kafka への外部アクセスを実現するには

  1. Kafka のカスタムリソース(CR)で route プロパティを設定します。

    Kafka の CR のスニペットを次に示します。

    spec:
      listeners:
        external:
          tls:
            enabled: true       --- [1]
          externalAccess:
            type: route
            route:
              domain:           --- [2]
              wildcardPolicy:   --- [3]
              bootstrapPrefix:  --- [4]
              brokerPrefix:     --- [5]
              annotations:      --- [6]
    
    • [1](必須)

    • [2](必須)``domain`` を OpenShift ドメインに設定します。

      実行中のクラスターでこの値を変更する場合、クラスターをローリングする必要があります。

    • [3](省略可能)未構成の場合、デフォルトで None に設定されます。使用できる値は SubdomainNone です。

    • [4](省略可)ブートストラップのアドバタイズドエンドポイントに使用されるプレフィックスで、bootstrapPrefix.domain として追加されます。設定しなかった場合は、コンポーネントのクラスター名になります(この例では kafka)。

    • [5](省略可)ブローカーのアドバタイズドエンドポイントに使用されるプレフィックスで、brokerPrefix.domain として追加されます。未構成の場合、プレフィックスとして b が追加されます(例: b1.domainb2.domain)。

      実行中のクラスターでこの値を変更する場合、クラスターをローリングする必要があります。

    • [6](省略可)

  2. (省略可)ルートを使用して複数のリスナーを設定する場合は、Kafka CR で 1 つまたは複数のカスタムリスナーを追加します。

    注釈

    ルートを使用した複数のリスナーは、Confluent Platform 6.1 移行のバージョンでサポートされています。

    spec:
      listeners:
        custom:
        - name:                 --- [1]
          port:                 --- [2]
          tls:
            enabled: true       --- [3]
          externalAccess:
            type: route         --- [4]
            route:
              domain:           --- [5]
              bootstrapPrefix:  --- [6]
              brokerPrefix:     --- [7]
    
    • [1](必須)カスタムリスナーの名前。予約語の internalexternaltoken は使用しないでください。
    • [2](必須) カスタムリスナーにバインドされるポート。9093 未満の予約値は使用しないでください。
    • [3](必須)
    • [4](必須)
    • [5](必須)OpenShift ドメインの名前。
    • [6](必須)ブートストラップのアドバタイズエンドポイントに使用されるプレフィックス。すべてのリスナー間で一意である必要があります。
    • [7](必須)ブローカーのアドバタイズエンドポイントに使用されるプレフィックス。すべてのリスナー間で一意である必要があります。
  3. 構成を適用します。

    oc apply -f <Kafka CR>
    
  4. Kafka のブートストラップサーバーとブローカー用に DNS エントリを追加します。

    ルートが作成されたら、Kafka ブローカーと Kafka ブートストラップサービスの DNS エントリを DNS テーブル(または、プロバイダー環境によって認識された DNS エントリの取得に使用する手段)に追加します。

    Kafka ブローカーの DNS エントリを取得するには以下が必要です。

    • OpenShift クラスターのドメイン名(ステップ 1 の domain
    • OpenShift ルーターのロードバランサーの外部 IP
    • Kafka のブートストラッププレフィックスとブローカープレフィックス

    Kafka 用に DNS エントリを追加する

    1. OpenShift ルーターのロードバランサーの IP アドレスを取得します。

      HAProxy ロードバランサーサービスはルートサービスのルーターとして機能し、HAProxy は一般に openshift-ingress 名前空間内で動作します。

      oc get svc --namespace openshift-ingress
      

      出力例:

      NAME                      TYPE           CLUSTER-IP       EXTERNAL-IP    PORT(S)                      AGE
      router-default            LoadBalancer   172.30.84.52     20.189.181.8   80:31294/TCP,443:32145/TCP   42h
      router-internal-default   ClusterIP      172.30.184.233   <none>         80/TCP,443/TCP,1936/TCP      42h
      

      上記の例では、ルーターの外部 IP は 20.189.181.8 です。

    2. Kafka の DNS 名を取得します。

      oc get routes | awk '{print $2}'
      
    3. 外部 IP を使用して、DNS サービスプロバイダーにすべての Kafka の DNS 名を指定します。

      以下は、DNS テーブルのエントリの例です。各項目が次のように指定されています。

      • ドメイン: example.com
      • デフォルトのプレフィックスを使用するブローカーレプリカ 3 個: b
      • Kafka ブートストラッププレフィックス: kafka
      • ロードバランサールーターの外部 IP: 20.189.181.8
      20.189.181.8 b0.example.com b1.example.com b2.example.com kafka.example.com
      
  5. 接続の確認 を行います。

ルートを使用した MDS への外部アクセスの構成

ロールベースアクセス制御(RBAC)を有効にして MDS への外部アクセスをセットアップする場合、追加でネットワーク構成の手順が必要になります。

Kafka Metadata Service (MDS) への外部アクセスをサポートするには、Kafka のカスタムリソース(CR)で以下のように構成します。

spec
  services
    mds:
      externalAccess:
        type: route
        route:
          domain:              --- [1]
          prefix:              --- [2]
  • [1](必須)MDS のドメイン名。
  • [2] MDS のプレフィックス。設定されている場合、MDS エンドポイントは <prefix>.<domain> です。省略されている場合、MDS エンドポイントはデフォルトの <domain> です。

ルートを使用して外部から MDS にアクセスするためのエンドポイントは、次のとおりです。

  • HTTP の場合: http://<domain>:80
  • HTTPS の場合: https://<domain>:443

ルーターを使用した他の Confluent Platform コンポーネントへの外部アクセスの構成

外部クライアントは、ルーターを使用して他の Confluent Platform コンポーネントに接続することができます。

各 Confluent Platform コンポーネントのアクセスエンドポイントは <component CR name>.<Kubernetes domain>:443 となります。

たとえば、TLS が有効な example.com ドメインでは、Confluent Platform コンポーネントへのアクセスに以下のエンドポイントを使用します。

  • https://connect.example.com:443
  • https://replicator.example.com:443
  • https://schemaregistry.example.com:443
  • https://ksqldb.example.com:443
  • https://controlcenter.example.com:443

ルートを使用して Confluent コンポーネントへの外部アクセスを実現するには

  1. Enable TLS for the component as described Confluent for Kubernetes でのネットワーク暗号化の構成.

  2. コンポーネントのカスタムリソース(CR)で次のように設定し、構成を適用します。

    spec:
      externalAccess:
        type: route
        route:
          domain:          --- [1]
          prefix:          --- [2]
          wildcardPolicy:  --- [3]
          annotations:     --- [4]
    
    • [1](必須)``domain`` を、Kubernetes クラスターのドメイン名に設定します。

      実行中のクラスターでこの値を変更する場合、クラスターをローリングする必要があります。

    • [2](省略可) prefix を設定して、デフォルトのルートプレフィックスを変更します。デフォルトでは、コンポーネント名となります(例: controlcenterconnectorreplicatorschemaregistryksql)。

      この値は DNS エントリに使用されます。コンポーネントの DNS 名は <prefix>.<domain> となります。

      設定されていない場合のデフォルトの DNS 名は <component name>.<domain> になります(例: controlcener.example.com)。

      複数の Kafka クラスターを実行しているときは、DNS の競合を避けるために、コンポーネントごとにデフォルトプレフィックスの変更が必要となる場合があります。

      実行中のクラスターでこの値を変更する場合、クラスターをローリングする必要があります。

    • [3](省略可能)未構成の場合、デフォルトで None に設定されます。使用できる値は SubdomainNone です。

    • [4] REST Proxy をコンシューマーとして使用する場合に必須です。それ以外の場合は、省略可能です。

      Openshift ルートでは、Cookie に基づくスティッキーセッションがデフォルトでサポートされます。REST Proxy で必要になる clientIp ベースのセッションアフィニティを使用するには、次の手順に従います。

      • アノテーション haproxy.router.openshift.io/disable_cookies: true を設定して Cookie を無効にします。
      • アノテーション haproxy.router.openshift.io/balance: source を設定して、 sourceIP ベースのロードバランシングを有効にします。
  3. 構成を適用します。

    oc apply -f <component CR>
    
  4. ルートを追加した各 Confluent Platform コンポーネントの DNS エントリを追加します。

    ルートが作成されたら、コンポーネントルートに関連付けられた DNS エントリを DNS テーブル(または、プロバイダー環境によって認識された DNS エントリの取得に使用する手段)に追加します。

    Confluent Platform コンポーネントの DNS エントリを取得するには以下が必要です。

    • ステップ 1 で設定した OpenShift クラスターのドメイン名。

    • OpenShift ルーターのロードバランサーの外部 IP

    • コンポーネントの prefix (前述のステップ 1 で設定した場合)。それ以外の場合は、デフォルトのコンポーネント名。

      DNS 名は prefixdomain 名から成ります(例: controlcenter.example.com)。


    Confluent コンポーネントの DNS エントリを追加する

    1. OpenShift ルーターのロードバランサーの IP アドレスを取得します。

      HAProxy ロードバランサーサービスはルートサービスのルーターとして機能し、HAProxy は一般に openshift-ingress 名前空間内で動作します。

      oc get svc --namespace openshift-ingress
      

      出力例:

      NAME                      TYPE           CLUSTER-IP       EXTERNAL-IP    PORT(S)                      AGE
      router-default            LoadBalancer   172.30.84.52     20.189.181.8   80:31294/TCP,443:32145/TCP   42h
      router-internal-default   ClusterIP      172.30.184.233   <none>         80/TCP,443/TCP,1936/TCP      42h
      
    2. コンポーネントの DNS 名を取得します。

      oc get routes | awk '{print $2}'
      
    3. 外部 IP を使用して、DNS サービスプロバイダーにすべての Confluent コンポーネントの DNS 名を指定します。

      以下は、DNS テーブルのエントリの例です。各項目が次のように指定されています。

      • ドメイン: example.com
      • デフォルトのコンポーネントプレフィックス
      • ロードバランサールーターの外部 IP: 20.189.181.8
      20.189.181.8 connect.example.com controlcenter.example.com ksqldb.example.com schemaregistry.example.com
      
  5. 接続の確認 を行います。

接続の確認

example.com ドメインとデフォルトのコンポーネントプレフィックスを使用して、Confluent Platform コンポーネントへの外部アクセスを確認する手順の例を以下に示します。

Control Center の UI
ブラウザーで、https://controlcenter.example.com:443 に移動します。
Kafka
  1. Kafka の外部エンドポイントを取得します。

    • ブローカーのエンドポイントを取得するには、次のコマンドを使用します。

      oc get kafka kafka -ojsonpath='{.status.listeners.external.advertisedExternalEndpoints}'
      
    • Kafka ブートストラップサーバーのエンドポイントを取得するには、次のコマンドを使用します。

      oc get kafka kafka -ojsonpath='{.status.listeners.external.externalEndpoint}'
      
  2. トピックを作成します。

    For this step, you need the Confluent CLI tool on your local system. Install Confluent CLI on your local system to get access to the tool.

    以下に例を示します。

    confluent kafka topic create mytest \
      --partitions 3 --replication-factor 2 \
      --url kafka.example.com:443
    
  3. Control Center で、mytest トピックが作成されていることを確認します。

Connect
  1. コンポーネントの外部エンドポイントを取得します。

    oc get connect connect -ojsonpath='{.status.restConfig.externalEndpoint}'
    
  2. コンポーネントのエンドポイントにアクセスできることを確認します。以下に例を示します。

    curl https://connect.example.com:443 -ik -s -H "Content-Type: application/json"
    
ksqlDB
  1. コンポーネントの外部エンドポイントを取得します。

    oc get ksqldb ksqldb -ojsonpath='{.status.restConfig.externalEndpoint}'
    
  2. コンポーネントのエンドポイントにアクセスできることを確認します。以下に例を示します。

    curl https://ksqldb.example.com:443/ksql -ik -s -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" -X POST --data '{"ksql": "LIST ALL TOPICS;", "streamsProperties": {}}'
    
Schema Registry
  1. コンポーネントの外部エンドポイントを取得します。

    oc get schemaregistry schemaregistry -ojsonpath='{.status.restConfig.externalEndpoint}'
    
  2. コンポーネントのエンドポイントにアクセスできることを確認します。以下に例を示します。

    curl -ik https://schemaregistry.example.com:443/subjects
    
Control Center
  1. コンポーネントの外部エンドポイントを取得します。

    oc get controlcenter controlcenter -ojsonpath='{.status.restConfig.externalEndpoint}'
    
  2. コンポーネントのエンドポイントにアクセスできることを確認します。以下に例を示します。

    curl https://controlcenter.example.com:443/2.0/health/status -ik -s -H "Content-Type: application/json"
    
REST Proxy
  1. コンポーネントの外部エンドポイントを取得します。

    oc get kafkarestproxy kafkarestproxy -ojsonpath='{.status.restConfig.externalEndpoint}'
    
  2. コンポーネントのエンドポイントにアクセスできることを確認します。以下に例を示します。

    curl -ik https://kafkarestproxy.example.com:443/v3/clusters