Google Kubernetes Engine 上の Confluent Platform

概要

このデモでは、Confluent for Kubernetes と、Kafka Connect Datagen を介して提供される模擬データ生成を利用して、 Google Kubernetes Engine (GKE)での Confluent Platform のデプロイを説明します。

注釈

Confluent Cloud をターゲットとする本稼働環境に類似した環境で Kubernetes を使用するサンプルについては、「Kubernetes と GitOps を使用する Apache Kafka® 用 DevOps」プロジェクトを参照してください。

このデモの主なコンポーネントは、次のとおりです。

  • GKE 上で実行されている Kubernetes クラスター。
  • 次の Confluent Platform コンポーネントの管理に使用する Confluent for Kubernetes
    • 単一ノードの ZooKeeper
    • 単一ノードの Kafka
    • 単一ノードの Schema Registry
    • Confluent Control Center
    • 単一ノードの Kafka Connect
    • 模擬データを生成するための kafka-connect-datagen のインスタンス 1 つ
Operator

前提条件

デモを正常に実行するために、次のアプリケーションまたはライブラリが、システムパスにインストールされ、使用できる状態である必要があります。

アプリケーション テスト済みバージョン 情報
kubectl 1.18.0 https://kubernetes.io/ja/docs/tasks/tools/install-kubectl/
helm 3.1.2 https://github.com/helm/helm/releases/tag/v3.1.2
gcloud GCP sdk core 286.0.0 2020.03.24 https://cloud.google.com/sdk/install

サンプルの実行

警告

デモでは、実際の GCP SDK を使用して実際のリソースを起動します。想定外の課金を避けるために、デモを起動する前に、慎重にリソースのコストを見積もってください。また、デモの評価が終了したら、すべてのリソースを必ず破棄してください。必要に応じて、CFK の「サイズ設定の推奨事項」および「 変数のリファレンス」セクションで、Kubernetes での Confluent Platform の実行に必要なリソースについて調べてください。

セットアップ

confluentinc/examples GitHub リポジトリのクローンを作成し、ディレクトリを kubernetes/gke-base に変更します。

git clone https://github.com/confluentinc/examples.git
cd examples/kubernetes/gke-base

このデモには、Kubernetes クラスターと、これを管理するために適切に構成された kubectl コンテキストが必要です。

この「セットアップ」セクションの残りの手順に従うと、Google Kubernetes Engine(GKE)で Kubernetes クラスターをビルドできます。デモに使用するクラスターが既にある場合は、このページの「検証」セクションへとスキップできます。

新しいクラスターの作成先となる Google Cloud Platform(GCP)プロジェクトを確認するには、以下を実行し、目的の GCP プロジェクト ID であることを確認します。

gcloud config list --format 'value(core.project)'

注釈

クラスターの作成に関する具体的な情報(サイズ、リージョン、ゾーンなど)については、このページの「変数のリファレンス」セクションを参照してください。これらの変数を使用して、デモのクラスター作成機能のデフォルト動作を変更することもできます。

クラスターを作成するには、次のコマンドを実行します(推定実行時間は 4 分)。

make gke-create-cluster

gcloud でクラスターが正常に作成されていることを確認します。:

