Kafka Connect ChangeTopicCase SMT Usage Reference for Confluent Cloud
The ChangeTopicCase Single Message Transform (SMT) allows you to automatically modify the case (uppercase, lowercase) of the Kafka topic name before the record is produced or consumed. This SMT converts a topic name, for example, KAFKA_TOPIC to kafkaTopic, or kafka_topic.
Note
The ChangeTopicCase SMT is supported on the JDBC Source, CDC Source, and MongoDB Source connectors.
To apply the ChangeTopicCase SMT, add the following to your connector configuration. Update the placeholders to specify the topic’s current casing (transforms.changeTopicCase.from) and the required output casing (transforms.changeTopicCase.to).
{
"transforms" : "changeTopicCase",
"transforms.changeTopicCase.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase",
"transforms.changeTopicCase.from" : "<VALID_CASE_NAME>",
"transforms.changeTopicCase.to" : "<TARGET_CASE_TYPE>"
}
Examples
The example below shows how to use ChangeTopicCase SMT to convert LOWER_CAMEL to UPPER_UNDERSCORE .
Before:
{ "topic" : "topicName", "kafkaPartition" : 1, "timestampType" : "NO_TIMESTAMP_TYPE", "offset" : 12345, "headers" : [ ] }
Adding the SMT to your connector configuration:
To apply the
ChangeTopicCaseSMT, add the following to your connector configuration:{ "transforms" : "changeTopicCase", "transforms.changeTopicCase.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase", "transforms.changeTopicCase.from" : "LOWER_CAMEL", "transforms.changeTopicCase.to" : "UPPER_UNDERSCORE" }
After:
After the
ChangeTopicCaseSMT applies, the value transforms as follows:{ "topic" : "TOPIC_NAME", "kafkaPartition" : 1, "timestampType" : "NO_TIMESTAMP_TYPE", "offset" : 12345, "headers" : [ ] }
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| The format of the incoming topic name. | STRING |
| HIGH | |
| The format of the outgoing topic name. | STRING |
| HIGH |
Predicates
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.