Manage Kafka Topics

Confluent for Kubernetes (CFK) allows you to declaratively create and manage Kafka topics as KafkaTopic custom resources (CRs) in Kubernetes. Each KafkaTopic CR is mapped to a topic and kept in sync with the corresponding Kafka topic. This allows you to have a separate workflow where you create topics as part of Confluent deployment, and your client applications only need to produce and consume from the topics.

Requirement

KafkaTopic communicates with a Kafka cluster through the Confluent REST Class to create, edit, and delete topics. You need to set up Kafka Admin REST Class as described in Kafka Admin REST Class before you create KafkaTopic CRs.

Create Kafka topic

You can create a topic using a KafkaTopic CR in an on-prem or Confluent Cloud Kafka cluster:

kind: KafkaTopic
metadata:
  name:                --- [1]
  namespace:           --- [2]
spec:
  replicas:
  partitionCount:
  kafkaClusterRef:     --- [3]
  kafkaRestClassRef:   --- [4]
  kafkaRest:
    endpoint:          --- [5]
    kafkaClusterID:    --- [6]
    authentication:
       type:           --- [7]
  • [1] The topic name. The _ character in the topic name is not allowed.

    If both metadata.name and spec.name are specified, spec.name is used.

  • [2] The namespace for the topic.

  • Use kafkaClusterRef ([3]), kafkaRestClassRef ([4]), or kafkaRest.endpoint ([5]) to explicitly specify the Confluent REST Class.

    The order of precedence is [4], [5], and [3].

    If none of the above is set, it performs an auto discovery of the Kafka in the same namespace.

  • [3] Name of the Kafka cluster.

  • [4] Name of the KafkaRestClass CR.

  • [5] Confluent REST Class endpoint. See Manage Confluent Admin REST Class for more information.

  • [6] ID of the Kafka cluster. Required when creating a topic in Confluent Cloud.

  • [7] If authentication is required for the Confluent Admin REST Class, specify the authentication type. basic, bearer, mtls are supported.

    If you specified the Confluent Admin REST Class using kafkaRestClassRef, you do not have to set the authentication in kafkaRest. Otherwise specify the authentication in kafkaRest.

The example CR below creates a Kafka topic, named topic-a, in the confluent namespace with 1 replica and 12 partitions. The topic is created in the internal Kafka cluster, kafka, with the bearer authentication.

kind: KafkaTopic
metadata:
  name: topic-a
  namespace: confluent
spec:
  replicas: 1
  partitionCount: 12
  kafkaClusterRef:
    name: kafka
  kafkaRest:
    authentication:
      type: bearer
      bearer:
        secretRef: rest-credential

For the list of available topics configuration parameters, see Kafka Topics Configurations.

Update Kafka topic

To update a topic, change the topic CR configuration, and apply the changes using the kubectl apply command.

You cannot update topics with the _ character in the topic name.

The following example adds a cleanup policy to the above topic-a topic:

apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
  name: topic-a
  namespace: confluent
spec:
  replicas: 1
  partitionCount: 12
  configs:
    cleanup.policy: "compact"

Warning

spec.replicas and spec.partitionCount cannot be updated.

For the list of available topics configuration parameters, see Kafka Topics Configurations.

Delete Kafka topic

To delete a topic, use the kubectl delete command.

For example:

kubectl delete -f <topic-cr-file>