...
Created [https://container.googleapis.com/v1/projects/<project-id>/zones/us-central1-a/clusters/cp-examples-operator-<username>].
To inspect the contents of your cluster, go to: <link>
kubeconfig entry generated for cp-examples-operator-<username>.
NAME                            LOCATION  MASTER_VERSION  MASTER_IP     MACHINE_TYPE  NODE_VERSION   NUM_NODES  STATUS
cp-examples-operator-<username> <zone>    1.12.8-gke.10   <ip-address>  n1-highmem-2  1.12.8-gke.10  3          RUNNING
✔  ++++++++++ GKE Cluster Created

検証

デモでは、kubectl を使用してクラスターを制御します。ローカルの kubectl が、意図したとおりに構成されていることを確認するには、以下を実行します。

kubectl config current-context

コンテキストには、適切なリージョンとクラスター名が含まれている必要があります。デモの gke-create-cluster 関数を使用してクラスターを作成した場合、コンテキスト名の形式は gke_<google-project-id>_<region>_<cp-examples-operator>-<username> になります。

実行

Confluent Platform をデプロイするために、以下を実行します(推定実行時間は 7 分)。

make demo

最後に表示される出力メッセージは、次のようになります。

✔ GKE Base Demo running

検証

Kubernetes デプロイの検証

次のコマンドを実行すると、デプロイしたコンポーネントを表示できます。

kubectl -n operator get all

デフォルトのサンプル変数値を使用した場合は、kubectl により、次のように報告されます。

NAME                                        READY   STATUS      RESTARTS   AGE
pod/cc-operator-76c54d65cd-28czd            1/1     Running     0          11m
pod/clicks-datagen-connector-deploy-2vd8q   0/1     Completed   0          8m6s
pod/connectors-0                            1/1     Running     0          9m36s
pod/controlcenter-0                         1/1     Running     0          8m4s
pod/client-console                          1/1     Running     0          10m
pod/kafka-0                                 1/1     Running     0          10m
pod/schemaregistry-0                        1/1     Running     0          9m59s
pod/zookeeper-0                             1/1     Running     0          11m

NAME                                TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)                                        AGE
service/connectors                  ClusterIP   None          <none>        8083/TCP,7203/TCP,7777/TCP                     9m36s
service/connectors-0-internal       ClusterIP   10.0.8.147    <none>        8083/TCP,7203/TCP,7777/TCP                     9m36s
service/controlcenter               ClusterIP   None          <none>        9021/TCP,7203/TCP,7777/TCP                     8m5s
service/controlcenter-0-internal    ClusterIP   10.0.14.242   <none>        9021/TCP,7203/TCP,7777/TCP                     8m5s
service/kafka                       ClusterIP   None          <none>        9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP   10m
service/kafka-0-internal            ClusterIP   10.0.14.239   <none>        9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP   10m
service/schemaregistry              ClusterIP   None          <none>        8081/TCP,7203/TCP,7777/TCP                     10m
service/schemaregistry-0-internal   ClusterIP   10.0.6.93     <none>        8081/TCP,7203/TCP,7777/TCP                     10m
service/zookeeper                   ClusterIP   None          <none>        3888/TCP,2888/TCP,2181/TCP,7203/TCP,7777/TCP   11m
service/zookeeper-0-internal        ClusterIP   10.0.8.51     <none>        3888/TCP,2888/TCP,2181/TCP,7203/TCP,7777/TCP   11m

NAME                          DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/cc-operator   1         1         1            1           11m

NAME                                     DESIRED   CURRENT   READY   AGE
replicaset.apps/cc-operator-76c54d65cd   1         1         1       11m

NAME                              DESIRED   CURRENT   AGE
statefulset.apps/connectors       1         1         9m36s
statefulset.apps/controlcenter    1         1         8m4s
statefulset.apps/kafka            1         1         10m
statefulset.apps/schemaregistry   1         1         9m59s
statefulset.apps/zookeeper        1         1         11m

NAME                                        COMPLETIONS   DURATION   AGE
job.batch/clicks-datagen-connector-deploy   1/1           4s         8m6s

NAME                                               AGE
zookeepercluster.cluster.confluent.com/zookeeper   11m

NAME                                       AGE
kafkacluster.cluster.confluent.com/kafka   10m

CLI での Confluent Platform の検証

デフォルトでは、Kubernetes Ingress なしでサンプルがデプロイされます。つまり、外部クライアントからは、Kubernetes クラスター内の Confluent Platform リソースにアクセスできません。Ingress が有効になっている既存のクラスターを使用した場合は、以降の手順が、ご使用のセットアップに適用されない可能性があります。

サンプルでは、client-console ポッドをデプロイします。これを使用して、Confluent Platform サービスへのネットワーク接続があるクラスター内のターミナルを開くことができます。以下に例を示します。

kubectl -n operator exec -it client-console -- bash

ここで、標準 Kafka コマンドを実行してクラスターを検証できます。コマンドには、client-console ポッドのマッピング済みファイルに含まれている、必須の接続性とセキュリティの構成を指定する必要があります。詳細については、クライアントの構成 の「重要ポイント」を参照してください。

kafka-topics --bootstrap-server kafka:9071 --command-config /etc/kafka-client-properties/kafka-client.properties --list

コンソールコンシューマーで、模擬クリックデータジェネレーターの出力を表示できます。

kafka-console-consumer --bootstrap-server kafka:9071 --consumer.config /etc/kafka-client-properties/kafka-client.properties --topic clicks

サンプルの出力は以下のようになります。

222.152.45.45F-
16141<GET /images/track.png HTTP/1.1204006-Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36
122.173.165.203L-
16151FGET /site/user_status.html HTTP/1.1401289-Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)
...

