.. meta:: :title: How to use Kafka Connect :description: This document provides concepts and instructions for getting started with Kafka Connect. :keywords: Connect quickstart, Connect, Kafka Connect, Kafka connectors, Connect worker, connectors .. _connect_userguide: How to Use |kconnect-long| - Get Started ======================================== :term:`Kafka Connect` is the framework to integrate popular systems, such as databases and cloud services with |ak|. This guide will help you get started in deploying |kconnect| and leveraging connectors. .. include:: ../.hidden/docs-common/home/includes/cloud-platform-cta.rst .. _connect_userguide-planning-install: Deployment considerations ------------------------- To get started with |kconnect-long|, you must have a set of :term:`Kafka brokers `. The |ak| brokers can be an earlier broker version, or the latest version. For more details, see :platform:`Cross-Component Compatibility|installation/versions-interoperability.html#cross-component-compatibility`. In addition to |ak| brokers, there are a few deployment options to consider as well. Understanding and acting on these deployment options ensures your |kconnect-long| deployment will scale and support the long-term needs of your data pipeline. |sr-long| ~~~~~~~~~ Although :platform:`Schema Registry|schema-registry/connect.html` is an optional service for |kconnect-long|, it enables you to easily use :term:`Avro `, :term:`Protobuf `, and :term:`JSON Schema ` as common data formats for the |ak| records that connectors read from and write to. This keeps the need to write custom code at a minimum and standardizes your data in a flexible format. You also get the added benefit of schema evolution and enforced compatibility rules. For more information, see :platform:`Using Kafka Connect with Schema Registry|schema-registry/connect.html` and :ref:`connect_configuring_converters`. .. _connect-standalone-v-distributed: Standalone and distributed mode ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Connectors and tasks are logical units of work that run as a process. The process is called a worker in |kconnect-long|. There are two modes for running workers: - **Standalone mode**: Useful for development and testing |kconnect-long| on a local machine. It can also be used for environments that typically use single agents (for example, sending web server logs to |ak|). - **Distributed mode**: Distributed mode is recommended for production environments because of scalability, high availability, and management benefits. It runs |kconnect| workers on multiple machines (nodes), which form a |kconnect| cluster. |kconnect-long| distributes running connectors across the cluster. You can add or remove nodes as your needs evolve. This mode is also more fault tolerant. For example, if a node unexpectedly leaves the cluster, |kconnect-long| distributes the work of that node to other nodes in the cluster. And, because |kconnect-long| stores connector configurations, status, and offset information inside the |ak| cluster where it is safely replicated, losing the node where a |kconnect| worker runs does not result in any loss of data. Try to identify which mode works best for your environment before getting started. For more information about the different deployment modes for |kconnect-long| workers check out `this video `__. Operating environment ~~~~~~~~~~~~~~~~~~~~~ |kconnect| workers operate well in containers and managed environments, such as Kubernetes, Apache Mesos, Docker Swarm, or Yarn. The distributed worker stores all states in |ak| making it easier to manage a cluster. And, by design, |kconnect-long| does not handle restarting or scaling workers. This means your existing cluster management solution can continue to be used transparently. Note that the standalone worker state is stored on the local file system. For more information about using Docker, see :platform:`Install using Docker|installation/docker/installation.html`. For details about deploying and managing |cp| in a Kubernetes environment, see `Confluent for Kubernetes `__ . |kconnect-long| workers are JVM processes that can run on shared machines with sufficient resources. Hardware requirements for |kconnect| workers are similar to that of standard Java :term:`producers ` and consumers. Resource requirements mainly depend on the types of connectors operated by the workers. More memory is required for environments where large messages are sent and where a large numbers of messages get buffered before being written in aggregate form to an external system. Using compression continuously requires a more powerful CPU. If you have multiple workers running concurrently on a single machine, ensure you know the resource limits (CPU and memory). Start with the default heap size setting and :platform:`monitor internal metrics|kafka/monitoring.html` and the system. Verify the CPU, memory, and network (10 GbE or greater) are sufficient for the load. .. _connect_installing_plugins: Install a |kconnect| plugin ----------------------------- |kconnect-long| is designed to be extensible so developers can create custom connectors, transforms, and converters, and users can install and run them. This section will help you with installing |kconnect| plugins. .. _connect_plugins: Define a plugin ~~~~~~~~~~~~~~~ A |kconnect-long| plugin is a set of JAR files containing the implementation of one or more connectors, transforms, or converters. |kconnect| isolates each plugin from one another so libraries in one plugin are not affected by the libraries in any other plugins. This is very important when mixing and matching connectors from multiple providers. .. caution:: It's common to have many plugins installed in a |kconnect| deployment. Ensure you have only one version of each plugin installed. A |kconnect-long| plugin can be any one of the following: * A directory on the file system that contains all required JAR files and third-party dependencies for the plugin. This is most common and is preferred. * A single uber JAR containing all the class files for a plugin and its third-party dependencies. A |kconnect-long| plugin should never contain any libraries provided by the |kconnect-long| runtime. |kconnect-long| finds the plugins using a plugin path defined as a comma-separated list of directory paths in the ``plugin.path`` :ref:`worker configuration property `. The following shows an example ``plugin.path`` worker configuration property: .. sourcecode:: json plugin.path=/usr/local/share/kafka/plugins Install a plugin ~~~~~~~~~~~~~~~~ To install a plugin, you must place the plugin directory or uber JAR (or a symbolic link that resolves to one of these) in a directory already listed in the plugin path. Or, you can update the plugin path by adding the absolute path of the directory containing the plugin. Using the previous plugin path example, you would create a ``/usr/local/share/kafka/plugins`` directory on each machine running |kconnect| and then place the plugin directories (or uber JARs) there. When you start your |kconnect| workers, each worker discovers all connectors, transforms, and converter plugins found inside the directories on the plugin path. When you use a connector, transform, or converter, the |kconnect| worker loads the classes from the respective plugin first, followed by the |kconnect-long| runtime and Java libraries. |kconnect| explicitly avoids all of the libraries in other plugins. This prevents conflicts and makes it very easy to add and use connectors and transforms developed by different providers. To find the components that fit your needs, check out the `Confluent Hub `__ page–it has an ecosystem of connectors, transforms, and converters. To install a connector using the |confluent-cli|, see the :confluent-cli:`confluent connect plugin install|command-reference/connect/plugin/confluent_connect_plugin_install.html` command. For a full list of supported connectors, see `Supported Connectors `__. Earlier versions of |kconnect-long| required a different approach to installing connectors, transforms, and converters. All the scripts for running |kconnect| recognized the ``CLASSPATH`` environment variable. You would export this variable to define the list of paths to the connector JAR files. The following example shows an older ``CLASSPATH`` export variable mechanism: .. codewithvars:: bash export CLASSPATH=/path/to/my/connectors/* bin/connect-standalone standalone.properties new-custom-connector.properties Confluent does not recommend exporting ``CLASSPATH`` environment variable as using this method to create a path to plugins can result in library conflicts that can cause |kconnect-long| and connectors to fail. Use the ``plugin.path`` configuration property which properly isolates each plugin from other plugins and libraries. .. include:: includes/classpath-v-pluginpath.rst .. include:: includes/custom-partitioner.rst .. _connect-userguide-standalone-config: Configure and run workers ------------------------- The following sections provide information about running workers in standalone mode and distributed mode. For a list of worker configuration properties, see :platform:`Kafka Connect Worker Configuration Properties|connect/references/allconfigs.html`. `This video `__ explains the different deployment modes for |kconnect-long| workers. Standalone mode ~~~~~~~~~~~~~~~ Standalone mode is typically used for development and testing, or for lightweight, single-agent environments-for example, sending web server logs to |ak|. The following example shows a command that launches a worker in standalone mode: .. codewithvars:: bash bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...] The first parameter (``worker.properties``) is the :ref:`worker configuration properties file `. Note that ``worker.properties`` is an example file name. You can use any valid file name for your worker configuration file. This file gives you control over settings such as the |ak| cluster to use and serialization format. For an example configuration file that uses `Avro `__ and :platform:`Schema Registry|schema-registry/connect.html` in a standalone mode, open the file located at ``etc/schema-registry/connect-avro-standalone.properties``. You can copy and modify this file for use as your standalone worker properties file. The second parameter (``connector1.properties``) is the connector configuration properties file. All connectors have configuration properties that are loaded with the worker. As shown in the example, you can launch multiple connectors using this command. If you run multiple standalone workers on the same host machine, the following two configuration properties must be unique for each worker: * ``offset.storage.file.filename``: The storage file name for connector offsets. This file is stored on the local filesystem in standalone mode. Using the same file name for two workers will cause offset data to be deleted or overwritten with different values. * ``listeners``: A list of URIs the REST API will listen on in the format ``protocol://host:port,protocol2://host2:port``–the protocol is either HTTP or HTTPS. You can specify hostname as ``0.0.0.0`` to bind to all interfaces or leave hostname empty to bind to the default interface. .. note:: You update the ``etc/schema-registry/connect-avro-standalone.properties`` file if you need to apply a change to |kconnect| when starting |cp| services using the `Confluent CLI `__. .. _connect-userguide-distributed-config: Distributed mode ~~~~~~~~~~~~~~~~ Distributed mode does not have any additional command-line parameters other than loading the worker configuration file. New workers will either start a new group or join an existing one with a matching ``group.id``. Workers then coordinate with the consumer groups to distribute the work to be done. The following shows an example command that launches a worker in distributed mode: .. codewithvars:: bash bin/connect-distributed worker.properties For an example distributed mode configuration file that uses Avro and :platform:`Schema Registry|schema-registry/connect.html`, open ``etc/schema-registry/connect-avro-distributed.properties``. You can make a copy of this file, modify it, use it as the new ``worker.properties`` file. Note that ``worker.properties`` is an example file name. You can use any valid file name for your properties file. In standalone mode, connector configuration property files are added as commmand-line parameters. However, in distributed mode, connectors are deployed and managed using a REST API request. To create connectors, you start the worker and then make a REST request to create the connector. REST request examples are provided in many `supported connector `__ documents. For instance, see the `Azure Blob Storage Source connector REST-based example `__ for one example. Note that if you run many distributed workers on one host machine for development and testing, the ``listeners`` configuration property must be unique for each worker. This is the port the REST interface listens on for HTTP requests. .. note:: You update the ``etc/schema-registry/connect-avro-distributed.properties`` file if you need to apply a change to |kconnect| when starting |cp| services using the `Confluent CLI `__. .. _connect-internal-topics: |kconnect| internal topics ~~~~~~~~~~~~~~~~~~~~~~~~~~ |kconnect| stores connector and task configurations, :term:`offsets `, and status in several :term:`Kafka topics `. These are referred to as |kconnect-long| internal topics. It is important that these internal topics have a high replication factor, a compaction cleanup policy, and an appropriate number of :term:`partitions `. |kconnect-long| can create the internal topics when it starts up, using the |kconnect| :platform:`worker configuration properties|connect/references/allconfigs.html#distributed-worker-configuration` to specify the topic names, replication factor, and number of partitions for these topics. |kconnect| verifies that the properties meet the requirements and creates all topics with compaction cleanup policy. Distributed workers that are configured with matching ``group.id`` values discover each other and form a |kconnect-long| cluster. All workers in the cluster use the same three internal topics to share connector configurations, offset data, and status updates. For this reason, all distributed worker configurations in the same |kconnect| cluster must have matching ``config.storage.topic``, ``offset.storage.topic``, and ``status.storage.topic`` properties. In addition to the three required internal topic names, the distributed worker configuration should have identical values for the following listed properties. This ensures that any worker in the cluster will create missing internal topics with the desired property values. Note that these configuration properties have :platform:`practical default values|connect/references/allconfigs.html#distributed-worker-configuration`. * ``config.storage.replication.factor`` * ``offset.storage.replication.factor`` * ``offset.storage.partitions`` * ``status.storage.replication.factor`` * ``status.storage.partitions`` .. note:: Starting with |cp| version 6.0, |kconnect-long| can create internal topics using the |ak| broker's ``default.replication.factor`` and ``num.partitions`` values. For more information, see :ref:`connect-internal-topics-broker-defaults`. As each distributed worker starts up, it uses the internal |ak| topics if they already exist. If not, the worker tries to create the topics using the worker configuration properties. This gives you the option of manually creating the topics before starting |kconnect-long|, if you require topic-specific settings or when |kconnect-long| does not have the necessary privileges to create the topics. If you do create the topics manually, follow the guidelines provided in the list of :platform:`configuration properties|connect/references/allconfigs.html#distributed-worker-configuration`. If you want to create a distributed worker that is independent of an existing |kconnect| cluster, you must create new worker configuration properties. The following configuration properties must be different from the worker configurations used in an existing cluster: * ``group.id`` * ``config.storage.topic`` * ``offset.storage.topic`` * ``status.storage.topic`` It is important to note that |kconnect| clusters cannot share group IDs or internal topics. Simply changing a ``group.id`` will not create a new worker separate from an existing |kconnect| cluster. The new ``group.id`` must also have unique internal topics associated with it. This requires setting unique ``config.storage.topic``, ``offset.storage.topic``, and ``status.storage.topic`` configuration properties for the new ``group.id``. You also must use different connector names than those used in the existing |kconnect| cluster since a consumer group is created based on the connector name. Each connector in a |kconnect| cluster shares the same consumer group. .. _connect-internal-topics-broker-defaults: Use |ak| broker default topic settings ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The |kconnect| worker can create internal topics using |ak| broker defaults for the replication factor and number of partitions. To use the |ak| broker defaults for the replication factor and number of partitions, use ``-1`` in the worker configuration properties for the internal topics. The following is an example snippet: .. sourcecode:: properties # Use the broker default properties config.storage.replication.factor=-1 offset.storage.replication.factor=-1 status.storage.replication.factor=-1 offset.storage.partitions=-1 status.storage.partitions=-1 The ``config.storage`` internal topic must always have exactly one partition. This is why there is no option to use the |ak| broker default for ``config.storage``. Override topic-specific properties ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can override :platform:`topic properties|installation/configuration/topic-configs.html` that are valid for the version of the |ak| broker where the internal topics will be created. The following example shows how you should enter these properties in the worker configuration: .. sourcecode:: properties config.storage. offset.storage. status.storage. The following example shows using the |ak| broker default property for replication factor and overriding the default minimum In-Sync Replicas (``min.insync.replicas``) property (``1`` is the |ak| broker default): .. sourcecode:: properties # Use the broker default properties config.storage.replication.factor=-1 offset.storage.replication.factor=-1 status.storage.replication.factor=-1 # Override the broker default properties config.storage.min.insync.replicas=3 offset.storage.min.insync.replicas=3 status.storage.min.insync.replicas=3 If the topic property is not valid for the |ak| broker version, the |kconnect| worker will fail upon startup. .. _connect-internal-topics-manual: Manually create internal topics ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Allowing |kconnect-long| to create these internal topics is recommended. However, you may want to manually create the topics. The following examples illustrate when you would manually create these topics: * For security purposes, the broker may be configured to not allow clients like |kconnect| to create |ak| topics. * You may require other advanced topic-specific settings that are not set by |kconnect| or that are different than the auto-created settings. The following example commands show how to manually create compacted and replicated |ak| topics before starting |kconnect|. Ensure you adhere to the :platform:`distributed worker|connect/references/allconfigs.html#distributed-worker-configuration` guidelines when entering parameters. .. code-block:: text # config.storage.topic=connect-configs bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact .. code-block:: text # offset.storage.topic=connect-offsets bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact .. code-block:: text # status.storage.topic=connect-status bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact All workers in a |kconnect| cluster use the same internal topics. Workers in a different cluster must use different internal topics. For more details, see :platform:`Distributed Worker Configuration|connect/references/allconfigs.html#distributed-worker-configuration`. .. _connect_configuring_workers: Worker configuration properties file ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Regardless of the mode used, |kconnect-long| workers are configured by passing a worker configuration properties file as the first parameter. For example: .. codewithvars:: bash bin/connect-distributed worker.properties Sample worker configuration properties files are included with |cp| to help you get started. The following list shows the location for Avro sample files: * ``etc/schema-registry/connect-avro-distributed.properties`` * ``etc/schema-registry/connect-avro-standalone.properties`` Use one of these files as a starting point. These files contain the necessary configuration properties to use the Avro converters that integrate with |sr|. They are configured to work well with |ak| and |sr| services running locally. They do not require running more than a single broker, making it easy for you to test |kconnect-long| locally. The example configuration files can also be modified for production deployments by using the correct hostnames for |ak| and |sr| and acceptable (or default) values for the internal topic replication factor. For a list of worker configuration properties, see :platform:`Kafka Connect Worker Configuration Properties|connect/references/allconfigs.html`. .. _connect_configuring_converters: Configuring key and value converters ------------------------------------ The following list shows the converters packaged with the |cp|: .. include:: includes/converter-list.rst The `Kafka Connect 101 course `__ explains converters in detail. The ``key.converter`` and ``value.converter`` properties are where you specify the type of :platform:`converter|connect/concepts.html#converters` to use. Default converters for all connectors are specified in the :platform:`worker configuration|connect/references/allconfigs.html#common-worker-configuration`. However, any connector can override the default converters by completely defining a key, value, and header converter. Confluent recommends you define the default key, value, and header converters that most connectors can use in the worker, and then define them in a connector's configuration if that connector requires different converters. The default ``header.converter`` defined in the worker serializes header values as strings using the ``StringConverter`` and deserializes header values to the most appropriate numeric, boolean, array, or map representation. Schemas are not serialized but are inferred upon deserialization when possible. It is important to note that converter configuration properties in the :platform:`worker configuration|connect/references/allconfigs.html#common-worker-configuration` are used by all connectors running on the worker, unless a converter is added to a connector configuration. If a converter is added to a connector configuration, all converter properties in the worker configuration prefixed with the converter type added (``key.converter.*`` and/or ``value.converter.*``) are not used. Be careful when adding converters to a connector configuration. For example, if the following value converter properties are present in the worker configuration: .. sourcecode:: properties value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 And, you add the following properties to your connector configuration: .. sourcecode:: json { "value.converter": "AvroConverter", "value.converter.basic.auth.credentials.source": "USER_INFO", "value.converter.basic.auth.user.info": ":" } An error will occur when the connector is started because the required |sr| URL property ``value.converter.schema.registry.url=http://localhost:8081`` is not provided to the converter. The following sections provide converter descriptions and examples. For details about how these converters work in |sr|, see :platform:`Using Kafka Connect with Schema Registry|schema-registry/connect.html`. Avro ~~~~ To use the ``AvroConverter`` with :platform:`Schema Registry|schema-registry/connect.html`, you specify the ``key.converter`` and ``value.converter`` properties in the worker configuration. You must also add a converter property that provides the |sr| URL. The following example shows the ``AvroConverter`` key and value properties that are added to the configuration: .. sourcecode:: properties key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 The Avro key and value converters can be used independently from each other. For example, you may want to use a ``StringConverter`` for keys and the ``AvroConverter`` or ``JsonConverter`` for values. Independent key and value properties are shown in the following example: .. sourcecode:: properties key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 .. _connect-json-protobuf: JSON Schema and Protobuf ~~~~~~~~~~~~~~~~~~~~~~~~ Both JSON Schema and Protobuf converters are implemented in the same way as the Avro converter. The following examples show a couple of configuration examples using the ``ProtobufConverter`` or ``JsonSchemaConverter`` for the value converter and using ``StringConverter`` for the key: .. sourcecode:: properties key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=http://localhost:8081 .. sourcecode:: properties key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=http://localhost:8081 Both Avro and JSON Schema express their schemas as JSON and are lenient if unrecognized properties are encountered. This allows the converter to use custom JSON properties to capture any |kconnect-long| schema objects with no equivalent in Avro or JSON Schema. However, Protobuf has its own Interface Definition Language (IDL) which differs from JSON and does not allow for custom ad-hoc properties. For this reason, the conversion from the |kconnect-long| schema to Protobuf may cause data loss or inconsistencies if there is no direct equivalent in Protobuf. For example, the |kconnect-long| schema supports ``int8``, ``int16``, and ``int32`` data types. Protobuf supports ``int32`` and ``int64``. When |kconnect| data is converted to Protobuf, ``int8`` and ``int16`` fields are mapped to int32 or int64 with no indication that the source was ``int8`` or ``int16``. With JSON Schema, only number and integer type fields are supported. However, the JSON Schema converter (``JsonSchemaConverter``) will store data with no JSON Schema equivalent in a property named ``connect.type``. This property is ignored by the JSON Schema parser, so fields can be restored to the proper type by downstream components. For full encoding details, see `JSON encoding for Avro `__ and `JSON encoding for Protobuf `__. Additionally, JSON Schema supports three means of combining schemas: `allOf, anyOf, and oneOf `__. However, the JSON Schema converter only supports ``oneOf``, treating it similarly to how the Avro converter handles unions and how the Protobuf converter handles ``oneOf``. Note that if you're configuring Avro, Protobuf, or JSON Schema converters in an environment configured for Role-Based Access Control (RBAC), see :platform:`key and value converters with RBAC|connect/rbac/connect-rbac-connectors.html`. For details about how converters work with |sr|, see :platform:`Using Kafka Connect with Schema Registry|schema-registry/connect.html`. The following converters are not used with |sr|. JSON (without |sr|) ~~~~~~~~~~~~~~~~~~~ If you need to use JSON without |sr| for |kconnect| data, you can use the ``JsonConverter`` supported with |ak|. The following example shows the ``JsonConverter`` key and value properties that are added to the configuration: .. sourcecode:: properties key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false When the properties ``key.converter.schemas.enable`` and ``value.converter.schemas.enable`` are set to ``true``, the key or value is not treated as plain JSON, but rather as a composite JSON object containing both an internal schema and the data. When these are enabled for a source connector, both the schema and data are in the composite JSON object. When these are enabled for a sink connector, the schema and data are extracted from the composite JSON object. Note that this implementation never uses |sr|. When the properties ``key.converter.schemas.enable`` and ``value.converter.schemas.enable`` are set to ``false`` (the default), only the data is passed along, without the schema. This reduces the payload overhead for applications that do not need a schema. String format and raw bytes ~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``org.apache.kafka.connect.storage.StringConverter`` is used to convert the internal |kconnect| format to simple string format. When converting |kconnect| data to bytes, the schema is ignored and data is converted to a simple string. When converting from bytes to |kconnect| data format, the converter returns an optional string schema and a string (or null). ``org.apache.kafka.connect.converters.ByteArrayConverter`` does not convert data. Bytes are passed through the connector directly with no conversion. For a deep dive into converters, see: `Converters and Serialization Explained `__. .. _connect-override-producer-consumer: |kconnect| producers and consumers ---------------------------------- Internally, |kconnect-long| uses standard Java producers and consumers to communicate with |ak|. |kconnect| configures default settings for these producer and consumer instances. These settings include properties that ensure data is delivered to |ak| in order and without any data loss. Default |kconnect| producer properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ By default, |kconnect| configures the |ak| producers for source connectors with the following properties: * Points the producer's bootstrap servers to the same |ak| cluster used by the |kconnect| cluster. * Configures key and value serializers that work with the connector's key and value converters. * Generates a producer ``client.id`` based on the connector and task, using the pattern ``connector-producer--``. * Sets ``acks=all`` to ensure each message produced is properly written to all in-sync replicas (ISRs). * For retryable exceptions, |kconnect| configures the producer with the following properties to reduce the potential for data duplication during infinite retries: - ``request.timeout.ms=`` - ``max.block.ms=`` - ``max.in.flight.requests.per.connection=1`` - ``delivery.timeout.ms=`` You can override the defaults by using the ``producer.*`` properties in the worker configuration, or by using the ``producer.override.*`` properties in connector configurations, but changing these default properties may compromise the delivery guarantees of |kconnect|. Producer and consumer overrides ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You may need to override default settings, other than those described in the previous section. The following two examples show when this might be required. **Worker override example** Consider a standalone process that runs a log file connector. For the logs being collected, you might prefer low-latency, best-effort delivery. That is, when there are connectivity issues, minimal data loss may be acceptable for your application in order to avoid data buffering on the client. This keeps log collection as lightweight as possible. To override :platform:`producer configuration properties|installation/configuration/producer-configs.html` and :platform:`consumer configuration properties|installation/configuration/consumer-configs.html` for all connectors controlled by the worker, you prefix worker configuration properties with ``producer.`` or ``consumer.`` as shown in the following example: .. sourcecode:: json producer.retries=1 consumer.max.partition.fetch.bytes=10485760 The previous example overrides the default producer ``retries`` property to retry sending messages only one time. The consumer override increases the default amount of data fetched from a partition per request to 10 MB. These configuration changes are applied to all connectors controlled by the worker. Be careful making any changes to these settings when running distributed mode workers. **Per-connector override example** By default, the producers and consumers used for connectors are created using the same properties that |kconnect| uses for its own internal topics. This means that the same |ak| principal must be able to read and write to all the internal topics and all of the topics used by the connectors. You may want the producers and consumers used for connectors to use a different |ak| principal. It is possible for connector configurations to override worker properties used to create producers and consumers. These are prefixed with ``producer.override.`` and ``consumer.override.``. For more information about per-connector overrides, see :platform:`Override the Worker Configuration|connect/references/allconfigs.html#override-the-worker-configuration`. For detailed information about producers and consumers, see :platform:`Kafka Producer|clients/producer.html` and :platform:`Kafka Consumer|clients/consumer.html`. For a list of configuration properties, see :platform:`producer configuration properties|installation/configuration/producer-configs.html` and :platform:`consumer configuration properties|installation/configuration/consumer-configs.html`. .. _connect_source-auto-topic-creation: Configuring auto topic creation for source connectors ----------------------------------------------------- Beginning with |cp| version 6.0, |kconnect-long| can create topics for source connectors if the topics do not exist on the |ak-tm| broker. To use auto topic creation for source connectors, you must set the |kconnect| worker property to ``true`` for all workers in the |kconnect| cluster. In addition, you must create supporting properties in each source connector configuration. .. important:: * This feature does not affect sink connectors or their configuration. Any topic creation properties you add to sink connectors will be ignored and will produce a warning in the log. * If you do not want the source connector to create topics that are missing, you must disabled the feature by setting ``topic.creation.enable=false`` in the |kconnect| worker. Worker property ~~~~~~~~~~~~~~~ The following worker property enables or disables auto topic creation for source connectors. ``topic.creation.enable`` Defaults to ``true``. This feature is enabled only for source connector configurations that have the supporting :ref:`connect_source-auto-topic-creation-source-config`. * Type: boolean * Default: true * Importance: low .. _connect_source-auto-topic-creation-source-config: Source connector properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Several source connector properties are associated with the worker property ``topic.creation.enable``. These properties set the default replication factor, number of partitions, and other topic-specific settings to be used by |kconnect-long| to create a topic if it does not exist. None of the properties have default values. The following source connector configuration properties are required: - ``topic.creation.$alias.replication.factor`` - ``topic.creation.$alias.partitions`` The auto topic creation feature is enabled for the source connector only when the feature is enabled in the worker configuration and when the source connector configuration specifies the required replication factor and number of partitions for one group. Users may choose to use the default values specified in the |ak| broker by setting ``topic.creation.$alias.replication.factor`` or ``topic.creation.$alias.partitions`` to ``-1``. You can define more connector properties using *configuration property groups*. Configuration property groups are added using the property ``topic.creation.groups``. The hierarchy of groups is built on top of a single foundational group called the ``default`` configuration property group. The ``default`` group always exists and does not need to be listed in the ``topic.creation.groups`` property in the connector configuration. Including ``default`` in ``topic.creation.groups`` results in a warning. The following source connector configuration properties are used in association with the ``topic.creation.enable=true`` worker property. For example properties, see :ref:`connect_source-auto-topic-creation-config-examples`. .. include:: includes/auto-topic-connector-configs.rst .. _connect_source-auto-topic-creation-config-examples: Configuration examples ~~~~~~~~~~~~~~~~~~~~~~ The following example configuration snippets show how the source connector configuration properties are entered when ``topic.creation.enable`` is enabled in the |kconnect| worker configuration. **Example 1** All new topics created by |kconnect| have a replication factor of 3 and 5 partitions. ``default`` is the only group, so ``config topic.creation.groups`` is not used. .. sourcecode:: json ...omitted topic.creation.default.replication.factor=3 topic.creation.default.partitions=5 ...omitted **Example 2** New topics created by |kconnect| have replication factor of 3 and 5 partitions. The exception to this configuration setting are topics that match the inclusion list of the ``inorder`` group, which have one partition. .. sourcecode:: json ...omitted topic.creation.groups=inorder topic.creation.default.replication.factor=3 topic.creation.default.partitions=5 topic.creation.inorder.include=status, orders.* topic.creation.inorder.partitions=1 ...omitted **Example 3** New topics created by |kconnect| have replication factor of 3 and 5 partitions. The ``key_value_topic`` and ``another.compacted.topic`` topics that begin with the prefix ``configurations`` are compacted and have a replication factor of 5 and one partition. .. sourcecode:: json ...omitted topic.creation.groups=compacted topic.creation.default.replication.factor=3 topic.creation.default.partitions=5 topic.creation.compacted.include=key_value_topic, another.compacted.topic, configurations.* topic.creation.compacted.replication.factor=5 topic.creation.compacted.partitions=1 topic.creation.compacted.cleanup.policy=compact ...omitted **Example 4** New topics created by |kconnect| have replication factor of 3 and 5 partitions. Topics that begin with the prefix ``configurations`` are compacted. Topics that match the includes list of ``highly_parallel`` and don't match its exclusion list have a replication factor of 1 and one partition. .. sourcecode:: json ...omitted topic.creation.groups=compacted, highly_parallel topic.creation.default.replication.factor=3 topic.creation.default.partitions=5 topic.creation.highly_parallel.include=hpc.*,parallel.* topic.creation.highly_parallel.exclude=.*internal, .*metadata, .*config.* topic.creation.highly_parallel.replication.factor=1 topic.creation.highly_parallel.partitions=100 topic.creation.compacted.include=configurations.* topic.creation.compacted.cleanup.policy=compact ...omitted Security configuration examples ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ When security is configured, the |kconnect| worker should have a principal configured to DESCRIBE and CREATE topics, which is inherited by all connectors in the worker. If :platform:`different security settings|connect/security.html#separate-principals` are required from what the |kconnect| worker configuration provides, you can add ``producer.override`` properties to the source connector configuration to provide the security credentials, as shown in the following examples: .. sourcecode:: properties ...omitted producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="alice" \ password="alice-secret"; ...omitted .. sourcecode:: properties ...omitted producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="alice" \ password="alice-secret"; admin.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="bob" \ password="bob-secret"; ...omitted An error is logged and the task fails if ``topic.creation.enable=true`` is configured, and neither the worker properties nor the connector overrides allow for creation of new topics. .. _userguide-connect-reporter: |kconnect| reporter ------------------- .. include:: includes/connect-reporter.rst Reporter and Kerberos security ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/reporter-security-properties.rst .. _disable-connect-reporter: Disabling |kconnect| reporter ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/disable-connect-reporter.rst .. _connect-metrics-reporter: Metrics reporter metadata ~~~~~~~~~~~~~~~~~~~~~~~~~ With the release of |cp| 6.0, a configuration prefix is available for |kconnect-long| that allows connectors to pass metrics context metadata to embedded metrics client libraries. The |kconnect| worker passes the context configuration to all instantiated connectors. The following provides information on these :platform:`worker configuration properties|connect/references/allconfigs.html`: .. sourcecode:: properties metrics.context.= The configured key/value pairs are passed by the connectors in the |kconnect| worker to the configured :platform:`Confluent Metrics Reporter|kafka/metrics-reporter.html` using the ``MetricsContext`` interface. For example, configuring a |kconnect| worker with ``metrics.context.foo.bar=baz`` adds the field ``foo.bar`` mapped to the value ``baz`` in the ``MetricsContext`` metadata metrics reporter. |kconnect| workers can pass the following fields to ``MetricsContext``: * ``connect.kafka.cluster.id`` to indicate the backing |ak-tm| cluster. * ``connect.group.id`` to indicate the group ID used for |kconnect| worker coordination. ``connect.group.id`` is only valid for :ref:`distributed mode `. For more information, see :ref:`connect-internal-topics`. .. _connect-ops-config-provider: ConfigProvider interface ------------------------ .. include:: includes/config-provider.rst .. _shut-down-connect: Shut down |kconnect-long| -------------------------- There are a few ways to shut down |kconnect-long|. If you are running |kconnect| in |cp| standalone mode, you can use the following local |confluent-cli| command: .. code-block:: bash confluent local services connect stop If you're not using the |confluent-cli|, or if you're running |kconnect| in distributed mode, perform following steps to shut down |kconnect-long|: #. Open a terminal session on one of the |kconnect| nodes. #. Search for the running |kconnect| process: .. code-block:: bash ps aux | grep ConnectDistributed #. Identify the PID in the output: .. code-block:: bash 0.2 2.1 8414400 351944 s003 S 12:42PM 2:52.62 /path/to/your/bin/java ...omitted... org.apache.kafka.connect.cli.ConnectDistributed /path/to/your/connect.properties #. Stop the process: .. code-block:: bash kill #. Stop this running process on all remaining |kconnect| nodes. .. important:: Do not use ``kill -9`` to stop the process. .. _connect_next-steps: Related content --------------- After getting started with your deployment, you may want check out the following |kconnect-long| documentation: * Course: `Kafka Connect 101 `__ * Course: `Building data pipelines with Apache Kafka `__ * Tutorial: :platform:`Moving Data In and Out of Kafka|connect/quickstart.html` * :platform:`Kafka Connect Logging|connect/logging.html` * :platform:`Upgrade Kafka Connect|installation/upgrade.html` * :platform:`Kafka Connect Security|connect/security.html` * :platform:`Kafka Connect REST Interface|connect/references/restapi.html` * :platform:`Using Kafka Connect with Schema Registry|schema-registry/connect.html` * :ref:`Upgrading a Connector Plugin ` * :platform:`Override the Worker Configuration|connect/references/allconfigs.html#override-the-worker-configuration` * :ref:`Adding Connectors or Software (Docker) ` Also, check out Confluent's `end-to-end demos `__ for |kconnect-long| on-premises, |ccloud|, and |co-long|.