.. _connect_userguide: Getting Started with |kconnect-long| ==================================== .. meta:: :title: Getting Started with Kafka Connect :keywords: Connect, Kafka Connect, Kafka connectors, Connect worker, connectors :description: This document provides instructions for getting started with Kafka Connect. This document provides information about how to get started with |kconnect-long|. You should read and understand :ref:`Kafka Connect Concepts ` before getting started. The following topics are covered in this document: * :ref:`connect_userguide-planning-install` * :ref:`connect_installing_plugins` * :ref:`connect_userguide_standalone_config` and :ref:`connect_configuring_workers` * :ref:`connect_configuring_converters` * :ref:`connect_override_client_configs` * :ref:`Next Steps (additional references and demo links) ` .. _connect_userguide-planning-install: Deployment Considerations ------------------------- |kconnect-long| has only one required prerequisite in order to get started; that is, a set of |ak| brokers. These |ak| brokers can be earlier broker versions or the latest version. See :ref:`cross-component-compatibility` for details. Even though there is only one prerequisite, there are a few deployment options to consider beforehand. Understanding and acting on these deployment options ensures your |kconnect-long| deployment will scale and support the long-term needs of your data pipeline. |sr-long| ~~~~~~~~~ Although :ref:`Schema Registry ` is not a required service for |kconnect-long|, it enables you to easily use Avro as the common data format for the |ak| records that connectors read from and write to. This keeps the need to write custom code at a minimum and standardizes your data in a flexible format. You also get the added benefit of schema evolution and enforced compatibility rules. For additional information, see :ref:`schemaregistry_kafka_connect` and :ref:`connect_configuring_converters`. .. _connect_standalone_v_distributed: Standalone vs. Distributed Mode ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Connectors and tasks are logical units of work and run as a process. This process is called a **worker** in |kconnect-long|. There are two modes for running workers: *standalone mode* and *distributed mode*. You should identify which mode works best for your environment before getting started. **Standalone mode** is useful for development and testing |kconnect-long| on a local machine. It can also be used for environments that typically use single agents (for example, sending web server logs to |ak|). **Distributed mode** runs |kconnect| workers on multiple machines (nodes). These form a |kconnect| cluster. |kconnect-long| distributes running connectors across the cluster. You can add more nodes or remove nodes as your needs evolve. Distributed mode is also more fault tolerant. If a node unexpectedly leaves the cluster, |kconnect-long| automatically distributes the work of that node to other nodes in the cluster. And, because |kconnect-long| stores connector configurations, status, and offset information inside the |ak| cluster where it is safely replicated, losing the node where a |kconnect| worker runs does not result in any lost data. .. important:: Distributed mode is recommended for production environments because of scalability, high availability, and management benefits. Operating Environment ~~~~~~~~~~~~~~~~~~~~~ |kconnect| workers operate well in containers and in managed environments, such as Kubernetes, Apache Mesos, Docker Swarm, or Yarn. The distributed worker stores all states in |ak| so it's easier to manage a cluster. And, by design, |kconnect-long| does not automatically handle restarting or scaling workers. This means your existing cluster management solution can continue to be used transparently. Note that the standalone worker state is stored on the local file system. .. seealso:: * See :ref:`cpdocker_intro` for more information about using Docker. * See :ref:`operator-about-intro` for information about deploying and managing |cp| in a Kubernetes environment. |kconnect-long| workers are JVM processes that can run on shared machines with sufficient resources. Hardware requirements for |kconnect| workers are similar to that of standard Java producers and consumers. Resource requirements mainly depend on the types of connectors operated by the workers. More memory is required for environments where large messages are sent. More memory is also required for environments where large numbers of messages get buffered before being written in aggregate form to an external system. Using compression continuously requires a more powerful CPU. .. tip:: If you have multiple workers running concurrently on a single machine, make sure you know the resource limits (CPU and memory). Start with the default heap size setting and :ref:`monitor internal metrics ` and the system. Verify that the CPU, memory, and network (10GbE or greater) are sufficient for the load. .. _connect_installing_plugins: Installing |kconnect| Plugins ----------------------------- |kconnect-long| is designed to be extensible so developers can create custom connectors, transforms, or converters, and users can install and run them. A |kconnect-long| plugin is a set of JAR files containing the implementation of one or more connectors, transforms, or converters. |kconnect| isolates each plugin from one another so that libraries in one plugin are not affected by the libraries in any other plugins. This is very important when mixing and matching connectors from multiple providers. .. caution:: It is common to have many plugins installed in a |kconnect| deployment. Make sure to only have **one version** of each plugin installed. The |cp| comes bundled with several commonly used connectors, transforms, and converters. All of these can be used without having to first install the corresponding plugins. Bundled connectors include the following: * :ref:`JDBC Source Connector `: reads tables from common DBMSes and writes them as records to |ak| topics. * :ref:`JDBC Sink Connector `: consumes records from |ak| topics and inserts, updates, and deletes rows in DBMS tables. * :ref:`Elasticsearch Sink Connector `: consumes records from |ak| topics and writes them as documents to Elasticsearch. * :ref:`Amazon S3 sink connector `: consumes records from |aK| topics and writes them as aggregate container files to an S3 bucket. For a full list of supported connectors, see :ref:`connect_bundled_connectors`. .. note:: Make sure to check out `Confluent Hub `__. You can browse the large ecosystem of connectors, transforms, and converters to find the components that suit your needs and easily install them into your local |cp| environment. See :ref:`confluent_hub_client` for |c-hub| Client installation instructions. .. _connect_plugins: A |kconnect-long| plugin can be: * a **directory** on the file system that contains all required JAR files and third-party dependencies for the plugin. This is most common and is preferred. * a single **uber JAR** containing all of the class files for the plugin and its third-party dependencies. .. important:: A plugin should never contain any libraries provided by the |kconnect-long| runtime. |kconnect-long| finds the plugins using a *plugin path* defined as a comma-separated list of directory paths in the ``plugin.path`` :ref:`worker configuration property `. The following shows an example ``plugin.path`` worker configuration property: :: plugin.path=/usr/local/share/kafka/plugins To install a plugin, place the plugin directory or uber JAR (or a symbolic link that resolves to one of these) in a directory already listed in the plugin path. Or, you can update the plugin path by adding the absolute path of the directory containing the plugin. Using the plugin path example above, you would create a ``/usr/local/share/kafka/plugins`` directory **on each machine** running |kconnect| and then place the plugin directories (or uber JARs) there. When you start your |kconnect| workers, each worker discovers all connectors, transforms, and converter plugins found inside the directories on the plugin path. When you use a connector, transform, or converter, the |kconnect| worker loads the classes from the respective plugin first, followed by the |kconnect-long| runtime and Java libraries. |kconnect| explicitly avoids all of the libraries in other plugins. This prevents conflicts and makes it very easy to add and use connectors and transforms developed by different providers. Earlier versions of |kconnect-long| required a different approach to installing connectors, transforms, and converters. All the scripts for running |kconnect| recognized the ``CLASSPATH`` environment variable. You would export this variable to define the list of paths to the connector JAR files. An example of the older ``CLASSPATH`` export variable mechanism is shown below: .. codewithvars:: bash export CLASSPATH=/path/to/my/connectors/* bin/connect-standalone standalone.properties new-custom-connector.properties .. caution:: Exporting ``CLASSPATH`` is not recommended. Using this mechanism to create a path to plugins can result in library conflicts that can cause |kconnect-long| and connectors to fail. Use the ``plugin.path`` configuration property which properly isolates each plugin from other plugins and libraries. .. include:: includes/classpath-v-pluginpath.rst .. _connect_userguide_standalone_config: Running Workers --------------- The following sections provide information about running workers in standalone mode or distributed mode. Standalone Mode ~~~~~~~~~~~~~~~ Standalone mode is typically used for development and testing, or for lightweight, single-agent environments (for example, sending web server logs to |ak|). The following shows an example command that launches a worker in standalone mode: .. codewithvars:: bash bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...] The first parameter (``worker.properties``) is the :ref:`worker configuration properties ` file. Note that ``worker.properties`` is an example file name. You can use any valid file name for your worker configuration file. This file gives you control over settings such as the |ak| cluster to use and serialization format. For an example configuration file that uses `Avro `__ and :ref:`Schema Registry ` in a standalone mode, open the file located at ``etc/schema-registry/connect-avro-standalone.properties``. You can copy and modify this file for use as your standalone worker properties file. The second parameter (``connector1.properties``) is the :ref:`connector configuration properties ` file. All connectors have configuration properties that are loaded with the worker. As shown in the example, you can launch multiple connectors using this command. If you run multiple standalone workers on the same host machine, the following two configuration properties must be unique for each worker: * ``offset.storage.file.filename``: the storage file name for connector offsets. This file is stored on the local filesystem in standalone mode. Using the same file name for two workers will cause offset data to be deleted or overwritten with different values. * ``rest.port``: the port the REST interface listens on for HTTP requests. This must be unique for each worker running on one host machine. .. _connect_userguide_distributed_config: Distributed Mode ~~~~~~~~~~~~~~~~ |kconnect| stores connector and task configurations, offsets, and status in several |ak| topics. These are referred to as |kconnect-long| internal topics. It is important that these internal topics have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. |kconnect-long| can automatically create the internal topics when it starts up, using the |kconnect| distributed worker configuration properties to specify the topic names, replication factor, and number of partitions for these topics. |kconnect| verifies that the properties meet the requirements and creates all topics with compaction cleanup policy. Allowing |kconnect| to automatically create these internal topics is recommended. However, you may want to manually create the topics. Two examples of when you would manually create these topics are provided below: * For security purposes, the broker may be configured to not allow clients like |kconnect| to create |ak| topics. * You may require other advanced topic-specific settings that are not automatically set by |kconnect| or that are different than the auto-created settings. The following example commands show how to manually create compacted and replicated |ak| topics before starting |kconnect|. Make sure to adhere to the :ref:`distributed worker ` guidelines when entering parameters. :: # config.storage.topic=connect-configs bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact :: # offset.storage.topic=connect-offsets bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact :: # status.storage.topic=connect-status bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact .. note:: All workers in a |kconnect| cluster use the same internal topics. Workers in a different cluster must use different internal topics. See :ref:`connect_userguide_dist_worker_config` for details. Distributed mode does not have any additional command-line parameters other than loading the worker configuration file. New workers will either start a new group or join an existing one with a matching ``group.id``. Workers then coordinate with the consumer groups to distribute the work to be done. See :ref:`connect_userguide_dist_worker_config` for details about how new workers get added. The following shows an example command that launches a worker in distributed mode: .. codewithvars:: bash bin/connect-distributed worker.properties For an example distributed mode configuration file that uses `Avro `__ and :ref:`Schema Registry `, open ``etc/schema-registry/connect-avro-distributed.properties``. You can make a copy of this file, modify it, use it as the new ``worker.properties`` file. Note that ``worker.properties`` is an example file name. You can use any valid file name for your properties file. In standalone mode, connector configuration property files are added as commmand-line parameters. However, in distributed mode, connectors are deployed and managed using a REST API request. To create connectors, you start the worker and then make a REST request to create the connector. REST request examples are provided in many :ref:`supported connector ` documents. For instance, see the :ref:`Azure Blob Storage Source connector REST-based example ` for one example. .. note:: If you run multiple distributed workers on one host machine for development and testing, the ``rest.port`` configuration property must be unique for each worker. This is the port the REST interface listens on for HTTP requests. .. _connect_configuring_workers: Worker Configuration Properties ------------------------------- Regardless of the mode used, |kconnect-long| workers are configured by passing a worker configuration properties file as the first parameter. For example: .. codewithvars:: bash bin/connect-distributed worker.properties Sample worker configuration properties files are included with |cp| to help you get started. The location for Avro sample files are listed below: * ``etc/schema-registry/connect-avro-distributed.properties`` * ``etc/schema-registry/connect-avro-standalone.properties`` Use one of these files as a starting point. These files contain the necessary configuration properties to use the Avro converters that integrate with |sr|. They are configured to work well with |ak| and |sr| services running locally. They do not require running more than a single broker, making it easy for you to test |kconnect-long| locally. The example configuration files can also be modified for production deployments by using the correct hostnames for |ak| and |sr| and acceptable (or default) values for the internal topic replication factor. .. _common-worker-configs-ug: Common Configuration Properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The following are several common worker configuration properties you need to get started. Many more configuration options are provided in :ref:`Kafka Connect Worker Configs `. ``bootstrap.servers`` A list of host/port pairs to use for establishing the initial connection to the |ak| cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping. The list only impacts the initial hosts used to discover the full set of servers. This list should be in the form ``host1:port1,host2:port2,...``. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one though, in case a server is down). * Type: list * Default: [localhost:9092] * Importance: high ``key.converter`` Converter class for key |kconnect| data. This controls the format of the data that will be written to |ak| for source connectors or read from |ak| for sink connectors. Popular formats include Avro and JSON. * Type: class * Default: * Importance: high ``value.converter`` Converter class for value |kconnect| data. This controls the format of the data that will be written to |ak| for source connectors or read from |ak| for sink connectors. Popular formats include Avro and JSON. * Type: class * Default: * Importance: high ``rest.host.name`` Hostname for the REST API. If this is set, it will only bind to this interface. * Type: string * Importance: low ``rest.port`` Port for the REST API to listen on. * Type: int * Default: 8083 * Importance: low ``plugin.path`` The comma-separated list of paths to directories that contain :ref:`Kafka Connect plugins `. * Type: string * Default: * Importance: low .. _connect_userguide_dist_worker_config: Distributed Configuration Properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Distributed Workers that are configured with matching ``group.id`` values automatically discover each other and form a |kconnect-long| cluster. All Workers in the cluster must also have access to and use the same three |ak| topics to share connector configurations, offset data, and status updates. For this reason all distributed worker configurations *in the same cluster* must have matching ``config.storage.topic``, ``offset.storage.topic``, and ``status.storage.topic`` properties. .. important:: Changing a ``group.id`` will not create a new worker separate from an existing |kconnect| cluster. The new ``group.id`` must have its own ``config.storage.topic``, ``offset.storage.topic``, and ``status.storage.topic`` configuration properties. As each distributed worker starts up, it will use these internal topics if they already exist. If not, the worker attempts to create the topics using the worker configuration properties. This allows you to manually create these topics before starting |kconnect-long|, if you require topic-specific settings or when |kconnect-long| does not have the necessary :ref:`privileges to create the topics `. If you do create the topics manually, make sure to follow the guidelines provided in the list of :ref:`configuration properties `. If you need to create a distributed worker that is independent of an existing |kconnect| cluster, you must create new worker configuration properties. The following configuration properties must be different from the worker configurations used in an existing cluster: * ``group.id`` * ``config.storage.topic`` * ``offset.storage.topic`` * ``status.storage.topic`` You also must use different connector names than those used in the existing cluster since a consumer group is created based on the connector name. Each connector in a |kconnect| cluster shares the same consumer group. The following lists and defines the distributed worker properties: ``group.id`` A unique string that identifies the |kconnect| cluster group this worker belongs to. * Type: string * Default: connect-cluster * Importance: high ``config.storage.topic`` The name of the topic where connector and task configuration data are stored. This *must* be the same for all workers with the same ``group.id``. At startup, |kconnect-long| attempts to automatically create this topic with a single-partition and compacted cleanup policy to avoid losing data. It uses the existing topic if present. If you choose to create this topic manually, **always** create it as a compacted topic with a single partition and a high replication factor (3x or more). * Type: string * Default: "" * Importance: high ``config.storage.replication.factor`` The replication factor used when |ak| Connects creates the topic used to store connector and task configuration data. This should **always** be at least 3 for a production system, but cannot be larger than the number of |ak| brokers in the cluster. * Type: short * Default: 3 * Importance: low ``offset.storage.topic`` The name of the topic where connector and task configuration offsets are stored. This *must* be the same for all workers with the same ``group.id``. At startup, |kconnect-long| attempts to automatically create this topic with multiple partitions and a compacted cleanup policy to avoid losing data. It uses the existing topic if present. If you choose to create this topic manually, **always** create it as a compacted, highly replicated (3x or more) topic with a large number of partitions to support large |kconnect-long| clusters (that is, 25 or 50 partitions like the |ak| built-in ``__consumer_offsets`` topic). * Type: string * Default: "" * Importance: high ``offset.storage.replication.factor`` The replication factor used when |kconnect| creates the topic used to store connector offsets. This should **always** be at least 3 for a production system, but cannot be larger than the number of |ak| brokers in the cluster. * Type: short * Default: 3 * Importance: low ``offset.storage.partitions`` The number of partitions used when |kconnect| creates the topic used to store connector offsets. A large value is necessary to support large |kconnect-long| clusters (that is, 25 or 50 partitions like the |ak| built-in ``__consumer_offsets`` topic). * Type: int * Default: 25 * Importance: low ``status.storage.topic`` The name of the topic where connector and task configuration status updates are stored. This *must* be the same for all workers with the same ``group.id``. At startup, |kconnect-long| attempts to automatically create this topic with multiple partitions and a compacted cleanup policy to avoid losing data. It uses the existing topic if present. If you choose to create this topic manually, **always** create it as a compacted, highly replicated (3x or more) topic with multiple partitions. * Type: string * Default: "" * Importance: high ``status.storage.replication.factor`` The replication factor used when |kconnect| creates the topic used to store connector and task status updates. This should **always** be at least 3 for a production system, but cannot be larger than the number of |ak| brokers in the cluster. * Type: short * Default: 3 * Importance: low ``status.storage.partitions`` The number of partitions used when |kconnect| creates the topic used to store connector and task status updates. * Type: int * Default: 5 * Importance: low Standalone Configuration Properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In addition to the common worker configuration options, the following property is available in standalone mode. ``offset.storage.file.filename`` The file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off. * Type: string * Default: "" * Importance: high .. _connect_configuring_converters: Configuring Key and Value Converters ------------------------------------ The ``key.converter`` and ``value.converter`` properties in the :ref:`common worker configurations ` are where you specify a :ref:`converter ` to use. The converters you can specify are listed below: .. include:: includes/converter-list.rst Each converter has its own associated configuration requirements. To configure a converter-specific property, you prepend the connect property (where a converter has been specified) to the converter property. The ``AvroConverter`` is recommended for |kconnect| data. To use the ``AvroConverter`` with :ref:`Schema Registry `, you specify the ``key.converter`` and ``value.converter`` properties in the worker configuration. An additional converter property must also be added that provides the |sr| URL. The example below shows the ``AvroConverter`` key and value properties that are added to the configuration: :: key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 The Avro key and value converters can be used independently from each other. For example, you may want to use a ``StringConverter`` for keys and the ``AvroConverter`` or ``JsonConverter`` for values. The example properties for this use case are shown below: :: key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 If you need to use JSON for |kconnect| data, you can use the ``JsonConverter`` supported with |ak|. Generally, you use the JSON converter without schemas. The example below shows the ``JsonConverter`` key and value properties that are added to the configuration: :: key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false .. important:: These converters are used by **all** connectors running on the worker, except for any connectors whose configurations override these configurations. .. tip:: For a deep dive into converters, see: `Converters and Serialization Explained `__. .. _connect_override_client_configs: Overriding Default Producer and Consumer Settings ------------------------------------------------- Internally, |kconnect-long| uses standard Java producers and consumers to communicate with |ak|. |kconnect| configures default settings for these producer and consumer instances. The default settings include properties that ensure data from sources is delivered to |ak| in order and without any data loss. You may need to override a default setting. The following two examples show when this might be required. Worker override ~~~~~~~~~~~~~~~ Consider a standalone process that runs a log file connector. For the logs being collected, you might prefer low-latency, best-effort delivery. That is, when there are connectivity issues, minimal data loss may be acceptable for your application in order to avoid data buffering on the client. This keeps log collection as lightweight as possible. To override :ref:`producer configuration properties ` and :ref:`consumer configuration properties ` for all connectors controlled by the worker, you prefix worker configuration properties with ``producer.`` or ``consumer.`` as shown in the example below: :: producer.retries=1 consumer.max.partition.fetch.bytes=10485760 The example above overrides the default producer ``retries`` property to retry sending messages only one time. The consumer override increases the default amount of data fetched from a partition per request to 10 MB. These configuration changes are applied to **all connectors** controlled by the worker. Be careful making any changes to these settings when running distributed mode workers. Per-connector override ~~~~~~~~~~~~~~~~~~~~~~ By default, the producers and consumers used for connectors are created using the same properties that |kconnect| uses for its own internal topics. That means that the same |ak| principal needs to be able to read and write to all the internal topics and all of the topics used by the connectors. You may want the producers and consumers used for connectors to use a different |ak| principal. It is possible for connector configurations to override worker properties used to create producers and consumers. These are prefixed with ``producer.override.`` and ``consumer.override.``. For additional information about per-connector overrides, see :ref:`connect-override-config`. .. _connect_next-steps: Next Steps ---------- After getting started with your deployment, you may want check out the following additional |kconnect-long| documentation: * :ref:`connect_quickstart` * :ref:`Upgrade Kafka Connect ` * :ref:`connect_security` * :ref:`connect_userguide_rest` * :ref:`schemaregistry_kafka_connect` * :ref:`Upgrading a Connector Plugin ` * :ref:`connect-override-config` * :ref:`Adding Connectors or Software (Docker) ` .. tip:: Try out our :devx-examples:`end-to-end demos|README.md` for |kconnect-long| on-premises, |ccloud|, and |co-long|.