Confluent Platform Control Center の検証

Confluent Control Center を表示するには、ローカルマシンと、Confluent Control Center サービスを実行している Kubernetes ポッドの間に、有効なネットワーク接続が必要になります。既存のクラスターを使用した場合は、外部クラスターアクセスが既に構成されている可能性があります。そうでない場合は、次の kubectl コマンドを使用して、ローカルホストと Confluent Control Center の間の転送されたポート接続を開くことができます。

kubectl -n operator port-forward controlcenter-0 12345:9021

次に、Web ブラウザーを開いて http://localhost:12345 にアクセスします。ここには、Confluent Control Center と稼働中の Kafka クラスター、Schema Registry、Kafka Connect と実行中の clicks コネクターが表示されます。

c3

ハイライト

サービスの構成

Confluent Platform Helm チャートは、ほとんどのデプロイに有効なベース構成を提供します。ユーザーが行う必要があるのは、自分の環境に特有の構成の最終調整のみです。このサンプルでは、values.yaml ファイルで、デフォルト以外の構成が指定されています。YAML ファイルを使用すると、宣言型インフラストラクチャアプローチが容易になりますが、1 つの場所でのデフォルト以外の構成の表示、新しい環境のブートストラップ、または一般的な共有を行う場合にも便利です。

以下に示すのは、Helm チャートを使用した Kafka サーバープロパティ( configOverrides )の構成方法を示す、values.yaml ファイルのサンプルセクションです。このサンプルは、YAML ファイル自体の内部での再利用を促す YAML アンカー( << : *cpImage )も示しています。詳細については、values.yaml を参照してください。

kafka:
  <<: *cpImage
  resources:
    cpu: 200m
    memory: 1Gi
  loadBalancer:
    enabled: false
  tls:
    enabled: false
  metricReporter:
    enabled: true
  configOverrides:
    server:
    - "auto.create.topics.enabled=true"

残りの構成詳細は、個々の helm コマンドで指定します。以下のサンプルは、helm upgrade コマンドの --set 引数により、Zookeeper デプロイを実際に有効にする設定を示しています。すべてのコマンドについては、Makefile を参照してください。

helm upgrade --install --namespace operator --set zookeeper.enabled=true ...

クライアントの構成

警告

Confluent Platform Helm チャートのデフォルトセキュリティデプロイでは、SASL/PLAIN セキュリティを使用します。これは、デモを行う場合は便利ですが、本稼働環境には、より厳格なセキュリティを使用する必要があります。詳細については、「セキュリティの構成」を参照してください。

Confluent Platform Helm チャート を使用すると、 Kafka は Plaintext SASL セキュリティが有効な状態でデプロイされます。クライアントが認証するには、SASL 資格情報を含む構成値が必要になります。Kubernetes API は、Secrets および ConfigMap タイプをサポートしています。これらのタイプを使用すると、ポッド上のアプリケーションが使用できるファイルに構成値をプッシュできます。このデモでは、このメカニズムを使用して、必要なクライアントプロパティファイルが事前構成された client-console ポッドを起動します。ポッド上のプロパティファイルは、一元的に保存されたシークレットのマッピング済みバージョンです。

以下で、そのしくみを説明します。

SASL シークレットなどの構成ファイルの値は、次に示すように、Kubernetes オブジェクトファイルで定義されます。kafka-client.properties の後の内容はすべて、典型的な Java プロパティファイルと同様であることに注意してください。

apiVersion: v1
kind: Secret
metadata:
  name: kafka-client.properties
type: Opaque
stringData:
  kafka-client.properties: |-
    bootstrap.servers=kafka:9071
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
    sasl.mechanism=PLAIN
    security.protocol=SASL_PLAINTEXT

デモでは、kubectl apply コマンドで、このオブジェクトをクラスターに適用します。

kubectl --context <k8s-context> -n operator apply -f <path-to-examples-repo>kubernetes/common/cfg/kafka-client-secrets.yaml

client-console は、このシークレットオブジェクトをボリュームとしてポッドにマッピングした状態でデプロイされます。

apiVersion: v1
kind: Pod
metadata:
  namespace: operator
  name: client-console
spec:
  containers:
  - name: client-console
    image: docker.io/confluentinc/cp-server-operator:5.3.0.0
    command: [sleep, "86400"]
    volumeMounts:
    - name: kafka-client-properties
      mountPath: /etc/kafka-client-properties/
  volumes:
  - name: kafka-client-properties
    secret:
      secretName: kafka-client.properties

