Create Custom Kafka Connect Single Message Transforms for Confluent Platform

If none of the available Single Message Transformations (SMTs) provide the necessary transformation, you can create your own.

An important concept to understand first is that, generally, SMT implementations provide the bulk of the logic in an abstract class. The SMT implementation then provides two concrete subclasses called Key and Value that specify whether to process the Connect record’s key or value. When using a transformation, users specify the fully qualified class names of either the Key or Value class.

Caution

Custom transformations are not currently available for Confluent Cloud connectors.

Tip

See How to Use Single Message Transforms in Kafka Connect. This tutorial provides a deep dive into building your own custom transformation.

The following are the high-level steps necessary to create and use a custom SMT.

  1. Review the different SMT source java files available from the default Kafka Connect transformations. Use one of these as a basis for creating your new custom transformation.

    The following are important methods to note when reviewing the source java files:

    • Search for apply() and see how this method is implemented.

    • Search for configure() and see how this method is implemented.

      Note

      For additional details see Interface Transformation.

  2. Write and compile your source code and unit tests. Sample unit tests for SMTs can be found in the Apache Kafka GitHub project.

  3. Create your JAR file.

  4. Install the JAR file. Copy your custom SMT JAR file (and any non-Kafka JAR files required by the transformation) into a directory that is under one of the directories listed in the plugin.path property in the Connect worker configuration file as shown below:

    plugin.path=/usr/local/share/kafka/plugins
    

    For example, create a directory named my-custom-smt under /usr/local/share/kafka/plugins and copy the JAR files into the my-custom-smt directory.

    Important

    Make sure to do this on all worker nodes. See Installing Connect Plugins for details.

  5. Start up the workers and the connector, and then try out your custom transformation.

    The Connect worker logs each transformation class it finds at the DEBUG level. Enable DEBUG mode and verify that your transformation was found. If not, check the JAR installation and make sure it’s in the correct location.

Note

Custom partitioners

By default, connectors inherit the partitioner used for the Kafka topic. You can create a custom partitioner for a connector. The custom partitioner must be placed in a connector’s /lib folder.

Partitioners can also be placed in a common location that you choose. If using this alternative, add a symlink to this location from each connector’s /lib folder. For example: You place a custom partitioner in the path share/confluent-hub-components/partitioners. You would then add the symlink share/confluent-hub-components/kafka-connect-s3/lib/partitioners -> ../../partitioners.