How to Use Kafka Connect - Get Started¶
Kafka Connect is the framework to integrate popular systems, such as databases and cloud services with Kafka. This guide will help you get started in deploying Connect and leveraging connectors.
Ready to get started?
- Sign up for Confluent Cloud, the fully managed cloud-native service for Apache Kafka® and get started for free using the Cloud quick start.
- Download Confluent Platform, the self managed, enterprise-grade distribution of Apache Kafka and get started using the Confluent Platform quick start.
Deployment considerations¶
To get started with Kafka Connect, you must have a set of Kafka brokers. The Kafka brokers can be an earlier broker version, or the latest version. For more details, see Cross-Component Compatibility.
In addition to Kafka brokers, there are a few deployment options to consider as well. Understanding and acting on these deployment options ensures your Kafka Connect deployment will scale and support the long-term needs of your data pipeline.
Confluent Schema Registry¶
Although Schema Registry is an optional service for Kafka Connect, it enables you to easily use Avro, Protobuf, and JSON Schema as common data formats for the Kafka records that connectors read from and write to. This keeps the need to write custom code at a minimum and standardizes your data in a flexible format. You also get the added benefit of schema evolution and enforced compatibility rules. For more information, see Using Kafka Connect with Schema Registry and Configuring key and value converters.
Standalone and distributed mode¶
Connectors and tasks are logical units of work that run as a process. The process is called a worker in Kafka Connect. There are two modes for running workers:
Standalone mode: Useful for development and testing Kafka Connect on a local machine. It can also be used for environments that typically use single agents (for example, sending web server logs to Kafka).
Distributed mode: Distributed mode is recommended for production environments because of scalability, high availability, and management benefits. It runs Connect workers on multiple machines (nodes), which form a Connect cluster. Kafka Connect distributes running connectors across the cluster. You can add or remove nodes as your needs evolve.
This mode is also more fault tolerant. For example, if a node unexpectedly leaves the cluster, Kafka Connect distributes the work of that node to other nodes in the cluster. And, because Kafka Connect stores connector configurations, status, and offset information inside the Kafka cluster where it is safely replicated, losing the node where a Connect worker runs does not result in any loss of data.
Try to identify which mode works best for your environment before getting started. For more information about the different deployment modes for Kafka Connect workers check out this video.
Operating environment¶
Connect workers operate well in containers and managed environments, such as Kubernetes, Apache Mesos, Docker Swarm, or Yarn. The distributed worker stores all states in Kafka making it easier to manage a cluster. And, by design, Kafka Connect does not handle restarting or scaling workers. This means your existing cluster management solution can continue to be used transparently. Note that the standalone worker state is stored on the local file system.
For more information about using Docker, see Install using Docker. For details about deploying and managing Confluent Platform in a Kubernetes environment, see Confluent for Kubernetes .
Kafka Connect workers are JVM processes that can run on shared machines with sufficient resources. Hardware requirements for Connect workers are similar to that of standard Java producers and consumers. Resource requirements mainly depend on the types of connectors operated by the workers. More memory is required for environments where large messages are sent and where a large numbers of messages get buffered before being written in aggregate form to an external system. Using compression continuously requires a more powerful CPU.
If you have multiple workers running concurrently on a single machine, ensure you know the resource limits (CPU and memory). Start with the default heap size setting and monitor internal metrics and the system. Verify the CPU, memory, and network (10 GbE or greater) are sufficient for the load.
Install a Connect plugin¶
Kafka Connect is designed to be extensible so developers can create custom connectors, transforms, and converters, and users can install and run them. This section will help you with installing Connect plugins.
Define a plugin¶
A Kafka Connect plugin is a set of JAR files containing the implementation of one or more connectors, transforms, or converters. Connect isolates each plugin from one another so libraries in one plugin are not affected by the libraries in any other plugins. This is very important when mixing and matching connectors from multiple providers.
Caution
It’s common to have many plugins installed in a Connect deployment. Ensure you have only one version of each plugin installed.
A Kafka Connect plugin can be any one of the following:
- A directory on the file system that contains all required JAR files and third-party dependencies for the plugin. This is most common and is preferred.
- A single uber JAR containing all the class files for a plugin and its third-party dependencies.
A Kafka Connect plugin should never contain any libraries provided by the
Kafka Connect runtime. Kafka Connect finds the plugins using a plugin path
defined as a comma-separated list of directory paths in the plugin.path
worker configuration property. The
following shows an example plugin.path
worker configuration property:
plugin.path=/usr/local/share/kafka/plugins
Install a plugin¶
To install a plugin, you must place the plugin directory or uber JAR (or a
symbolic link that resolves to one of these) in a directory already listed in
the plugin path. Or, you can update the plugin path by adding the absolute path
of the directory containing the plugin. Using the previous plugin path example,
you would create a /usr/local/share/kafka/plugins
directory on each
machine running Connect and then place the plugin directories (or uber
JARs) there.
When you start your Connect workers, each worker discovers all connectors, transforms, and converter plugins found inside the directories on the plugin path. When you use a connector, transform, or converter, the Connect worker loads the classes from the respective plugin first, followed by the Kafka Connect runtime and Java libraries. Connect explicitly avoids all of the libraries in other plugins. This prevents conflicts and makes it very easy to add and use connectors and transforms developed by different providers.
To find the components that fit your needs, check out the Confluent Hub page–it has an ecosystem of connectors, transforms, and converters. To install a connector using the Confluent CLI, see the confluent connect plugin install command. For a full list of supported connectors, see Supported Connectors.
Earlier versions of Kafka Connect required a different approach to installing
connectors, transforms, and converters. All the scripts for running Connect
recognized the CLASSPATH
environment variable. You would export this
variable to define the list of paths to the connector JAR files. The following
example shows an older CLASSPATH
export variable mechanism:
export CLASSPATH=/path/to/my/connectors/*
bin/connect-standalone standalone.properties new-custom-connector.properties
Confluent does not recommend exporting CLASSPATH
environment variable as
using this method to create a path to plugins can result in library conflicts
that can cause Kafka Connect and connectors to fail. Use the plugin.path
configuration property which properly isolates each plugin from other plugins
and libraries.
Note
As described in Installing Connect Plugins, connector plugin JAR files
are placed in the plugin path (Connect worker
property: plugin.path
). However, a few connectors may require that you
additionally export the CLASSPATH
to the plugin JAR files when starting
the connector (export CLASSPATH=<path-to-jar-files>
). While not
recommended, CLASSPATH
is required for these connectors because
Kafka Connect uses classloading isolation to distinguish between system
classes and regular classes, and some plugins load system classes (for
example, javax.naming
and others in the package javax
). An example
error message showing this issue is provided below. If you see an error that
resembles the example below, in addition to adding the plugin path, you must also export CLASSPATH=<path-to-jar-files>
when starting the connector.
Caused by: javax.naming.NoInitialContextException:
Cannot instantiate class: com.tibco.tibjms.naming.TibjmsInitialContextFactory
[Root exception is java.lang.ClassNotFoundException: com.tibco.tibjms.naming.TibjmsInitialContextFactory]
Tip
By default, connectors inherit the partitioner used for the Kafka topic. You
can create a custom partitioner for a connector which you must place in the
connector’s /lib
folder.
You can also put partitioners in a common location of choice. If you choose
this option, you must add a symlink to the location from each connector’s
/lib
folder. For example, you would place a custom partitioner in the
path share/confluent-hub-components/partitioners
and then add the symlink
share/confluent-hub-components/kafka-connect-s3/lib/partitioners -> ../../partitioners
.
Configure and run workers¶
The following sections provide information about running workers in standalone mode and distributed mode. For a list of worker configuration properties, see Kafka Connect Worker Configuration Properties.
This video explains the different deployment modes for Kafka Connect workers.
Standalone mode¶
Standalone mode is typically used for development and testing, or for lightweight, single-agent environments-for example, sending web server logs to Kafka. The following example shows a command that launches a worker in standalone mode:
bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]
The first parameter (worker.properties
) is the worker configuration
properties file. Note that worker.properties
is an example file name. You can use any valid file name for your worker
configuration file. This file gives you control over settings such as the Kafka
cluster to use and serialization format. For an example configuration file that
uses Avro and Schema
Registry in a standalone mode, open the file
located at etc/schema-registry/connect-avro-standalone.properties
. You can
copy and modify this file for use as your standalone worker properties file.
The second parameter (connector1.properties
) is the connector configuration
properties file. All connectors have configuration properties that are loaded
with the worker. As shown in the example, you can launch multiple connectors
using this command.
If you run multiple standalone workers on the same host machine, the following two configuration properties must be unique for each worker:
offset.storage.file.filename
: The storage file name for connector offsets. This file is stored on the local filesystem in standalone mode. Using the same file name for two workers will cause offset data to be deleted or overwritten with different values.listeners
: A list of URIs the REST API will listen on in the formatprotocol://host:port,protocol2://host2:port
–the protocol is either HTTP or HTTPS. You can specify hostname as0.0.0.0
to bind to all interfaces or leave hostname empty to bind to the default interface.Note
You update the
etc/schema-registry/connect-avro-standalone.properties
file if you need to apply a change to Connect when starting Confluent Platform services using the Confluent CLI.
Distributed mode¶
Distributed mode does not have any additional command-line parameters other than
loading the worker configuration file. New workers will either start a new group
or join an existing one with a matching group.id
. Workers then coordinate
with the consumer groups to distribute the work to be done.
The following shows an example command that launches a worker in distributed mode:
bin/connect-distributed worker.properties
For an example distributed mode configuration file that uses Avro and
Schema Registry, open
etc/schema-registry/connect-avro-distributed.properties
. You can make a copy
of this file, modify it, use it as the new worker.properties
file. Note that
worker.properties
is an example file name. You can use any valid file name
for your properties file.
In standalone mode, connector configuration property files are added as commmand-line parameters. However, in distributed mode, connectors are deployed and managed using a REST API request. To create connectors, you start the worker and then make a REST request to create the connector. REST request examples are provided in many supported connector documents. For instance, see the Azure Blob Storage Source connector REST-based example for one example.
Note that if you run many distributed workers on one host machine for development
and testing, the listeners
configuration property must be unique for each
worker. This is the port the REST interface listens on for HTTP requests.
Note
You update the etc/schema-registry/connect-avro-distributed.properties
file if you need to apply a change to Connect when starting Confluent Platform
services using the Confluent CLI.
Connect internal topics¶
Connect stores connector and task configurations, offsets, and status in several Kafka topics. These are referred to as Kafka Connect internal topics. It is important that these internal topics have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions.
Kafka Connect can create the internal topics when it starts up, using the Connect worker configuration properties to specify the topic names, replication factor, and number of partitions for these topics. Connect verifies that the properties meet the requirements and creates all topics with compaction cleanup policy.
Distributed workers that are configured with matching group.id
values
discover each other and form a Kafka Connect cluster. All workers in the
cluster use the same three internal topics to share connector configurations,
offset data, and status updates. For this reason, all distributed worker
configurations in the same Connect cluster must have matching
config.storage.topic
, offset.storage.topic
, and status.storage.topic
properties.
In addition to the three required internal topic names, the distributed worker configuration should have identical values for the following listed properties. This ensures that any worker in the cluster will create missing internal topics with the desired property values. Note that these configuration properties have practical default values.
config.storage.replication.factor
offset.storage.replication.factor
offset.storage.partitions
status.storage.replication.factor
status.storage.partitions
Note
Starting with Confluent Platform version 6.0, Kafka Connect can create internal topics
using the Kafka broker’s default.replication.factor
and num.partitions
values. For more information, see
Use Kafka broker default topic settings.
As each distributed worker starts up, it uses the internal Kafka topics if they already exist. If not, the worker tries to create the topics using the worker configuration properties. This gives you the option of manually creating the topics before starting Kafka Connect, if you require topic-specific settings or when Kafka Connect does not have the necessary privileges to create the topics. If you do create the topics manually, follow the guidelines provided in the list of configuration properties.
If you want to create a distributed worker that is independent of an existing Connect cluster, you must create new worker configuration properties. The following configuration properties must be different from the worker configurations used in an existing cluster:
group.id
config.storage.topic
offset.storage.topic
status.storage.topic
It is important to note that Connect clusters cannot share group IDs or
internal topics. Simply changing a group.id
will not create a new worker
separate from an existing Connect cluster. The new group.id
must also
have unique internal topics associated with it. This requires setting unique
config.storage.topic
, offset.storage.topic
, and status.storage.topic
configuration properties for the new group.id
.
You also must use different connector names than those used in the existing Connect cluster since a consumer group is created based on the connector name. Each connector in a Connect cluster shares the same consumer group.
Use Kafka broker default topic settings¶
The Connect worker can create internal topics using Kafka broker defaults for
the replication factor and number of partitions. To use the Kafka broker defaults
for the replication factor and number of partitions, use -1
in the worker
configuration properties for the internal topics. The following is an example
snippet:
# Use the broker default properties
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
offset.storage.partitions=-1
status.storage.partitions=-1
The config.storage
internal topic must always have exactly one partition.
This is why there is no option to use the Kafka broker default for
config.storage
.
Override topic-specific properties¶
You can override topic properties that are valid for the version of the Kafka broker where the internal topics will be created. The following example shows how you should enter these properties in the worker configuration:
config.storage.<topic-specific-property>
offset.storage.<topic-specific-property>
status.storage.<topic-specific-property>
The following example shows using the Kafka broker default property for
replication factor and overriding the default minimum In-Sync Replicas
(min.insync.replicas
) property (1
is the Kafka broker default):
# Use the broker default properties
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
# Override the broker default properties
config.storage.min.insync.replicas=3
offset.storage.min.insync.replicas=3
status.storage.min.insync.replicas=3
If the topic property is not valid for the Kafka broker version, the Connect worker will fail upon startup.
Manually create internal topics¶
Allowing Kafka Connect to create these internal topics is recommended. However, you may want to manually create the topics. The following examples illustrate when you would manually create these topics:
- For security purposes, the broker may be configured to not allow clients like Connect to create Kafka topics.
- You may require other advanced topic-specific settings that are not set by Connect or that are different than the auto-created settings.
The following example commands show how to manually create compacted and replicated Kafka topics before starting Connect. Ensure you adhere to the distributed worker guidelines when entering parameters.
# config.storage.topic=connect-configs
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# offset.storage.topic=connect-offsets
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
# status.storage.topic=connect-status
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
All workers in a Connect cluster use the same internal topics. Workers in a different cluster must use different internal topics. For more details, see Distributed Worker Configuration.
Worker configuration properties file¶
Regardless of the mode used, Kafka Connect workers are configured by passing a worker configuration properties file as the first parameter. For example:
bin/connect-distributed worker.properties
Sample worker configuration properties files are included with Confluent Platform to help you get started. The following list shows the location for Avro sample files:
etc/schema-registry/connect-avro-distributed.properties
etc/schema-registry/connect-avro-standalone.properties
Use one of these files as a starting point. These files contain the necessary configuration properties to use the Avro converters that integrate with Schema Registry. They are configured to work well with Kafka and Schema Registry services running locally. They do not require running more than a single broker, making it easy for you to test Kafka Connect locally.
The example configuration files can also be modified for production deployments by using the correct hostnames for Kafka and Schema Registry and acceptable (or default) values for the internal topic replication factor.
For a list of worker configuration properties, see Kafka Connect Worker Configuration Properties.
Configuring key and value converters¶
The following list shows the converters packaged with the Confluent Platform:
- AvroConverter
io.confluent.connect.avro.AvroConverter
: use with Schema Registry - ProtobufConverter
io.confluent.connect.protobuf.ProtobufConverter
: use with Schema Registry - JsonSchemaConverter
io.confluent.connect.json.JsonSchemaConverter
: use with Schema Registry - JsonConverter
org.apache.kafka.connect.json.JsonConverter
(without Schema Registry): use with structured data - StringConverter
org.apache.kafka.connect.storage.StringConverter
: simple string format - ByteArrayConverter
org.apache.kafka.connect.converters.ByteArrayConverter
: provides a “pass-through” option that does no conversion
The Kafka Connect 101 course explains converters in detail.
The key.converter
and value.converter
properties are where you specify
the type of converter to use.
Default converters for all connectors are specified in the worker
configuration.
However, any connector can override the default converters by completely
defining a key, value, and header converter. Confluent recommends you define the
default key, value, and header converters that most connectors can use in the
worker, and then define them in a connector’s configuration if that connector
requires different converters. The default header.converter
defined in the
worker serializes header values as strings using the StringConverter
and
deserializes header values to the most appropriate numeric, boolean, array, or
map representation. Schemas are not serialized but are inferred upon
deserialization when possible.
It is important to note that converter configuration properties in the worker configuration are used by all connectors running on the worker, unless a converter is added to a connector configuration.
If a converter is added to a connector configuration, all converter properties
in the worker configuration prefixed with the converter type added
(key.converter.*
and/or value.converter.*
) are not used. Be careful when
adding converters to a connector configuration. For example, if the following
value converter properties are present in the worker configuration:
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
And, you add the following properties to your connector configuration:
{
"value.converter": "AvroConverter",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "<username>:<password>"
}
An error will occur when the connector is started because the required Schema Registry URL
property value.converter.schema.registry.url=http://localhost:8081
is not
provided to the converter.
The following sections provide converter descriptions and examples. For details about how these converters work in Schema Registry, see Using Kafka Connect with Schema Registry.
Avro¶
To use the AvroConverter
with Schema
Registry, you specify the key.converter
and
value.converter
properties in the worker configuration. You must also add a
converter property that provides the Schema Registry URL. The following example shows the
AvroConverter
key and value properties that are added to the configuration:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
The Avro key and value converters can be used independently from each other. For
example, you may want to use a StringConverter
for keys and the
AvroConverter
or JsonConverter
for values. Independent key and value
properties are shown in the following example:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
JSON Schema and Protobuf¶
Both JSON Schema and Protobuf converters are implemented in the same way as the
Avro converter. The following examples show a couple of configuration examples
using the ProtobufConverter
or JsonSchemaConverter
for the value
converter and using StringConverter
for the key:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
Both Avro and JSON Schema express their schemas as JSON and are lenient if unrecognized properties are encountered. This allows the converter to use custom JSON properties to capture any Kafka Connect schema objects with no equivalent in Avro or JSON Schema.
However, Protobuf has its own Interface Definition Language (IDL) which differs from JSON and does not allow for custom ad-hoc properties. For this reason, the conversion from the Kafka Connect schema to Protobuf may cause data loss or inconsistencies if there is no direct equivalent in Protobuf.
For example, the Kafka Connect schema supports int8
, int16
, and
int32
data types. Protobuf supports int32
and int64
. When Connect
data is converted to Protobuf, int8
and int16
fields are mapped to int32
or int64 with no indication that the source was int8
or int16
.
With JSON Schema, only number and integer type fields are supported. However,
the JSON Schema converter (JsonSchemaConverter
) will store data with no JSON
Schema equivalent in a property named connect.type
. This property is ignored
by the JSON Schema parser, so fields can be restored to the proper type by
downstream components.
For full encoding details, see JSON encoding for Avro and JSON
encoding for Protobuf.
Additionally, JSON Schema supports three means of combining schemas: allOf,
anyOf, and oneOf.
However, the JSON Schema converter only supports oneOf
, treating it
similarly to how the Avro converter handles unions and how the Protobuf
converter handles oneOf
.
Note that if you’re configuring Avro, Protobuf, or JSON Schema converters in an environment configured for Role-Based Access Control (RBAC), see key and value converters with RBAC. For details about how converters work with Schema Registry, see Using Kafka Connect with Schema Registry.
The following converters are not used with Schema Registry.
JSON (without Schema Registry)¶
If you need to use JSON without Schema Registry for Connect data, you can use the
JsonConverter
supported with Kafka. The following example shows the
JsonConverter
key and value properties that are added to the configuration:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
When the properties key.converter.schemas.enable
and
value.converter.schemas.enable
are set to true
, the key or value is not
treated as plain JSON, but rather as a composite JSON object containing both an
internal schema and the data. When these are enabled for a source connector,
both the schema and data are in the composite JSON object. When these are
enabled for a sink connector, the schema and data are extracted from the
composite JSON object. Note that this implementation never uses Schema Registry.
When the properties key.converter.schemas.enable
and
value.converter.schemas.enable
are set to false
(the default), only the
data is passed along, without the schema. This reduces the payload overhead for
applications that do not need a schema.
String format and raw bytes¶
org.apache.kafka.connect.storage.StringConverter
is used to convert the
internal Connect format to simple string format. When converting Connect
data to bytes, the schema is ignored and data is converted to a simple string.
When converting from bytes to Connect data format, the converter returns an
optional string schema and a string (or null).
org.apache.kafka.connect.converters.ByteArrayConverter
does not convert
data. Bytes are passed through the connector directly with no conversion.
For a deep dive into converters, see: Converters and Serialization Explained.
Connect producers and consumers¶
Internally, Kafka Connect uses standard Java producers and consumers to communicate with Kafka. Connect configures default settings for these producer and consumer instances. These settings include properties that ensure data is delivered to Kafka in order and without any data loss.
Default Connect producer properties¶
By default, Connect configures the Kafka producers for source connectors with the following properties:
- Points the producer’s bootstrap servers to the same Kafka cluster used by the Connect cluster.
- Configures key and value serializers that work with the connector’s key and value converters.
- Generates a producer
client.id
based on the connector and task, using the patternconnector-producer-<connectorName>-<taskId>
. - Sets
acks=all
to ensure each message produced is properly written to all in-sync replicas (ISRs). - For retryable exceptions, Connect configures the producer with the
following properties to reduce the potential for data duplication during
infinite retries:
request.timeout.ms=<max>
max.block.ms=<max>
max.in.flight.requests.per.connection=1
delivery.timeout.ms=<max>
You can override the defaults by using the producer.*
properties in the
worker configuration, or by using the producer.override.*
properties in
connector configurations, but changing these default properties may compromise
the delivery guarantees of Connect.
Producer and consumer overrides¶
You may need to override default settings, other than those described in the previous section. The following two examples show when this might be required.
Worker override example
Consider a standalone process that runs a log file connector. For the logs being collected, you might prefer low-latency, best-effort delivery. That is, when there are connectivity issues, minimal data loss may be acceptable for your application in order to avoid data buffering on the client. This keeps log collection as lightweight as possible.
To override producer configuration
properties and
consumer configuration
properties for all connectors
controlled by the worker, you prefix worker configuration properties with
producer.
or consumer.
as shown in the following example:
producer.retries=1
consumer.max.partition.fetch.bytes=10485760
The previous example overrides the default producer retries
property to
retry sending messages only one time. The consumer override increases the
default amount of data fetched from a partition per request to 10 MB. These
configuration changes are applied to all connectors controlled by the worker. Be
careful making any changes to these settings when running distributed mode
workers.
Per-connector override example
By default, the producers and consumers used for connectors are created using the same properties that Connect uses for its own internal topics. This means that the same Kafka principal must be able to read and write to all the internal topics and all of the topics used by the connectors.
You may want the producers and consumers used for connectors to use a different
Kafka principal. It is possible for connector configurations to override worker
properties used to create producers and consumers. These are prefixed with
producer.override.
and consumer.override.
. For more information about
per-connector overrides, see Override the Worker
Configuration.
For detailed information about producers and consumers, see Kafka Producer and Kafka Consumer. For a list of configuration properties, see producer configuration properties and consumer configuration properties.
Configuring auto topic creation for source connectors¶
Beginning with Confluent Platform version 6.0, Kafka Connect can create topics for source
connectors if the topics do not exist on the Apache Kafka® broker. To use auto topic
creation for source connectors, you must set the Connect worker property to
true
for all workers in the Connect cluster. In addition, you must
create supporting properties in each source connector configuration.
Important
- This feature does not affect sink connectors or their configuration. Any topic creation properties you add to sink connectors will be ignored and will produce a warning in the log.
- If you do not want the source connector to create topics that are missing,
you must disabled the feature by setting
topic.creation.enable=false
in the Connect worker.
Worker property¶
The following worker property enables or disables auto topic creation for source connectors.
topic.creation.enable
Defaults to
true
. This feature is enabled only for source connector configurations that have the supporting Source connector properties.- Type: boolean
- Default: true
- Importance: low
Source connector properties¶
Several source connector properties are associated with the worker property
topic.creation.enable
. These properties set the default replication factor,
number of partitions, and other topic-specific settings to be used by
Kafka Connect to create a topic if it does not exist. None of the properties
have default values.
The following source connector configuration properties are required:
topic.creation.$alias.replication.factor
topic.creation.$alias.partitions
The auto topic creation feature is enabled for the source connector only when
the feature is enabled in the worker configuration and when the source connector
configuration specifies the required replication factor and number of
partitions for one group. Users may choose to use the default values specified
in the Kafka broker by setting topic.creation.$alias.replication.factor
or
topic.creation.$alias.partitions
to -1
.
You can define more connector properties using configuration property groups.
Configuration property groups are added using the property
topic.creation.groups
. The hierarchy of groups is built on top of a single
foundational group called the default
configuration property group. The
default
group always exists and does not need to be listed in the
topic.creation.groups
property in the connector configuration. Including
default
in topic.creation.groups
results in a warning.
The following source connector configuration properties are used in association
with the topic.creation.enable=true
worker property. For example properties,
see Configuration examples.
Configuration properties accept regular expressions (regex) that are defined as Java regex.
topic.creation.groups
A list of group aliases that are used to define per-group topic configurations for matching topics. A
default
group always exists and matches all topics.- Type: List of String types
- Default: empty
- Possible Values: The values of this property refer to any additional groups. A
default
group is always defined for topic configurations.
topic.creation.$alias.replication.factor
The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.partitions
The number of topic partitions created by this connector. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.include
A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude
A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group. Note that exclusion rules override any inclusion rules for topics.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}
Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule.
$alias
applies to thedefault
group as well as any group defined intopic.creation.groups
.- Type: property values
- Default: Kafka broker value
Configuration examples¶
The following example configuration snippets show how the source connector
configuration properties are entered when topic.creation.enable
is enabled
in the Connect worker configuration.
Example 1
All new topics created by Connect have a replication factor of 3 and 5
partitions. default
is the only group, so config topic.creation.groups
is not used.
...omitted
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5
...omitted
Example 2
New topics created by Connect have replication factor of 3 and 5 partitions.
The exception to this configuration setting are topics that match the inclusion
list of the inorder
group, which have one partition.
...omitted
topic.creation.groups=inorder
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5
topic.creation.inorder.include=status, orders.*
topic.creation.inorder.partitions=1
...omitted
Example 3
New topics created by Connect have replication factor of 3 and 5 partitions.
The key_value_topic
and another.compacted.topic
topics that begin with
the prefix configurations
are compacted and have a replication factor of 5
and one partition.
...omitted
topic.creation.groups=compacted
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5
topic.creation.compacted.include=key_value_topic, another.compacted.topic, configurations.*
topic.creation.compacted.replication.factor=5
topic.creation.compacted.partitions=1
topic.creation.compacted.cleanup.policy=compact
...omitted
Example 4
New topics created by Connect have replication factor of 3 and 5 partitions.
Topics that begin with the prefix configurations
are compacted. Topics that
match the includes list of highly_parallel
and don’t match its exclusion
list have a replication factor of 1 and one partition.
...omitted
topic.creation.groups=compacted, highly_parallel
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5
topic.creation.highly_parallel.include=hpc.*,parallel.*
topic.creation.highly_parallel.exclude=.*internal, .*metadata, .*config.*
topic.creation.highly_parallel.replication.factor=1
topic.creation.highly_parallel.partitions=100
topic.creation.compacted.include=configurations.*
topic.creation.compacted.cleanup.policy=compact
...omitted
Security configuration examples¶
When security is configured, the Connect worker should have a principal
configured to DESCRIBE and CREATE topics, which is inherited by all connectors
in the worker. If different security
settings are required from what the
Connect worker configuration provides, you can add producer.override
properties to the source connector configuration to provide the security
credentials, as shown in the following examples:
...omitted
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice-secret";
...omitted
...omitted
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice-secret";
admin.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="bob" \
password="bob-secret";
...omitted
An error is logged and the task fails if topic.creation.enable=true
is
configured, and neither the worker properties nor the connector overrides allow
for creation of new topics.
Connect reporter¶
The Kafka Connect Reporter submits the result of a sink operation to a reporter topic. After successfully sinking a record or following an error condition, the Connect Reporter is called to submit the result report. The report is constructed to include details about how the original record was handled along with additional information about the sink event. These records are written to configurable success and error topics for further consumption. The following is an example of the basic Connect Reporter configuration properties added to a sink connector configuration:
reporter.bootstrap.servers=localhost:9092
reporter.result.topic.name=success-responses
reporter.result.topic.replication.factor=1
reporter.error.topic.name=error-responses
reporter.error.topic.replication.factor=1
To completely disable Connect Reporter, see Disabling Connect reporter.
If you have a secure environment, you use configuration blocks for both an Admin Client and Producer. A Producer is constructed to send records to the reporter topic. The Admin Client creates the topic. Credentials need to be added in a secure environment. Example Admin and Producer properties are shown below:
reporter.admin.bootstrap.servers=localhost:9092
reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.admin.security.protocol=SASL_SSL
reporter.admin.sasl.mechanism=PLAIN
reporter.producer.bootstrap.servers=localhost:9092
reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.producer.security.protocol=SASL_SSL
reporter.producer.sasl.mechanism=PLAIN
Additional Reporter configuration property examples are provided in each applicable Kafka Connect sink connector document. For an example, see the Reporter properties in the HTTP Sink connecter.
Reporter and Kerberos security¶
The following configuration example shows a sink connector with all the necessary configuration properties for Reporter and Kerberos security. This example shows the Prometheus Metrics Sink Connector for Confluent Platform, but can be modified for any applicable sink connector.
{
"name" : "prometheus-connector",
"config" : {
"topics":"prediction-metrics",
"connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"confluent.topic.ssl.truststore.password":"xxxx",
"confluent.topic.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"confluent.topic.ssl.keystore.password":"xxxx",
"confluent.topic.ssl.key.password":"xxxx",
"confluent.topic.security.protocol":"SASL_SSL",
"confluent.topic.replication.factor": "3",
"confluent.topic.sasl.kerberos.service.name":"kafka",
"confluent.topic.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"prometheus.scrape.url": "http://localhost:8889/metrics",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"behavior.on.error": "LOG",
"reporter.result.topic.replication.factor": "3",
"reporter.error.topic.replication.factor": "3",
"reporter.bootstrap.servers":"localhost:9092",
"reporter.producer.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.producer.ssl.truststore.password":"xxxx",
"reporter.producer.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.producer.ssl.keystore.password":"xxxx",
"reporter.producer.ssl.key.password":"xxxx",
"reporter.producer.security.protocol":"SASL_SSL",
"reporter.producer.sasl.kerberos.service.name":"kafka",
"reporter.producer.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"reporter.admin.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.admin.ssl.truststore.password":"xxxx",
"reporter.admin.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.admin.ssl.keystore.password":"xxxx",
"reporter.admin.ssl.key.password":"xxxx",
"reporter.admin.security.protocol":"SASL_SSL",
"reporter.admin.sasl.kerberos.service.name":"kafka",
"reporter.admin.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"confluent.license":"eyJ0eXAiOiJK ...omitted"
}
Disabling Connect reporter¶
To disable Connect Reporter, set the reporter.error.topic.name
and
reporter.result.topic.name
configuration properties to empty strings.
Metrics reporter metadata¶
With the release of Confluent Platform 6.0, a configuration prefix is available for Kafka Connect that allows connectors to pass metrics context metadata to embedded metrics client libraries. The Connect worker passes the context configuration to all instantiated connectors. The following provides information on these worker configuration properties:
metrics.context.<key>=<value>
The configured key/value pairs are passed by the connectors in the Connect
worker to the configured Confluent Metrics
Reporter using the MetricsContext
interface.
For example, configuring a Connect worker with
metrics.context.foo.bar=baz
adds the field foo.bar
mapped to the value
baz
in the MetricsContext
metadata metrics reporter.
Connect workers can pass the following fields to MetricsContext
:
connect.kafka.cluster.id
to indicate the backing Apache Kafka® cluster.connect.group.id
to indicate the group ID used for Connect worker coordination.connect.group.id
is only valid for distributed mode. For more information, see Connect internal topics.
ConfigProvider interface¶
The ConfigProvider class interface allows you to use variables in your worker configuration that are dynamically resolved upon startup. It also allows you to use variables in your connector configurations that are dynamically resolved when the connector is (re)started. You can use variables within configuration property values in place of secrets, or in place of any information that should be resolved dynamically at runtime.
Note
Connector configurations are persisted and shared over the Connect REST API with the variables. Only when the connector starts does it transiently resolve and replace variables in-memory. Secrets are never persisted in connector configs, logs, or in REST API requests and responses.
The Connect worker relies upon the named ConfigProviders defined in the worker configuration to resolve the variables. Each variable specifies the name of the ConfigProvider that should be used, and the information the ConfigProvider uses to resolve the variable into a replacement string.
All ConfigProvider
implementations are discovered using the standard Java
ServiceLoader
mechanism. To create a custom implementation of
ConfigProvider
, implement the ConfigProvider
interface. Package the implementation class(es) and a file named
META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider
containing the fully qualified name of the ConfigProvider
implementation
class into a JAR file. Note that the JAR file can use third-party libraries
other than those provided by the Connect framework, but they must be
installed with the JAR file as described below.
To install the custom implementation, add a new subdirectory
containing the JAR files to the directory that is in Connect’s
plugin.path
and (re)start the Connect workers.
To implement a custom ConfigProvider
, add a new subdirectory containing the
JAR files under the default CLASSPATH
location—for example,
/usr/local/share/java
—and restart the Connect workers. Note that the
JAR files should not be placed under a subdirectory in Connect’s
plugin.path
. The plugin.path
locations are for connectors and their
specific dependent JARs and not for config.providers
.
When the Connect worker starts, it instantiates all ConfigProvider
implementations specified in the worker configuration. All properties prefixed
with config.providers.[provider].param.*
are passed to the configure()
method of the ConfigProvider
. When the Connect worker shuts down, it
calls the close()
method of the ConfigProvider
.
Important
Any worker in a Connect cluster must be able to resolve every variable in the worker configuration, and must be able to resolve all variables used in every connector configuration. This requires the following:
- All of the workers in a Connect cluster must have the same set of named config providers.
- Each provider on every worker must have access to any resources required to resolve variables used in the worker config or in the connector configs.
The following configuration properties are added to the distributed worker configuration to make this work:
config.providers
: A comma-separated list of names for providers.config.providers.{name}.class
: The Java class name for a provider.config.providers.{name}.param.{param-name}
: A parameter (or parameters) to be passed to the above Java class on initialization.
FileConfigProvider¶
Kafka provides an implementation of ConfigProvider
called
FileConfigProvider
that allows variable references to be replaced with
values from local files on each worker. For example, rather than having a secret
in a configuration property, you can put the secret in a local file and use a
variable in connector configurations. When the connector is started, Connect
will use the file config provider to resolve and replace the variable with the
actual secret, ensuring that the connector configuration does not include the
secret when it is persisted and shared over the Connect REST API.
Important
Every worker in the Connect cluster must have access to the files referenced by all variables referencing the config provider.
Variables that refer by name to a FileConfigProvider
should be in the form
${provider:[path:]key}
. The path is the fully-qualified path of the
property file on each Connect worker; the key is the name of the key within
that property file. Note that the FileConfigProvider
supports reading any
file, where the path (and property key in that file) is specified in each
variable. When Connect resolves one of these variables, it will read the
properties file, extract the value for the corresponding key, and replace the
whole variable with that value.
The following shows a JDBC connector configuration that includes the database URL, username, and password:
connection.url=jdbc:oracle:thin:@myhost:1521:orcl
connection.user=scott
connection.password=<my-secret-password>
Instead of having these details exposed in the connector configuration, you can
use FileConfigProvider
and store them in a file accessible to each
Connect worker and protect them from other OS users. In the following
examples, the separate file is named /opt/connect-secrets.properties
. The
properties added to /opt/connect-secrets.properties
are listed below:
productsdb-url=jdbc:oracle:thin:@myhost:1521:orcl
productsdb-username=scott
productsdb-password=my-secret-password
other-connector-url=jdbc:oracle:thin:@myhost:1521:orcl
other-connector-username=customers
other-connector-password=superSecret!
Then, you can configure each Connect worker to use FileConfigProvider
. The worker configuration would include the following properties:
# Additional properties added to the worker configuration
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
The JDBC connector configuration can now use variables in place of the secrets:
# Additional properties added to the connector configuration
connection.url=${file:/opt/connect-secrets.properties:productsdb-url}
connection.user=${file:/opt/connect-secrets.properties:productsdb-username}
connection.password=${file:/opt/connect-secrets.properties:productsdb-password}
Another connector configuration could use variables with a different file, or variables that use different properties in the same file:
# Additional properties added to another connector configuration
connection.url=${file:/opt/connect-secrets.properties:other-connector-url}
connection.user=${file:/opt/connect-secrets.properties:other-connector-username}
connection.password=${file:/opt/connect-secrets.properties:other-connector-password}
InternalSecretConfigProvider¶
Confluent Platform provides another implementation of ConfigProvider
named
InternalSecretConfigProvider
which is used with the Connect
Secret Registry. The
InternalSecretConfigProvider
requires Role-based access control (RBAC) with Secret Registry. The Secret Registry is a secret serving
layer that enables Connect to store encrypted Connect credentials in a
topic exposed through a REST API. This eliminates any unencrypted credentials
being located in the actual connector configuration. The following example shows
how InternalSecretConfigProvider
is configured in the worker configuration
file:
### Secret Provider
config.providers=secret
config.providers.secret.class=io.confluent.connect.secretregistry.rbac.config.provider.InternalSecretConfigProvider
config.providers.secret.param.master.encryption.key=<encryption key>
config.providers.secret.param.kafkastore.bootstrap.servers=SASL_PLAINTEXT://<Kafka broker URLs>
config.providers.secret.param.kafkastore.security.protocol=SASL_PLAINTEXT
config.providers.secret.param.kafkastore.sasl.mechanism=OAUTHBEARER
config.providers.secret.param.kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
config.providers.secret.param.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
username="<service-principal-username>" \
password="<service-principal-password>" \
metadataServerUrls="<metadata server URLs>";
Shut down Kafka Connect¶
There are a few ways to shut down Kafka Connect. If you are running Connect in Confluent Platform standalone mode, you can use the following local Confluent CLI command:
confluent local services connect stop
If you’re not using the Confluent CLI, or if you’re running Connect in distributed mode, perform following steps to shut down Kafka Connect:
Open a terminal session on one of the Connect nodes.
Search for the running Connect process:
ps aux | grep ConnectDistributed
Identify the PID in the output:
<node-hostname> <worker-pid> 0.2 2.1 8414400 351944 s003 S 12:42PM 2:52.62 /path/to/your/bin/java ...omitted... org.apache.kafka.connect.cli.ConnectDistributed /path/to/your/connect.properties
Stop the process:
kill <worker-pid>
Stop this running process on all remaining Connect nodes.
Important
Do not use kill -9
to stop the process.