その結果、kafka-client.properties という名前のシークレットオブジェクトが、ポッド上のファイルの場所 /etc/kafka-client-properties/kafka-client.properties に配置されます。

kubectl -n operator exec -it client-console bash

root@client-console:/opt# cat /etc/kafka-client-properties/kafka-client.properties
bootstrap.servers=kafka:9071
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT

コネクターのデプロイ

Kafka Connect は、コネクターのデプロイ用に JSON オブジェクトを受け入れる REST エンドポイント を使用します。このデモでは、Kubernetes の ConfigMap オブジェクト、オーバーライドされたコマンドによる標準 Docker イメージ、Kubernetes の バッチジョブ API を使用して Kubernetes クラスター内にコネクターをデプロイするアプローチを示します。

最初に、コネクターの定義が ConfigMap オブジェクト内で定義されます。clicks-datagen-connector.json という名前の後は、完全な JSON オブジェクトになっています。

apiVersion: v1
kind: ConfigMap
metadata:
  name: clicks-datagen-connector
data:
  clicks-datagen-connector.json: '{
    "name":"clicks",
    "config": {
      "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
      "kafka.topic": "clicks",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schemaregistry:8081",
      "value.converter.schemas.enable": "true",
      "quickstart": "clickstream",
      "max.interval": 1000,
      "iterations": -1,
      "tasks.max": "1"
    }
  }'

この ConfigMap は、次のコマンドでクラスターに適用されます。

kubectl --context <k8s-context> -n operator apply -f <path-to-examples-repo>kubernetes/common/cfg/clicks-datagen-connector-configmap.yaml

次に、Kubernetes ジョブオブジェクトが定義されます。このジョブは、curl プログラムがインストールされた Docker イメージを使用して、コネクター構成をデプロイするために curl コマンドに引数を追加します。前述の定義済み ConfigMap がどのようにジョブ仕様にマウントされ、curl コマンドに渡された構成ファイルが、マウントされたファイルのパスとどのように照合されるかに注意してください。

apiVersion: batch/v1
kind: Job
metadata:
  name: clicks-datagen-connector-deploy
spec:
  ttlSecondsAfterFinished: 5
  template:
    spec:
      volumes:
      - name: clicks-datagen-connector
        configMap:
          name: clicks-datagen-connector
      containers:
      - name: clicks-datagen-connector-deploy
        image: cnfldemos/alpine-curl:3.10.2_7.65.1
        args: [
          "-s",
          "-X", "POST",
          "-H", "Content-Type: application/json",
          "--data", "@/etc/config/connector/clicks-datagen-connector.json",
          "http://connectors:8083/connectors"
        ]
        volumeMounts:
          - name: clicks-datagen-connector
            mountPath: /etc/config/connector
      restartPolicy: Never
  backoffLimit: 1

次のコマンドで Kafka Connect システムがデプロイされた後に、ジョブがクラスターに適用されます。

kubectl --context <k8s-context> -n operator apply -f <path-to-examples-repo>kubernetes/common/cfg/clicks-datagen-connector-deploy-job.yaml

ジョブが適用された後に、以下のコマンドを実行して、デプロイされたコネクターを表示します。

kubectl -n operator exec -it client-console bash
root@client-console:/opt# curl http://connectors:8083/connectors;echo;
["clicks"]
オペレーターによるコネクターのデプロイ

任意の Kafka コネクター(または Single Message Transformation(SMT) あるいは コンバーター)を Kubernetes 環境にデプロイできます。Confluent Hub は、事前パッケージ済みですぐにインストールできるコネクター、変換、およびコンバーターのオンラインライブラリです。これを検索して、必要なものを見つけることができます。Kafka Connect の Confluent for Kubernetes イメージ、confluentinc/cp-server-connect-operator には、このようなコネクターがいくつか含まれていますが、デプロイしようとしている特定のコネクターは含まれていない場合があります。したがって、新しいコネクタータイプを Kubernetes 環境にデプロイするには、Connect イメージ用の JAR を入手する必要があります。

