Manage Kafka Topics for Confluent Platform Using Confluent for Kubernetes¶
Confluent for Kubernetes (CFK) allows you to declaratively create and manage Kafka topics as KafkaTopic custom resources (CRs) in Kubernetes. Each KafkaTopic CR is mapped to a topic and kept in sync with the corresponding Kafka topic. This allows you to have a separate workflow where you create topics as part of Confluent deployment, and your client applications only need to produce and consume from the topics.
Note
KafkaTopic CRs stay in sync with the Kafka topics if created and modified with KafkaTopic CRs. If you modify topic configuration outside of the KafkaTopic CR, such as using Confluent Control Center or Confluent CLI, the change is not reconciled, and the KafkaTopic CR and the topic configuration get out of sync.
Requirement¶
KafkaTopic communicates with a Kafka cluster through the Confluent REST Class to create, edit, and delete topics. You need to set up Kafka Admin REST Class as described in Kafka Admin REST Class before you create KafkaTopic CRs.
Create Kafka topic¶
You can create a topic using a KafkaTopic CR in an on-prem or Confluent Cloud Kafka cluster:
kind: KafkaTopic
metadata:
name: --- [1]
namespace: --- [2]
spec:
replicas:
partitionCount:
kafkaClusterRef: --- [3]
kafkaRestClassRef: --- [4]
kafkaRest:
endpoint: --- [5]
kafkaClusterID: --- [6]
authentication:
type: --- [7]
basic: --- [8]
bearer: --- [9]
oauth: --- [10]
configs: --- [11]
[1] The topic name. If both
metadata.name
andspec.name
are specified,spec.name
is used.[2] The namespace for the topic.
Use
kafkaClusterRef
([3]),kafkaRestClassRef
([4]), orkafkaRest.endpoint
([5]) to explicitly specify the Confluent REST Class.The order of precedence is [4], [5], and [3].
If none of the above is set, it performs an auto discovery of the Kafka in the same namespace.
[3] Name of the Kafka cluster.
[4] Name of the KafkaRestClass CR.
[5] Confluent REST Class endpoint. See Manage Confluent Admin REST Class for Confluent Platform Using Confluent for Kubernetes for more information.
[6] ID of the Kafka cluster. Required when creating a topic in Confluent Cloud.
[7] If authentication is required for the Confluent Admin REST Class, specify the authentication type.
basic
,bearer
,mtls
, andoauth
are supported.If you specified the Confluent Admin REST Class using
kafkaRestClassRef
, you do not have to set the authentication inkafkaRest
. Otherwise specify the authentication inkafkaRest
.[8] For information about the basic settings, see Basic authentication.
[9] For information about the bearer settings, see Bearer authentication,
[10] For information about the OAuth settings, see OAuth/OIDC authentication.
[11] Specify additional topic configuration settings in key and value pairs, for example,
cleanup.policy: "compact"
.For the list of available topics configuration parameters, see Kafka Topics Configurations.
Important
After you create a topic, you should NOT change the Kafka cluster used by that topic CR. That can lead to possible data loss.
The example CR below creates a Kafka topic, named topic-a
, in the
confluent
namespace with 1
replica and 12
partitions. The topic is
created in the internal Kafka cluster, kafka
, with the bearer
authentication.
kind: KafkaTopic
metadata:
name: topic-a
namespace: confluent
spec:
replicas: 1
partitionCount: 12
configs:
cleanup.policy: "compact"
kafkaClusterRef:
name: kafka
kafkaRest:
authentication:
type: bearer
bearer:
secretRef: rest-credential
Update Kafka topic¶
To update a topic, change the topic CR configuration, and apply the changes
using the kubectl apply
command.
The following example adds a cleanup policy to the above topic-a
topic:
apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
name: topic-a
namespace: confluent
spec:
replicas: 1
partitionCount: 12
configs:
cleanup.policy: "compact"
Warning
spec.replicas
and spec.partitionCount
cannot be updated.
For the list of available topics configuration parameters, see Kafka Topics Configurations.
Delete Kafka topic¶
To delete a topic, use the kubectl delete
command.
For example:
kubectl delete -f <topic-cr-file>