Kafka のトピックの管理

Confluent for Kubernetes (CFK)を使用すると、Kafka のトピックを、Kubernetes における KafkaTopic カスタムリソース(CR)として宣言的に作成、管理することができます。KafkaTopic の CR はそれぞれ 1 つのトピックにマッピングされ、対応する Kafka トピックと同期された状態に保たれます。これによりワークフローに独立性が生まれ、Confluent をデプロイする過程でトピックを作成しておけば、クライアントアプリケーションでは、そのトピックに対してデータを生成したり、またそのトピックからデータを消費したりするだけで済みます。

KafkaTopic は Kafka クラスターとやり取りしながら、Admin REST APIs を通じてトピックの作成、編集、削除を実行します。Admin REST APIs は、KafkaTopic の CR を作成する前に、Kafka Admin REST APIs の説明に従ってセットアップしておく必要があります。

Kafka トピックの作成

Create a topic as a KafkaTopic CR:

kind: KafkaTopic
metadata:
  name:              ----- [1]
  namespace:
spec:
  replicas:
  partitionCount:
  kafkaClusterRef:     --- [2]
  kafkaRestClassRef:   --- [3]
  kafkaRest:
    endpoint:          --- [4]
    kafkaClusterID:    --- [5]
    authentication:
       type:           --- [6]
  • [1] トピック名。トピック名に _ 文字は使用できません。

    metadata.namespec.name の両方を指定した場合、spec.name が使用されます。

  • kafkaClusterRef ([2])、kafkaRestClassRef ([3])、または kafkaRest.endpoint ([4])を使用して、Admin REST APIs を明示的に指定します。

    優先順位の高い順に、[3]、[4]、[2] です。

    前述のどれも設定されていない場合、同じ名前空間の Kafka が自動的に検出されます。

  • [2] Kafka クラスターの名前。

  • [3] KafkaRestClass の CR の名前。

  • [4] Admin REST APIs エンドポイント。詳細については、「Confluent Admin REST API の管理」を参照してください。

  • [5] Kafka クラスターの ID。Confluent Cloud でのトピックの作成時に必須です。

  • [6] Admin REST APIs の認証が必要な場合は、認証タイプを指定します。basicbearermtls がサポートされています。

    kafkaRestClassRef を使用して Admin REST APIs を指定した場合は、kafkaRest で認証を設定する必要はありません。それ以外の場合は、kafkaRest で認証を指定します。

以下のサンプルの CR では、topic-a という名前の Kafka のトピックを confluent 名前空間に作成しています。レプリカ数は 1、パーティション数は 12 です。このトピックは、認証タイプを bearer とし、内部 Kafka クラスターである kafka に作成されます。

kind: KafkaTopic
metadata:
  name: topic-a
  namespace: confluent
spec:
  replicas: 1
  partitionCount: 12
  kafkaClusterRef:
    name: kafka
  kafkaRest:
    authentication:
      type: bearer
      bearer:
        secretRef: rest-credential

利用可能なトピック構成パラメーターのリストについては、「Kafka のトピックの構成」を参照してください。

Kafka トピックのアップデート

トピックをアップデートするには、トピックの CR 構成に変更を加え、kubectl apply コマンドを使用して、その変更を適用します。

トピック名に _ 文字を使用してトピックをアップデートすることはできません。

次の例では、前出の topic-a トピックにクリーンアップポリシーを追加しています。

apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
  name: topic-a
  namespace: operator
spec:
  replicas: 1
  partitionCount: 12
  configs:
    cleanup.policy: "compact"

警告

spec.replicasspec.partitionCount をアップデートすることはできません。

利用可能なトピック構成パラメーターのリストについては、「Kafka のトピックの構成」を参照してください。

Kafka のトピックの削除

トピックを削除するには、kubectl delete コマンドを使用します。

以下に例を示します。

kubectl delete -f <topic-cr-file>