推奨される方法は、必要な JAR でベース Connect Docker イメージを拡張するカスタム Docker イメージを作成することです。カスタム Docker イメージでは、依存関係から 1 つのアーティファクトが作成されます。これは自立性と移植性が高いため、エフェメラル(揮発性)ディスクであるにもかかわらず、あらゆるポッドで実行できます。Confluent Hub クライアントを使用してカスタム Docker イメージを作成し、すぐにインストールできるコネクターの特定のセットで、Confluent の Kafka Connect イメージのいずれかを拡張する方法については、このドキュメント を参照してください。例として、模擬イベントを生成する Kafka Connect Datagen コネクター を Confluent Hub からプルし、この Dockerfile を使用して Docker イメージにバンドルする方法をご覧ください。カスタム Docker イメージをビルドしたら、Kubernetes が、このイメージを Docker レジストリからプルしてポッドを作成する必要があります。

注釈

複数のボリュームを使用して、必要な JAR を Connect イメージに配置することは推奨されません。自立性と移植性が低下し、ベースイメージと JAR の間でのバージョンの一致が困難になるためです。

Confluent Hub で用意されている事前パッケージ済みコネクターではなく、カスタムコネクターを使用する高度なユースケースでは、ローカルアーカイブ のカスタムコネクターを含む Docker イメージを作成する場合があります。デモでは、この高度なワークフローを使用します。Kafka Connect Datagen コネクター を使用して模擬イベントを生成します。この Dockerfile が、( Confluent Hub から直接プルされたのではなく)ソースコードからコンパイルされた Kafka Connect Datagen コネクターのローカルアーカイブで Docker イメージをビルドします。このイメージを Docker Hub にパブリッシュしますが、ご使用の環境では、各自の Docker Hub リポジトリにパブリッシュしてください。

オペレーター Helm の値をアップデートして、ポッド用のカスタム Connect Docker イメージをプルする必要があります。これを行うには、connect イメージをオーバーライドして、デモの value.yaml 構成ファイルの Docker Hub にパブリッシュされたものを代わりに使用します。

connect:
image:
  repository: cnfldemos/cp-server-connect-operator-datagen
  tag: 0.3.1-5.4.1.0

終了後の手順

クラスター内の Confluent Platform コンポーネントを強制的に停止するには、以下を実行します(推定実行時間は 4 分)。

make destroy-demo

次のコマンドで、すべてのリソースが削除されたことを確認できます。:

kubectl -n operator get all

サンプルを使用して自分の Kubernetes クラスターを作成した場合は、以下のコマンドで、そのクラスターを破棄します(推定実行時間は 3 分)。

make gke-destroy-cluster

高度な使用方法

変数のリファレンス

以下の表では、さまざまな動作を構成するために使用できる変数を説明しています。変数は、エクスポート するか、次のサンプル構文のいずれかで個々の make コマンドに設定することができます。

VARIABLE=value make <make-target>
make <make-target> VARIABLE=value
変数 説明 デフォルト
GCP_PROJECT_ID GCP プロジェクト ID にマッピングします。サンプルで、新しい GKE クラスターをビルドしたり、kubectl コンテキストを構成したりするために使用します。glcoud の現在のアクティブな構成とは異なるプロジェクト ID を使用する場合は、この値を、サンプルを実行している現在のシェルにエクスポートする必要があります。 コマンド gcloud config list --format 'value(core.project) の出力
GKE_BASE_CLUSTER_ID GKE クラスターを特定します。現在のユーザーの代わりとなり、GCP でプロジェクトが一意になるようにします。 cp-examples-operator-$USER
GKE_BASE_REGION --subnetwork フラグで使用してネットワーキングリージョンを定義します。 us-central1
GKE_BASE_ZONE --zone フラグにマッピングします。 us-central1-a
GKE_BASE_SUBNET --subnetwork フラグで使用してサブネットを定義します。 default
GKE_BASE_CLUSTER_VERSION --cluster-version フラグにマッピングします。 1.12.8-gke.10
GKE_BASE_MACHINE_TYPE --machine-type フラグにマッピングします。 n1-highmem-2
GKE_BASE_IMAGE_TYPE --image-type フラグにマッピングします。変化している場合は、CPU プラットフォームの最小値を確認します。 COS
GKE_BASE_DISK_TYPE --disk-type フラグにマッピングします。 pd-standard
GKE_BASE_DISK_SIZE --disksize フラグにマッピングします。 100
GKE_BASE_NUM_NODES --num-nodes フラグにマッピングします。 3
KUBECTL_CONTEXT サンプル内で kubectl コンテキストを明示的に設定するために使用します。 gke_$(GCP_PROJECT_ID)_$(GKE_BASE_ZONE)_$(GKE_BASE_CLUSTER_ID)