Confluent Operator でのネットワークの構成¶
このドキュメントでは、Kafka などの Confluent Platform コンポーネントにアクセスするための、Confluent Operator の構成オプションについて説明します。
このガイドの例では、以下が想定されています。
$VALUES_FILE
は「グローバル構成ファイルの作成」でセットアップする構成ファイルのことです。Operator ドキュメントで簡潔で明快な例を提示できるよう、すべての構成パラメーターが構成ファイル(
$VALUES_FILE
)に指定されます。ただし、本稼働環境でのデプロイにおいて、Helm を使用して機密データを扱う場合には、--set
または--set-file
オプションを使用します。以下に例を示します。helm upgrade --install kafka \ --set kafka.services.mds.ldap.authentication.simple.principal="cn=mds\,dc=test\,dc=com" \ --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \ --set kafka.enabled=true
operator
は Confluent Platform のデプロイ先となる名前空間です。コマンドはすべて、Confluent Operator のダウンロード先ディレクトリにある
helm
ディレクトリで実行されます。
Kafka へのアクセスの構成¶
Kafka への外部アクセス¶
外部クライアントによる Kafka との接続用に、以下のいずれかの方法で Kafka を構成する必要があります。
- ロードバランサー : クライアントによる Kafka との接続に、クラウドプロバイダーのロードバランサーが使用されます。
- ノードポート : クライアントによる Kafka との接続に、Kubernetes ワーカーノードの指定された静的ポート(NodePort)(または、指定されたポートにトラフィックを転送できるお客様管理のネットワークインフラストラクチャ)が使用されます。
- 静的ポートベースルーティング : クライアントによる Kafka との接続は、Kubernetes Ingress コントローラーによってポートベースルーティングを使用して管理されます。
- 静的ホストベースのルーティング : クライアントによる Kafka との接続は、Kubernetes Ingress コントローラーによってホストベースルーティングを使用して管理されます。
Kafka デプロイにおける Operator で有効にできるのは、上記方法のいずれか 1 つのみで、いずれかの方法で外部アクセスを有効にすると、別の外部アクセス方法には変更できません。
ロードバランサーを使用した Kafka への外部アクセス¶
クライアントによる Kafka クラスターへのアクセスでは、まずブートストラップサーバーへの接続が行われ、クラスターに属するすべてのブローカーのメタデータリストが取得されます。次に、クライアントにより、関心の対象であるブローカーのアドレスが認識され、データを生成または消費するためにブローカーへの直接接続が行われます。ロードバランサーを使用するように構成すると、Operator により、ブートストラップサーバー用のロードバランサーに加えて、各ブローカー用のロードバランサーが作成されます。Kafka ブローカーが N 個であれば、Operator によってロードバランサーサービスが N+1 個作成されます。
- 1 個は、初期接続と Kafka クラスターに関するメタデータの受信に使用するブートストラップサービスとして作成されます。
- 残りの N 個のサービスは、各ブローカーにつき 1 個で、ブローカーに直接対応します。
重要
外部ロードバランサーを構成する際には、TLS を使用してエンドポイントを保護することをお勧めします。
Kafka 用の外部ロードバランサーを構成するには、以下の手順に従います。
構成ファイル(
$VALUES_FILE
)の以下のパラメーターを使用して、Kafka を構成し、デプロイします。kafka: loadBalancer: enabled: true ----- [1] type: ----- [2] domain: ----- [3] bootstrapPrefix: ----- [4] brokerPrefix: ----- [5] annotations: ----- [6]
[1]
enabled: true
と設定して、ロードバランサーを有効にします。[2](省略可能)外部ロードバランサーについて、
type: external
と設定するか、設定を省略します。ロードバランサーは、デフォルトで外部ロードバランサーです。[3](必須)
domain
を、クラスターが実行されているドメインに設定します。[4](省略可能)
bootstrapPrefix
を使用して、デフォルトの Kafka ブートストラップロードバランサープレフィックスを変更します。デフォルトのブートストラッププレフィックスは Kafka コンポーネント名(kafka
)です。この値は、使用ドメインの DNS エントリに使用されます。ブートストラップ DNS 名は
<bootstrapPrefix>.<domain>
となります。設定されていない場合、デフォルトの Kafka ブートストラップ DNS 名は
kafka.<domain>
です。ネットワーク計画の一環として、複数の Kafka クラスターを実行する場合に DNS の競合を回避できるよう、デフォルトプレフィックスをコンポーネントごとに変えることを検討してください。
[5](省略可能)
brokerPrefix:
を使用して、デフォルトの Kafka ブローカープレフィックスを変更します。デフォルトの Kafka ブローカープレフィックスはb
です。これらは、使用ドメインの DNS エントリに使用されます。ブローカー DNS 名は
<brokerPrefix>0.<domain>
、<brokerPrefix>1.<domain>
、などとなります。設定されていない場合、デフォルトのブローカー DNS 名は
b0.<domain>
、b1.<domain>
、などとなります。ネットワーク計画の一環として、複数の Kafka クラスターを実行する場合に DNS の競合を回避できるよう、デフォルトプレフィックスをコンポーネントごとに変えることを検討してください。
[6](省略可能)アプリケーション固有またはプロバイダー固有の設定を追加するには、
annotations:
を使用します。AWS 用の Kubernetes ロードバランサーアノテーションの詳細については、「Service」を参照してください。
以下に示すのは、Kafka 用の外部ロードバランサーを作成するための構成ファイル(
$VALUES_FILE
)のセクション例です。kafka: loadBalancer: enabled: true domain: mydevplatform.gcp.cloud bootstrapPrefix: kafka-lb brokerPrefix: kafka-broker
Kafka のブートストラップサーバーとブローカー用に DNS エントリを追加します。
外部ロードバランサーが作成されたら、Kafka ブローカーロードバランサーと Kafka ブートストラップロードバランサーそれぞれのパブリック IP と関連付けられた DNS エントリを、DNS テーブル(または、プロバイダー環境によって認識された DNS エントリの取得に使用する手段)に追加します。
重要
複数のクラスターを単一のドメインに追加する場合に DNS の競合を回避できるよう、デフォルトプレフィックスをコンポーネントごとに変えてください。
Kafka DNS エントリを取得するには以下が必要です。
ステップ #1 の構成ファイル(
$VALUES_FILE
)で指定したdomain
([2])名。Kafka ロードバランサーの外部 IP。
外部 IP を取得するには、以下のコマンドを使用します。
kubectl get services -n <namespace>
Kafka のブートストラッププレフィックスとブローカープレフィックス
追加する DNS テーブルエントリを以下の出力例に示します。使用パラメーターは次のとおりです。
- ドメイン :
mydevplatform.gcp.cloud
- デフォルトのプレフィックス/レプリカ番号を使用するブローカーレプリカ 3 個 :
b0
、b1
、b2
- Kafka ブートストラッププレフィックス :
kafka
DNS name Internal IP External IP b0.mydevplatform.gcp.cloud 10.47.245.57 192.50.14.35 b1.mydevplatform.gcp.cloud 10.47.240.85 192.50.28.28 b2.mydevplatform.gcp.cloud 10.35.186.46 192.50.64.18 kafka.mydevplatform.gcp.cloud 10.47.250.36 192.50.34.20
NodePort サービスを使用した、Kafka への外部アクセス¶
Kafka を外部クライアントに公開するために、NodePort を使用して Operator を構成する場合、Kubernetes では Kafka のブートストラップサーバーとブローカーの各ノードに異なるポートが割り当てられます。特定のポートに対するリクエストは、そのポートが構成されたブートストラップサーバーまたはブローカーに転送されます。
Kafka を NodePort 経由で外部クライアントに公開するために、Operator を構成する場合、Kubernetes では Kafka のブートストラップサーバーと Kafka の各ブローカーに一意のポートが割り当てられます。ブートストラップサーバーと Kafka のブローカーはそれぞれ別個のポートからアクセスできます。
ブローカーが N 個ある Kafka クラスターには、N+1 個の NodePort サービスが作成されます。
- ブートストラップサーバー用に 1 個。初期接続用
- N 個のサービス。各ブローカーにつき 1 個で、その後ブローカーとの直接接続で使用
ブローカーが N 個ある、RBAC が有効の Kafka クラスターには、(N+1)*2 個の NodePort サービスが作成されます。
- ブートストラップサーバー用に 1 個。初期接続用
- ブートストラップサーバー上の MDS 用に 1 個
- N 個のサービス。各ブローカーにつき 1 個で、その後ブローカーとの直接接続で使用
- N 個のサービス。各ブローカー上の MDS につき 1 個
ポートへのトラフィックはすべて、以下のように Kafka ポッドへルーティングされます。portOffset
は、Kafka に構成した開始ポート番号です。
ブートストラップエンドポイント:
<host>:<portOffset>
N 番目のブローカーレプリカの、外部にアドバタイズされるエンドポイント:
<host>:<portOffset + N*2>
RBAC が有効の場合の MDS ブートストラップエンドポイント:
<host>:<portOffset + 1>
RBAC が有効の場合の、N 番目のブローカーの、MDS にアドバタイズされるリスナーエンドポイント:
<host>:<portOffset + 1 + N*2>
Kafka との外部通信に NodePort サービスを使用するには、以下の手順に従います。
Kubernetes クラスターに属する単一または複数のノードのアドレスを使用して、DNS レコードを作成します。
ノードポートを持つ Kafka を構成し、デプロイします。
kafka: nodePort: enabled: true ----- [1] host: ----- [2] portOffset: ----- [3] annotations: {} ----- [4]
[1]
enabled: true
と設定して、NodePort ベースの外部アクセスを有効にします。[2](必須)
host
に、ステップ #1 で作成した DNS レコードのホスト名を指定します。[3](必須)最初のノードポート
portOffset
を指定します。Kafka ブートストラップサービスによって、このportOffset
ポートが取得されます。Kafka ブローカーが、ポートportOffet+2
、portOffset+4
のように N 番目のブローカーのportOffet+2*N
まで割り当てられます。RBAC が有効な場合、ブートストラップサービスの MDS ポートは
portOffset+1
に、N 番目のブローカーの MDS ポートはportOffset+1+N*2
になります。portOffset
をポートとして選択する場合は、Kafka に必要となるポートの数を考慮します。ポートが、構成済みの Kubernetes ノードポートの範囲内に収まっている必要があるからです。ノードのポート番号が使用中だった場合、該当する Kafka サービスの作成は失敗します。
[4](省略可能)指定する
annotations
は、Confluent Operator によってこの Kafka クラスター用に作成されるすべての NodePort サービスに追加されます。
使用予定の NodePort 範囲で接続できるように、ファイアウォールルールを作成します。ファイアウォールルールの作成については、「ファイアウォールルールの使用」を参照してください。
NodePort サービスが正しく作成されていることを確認するには、以下のコマンドを使用して、名前空間内の Kafka のサービスをリストします。
kubectl get services -n <namespace>
以下のような Kafka サービスリストが出力されます。この例では
portOffset
として 31000 が使用されています。NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) kafka ClusterIP None <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP kafka-0-internal ClusterIP 10.71.10.235 <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP kafka-0-np NodePort 10.71.7.28 <none> 9092:31002/TCP kafka-1-internal ClusterIP 10.71.8.62 <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP kafka-1-np NodePort 10.71.6.82 <none> 9092:31004/TCP kafka-2-internal ClusterIP 10.71.6.0 <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP kafka-2-np NodePort 10.71.12.57 <none> 9092:31006/TCP kafka-bootstrap-np NodePort 10.71.15.28 <none> 9092:31000/TCP
静的ポートベースルーティングを使用した Kafka への外部アクセス¶
Kubernetes Ingress のサポート対象が HTTP ベースのサービスのみであるのに対し、 Kafka は TCP ベースです。ただし、エコシステムには Ingress コントローラーの実装があり、たとえば NGINX Ingress Controller では、Kafka のような非 HTTP ベースのサービスがサポートされています。Kafka への HTTP 経由の外部アクセスを実現するには、そうした Ingress コントローラー の 1 つを使用します。
RBAC が有効な Kafka を公開する方法については、「静的ポートベースルーティングと RBAC」を参照してください。
Kafka への外部アクセスに静的ポートベースルーティングを使用するよう構成するには、以下の手順に従います。
外部アクセスの種類として
staticForPortBasedRouting
を使用して Kafka をデプロイします。kafka: staticForPortBasedRouting: enabled: true ----- [1] host: ----- [2] portOffset: ----- [3]
[1]
enabled: true
と設定して、静的ポートベースルーティングを有効にします。[2](必須)
host
に、使用するホスト名を指定します。このホスト名と、Ingress コントローラーの外部 IP アドレスを使用して、このワークフローの後のほうで Kafka 用の DNS レコードをセットアップします。[3](必須)
PortOffset
は開始ポート番号で、9092 より大きい必要があります。portOffset
ポートには Kafka ブートストラップサーバーが割り当てられます。Kafka ブローカーが、ポートportOffet+2
、portOffset+4
、のように N 番目のブローカーのportOffet+2*N
まで割り当てられます。RBAC が有効な場合、ブートストラップサービスの MDS ポートは
portOffset+1
に、N 番目のブローカーの MDS ポートはportOffset+1+N*2
になります。
種類が
ClusterIP
であるブートストラップサービスを作成します。以下に例を示します。
bootstrap.yaml
ファイルを以下の内容で作成します。apiVersion: v1 kind: Service metadata: name: kafka-bootstrap namespace: operator labels: app: kafka-bootstrap spec: ports: - name: external port: 9092 protocol: TCP targetPort: 9092 selector: physicalstatefulcluster.core.confluent.cloud/name: kafka physicalstatefulcluster.core.confluent.cloud/version: v1 type: ClusterIP
次のコマンドを実行して、上記のように設定されたブートストラップサービスを作成します。
kubectl apply -f bootstrap.yaml -n <namespace>
ingress-nginx などの Ingress コントローラーをデプロイします。利用可能なコントローラーのリストについては、「Ingress コントローラー」を参照してください。
以下のコマンド例を参考に、TCP ポートと Kafka サービスとのマッピングを指定します。各ブローカーおよびブートストラップサーバーは、ステップ #1 で指定したポートオフセットに基づいて TCP ポートにマップされている必要があります。Kafka clusterIP サービスとポートを表示するには、以下のコマンドを使用します。
kubectl get services -n <namespace>
以下の Helm コマンドの例では、NGINX Ingress コントローラーをインストールし、ポート 9093、9095、9097、9099 を Kafka のサービスとサービスポート(ブートストラップサーバー 1 個とブローカー 3 個)にマップしています。
helm install <release name> stable/nginx-ingress -n <namespace> \ --set controller.ingressClass=kafka \ --set tcp.9093="operator/kafka-bootstrap:9092" \ --set tcp.9095="operator/kafka-0-internal:9092" \ --set tcp.9097="operator/kafka-1-internal:9092" \ --set tcp.9099="operator/kafka-2-internal:9092"
Ingress コントローラーが適切に構成されていることを確認します。詳細については、使用している Ingress コントローラーのドキュメントを参照してください。
NGINX の場合は、以下のコマンドを実行して、configmap 名を取得し、Ingress コントローラーによって作成された configmap を検証します。
kubectl get configmap -n <namespace>
kubectl describe configmap <configmap name> -n <namespace>
出力には、上記の Helm コマンドの場合と同様、名前空間名、Kafka ブローカーサービス名、ポートが示されます。たとえば、
operator/kafka-bootstrap:9093
となります。インバウンドトラフィックの Kafka へのルーティングで Ingress コントローラーによって使用されるルールのコレクションが含まれた Ingress リソース を作成します。
Ingress では、Ingress コントローラーに応じて一部のオプションの構成にアノテーションを使用します。その一例が rewrite-target アノテーション です。サポートされているアノテーションについては、使用している Ingress コントローラーのドキュメントを参照してください。
NGINX コントローラーのデプロイと Ingress リソースの構成の詳細については、この チュートリアル を参照してください。
以下に例を示します。
NGINX Ingress コントローラーによって Kafka ブートストラップサーバーと 3 個のブローカーが公開されるよう、Ingress リソース用の
ingress-resource.yaml
を以下の内容で作成します。apiVersion: networking.k8s.io/v1beta1 kind: Ingress metadata: name: <Ingress resource> annotations: kubernetes.io/ingress.class: nginx nginx.ingress.kubernetes.io/rewrite-target: / spec: rules: - host: example.mydevplatform.gcp.cloud http: paths: - path: backend: serviceName: kafka-bootstrap servicePort: 9092 - path: backend: serviceName: kafka-0-internal servicePort: 9092 - path: backend: serviceName: kafka-1-internal servicePort: 9092 - path: backend: serviceName: kafka-0-internal servicePort: 9092
次のコマンドを実行して、上記のように設定された Ingress リソースを作成します。
kubectl apply -f ingress-service.yaml -n <namespace>
Kafka 用にステップ #1 で指定したホスト名と、Ingress コントローラーの外部ロードバランサー IP アドレスを使用して、DNS レコードを作成します。
外部 IP を取得するには、以下のコマンドを使用します。
kubectl get services -n <namespace>
静的ホストベースルーティングを使用した Kafka への外部アクセス¶
Kubernetes Ingress のサポート対象が HTTP ベースのサービスのみであるのに対し、 Kafka は TCP ベースです。ただし、エコシステムには Ingress コントローラーの実装があり、たとえば NGINX Ingress Controller では、Kafka のような非 HTTP ベースのサービスがサポートされています。Kafka への HTTP 経由の外部アクセスを実現するには、そうした Ingress コントローラー の 1 つを使用します。
RBAC 対応の Kafka を公開する方法については、静的ホストベースルーティングと RBAC を参照してください。
Kafka への外部アクセスに静的ホストベースルーティングを使用するよう構成するには、以下の手順に従います。
モードが TLS でアクセスの種類が
staticForHostBasedRouting
の Kafka を構成してデプロイします。kafka: staticForHostBasedRouting: enabled: true ----- [1] domain: ----- [2] bootstrapPrefix: ----- [3] brokerPrefix: ----- [4] port: ----- [5] tls: enabled: true ----- [6]
[1]
enabled: true
と設定して、静的ホストベースルーティングを有効にします。[2](必須)クラスターの
domain
名を指定します。[3](省略可能)
bootstrapPrefix
を使用して、デフォルトの Kafka ブートストラッププレフィックスを変更します。デフォルトのブートストラッププレフィックスは Kafka コンポーネント名(kafka
)です。この値は、使用ドメインの DNS エントリに使用されます。ブートストラップ DNS 名は
<bootstrapPrefix>.<domain>
となります。設定されていない場合、デフォルトの Kafka ブートストラップ DNS 名は
kafka.<domain>
です。ネットワーク計画の一環として、複数の Kafka クラスターを実行する場合に DNS の競合を回避できるよう、デフォルトプレフィックスをコンポーネントごとに変えることを検討してください。
[4](省略可能)
brokerPrefix:
を使用して、デフォルトの Kafka ブローカープレフィックスを変更します。デフォルトの Kafka ブローカープレフィックスは
b
です。これらは、使用ドメインの DNS エントリに使用されます。ブローカー DNS 名は
<brokerPrefix>0.<domain>
、<brokerPrefix>1.<domain>
、などとなります。設定されていない場合、デフォルトのブローカー DNS 名は
b0.<domain>
、b1.<domain>
、などとなります。[5]
port
は必須です。デフォルトの TLS ポートである 443 を指定できます。[6]
enabled: true
と設定して、Kafka での TLS 暗号化の使用を構成します。Operator では、TLS パススルー機能が使用されることから、静的ホストベースルーティングを使用して Kafka を公開する場合は TLS 暗号化を有効にする必要があります。
種類が ClusterIP であるブートストラップサービスを作成します。
以下に例を示します。
YAML ファイル
bootstrap.yaml
を以下の内容で作成します。apiVersion: v1 kind: Service metadata: name: kafka-bootstrap namespace: operator labels: app: kafka-bootstrap spec: ports: - name: external port: 9092 protocol: TCP targetPort: 9092 selector: physicalstatefulcluster.core.confluent.cloud/name: kafka physicalstatefulcluster.core.confluent.cloud/version: v1 type: ClusterIP
次のコマンドを実行して、上記のように設定されたブートストラップサービスを作成します。
kubectl apply -f bootstrap.yaml -n <namespace>
ingress-nginx などの Ingress コントローラーをデプロイします。利用可能なコントローラーのリストについては、「Ingress コントローラー」を参照してください。
使用する Ingress コントローラーには、構成された HTTPS ポート(デフォルト: 443)ですべてのトラフィックをインターセプトして Kafka TCP プロキシに渡す SSL パススルーのサポートが必要です。
たとえば、以下の Helm コマンドでは、SSL パススルーが有効の NGINX Ingress コントローラーがインストールされます。
helm upgrade --install <release name> stable/nginx-ingress \ --set rbac.create=true \ --set controller.publishService.enabled=true \ --set controller.extraArgs.enable-ssl-passthrough="true"
インバウンドトラフィックの Kafka へのルーティングで Ingress のコントロール機能によって使用されるルールのコレクションが含まれた Ingress リソース を作成します。
hosts
は、前述の DNS セットアップで設定した、Ingress ロードバランサーの外部 IP に解決されることが必要です。NGINX コントローラーのデプロイと Ingress リソースの構成の詳細については、この チュートリアル を参照してください。
以下に例を示します。
NGINX Ingress コントローラーによって Kafka ブートストラップサーバーと 3 個のブローカーが公開されるよう、Ingress リソース用の
ingress-resource.yaml
を以下の内容で作成します。ドメイン名はmydevplatform.gcp.cloud
です。ブートストラップの prefix/brokerPrefix/port はデフォルト値に設定されます。ルールは、指定された Kafka ブローカーとブートストラップサーバーホストに適用されます。apiVersion: extensions/v1beta1 kind: Ingress metadata: name: <ingress service name> annotations: nginx.ingress.kubernetes.io/ssl-passthrough: "true" ---[1] nginx.ingress.kubernetes.io/ssl-redirect: "false" ---[2] nginx.ingress.kubernetes.io/backend-protocol: HTTPS ---[3] ingress.kubernetes.io/ssl-passthrough: "true" ---[4] kubernetes.io/ingress.class: nginx ---[5] spec: tls: - hosts: - b0.mydevplatform.gcp.cloud - b1.mydevplatform.gcp.cloud - b2.mydevplatform.gcp.cloud - kafka.mydevplatform.gcp.cloud rules: - host: kafka.mydevplatform.gcp.cloud http: paths: - backend: serviceName: kafka-bootstrap servicePort: 9092 - host: b0.mydevplatform.gcp.cloud http: paths: - backend: serviceName: kafka-0-internal servicePort: 9092 - host: b1.mydevplatform.gcp.cloud http: paths: - backend: serviceName: kafka-1-internal servicePort: 9092 - host: b2.mydevplatform.gcp.cloud http: paths: - backend: serviceName: kafka-2-internal servicePort: 9092
- アノテーション [1] ではコントローラーに対して、TLS 接続を直接バックエンドへ送信し、NGINX での通信の復号は行わないよう指示します。
- アノテーション [2] では、デフォルト値を無効にします。
- アノテーション [3] には、NGINX によるバックエンドサービスとの通信方法を指定します。
- アノテーション [4] の
ssl-passthrough
は必須です。 - アノテーション [5] では、NGINX コントローラーの使用を指定しています。
次のコマンドを実行して、上記のように設定された Ingress リソースを作成します。
kubectl apply -f Ingress-resource.yaml -n <namespace>
Kafka のブートストラップとブローカーの DNS アドレスが Ingress コントローラーを指すように構成します。Kafka DNS エントリを取得するには以下が必要です。
ステップ #1 の構成ファイル(
$VALUES_FILE
)で指定したdomain
([2])名。Ingress コントローラーのロードバランサーの外部 IP。
外部 IP を取得するには、以下のコマンドを使用します。
kubectl get services -n <namespace>
Kafka のブートストラッププレフィックスとブローカープレフィックス
追加する DNS テーブルエントリを以下の出力例に示します。使用パラメーターは次のとおりです。
- ドメイン :
mydevplatform.gcp.cloud
- デフォルトのプレフィックス/レプリカ番号を使用するブローカーレプリカ 3 個 :
b0
、b1
、b2
- Kafka ブートストラッププレフィックス :
kafka
DNS name ExternalIP b0.mydevplatform.gcp.cloud 34.71.198.214 b1.mydevplatform.gcp.cloud 34.71.198.214 b2.mydevplatform.gcp.cloud 34.71.198.214 kafka.mydevplatform.gcp.cloud 34.71.198.214
Kafka への内部アクセス¶
Operator によって Kubernetes クラスター内にデプロイされる Confluent Platform コンポーネントと、Kubernetes クラスター内のユーザークライアントアプリケーションは、Kafka の内部リスナー経由での Kafka との接続に以下のアドレスを使用します。
Kafka クラスターがこのクライアントやコンポーネントと同じ名前空間にデプロイされる場合:
<kafka-component-name>:9071
Kafka クラスターがこのクライアントやコンポーネントとは別の名前空間にデプロイされる場合:
<kafka-component-name>.<kafka-namespace>.svc.cluster.local:9071
<kafka-component-name>
は、構成ファイル( $VALUES_FILE
)の kafka
セクションの name:
に設定される値です。
ロードバランサーを使用した Kafka への内部アクセス¶
Kubernetes がプライベートサブネット内または VPC ピアリングされたクラスターの範囲内で実行されている場合は、Kafka や他の Confluent Platform コンポーネントの間で VPC 対 VPC のピアリング接続に使用される内部ロードバランサーを構成します。
内部ロードバランサーを作成するには、構成ファイル( $VALUES_FILE
)の Kafka セクションに以下のエントリを追加します。
kafka:
loadBalancer:
enabled: true ----- [1]
type: internal ----- [2]
domain: ----- [3]
- [1]
enabled: true
と設定して、ロードバランサーを有効にします。 - [2]
type: internal
と設定して、内部ロードバランサーを作成します。 - [3]
domain
を、クラスターのドメイン名に設定します。
内部ロードバランサーにより、VPC ピアリング 用のコンポーネントサポートが追加されます。
インストーラーにより、以下のプロバイダー固有アノテーションが自動的に構成され、該当プロバイダー用の内部ロードバランサーが作成されます。
Azure
kafka: loadBalancer: annotations: service.beta.kubernetes.io/azure-load-balancer-internal: "true"
AWS
kafka: loadBalancer: annotations: service.beta.kubernetes.io/aws-load-balancer-internal: "0.0.0.0/0"
GCP
kafka: loadBalancer: annotations: cloud.google.com/load-balancer-type: "Internal"
他の Confluent Platform コンポーネントによる Kafka へのアクセス¶
Confluent Platform コンポーネントによる Kafka との通信を実現するには、構成ファイル( $VALUES_FILE
)のコンポーネントセクションの bootstrapEndpoint:
を使用して、以下の例のように Kafka ブローカーエンドポイントを設定します。Kafka の dependencies セクションは、Confluent Control Center の場合は c3KafkaCluster
に、他のコンポーネントの場合は kafka
にあります。
<component>
dependencies:
kafka:
bootstrapEndpoint:
コンポーネントと Operator でデプロイされた Kafka との通信が Kafka の内部リスナー経由の場合:
Kafka クラスターがこのコンポーネントと同じ名前空間にデプロイされる場合:
<kafka-component-name>:9071
Kafka クラスターがこのコンポーネントとは別の名前空間にデプロイされる場合:
<kafka-component-name>.<kafka-namespace>.svc.cluster.local:9071
<kafka-component-name>
は、構成ファイル($VALUES_FILE
)のkafka
セクションのname
に設定される値です。
他の Confluent Platform コンポーネントへのアクセスの構成¶
Confluent Operator では、外部クライアントと他の Confluent Platform コンポーネントとの接続用にロードバランサーがサポートされています。
注釈
Kafka ブローカーと ZooKeeper との間には常時接続が維持されますが、ロードバランサーが ZooKeeper 用に使用されると切断のおそれが生じます。ロードバランサーを ZooKeeper 用には構成しないでください。
ロードバランサーが有効の場合、Confluent Platform コンポーネントへのアクセスには、以下のエンドポイントのいずれかを使用します。
- TLS が有効の場合 :
https://<component>.<domain>
- TLS が無効の場合 :
http://<component>.<domain>
たとえば、TLS が有効の mydevplatform.gcp.cloud
ドメインでは、Confluent Platform コンポーネントへのアクセスに以下のエンドポイントを使用します。
https://connectors.mydevplatform.gcp.cloud
https://replicator.mydevplatform.gcp.cloud
https://schemaregistry.mydevplatform.gcp.cloud
https://ksql.mydevplatform.gcp.cloud
https://controlcenter.mydevplatform.gcp.cloud
外部ロードバランサーは、グローバル構成ファイル( $VALUES_FILE
)に構成されます。コンポーネントの loadBalancer:
セクションで、外部ロードバランサーを構成するために以下のパラメーターを使用します。
<component>:
loadBalancer:
enabled: true ----- [1]
type: ----- [2]
domain: ----- [3]
prefix: ----- [4]
[1]
enabled: true
と設定して、ロードバランサーを有効にします。[2] 外部ロードバランサーについて、
type: external
と設定するか、設定を省略します。ロードバランサーは、デフォルトで外部ロードバランサーです。[3]
domain
を、クラスターのドメイン名に設定します。[4]
prefix
を設定して、デフォルトのロードバランサープレフィックスを変更します。デフォルトはコンポーネント名です。prefix: <myprefix>
のように指定すると、コンポーネント DNS 名は<myprefix>.<domain>
のように構成されます。prefix
の設定がない場合、コンポーネント DNS 名は<component-name>.<domain>
のように構成されます。
コンポーネント DNS エントリの追加¶
外部ロードバランサーが作成されたら、コンポーネントロードバランサーに関連付けられた DNS エントリを DNS テーブル(または、プロバイダー環境によって認識された DNS エントリの取得に使用する手段)に追加します。
重要
複数のクラスターを単一のドメインに追加する場合に DNS の競合を回避できるよう、デフォルトプレフィックスをコンポーネントごとに変えてください。
DNS エントリの追加に必要な情報を取得するには、まず名前空間内のコンポーネントで使用するサービスをリストします。
kubectl get services -n <namespace>
ロードバランサーを追加した各 Confluent Platform コンポーネントに DNS エントリを追加します。フォーマットは以下のとおりです。
- DNS 名
- コンポーネント名。例:
controlcenter
、connector
、replicator
、schemaregistry
、ksql
。 - 外部 IP
kubectl get services -n <namespace>
の出力から得られるコンポーネントのブートストラップロードバランサーの "外部 IP"。- 内部 IP
kubectl get services -n <namespace>
の出力から得られるコンポーネントのブートストラップロードバランサーの Cluster IP。
非 DNS 開発アクセス¶
所属組織の DNS プロセスを経由しない直接外部アクセスが必要な場合は、ローカルワークステーションの hosts ファイルを編集します。Linux ディストリビューションの場合、通常このファイルは /etc/hosts
にあります。
重要
非 DNS アクセスは開発やテスト専用であり、本稼働環境では使用しないでください。
hosts ファイルを構成すると、Kafka ブートストラップサーバーにローカルワークステーションからアクセスできるようになります。
ロードバランサーアクセス¶
以下のコマンドを実行して、ロードバランサーの外部 IP を取得します。
kubectl get services -n <namespace>
出力例:
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) controlcenter ClusterIP None <none> 9021/TCP,7203/TCP,7777/TCP controlcenter-0-internal ClusterIP 10.77.12.166 <none> 9021/TCP,7203/TCP,7777/TCP controlcenter-bootstrap-lb LoadBalancer 10.77.11.31 35.224.122.181 80:31358/TCP kafka ClusterIP None <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP kafka-0-internal ClusterIP 10.77.8.20 <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP kafka-0-lb LoadBalancer 10.77.3.227 34.72.187.157 9092:31977/TCP kafka-1-internal ClusterIP 10.77.4.190 <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP kafka-1-lb LoadBalancer 10.77.15.119 34.72.76.66 9092:30244/TCP kafka-2-internal ClusterIP 10.77.4.234 <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP kafka-2-lb LoadBalancer 10.77.14.14 35.239.218.11 9092:30695/TCP kafka-bootstrap-lb LoadBalancer 10.77.0.47 35.232.216.94 9092:30601/TC
ロードバランサー IP を
/etc/hosts
ファイルに追加します。Kafka ロードバランサーの
/etc/hosts
ファイルのエントリは、上記の出力例を使用すると以下のようになります。34.72.187.157 b0.mydevplatform.gcp.cloud b0 34.72.76.66 b1.mydevplatform.gcp.cloud b1 35.239.218.11 b2.mydevplatform.gcp.cloud b2 35.232.216.94 kafka.mydevplatform.gcp.cloud kafka 35.224.122.181 controlcenter.mydevplatform.gcp.cloud
NodePort アクセス¶
以下を使用して、DNS エントリを /etc/hosts
ファイルに追加します。
- Kafka ノード IP のいずれか
- 構成ファイル(
$VALUES_FILE
)のkafka.nodePort.host
に指定したホスト名。kafka.nodePort.host
の詳細については、「NodePort サービスを使用した、Kafka への外部アクセス」を参照してください。
以下に例を示します。
34.68.103.48 kafka-nodeport.mydevplatform.gcp.cloud
ホストベースルーティングを使用した静的アクセス¶
Ingress コントローラーのロードバランサーの外部 IP を使用して、Kafka の IP を /etc/hosts
ファイルに追加します。
後述のエントリ例では、以下を使用しています。
- デフォルトのブートストラッププレフィックス:
kafka
- デフォルトのブローカープレフィックス :
b
- ドメイン名 :
mydevplatform.gcp.cloud
- Ingress コントローラーのロードバランサーの外部 IP:
34.68.103.48
Kafka ロードバランサーの /etc/hosts
ファイルエントリは以下のようになります。
34.68.103.48 b0.mydevplatform.gcp.cloud b0
34.68.103.48 b1.mydevplatform.gcp.cloud b1
34.68.103.48 b2.mydevplatform.gcp.cloud b2
34.68.103.48 kafka.mydevplatform.gcp.cloud kafka
ポートベースルーティングを使用した静的アクセス¶
kafka.staticForPortBasedRouting.host
に指定したホスト名と Ingress コントローラーの外部 IP を /etc/hosts
ファイルに追加します。
以下に例を示します。
34.68.103.48 kafka-ingress.mydevplatform.gcp.cloud