Manage Kafka Topics
Confluent for Kubernetes (CFK) allows you to declaratively create and manage Kafka topics as KafkaTopic custom resources (CRs) in Kubernetes. Each KafkaTopic CR is mapped to a topic and kept in sync with the corresponding Kafka topic. This allows you to have a separate workflow where you create topics as part of Confluent deployment, and your client applications only need to produce and consume from the topics.
KafkaTopic communicates with a Kafka cluster to create, edit, and delete topics through Admin REST APIs. You need to set up Admin REST APIs as described in Kafka Admin REST API before you create KafkaTopic CRs.
Create Kafka topic
Create a topic as a KafkaTopic CR:
apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
name: ----- [1]
namespace:
spec:
replicas:
partitionCount:
kafkaClusterRef:
name: ----- [2]
namespace: ----- [3]
kafkaRestClassRef:
name: ----- [4]
namespace: ----- [5]
kafkaRest:
endpoint: ----- [6]
authentication:
type: ----- [7]
[1] The topic name. The
_character in the topic name is not allowed.If both
metadata.nameandspec.nameare specified,spec.nameis used.[2] Name of the Kafka cluster.
[3] The namespace where the Kafka cluster is running. It can be omitted if the Kafka cluster is in the same namespace as this KafkaTopic being created.
[4] Name of the KafkaRestClass CR.
[2] [4] If you do not provide
kafkaRestClassRefandkafkaClusterRefin the topic CR, CFK tries to discover the Kafka cluster in the same namespace through thekafkaCR type.[5] The namespace of the KafkaRestClass. It can be omitted if the KafkaRestClass CR is in the same namespace as this KafkaTopic being created.
[6] Kafka endpoint. See Manage Confluent Admin REST API for more information.
[7] If authentication is required for Admin REST APIs, specify the authentication type.
basicandbearerare supported.During discovery, if
authenticationTypeis found in the Kafka cluster, thespec.kafkaRest.authenticationmust match in the topics CR configuration. Otherwise, the topic is not created.
The example CR below creates a Kafka topic, named topic-a, in the
confluent namespace with 1 replica and 12 partitions. The topic is
created in the internal Kafka cluster, kafka, with the bearer
authentication.
apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
name: topic-a ----- [1]
namespace: confluent
spec:
replicas: 1
partitionCount: 12
kafkaClusterRef:
name: kafka
kafkaRest:
endpoint: https://kafka.confluent.svc.cluster.local:8090
authentication:
type: bearer
bearer:
secretRef: rest-credential
For the list of available topics configuration parameters, see Kafka Topics Configurations.
Update Kafka topic
To update a topic, change the topic CR configuration, and apply the changes
using the kubectl apply command.
The following example adds a cleanup policy to the above topic-a topic:
apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
name: topic-a
namespace: operator
spec:
replicas: 1
partitionCount: 12
configs:
cleanup.policy: "compact"
Warning
spec.replicas and spec.partitionCount cannot be updated.
For the list of available topics configuration parameters, see Kafka Topics Configurations.
Delete Kafka topic
To delete a topic, use the kubectl delete command.
For example:
kubectl delete -f <topic-cr-file>