.. title:: Guide for Kafka Connector Developers
.. meta::
:description: Learn how to develop a new Kafka connector.
.. _connect_devguide:
Connector Developer Guide
=========================
This guide describes how developers can write new connectors for |kconnect-long| to move data between
|ak-tm| and other systems. It briefly reviews a few key |kconnect-long| concepts and then describes how to
create a simple connector.
.. tip::
* For more details about how to create a connector, see `How to Write a
Connector for Kafka Connect – Deep Dive into Configuration Handling
`__
* `Confluent Hub `__ welcomes new connector
development. To have your connector hosted on |c-hub|, see
:connect-common:`Contributing to Confluent
Hub|confluent-hub/contributing.html`.
Core Concepts and APIs
----------------------
Connectors and tasks
^^^^^^^^^^^^^^^^^^^^
To copy data between |ak| and another system, users instantiate |ak|
:platform:`Connectors|connect/javadocs/javadoc/org/apache/kafka/connect/connector/Connector.html` for the
systems they want to pull data from or push data to. Connectors come in two flavors:
:platform:`SourceConnectors|connect/javadocs/javadoc/org/apache/kafka/connect/source/SourceConnector.html`,
which import data from another system, and :platform:`SinkConnectors|connect/javadocs/javadoc/org/apache/kafka/connect/sink/SinkConnector.html`,
which export data to another system. For example, ``JDBCSourceConnector`` would import a relational
database into |ak|, and ``HDFSSinkConnector`` would export the contents of a |ak| topic to
HDFS files.
Implementations of the ``Connector`` class do not perform data copying themselves: their configuration
describes the set of data to be copied, and the ``Connector`` is responsible for breaking that job into a set of
:platform:`Tasks|connect/javadocs/javadoc/org/apache/kafka/connect/connector/Task.html` that can be
distributed to |kconnect-long| workers. Tasks also come in two corresponding flavors:
:platform:`SourceTask|connect/javadocs/javadoc/org/apache/kafka/connect/source/SourceTask.html` and
:platform:`SinkTask|connect/javadocs/javadoc/org/apache/kafka/connect/sink/SinkTask.html`. Optionally, the implementation
of the ``Connector`` class can monitor the data changes of external systems and request task reconfiguration.
With an assignment of data to be copied in hand, each ``Task`` must copy its subset of the data to or from
|ak|. The data that a connector copies must be represented as a **partitioned stream**, similar
to the model of a |ak| topic, where each partition is an ordered sequence of records with offsets. Each task is
assigned a subset of the partitions to process. Sometimes this mapping is clear:
each file in a set of log files can be considered a partition, each line within a file is a record,
and offsets are simply the position in the file. In other cases it may
require a bit more effort to map to this model: a JDBC connector can map each table to a partition, but
the offset is less clear. One possible mapping uses a timestamp column to generate queries
to incrementally return new data, and the last queried timestamp can be used as the offset.
.. figure:: images/connector-model.png
:align: center
Example of a source *connector* which has created two *tasks*, which copy data from input *partitions* and write
*records* to |ak|.
Partitions and records
^^^^^^^^^^^^^^^^^^^^^^
Each partition is an ordered sequence of key-value records. Both the keys and values can have
complex structures, represented by the data structures in the
:platform:`org.apache.kafka.connect.data|connect/javadocs/javadoc/org/apache/kafka/connect/data/package-summary.html` package.
Many primitive types as well as arrays, structs, and nested data structures are supported.
For most types, standard Java types like ``java.lang.Integer``,
``java.lang.Map``, and ``java.lang.Collection`` can be used directly. For structured records, the
:platform:`Struct|connect/javadocs/javadoc/org/apache/kafka/connect/data/Struct.html` class should be used.
.. figure:: images/data-model-stream-partition-record.png
:align: center
A partitioned stream: the data model that connectors must map all source and sink systems to. Each record contains
keys and values (with schemas), a partition ID, and offsets within that partition.
In order to track the structure and compatibility of records in
partitions, :platform:`Schemas|connect/javadocs/javadoc/org/apache/kafka/connect/data/Schema.html`
may be included with each record. Because schemas are commonly generated on the fly, based on the data source,
a :platform:`SchemaBuilder|connect/javadocs/javadoc/org/apache/kafka/connect/data/SchemaBuilder.html` class is included
which makes constructing schemas very easy.
This runtime data format does not assume any particular serialization format; this conversion is handled by
:platform:`Converter|connect/javadocs/javadoc/org/apache/kafka/connect/storage/Converter.html` implementations, which convert
between ``org.apache.kafka.connect.data`` runtime format and serialized data represented as ``byte[]``.
Connector developers should not have to worry about the details of this conversion.
In addition to the key and value, records have partition IDs and offsets. These are used by the framework to periodically
commit the offsets of data that have been processed. In the event of a failure, processing can
resume from the last committed offsets, avoiding unnecessary reprocessing and duplication of
events.
Dynamic connectors
^^^^^^^^^^^^^^^^^^
Not all connectors have a static set of partitions, so ``Connector`` implementations are also responsible for
monitoring the external system for any changes that might require reconfiguration. For example, in the
``JDBCSourceConnector`` example, the ``Connector`` might assign a set of tables to each ``Task``.
When a new table is created, it must discover this so it can assign the new table to one of the
``Tasks`` by updating its configuration. When it notices a change that requires reconfiguration
(or a change in the number of ``Tasks``), it notifies the framework and the framework updates any
corresponding ``Tasks``.
.. _connect_developing_simple_connector:
Develop a Simple Connector
--------------------------
Developing a connector only requires implementing two interfaces, the ``Connector`` and ``Task``.
A simple example of connectors that read and write lines from and to files is included in the
source code for |kconnect-long| in the ``org.apache.kafka.connect.file`` package. The classes
``SourceConnector``/``SourceTask`` implement a source connector that reads lines from files and
``SinkConnector``/``SinkTask`` implement a sink connector that writes each record to a file.
.. tip::
Refer to the :kafka-file:`example source code|connect/file/src/main/java/org/apache/kafka/connect/file` for full examples. The following sections provide key steps and code snippets only.
Connector example
^^^^^^^^^^^^^^^^^
We'll cover the ``SourceConnector`` as a simple example. ``SinkConnector`` implementations are
very similar. Start by creating the class that inherits from ``SourceConnector`` and add a couple
of fields that will store parsed configuration information (the filename to read from and the
topic to send data to):
.. sourcecode:: java
public class FileStreamSourceConnector extends SourceConnector {
private String filename;
private String topic;
The easiest method to fill in is ``getTaskClass()``, which defines the class that should be
instantiated in worker processes to actually read the data:
.. sourcecode:: java
@Override
public Class extends Task> getTaskClass() {
return FileStreamSourceTask.class;
}
The following example defines the ``FileStreamSourceTask`` class and adds some
standard lifecycle methods, ``start()`` and ``stop()``:
.. sourcecode:: java
@Override
public void start(Map props) {
// The complete version includes error handling as well.
filename = props.get(FILE_CONFIG);
topic = props.get(TOPIC_CONFIG);
}
@Override
public void stop() {
// Nothing to do since no background monitoring is required
}
Finally, the real core of the implementation is in ``taskConfigs()``. In this
case, you only handle a single file, so even though you may be permitted to
generate more tasks as per the ``maxTasks`` argument, a list is returned with
only one entry:
.. sourcecode:: java
@Override
public List