.. _connect_concepts: |kconnect-long| Concepts ======================== |kconnect-long| is a framework to stream data into and out of |ak-tm|. The |cp| ships with several `built-in connectors `_ that can be used to stream data to or from commonly used systems such as relational databases or HDFS. In order to efficiently discuss the inner workings of |kconnect-long|, it is helpful to establish a few major concepts. * :ref:`Connectors ` -- the high level abstraction that coordinates data streaming by managing tasks * :ref:`Tasks ` -- the implementation of how data is copied to or from |ak| * :ref:`Workers ` -- the running processes that execute connectors and tasks * :ref:`Converters ` -- the code used to translate data between |kconnect| and the system sending or receiving data * :ref:`Transforms ` -- simple logic to alter each message produced by or sent to a connector * :ref:`dead-letter-queues` -- how |kconnect| handles connector errors .. _connect_connectors: Connectors ---------- Connectors in |kconnect-long| define where data should be copied to and from. A **connector instance** is a logical job that is responsible for managing the copying of data between |ak| and another system. All of the classes that implement or are used by a connector are defined in a **connector plugin**. Both connector instances and connector plugins may be referred to as "connectors", but it should always be clear from the context which is being referred to (e.g., :ref:`"install a connector" ` refers to the plugin, and "check the status of a connector" refers to a connector instance). We encourage users to leverage `existing connectors `_. However, it is possible to write a new connector plugin from scratch. At a high level, a developer who wishes to write a new connector plugin follows the workflow below. Further information is available in the :ref:`developer guide `. .. figure:: images/connector-model-simple.png :align: center .. _connect_tasks: Tasks ----- Tasks are the main actor in the data model for |kconnect|. Each connector instance coordinates a set of **tasks** that actually copy the data. By allowing the connector to break a single job into many tasks, |kconnect-long| provides built-in support for parallelism and scalable data copying with very little configuration. These tasks have no state stored within them. Task state is stored in |ak| in special topics ``config.storage.topic`` and ``status.storage.topic`` and managed by the associated connector. As such, tasks may be started, stopped, or restarted at any time in order to provide a resilient, scalable data pipeline. .. figure:: images/data-model-simple.png :align: center High level representation of data passing through a |kconnect| source task into |ak|. Note that internal offsets are stored either in |ak| or on disk rather than within the task itself. Task Rebalancing ~~~~~~~~~~~~~~~~ When a connector is first submitted to the cluster, the workers rebalance the full set of connectors in the cluster and their tasks so that each worker has approximately the same amount of work. This same rebalancing procedure is also used when connectors increase or decrease the number of tasks they require, or when a connector's configuration is changed. When a worker fails, tasks are rebalanced across the active workers. When a task fails, no rebalance is triggered as a task failure is considered an exceptional case. As such, failed tasks are not automatically restarted by the framework and should be restarted via the :ref:`REST API `. .. figure:: images/task-failover.png :align: center Task failover example showing how tasks rebalance in the event of a worker failure. .. _connect_workers: Workers ------- Connectors and tasks are logical units of work and must be scheduled to execute in a process. |kconnect-long| calls these processes **workers** and has two types of workers: standalone and distributed. .. _standalone-workers: Standalone Workers ~~~~~~~~~~~~~~~~~~ Standalone mode is the simplest mode, where a single process is responsible for executing all connectors and tasks. Since it is a single process, it requires minimal configuration. Standalone mode is convenient for getting started, during development, and in certain situations where only one process makes sense, such as collecting logs from a host. However, because there is only a single process, it also has more limited functionality: scalability is limited to the single process and there is no fault tolerance beyond any monitoring you add to the single process. .. _distributed-workers: Distributed Workers ~~~~~~~~~~~~~~~~~~~ Distributed mode provides scalability and automatic fault tolerance for |kconnect-long|. In distributed mode, you start many worker processes using the same ``group.id`` and they automatically coordinate to schedule execution of connectors and tasks across all available workers. If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers. Note the similarity to consumer group rebalance. Under the covers, connect workers are using consumer groups to coordinate and rebalance. .. important:: All workers with the same ``group.id`` will be in the same connect cluster. For example, if worker-a has ``group.id=connect-cluster-a`` and worker-b has the same ``group.id``, worker-a and worker-b will form a cluster called ``connect-cluster-a``. .. figure:: images/worker-model-basics.png :align: center A three-node |kconnect-long| distributed mode cluster. Connectors (monitoring the source or sink system for changes that require reconfiguring tasks) and tasks (copying a subset of a connector's data) are automatically balanced across the active workers. The division of work between tasks is shown by the partitions that each task is assigned. .. _connect_converters: Converters ---------- Converters are necessary to have a |kconnect-long| deployment support a particular data format when writing to or reading from |ak|. Tasks use converters to change the format of data from bytes to a |kconnect| internal data format and vice versa. By default, |cp| provides the following converters: .. include:: includes/converter-list.rst Converters are decoupled from connectors themselves to allow for reuse of converters between connectors naturally. For example, using the same Avro converter, the JDBC Source Connector can write Avro data to |ak| and the HDFS Sink Connector can read Avro data from |ak|. This means the same converter can be used even though, for example, the JDBC source returns a ``ResultSet`` that is eventually written to HDFS as a parquet file. The following graphic shows how converters are used when reading from a database using a JDBC Source Connector, writing to |ak|, and finally, writing to HDFS with an HDFS Sink Connector. .. figure:: images/converter-basics.png :width: 800px :alt: How converters are used for a source and sink data transfer For detailed information about converters, see :ref:`connect_configuring_converters`. For more information about how converters and |sr| work, see :ref:`schemaregistry_kafka_connect`. .. tip:: For a deep dive into converters, see: `Converters and Serialization Explained `__. .. _connect_transforms: Transforms ---------- Connectors can be configured with transformations to make simple and lightweight modifications to individual messages. This can be convenient for minor data adjustments and event routing, and multiple transformations can be chained together in the connector configuration. However, more complex transformations and operations that apply to multiple messages are best implemented with :ref:`ksql_home` and :ref:`kafka_streams`. A transform is a simple function that accepts one record as an input and outputs a modified record. All transforms provided by |kconnect-long| perform simple but commonly useful modifications. Note that you can implement the `Transformation `__ interface with your own custom logic, package them as a :ref:`Kafka Connect plugin `, and use them with any connectors. When transforms are used with a source connector, |kconnect-long| passes each source record produced by the connector through the first transformation, which makes its modifications and outputs a new source record. This updated source record is then passed to the next transform in the chain, which generates a new modified source record. This continues for the remaining transforms. The final updated source record is :ref:`converted to the binary form ` and written to |ak|. Transforms can also be used with sink connectors. |kconnect-long| reads message from |ak| and :ref:`converts the binary representation to a sink record `. If there is a transform, |kconnect-long| and passes the record through the first transformation, which makes its modifications and outputs a new, updated sink record. The updated sink record is then passed through the next transform in the chain, which generates a new sink record. This continues for the remaining transforms, and the final updated sink record is then passed to the sink connector for processing. For more information, see :ref:`connect_transforms_supported`. .. include:: includes/transforms-list.rst .. _dead-letter-queues: Dead Letter Queue ----------------- An invalid record may occur for a number of reasons. One example is when a record arrives at the sink connector serialized in JSON format, but the sink connector configuration is expecting Avro format. When an invalid record cannot be processed by a sink connector, the error is handled based on the connector configuration property ``errors.tolerance``. .. note:: A Dead Letter Queue topic is autogenerated for |ccloud| sink connectors. For more information, see `Dead Letter Queue __`. There are two valid values for this configuration property: ``none`` (default) or ``all``. The table below shows the different states a connector may be in while operating and whether or not |kconnect-long| handles the errors. +-------------------+----------------------------------+-----------------------+ | Connector State | Description | Errors Handled by | | | | |kconnect| | +===================+==================================+=======================+ | Starting | Can't connect to the datastore | No | | | at connector startup | | +-------------------+----------------------------------+-----------------------+ | Polling (source | Can't read records from the | No | | connector) | source database | | +-------------------+----------------------------------+-----------------------+ | Converting data | Can't read from or write to a | Yes | | | |ak| topic, or deserialize or | | | | serialize JSON, Avro, etc. | | +-------------------+----------------------------------+-----------------------+ | Transforming data | Can't apply a single message | Yes | | | transform (SMT) | | +-------------------+----------------------------------+-----------------------+ | Putting (sink | Can't write records to the | No | | connector) | target dataset | | +-------------------+----------------------------------+-----------------------+ .. note:: Dead letter queues are only applicable for sink connectors. When ``errors.tolerance`` is set to ``none`` an invalid record causes the connector task to immediately fail and the connector goes into a failed state. To resolve this issue, you would need to review the |kconnect-long| Worker log to find out what caused the failure, correct it, and restart the connector. When ``errors.tolerance`` is set to ``all``, all errors or invalid records are ignored and processing continues. No errors are written to the |kconnect| Worker log. To determine if records are failing you must use :ref:`internal metrics ` or count the number of records at the source and compare that with the number of records processed. An error-handling feature is available that will route all invalid records to a special topic. This topic contains a **dead letter queue** of records that could not be processed by the sink connector. Creating a Dead Letter Queue Topic ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To create a dead letter queue, add the following configuration properties to the sink connector configuration: :: errors.tolerance = all errors.deadletterqueue.topic.name = An example GCS sink connector configuration with dead letter queueing enabled is shown below: .. highlight:: none :: { "name": "gcs-sink-01", "config": { "connector.class": "io.confluent.connect.gcs.GcsSinkConnector", "tasks.max": "1", "topics": "gcs_topic", "gcs.bucket.name": "", "gcs.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.gcs.storage.GcsStorage", "format.class": "io.confluent.connect.gcs.format.avro.AvroFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "schema.compatibility": "NONE", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "errors.tolerance": "all", "errors.deadletterqueue.topic.name": "dlq-gcs-sink-01", } } Even if the dead letter topic contains the records that failed, it does not show why. You can add the following additional configuration property to include failed record header information. :: errors.deadletterqueue.context.headers.enable = true Record headers are added to the dead letter queue when this parameter is set to ``true`` (the default is false). You can then use the :ref:`kafkacat-usage` to view the record header and determine why the record failed. .. note:: To avoid conflicts with the original record header, the dead letter queue context header keys start with ``_connect.errors``. Here is the same example configuration with headers enabled: :: { "name": "gcs-sink-01", "config": { "connector.class": "io.confluent.connect.gcs.GcsSinkConnector", "tasks.max": "1", "topics": "gcs_topic", "gcs.bucket.name": "", "gcs.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.gcs.storage.GcsStorage", "format.class": "io.confluent.connect.gcs.format.avro.AvroFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "schema.compatibility": "NONE", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "errors.tolerance": "all", "errors.deadletterqueue.topic.name": "dlq-gcs-sink-01", "errors.deadletterqueue.context.headers.enable":true } } .. seealso:: See `Kafka Connect Deep Dive – Error Handling and Dead Letter Queues `__ for a deep dive into this topic. Using a Dead Letter Queue with Security ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ When you use |cp| with security enabled, the |cp| :ref:`Admin Client ` creates the dead letter queue topic. Invalid records are first passed to an internal Producer constructed to send these records. Then, the Admin Client creates the dead letter queue topic. For the dead letter queue to work in a secure |cp| environment, additional Admin Client configuration properties (prefixed with ``.admin``) must be added to the |kconnect| Worker configuration. A :ref:`SASL/PLAIN ` example showing the additional |kconnect| Worker configuration properties is provided below: :: admin.ssl.endpoint.identification.algorithm=https admin.sasl.mechanism=PLAIN admin.security.protocol=SASL_SSL admin.request.timeout.ms=20000 admin.retry.backoff.ms=500 admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" \ password=""; .. seealso:: See :ref:`connect-rbac-index` for details about configuring your |kconnect| worker, sink connector, and dead letter queue topic in a Role-Based Access Control (RBAC) environment. Suggested Reading ~~~~~~~~~~~~~~~~~ Blog post: `Kafka Connect Deep Dive – Error Handling and Dead Letter Queues `__