Kafka のトピックの管理

Confluent for Kubernetes (CFK)を使用すると、Kafka のトピックを、Kubernetes における KafkaTopic カスタムリソース(CR)として宣言的に作成、管理することができます。KafkaTopic の CR はそれぞれ 1 つのトピックにマッピングされ、対応する Kafka トピックと同期された状態に保たれます。これによりワークフローに独立性が生まれ、Confluent をデプロイする過程でトピックを作成しておけば、クライアントアプリケーションでは、そのトピックに対してデータを生成したり、またそのトピックからデータを消費したりするだけで済みます。

KafkaTopic は Kafka クラスターとやり取りしながら、Admin REST APIs を通じてトピックの作成、編集、削除を実行します。Admin REST APIs は、KafkaTopic の CR を作成する前に、Kafka Admin REST APIs の説明に従ってセットアップしておく必要があります。

Kafka トピックの作成

KafkaTopic の CR としてトピックを作成します。

apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
  name:              ----- [1]
  namespace:
spec:
  replicas:
  partitionCount:
  kafkaClusterRef:
    name:            ----- [2]
    namespace:       ----- [3]
  kafkaRestClassRef:
    name:            ----- [4]
    namespace:       ----- [5]
  kafkaRest:
    endpoint:        ----- [6]
    authentication:
       type:         ----- [7]
  • [1] トピック名。トピック名に _ 文字は使用できません。

    metadata.namespec.name の両方を指定した場合、spec.name が使用されます。

  • [2] Kafka クラスターの名前。

  • [3] Kafka クラスターが実行されている名前空間。Kafka クラスターが、作成中の KafkaTopic と同じ名前空間にある場合は、省略できます。

  • [4] KafkaRestClass の CR の名前。

  • [2] [4] トピックの CR で kafkaRestClassRefkafkaClusterRef が指定されていない場合、CFK が、kafka の CR タイプを通じて同じ名前空間から Kafka クラスターの検出を試みます。

  • [5] KafkaRestClass の名前空間。KafkaRestClass の CR が、作成中の KafkaTopic と同じ名前空間にある場合は、省略できます。

  • [6] Kafka エンドポイント。詳細については、「Confluent Admin REST API の管理」を参照してください。

  • [7] Admin REST APIs の認証が必要な場合、認証タイプを指定します。basicbearer がサポートされます。

    検出中、Kafka クラスターに authenticationType が見つかった場合、トピックの CR 構成にある spec.kafkaRest.authentication が一致する必要があります。それ以外の場合、トピックは作成されません。

以下のサンプル CR では、topic-a という名前の Kafka のトピックを confluent 名前空間に作成しています。レプリカ数は 1、パーティション数は 12 です。このトピックは、認証タイプを bearer とし、内部 Kafka クラスターである kafka に作成されます。

apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
  name: topic-a           ----- [1]
  namespace: confluent
spec:
  replicas: 1
  partitionCount: 12
  kafkaClusterRef:
    name: kafka
  kafkaRest:
    endpoint: https://kafka.confluent.svc.cluster.local:8090
    authentication:
      type: bearer
      bearer:
        secretRef: rest-credential

利用可能なトピック構成パラメーターのリストについては、「Kafka のトピックの構成」を参照してください。

Kafka トピックのアップデート

トピックをアップデートするには、トピックの CR 構成に変更を加え、kubectl apply コマンドを使用して、その変更を適用します。

トピック名に _ 文字を使用してトピックをアップデートすることはできません。

次の例では、前出の topic-a トピックにクリーンアップポリシーを追加しています。

apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
  name: topic-a
  namespace: operator
spec:
  replicas: 1
  partitionCount: 12
  configs:
    cleanup.policy: "compact"

警告

spec.replicasspec.partitionCount をアップデートすることはできません。

利用可能なトピック構成パラメーターのリストについては、「Kafka のトピックの構成」を参照してください。

Kafka のトピックの削除

トピックを削除するには、kubectl delete コマンドを使用します。

以下に例を示します。

kubectl delete -f <topic-cr-file>