Getting Started with Kafka Connect

This document provides information about how to get started with Kafka Connect. You should read and understand Kafka Connect Concepts before getting started. The following topics are covered in this document:

Deployment Considerations

Kafka Connect has only one required prerequisite in order to get started; that is, a set of Kafka brokers. These Kafka brokers can be earlier broker versions or the latest version. See Cross-Component Compatibility for details.

Even though there is only one prerequisite, there are a few deployment options to consider beforehand. Understanding and acting on these deployment options ensures your Kafka Connect deployment will scale and support the long-term needs of your data pipeline.

Confluent Schema Registry

Although Schema Registry is not a required service for Kafka Connect, it enables you to easily use Avro, Protobuf, and JSON Schema as common data formats for the Kafka records that connectors read from and write to. This keeps the need to write custom code at a minimum and standardizes your data in a flexible format. You also get the added benefit of schema evolution and enforced compatibility rules. For additional information, see Using Kafka Connect with Schema Registry and Configuring Key and Value Converters.

Standalone vs. Distributed Mode

Connectors and tasks are logical units of work and run as a process. This process is called a worker in Kafka Connect. There are two modes for running workers: standalone mode and distributed mode. You should identify which mode works best for your environment before getting started.

Standalone mode is useful for development and testing Kafka Connect on a local machine. It can also be used for environments that typically use single agents (for example, sending web server logs to Kafka).

Distributed mode runs Connect workers on multiple machines (nodes). These form a Connect cluster. Kafka Connect distributes running connectors across the cluster. You can add more nodes or remove nodes as your needs evolve.

Distributed mode is also more fault tolerant. If a node unexpectedly leaves the cluster, Kafka Connect automatically distributes the work of that node to other nodes in the cluster. And, because Kafka Connect stores connector configurations, status, and offset information inside the Kafka cluster where it is safely replicated, losing the node where a Connect worker runs does not result in any lost data.

Important

Distributed mode is recommended for production environments because of scalability, high availability, and management benefits.

Operating Environment

Connect workers operate well in containers and in managed environments, such as Kubernetes, Apache Mesos, Docker Swarm, or Yarn. The distributed worker stores all states in Kafka so it’s easier to manage a cluster. And, by design, Kafka Connect does not automatically handle restarting or scaling workers. This means your existing cluster management solution can continue to be used transparently. Note that the standalone worker state is stored on the local file system.

See also

Kafka Connect workers are JVM processes that can run on shared machines with sufficient resources. Hardware requirements for Connect workers are similar to that of standard Java producers and consumers. Resource requirements mainly depend on the types of connectors operated by the workers. More memory is required for environments where large messages are sent. More memory is also required for environments where large numbers of messages get buffered before being written in aggregate form to an external system. Using compression continuously requires a more powerful CPU.

Tip

If you have multiple workers running concurrently on a single machine, make sure you know the resource limits (CPU and memory). Start with the default heap size setting and monitor internal metrics and the system. Verify that the CPU, memory, and network (10GbE or greater) are sufficient for the load.

Installing Connect Plugins

Kafka Connect is designed to be extensible so developers can create custom connectors, transforms, or converters, and users can install and run them.

A Kafka Connect plugin is a set of JAR files containing the implementation of one or more connectors, transforms, or converters. Connect isolates each plugin from one another so that libraries in one plugin are not affected by the libraries in any other plugins. This is very important when mixing and matching connectors from multiple providers.

Caution

It is common to have many plugins installed in a Connect deployment. Make sure to only have one version of each plugin installed.

The Confluent Platform comes bundled with several commonly used connectors, transforms, and converters. All of these can be used without having to first install the corresponding plugins. Bundled connectors include the following:

For a full list of supported connectors, see Supported Connectors.

Note

Make sure to check out Confluent Hub. You can browse the large ecosystem of connectors, transforms, and converters to find the components that suit your needs and easily install them into your local Confluent Platform environment. See Confluent Hub Client for Confluent Hub Client installation instructions.

A Kafka Connect plugin can be:

  • a directory on the file system that contains all required JAR files and third-party dependencies for the plugin. This is most common and is preferred.
  • a single uber JAR containing all of the class files for the plugin and its third-party dependencies.

Important

A plugin should never contain any libraries provided by the Kafka Connect runtime.

Kafka Connect finds the plugins using a plugin path defined as a comma-separated list of directory paths in the plugin.path worker configuration property. The following shows an example plugin.path worker configuration property:

plugin.path=/usr/local/share/kafka/plugins

To install a plugin, place the plugin directory or uber JAR (or a symbolic link that resolves to one of these) in a directory already listed in the plugin path. Or, you can update the plugin path by adding the absolute path of the directory containing the plugin. Using the plugin path example above, you would create a /usr/local/share/kafka/plugins directory on each machine running Connect and then place the plugin directories (or uber JARs) there.

When you start your Connect workers, each worker discovers all connectors, transforms, and converter plugins found inside the directories on the plugin path. When you use a connector, transform, or converter, the Connect worker loads the classes from the respective plugin first, followed by the Kafka Connect runtime and Java libraries. Connect explicitly avoids all of the libraries in other plugins. This prevents conflicts and makes it very easy to add and use connectors and transforms developed by different providers.

Earlier versions of Kafka Connect required a different approach to installing connectors, transforms, and converters. All the scripts for running Connect recognized the CLASSPATH environment variable. You would export this variable to define the list of paths to the connector JAR files. An example of the older CLASSPATH export variable mechanism is shown below:

export CLASSPATH=/path/to/my/connectors/*
bin/connect-standalone standalone.properties new-custom-connector.properties

Caution

Exporting CLASSPATH is not recommended. Using this mechanism to create a path to plugins can result in library conflicts that can cause Kafka Connect and connectors to fail. Use the plugin.path configuration property which properly isolates each plugin from other plugins and libraries.

Note

As described in Installing Connect Plugins, connector plugin JAR files are placed in the plugin path (Connect worker property: plugin.path). However, a few connectors may require that you additionally export the CLASSPATH to the plugin JAR files when starting the connector (export CLASSPATH=<path-to-jar-files>). While not recommended, CLASSPATH is required for these connectors because Kafka Connect uses classloading isolation to distinguish between system classes and regular classes, and some plugins load system classes (for example, javax.naming and others in the package javax). An example error message showing this issue is provided below. If you see an error that resembles the example below, in addition to adding the plugin path, you must also export CLASSPATH=<path-to-jar-files> when starting the connector.

Caused by: javax.naming.NoInitialContextException:
Cannot instantiate class: com.tibco.tibjms.naming.TibjmsInitialContextFactory
[Root exception is java.lang.ClassNotFoundException: com.tibco.tibjms.naming.TibjmsInitialContextFactory]

Running Workers

The following sections provide information about running workers in standalone mode or distributed mode.

Standalone Mode

Standalone mode is typically used for development and testing, or for lightweight, single-agent environments (for example, sending web server logs to Kafka). The following shows an example command that launches a worker in standalone mode:

bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]

The first parameter (worker.properties) is the worker configuration properties file. Note that worker.properties is an example file name. You can use any valid file name for your worker configuration file. This file gives you control over settings such as the Kafka cluster to use and serialization format. For an example configuration file that uses Avro and Schema Registry in a standalone mode, open the file located at etc/schema-registry/connect-avro-standalone.properties. You can copy and modify this file for use as your standalone worker properties file.

The second parameter (connector1.properties) is the connector configuration properties file. All connectors have configuration properties that are loaded with the worker. As shown in the example, you can launch multiple connectors using this command.

If you run multiple standalone workers on the same host machine, the following two configuration properties must be unique for each worker:

  • offset.storage.file.filename: the storage file name for connector offsets. This file is stored on the local filesystem in standalone mode. Using the same file name for two workers will cause offset data to be deleted or overwritten with different values.
  • rest.port: the port the REST interface listens on for HTTP requests. This must be unique for each worker running on one host machine.

Distributed Mode

Connect stores connector and task configurations, offsets, and status in several Kafka topics. These are referred to as Kafka Connect internal topics. It is important that these internal topics have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions.

Kafka Connect can automatically create the internal topics when it starts up, using the Connect distributed worker configuration properties to specify the topic names, replication factor, and number of partitions for these topics. Connect verifies that the properties meet the requirements and creates all topics with compaction cleanup policy.

Allowing Connect to automatically create these internal topics is recommended. However, you may want to manually create the topics. Two examples of when you would manually create these topics are provided below:

  • For security purposes, the broker may be configured to not allow clients like Connect to create Kafka topics.
  • You may require other advanced topic-specific settings that are not automatically set by Connect or that are different than the auto-created settings.

The following example commands show how to manually create compacted and replicated Kafka topics before starting Connect. Make sure to adhere to the distributed worker guidelines when entering parameters.

# config.storage.topic=connect-configs
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# offset.storage.topic=connect-offsets
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
# status.storage.topic=connect-status
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact

Note

All workers in a Connect cluster use the same internal topics. Workers in a different cluster must use different internal topics. See Distributed Configuration Properties for details.

Distributed mode does not have any additional command-line parameters other than loading the worker configuration file. New workers will either start a new group or join an existing one with a matching group.id. Workers then coordinate with the consumer groups to distribute the work to be done. See Distributed Configuration Properties for details about how new workers get added.

The following shows an example command that launches a worker in distributed mode:

bin/connect-distributed worker.properties

For an example distributed mode configuration file that uses Avro and Schema Registry, open etc/schema-registry/connect-avro-distributed.properties. You can make a copy of this file, modify it, use it as the new worker.properties file. Note that worker.properties is an example file name. You can use any valid file name for your properties file.

In standalone mode, connector configuration property files are added as commmand-line parameters. However, in distributed mode, connectors are deployed and managed using a REST API request. To create connectors, you start the worker and then make a REST request to create the connector. REST request examples are provided in many supported connector documents. For instance, see the Azure Blob Storage Source connector REST-based example for one example.

Note

If you run multiple distributed workers on one host machine for development and testing, the rest.port configuration property must be unique for each worker. This is the port the REST interface listens on for HTTP requests.

Worker Configuration Properties

Regardless of the mode used, Kafka Connect workers are configured by passing a worker configuration properties file as the first parameter. For example:

bin/connect-distributed worker.properties

Sample worker configuration properties files are included with Confluent Platform to help you get started. The location for Avro sample files are listed below:

  • etc/schema-registry/connect-avro-distributed.properties
  • etc/schema-registry/connect-avro-standalone.properties

Use one of these files as a starting point. These files contain the necessary configuration properties to use the Avro converters that integrate with Schema Registry. They are configured to work well with Kafka and Schema Registry services running locally. They do not require running more than a single broker, making it easy for you to test Kafka Connect locally.

The example configuration files can also be modified for production deployments by using the correct hostnames for Kafka and Schema Registry and acceptable (or default) values for the internal topic replication factor.

Common Configuration Properties

The following are several common worker configuration properties you need to get started. Many more configuration options are provided in Kafka Connect Worker Configs.

bootstrap.servers

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client uses all servers regardless of which servers are specified for bootstrapping. The list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one though, in case a server is down).

  • Type: list
  • Default: [localhost:9092]
  • Importance: high
key.converter

Converter class for key Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON.

  • Type: class
  • Default:
  • Importance: high
value.converter

Converter class for value Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON.

  • Type: class
  • Default:
  • Importance: high
rest.host.name

Hostname for the REST API. If this is set, it will only bind to this interface.

  • Type: string
  • Importance: low
rest.port

Port for the REST API to listen on.

  • Type: int
  • Default: 8083
  • Importance: low
plugin.path

The comma-separated list of paths to directories that contain Kafka Connect plugins.

  • Type: string
  • Default:
  • Importance: low

Distributed Configuration Properties

Distributed Workers that are configured with matching group.id values automatically discover each other and form a Kafka Connect cluster. All Workers in the cluster use the same three internal Kafka topics to share connector configurations, offset data, and status updates. For this reason all distributed worker configurations in the same Connect cluster must have matching config.storage.topic, offset.storage.topic, and status.storage.topic properties.

In addition to the three required topic names, the distributed worker configuration should have identical values for the properties listed below. This ensures that any worker in the cluster will create missing topics with the desired property values. Note that these configuration properties have practical default values.

  • config.storage.replication.factor
  • offset.storage.replication.factor
  • offset.storage.partitions
  • status.storage.replication.factor
  • status.storage.partitions

As each distributed worker starts up, it uses the internal Kafka topics if they already exist. If not, the worker attempts to create the topics using the worker configuration properties. This gives you the option of manually creating these topics before starting Kafka Connect, if you require topic-specific settings or when Kafka Connect does not have the necessary privileges to create the topics. If you do create the topics manually, make sure to follow the guidelines provided in the list of configuration properties.

If you need to create a distributed worker that is independent of an existing Connect cluster, you must create new worker configuration properties. The following configuration properties must be different from the worker configurations used in an existing cluster:

  • group.id
  • config.storage.topic
  • offset.storage.topic
  • status.storage.topic

Important

  • Connect clusters cannot share Group IDs or internal topics. Simply changing a group.id will not create a new worker separate from an existing Connect cluster. The new group.id must also have unique internal topics associated with it. This requires setting unique config.storage.topic, offset.storage.topic, and status.storage.topic configuration properties for the new group.id.
  • You also must use different connector names than those used in the existing Connect cluster since a consumer group is created based on the connector name. Each connector in a Connect cluster shares the same consumer group.

The following lists and defines the distributed worker properties:

group.id

A unique string that identifies the Connect cluster group this worker belongs to.

  • Type: string
  • Default: connect-cluster
  • Importance: high
config.storage.topic

The name of the topic where connector and task configuration data are stored. This must be the same for all workers with the same group.id. At startup, Kafka Connect attempts to automatically create this topic with a single-partition and compacted cleanup policy to avoid losing data. It uses the existing topic if present. If you choose to create this topic manually, always create it as a compacted topic with a single partition and a high replication factor (3x or more).

  • Type: string
  • Default: “”
  • Importance: high
config.storage.replication.factor

The replication factor used when Kafka Connects creates the topic used to store connector and task configuration data. This should always be at least 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster.

  • Type: short
  • Default: 3
  • Importance: low
offset.storage.topic

The name of the topic where connector and task configuration offsets are stored. This must be the same for all workers with the same group.id. At startup, Kafka Connect attempts to automatically create this topic with multiple partitions and a compacted cleanup policy to avoid losing data. It uses the existing topic if present. If you choose to create this topic manually, always create it as a compacted, highly replicated (3x or more) topic with a large number of partitions to support large Kafka Connect clusters (that is, 25 or 50 partitions like the Kafka built-in __consumer_offsets topic).

  • Type: string
  • Default: “”
  • Importance: high
offset.storage.replication.factor

The replication factor used when Connect creates the topic used to store connector offsets. This should always be at least 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster.

  • Type: short
  • Default: 3
  • Importance: low
offset.storage.partitions

The number of partitions used when Connect creates the topic used to store connector offsets. A large value is necessary to support large Kafka Connect clusters (that is, 25 or 50 partitions like the Kafka built-in __consumer_offsets topic).

  • Type: int
  • Default: 25
  • Importance: low
status.storage.topic

The name of the topic where connector and task configuration status updates are stored. This must be the same for all workers with the same group.id. At startup, Kafka Connect attempts to automatically create this topic with multiple partitions and a compacted cleanup policy to avoid losing data. It uses the existing topic if present. If you choose to create this topic manually, always create it as a compacted, highly replicated (3x or more) topic with multiple partitions.

  • Type: string
  • Default: “”
  • Importance: high
status.storage.replication.factor

The replication factor used when Connect creates the topic used to store connector and task status updates. This should always be at least 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster.

  • Type: short
  • Default: 3
  • Importance: low
status.storage.partitions

The number of partitions used when Connect creates the topic used to store connector and task status updates.

  • Type: int
  • Default: 5
  • Importance: low

Standalone Configuration Properties

In addition to the common worker configuration options, the following property is available in standalone mode.

offset.storage.file.filename

The file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off.

  • Type: string
  • Default: “”
  • Importance: high

For additional configuration properties see the following sections:

Configuring Key and Value Converters

The converters packaged with the Confluent Platform are listed below:

  • AvroConverter io.confluent.connect.avro.AvroConverter: use with Schema Registry
  • ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter: use with Schema Registry
  • JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter: use with Schema Registry
  • JsonConverter org.apache.kafka.connect.json.JsonConverter (without Schema Registry): use with structured data
  • StringConverter org.apache.kafka.connect.storage.StringConverter: simple string format
  • ByteArrayConverter org.apache.kafka.connect.converters.ByteArrayConverter: provides a “pass-through” option that does no conversion

The key.converter and value.converter properties are where you specify the type of converter to use. Default converters for all connectors are specified in the worker configuration. However, any connector can override the default converters by completely defining a key, value, and header converter. We recommend you define the default key, value, and header converters that most connectors can use in the worker, and then define them in a connector’s configuration if that connector requires different converters. The default header.converter defined in the worker serializes header values as strings using the StringConverter and deserializes header values to the most appropriate numeric, boolean, array, or map representation. Schemas are not serialized but are inferred upon deserialization when possible.

Important

Converter configuration properties in the worker configuration are used by all connectors running on the worker, unless a converter is added to a connector configuration.

If a converter is added to a connector configuration, all converter properties in the worker configuration prefixed with the converter type added (key.converter.* and/or value.converter.*) are not used. Be careful when adding converters to a connector configuration. For example, if the following value converter properties are present in the worker configuration:

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

and you add the following properties to your connector configuration:

{
 "value.converter": "AvroConverter",
 "value.converter.basic.auth.credentials.source": "USER_INFO",
 "value.converter.basic.auth.user.info": "<username>:<password>"
}

an error will occur when the connector is started because the required Schema Registry URL property value.converter.schema.registry.url=http://localhost:8081 is not provided to the converter.

The following sections provide converter descriptions and examples. For details about how these converters work in Schema Registry, see Using Kafka Connect with Schema Registry.

Avro

To use the AvroConverter with Schema Registry, you specify the key.converter and value.converter properties in the worker configuration. An additional converter property must also be added that provides the Schema Registry URL. The example below shows the AvroConverter key and value properties that are added to the configuration:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

The Avro key and value converters can be used independently from each other. For example, you may want to use a StringConverter for keys and the AvroConverter or JsonConverter for values. Independent key and value properties are shown below:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

JSON Schema and Protobuf

Both JSON Schema and Protobuf converters are implemented in the same way as the Avro converter. The following examples show a couple of configuration examples using the ProtobufConverter or JsonSchemaConverter for the value converter and using StringConverter for the key:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081

Both Avro and JSON Schema express their schemas as JSON and are lenient if unrecognized properties are encountered. This allows the converter to use custom JSON properties to capture any Kafka Connect schema objects with no equivalent in Avro or JSON Schema.

However, Protobuf has its own Interface Definition Language (IDL) which differs from JSON and does not allow for custom ad-hoc properties. For this reason, the conversion from the Kafka Connect schema to Protobuf may cause data loss or inconsistencies if there is no direct equivalent in Protobuf.

For example, the Kafka Connect schema supports int8, int16, and int32 data types. Protobuf supports int32 and int64. When Connect data is converted to Protobuf, int8 and int16 fields are mapped to int32 or int64 with no indication that the source was int8 or int16.

With JSON Schema, only number and integer type fields are supported. However, the JSON Schema Converter (JsonSchemaConverter) will store data with no JSON Schema equivalent in a property named connect.type. This property is ignored by the JSON Schema parser, so fields can be restored to the proper type by downstream components.

For full encoding details, see JSON encoding for Avro and JSON encoding for Protobuf. Additionally, JSON Schema supports three means of combining schemas: allOf, anyOf, and oneOf. However, the JSON Schema converter only supports oneOf, treating it similarly to how the Avro converter handles unions and how the Protobuf converter handles oneof.

Note

If you’re configuring Avro, Protobuf, or JSON Schema converters in an environment configured for Role-Based Access Control (RBAC), see key and value converters with RBAC.

For details about how converters work with Schema Registry, see Using Kafka Connect with Schema Registry.

The following converters are not used with Schema Registry.

JSON (without Schema Registry)

If you need to use JSON without Schema Registry for Connect data, you can use the JsonConverter supported with Kafka. The example below shows the JsonConverter key and value properties that are added to the configuration:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

When the properties key.converter.schemas.enable and value.converter.schemas.enable are set to true, the key or value is not treated as plain JSON, but rather as a composite JSON object containing both an internal schema and the data. When these are enabled for a source connector, both the schema and data are in the composite JSON object. When these are enabled for a sink connector, the schema and data are extracted from the composite JSON object. Note that this implementation never uses Schema Registry.

When the properties key.converter.schemas.enable and value.converter.schemas.enable are set to false (the default), only the data is passed along, without the schema. This reduces the payload overhead for applications that do not need a schema.

String format and raw bytes

org.apache.kafka.connect.storage.StringConverter is used to convert the internal Connect format to simple string format. When converting Connect data to bytes, the schema is ignored and data is converted to a simple string. When converting from bytes to Connect data format, the converter returns an optional string schema and a string (or null).

org.apache.kafka.connect.converters.ByteArrayConverter does not convert data. Bytes are passed through the connector directly with no conversion.

Tip

For a deep dive into converters, see: Converters and Serialization Explained.

Connect Producers and Consumers

Internally, Kafka Connect uses standard Java producers and consumers to communicate with Kafka. Connect configures default settings for these producer and consumer instances. These settings include properties that ensure data is delivered to Kafka in order and without any data loss.

Default Connect Producer properties

By default, Connect configures the Kafka producers for source connectors with the following important properties:

  • Points the producer’s bootstrap servers to the same Kafka cluster used by the Connect cluster.
  • Configures key and value serializers that work with the connector’s key and value converters.
  • Generates a producer client.id based on the connector and task, using the pattern connector-producer-<connectorName>-<taskId>.
  • Sets acks=all to ensure each message produced is properly written to all in-sync replicas (ISRs).
  • For retriable exceptions, Connect configures the producer with the following properties to reduce the potential for data duplication during infinite retries:
    • request.timeout.ms=<max>
    • max.block.ms=<max>
    • max.in.flight.requests.per.connection=1
    • delivery.timeout.ms=<max>

You can override these defaults by using the producer.* properties in the worker configuration or by using the producer.override.* properties in connector configurations, but changing these default properties may compromise the delivery guarantees of Connect.

Producer and Consumer overrides

You may need to override default settings, other than those described in the previous section. The following two examples show when this might be required.

Worker override example

Consider a standalone process that runs a log file connector. For the logs being collected, you might prefer low-latency, best-effort delivery. That is, when there are connectivity issues, minimal data loss may be acceptable for your application in order to avoid data buffering on the client. This keeps log collection as lightweight as possible.

To override producer configuration properties and consumer configuration properties for all connectors controlled by the worker, you prefix worker configuration properties with producer. or consumer. as shown in the example below:

producer.retries=1
consumer.max.partition.fetch.bytes=10485760

The example above overrides the default producer retries property to retry sending messages only one time. The consumer override increases the default amount of data fetched from a partition per request to 10 MB.

These configuration changes are applied to all connectors controlled by the worker. Be careful making any changes to these settings when running distributed mode workers.

Per-connector override example

By default, the producers and consumers used for connectors are created using the same properties that Connect uses for its own internal topics. That means that the same Kafka principal needs to be able to read and write to all the internal topics and all of the topics used by the connectors.

You may want the producers and consumers used for connectors to use a different Kafka principal. It is possible for connector configurations to override worker properties used to create producers and consumers. These are prefixed with producer.override. and consumer.override.. For additional information about per-connector overrides, see Override the Worker Configuration.

Note

For detailed information about producers and consumers, see Kafka Producer and Kafka Consumer. For a list of configuration properties, see Producer Configurations and Consumer Configurations.

Connect Reporter

The Kafka Connect Reporter submits the result of a sink operation to a reporter topic. After successfully sinking a record or following an error condition, the Connect Reporter is called to submit the result report. The report is constructed to include details about how the original record was handled along with additional information about the sink event. These records are written to configurable success and error topics for further consumption. The following is an example of the basic Connect Reporter configuration properties added to a sink connector configuration:

reporter.bootstrap.servers=localhost:9092
reporter.result.topic.name=success-responses
reporter.result.topic.replication.factor=1
reporter.error.topic.name=error-responses
reporter.error.topic.replication.factor=1

If you have a secure environment, you use configuration blocks for both an Admin Client and Producer. A Producer is constructed to send records to the reporter topic. The Admin Client creates the topic. Credentials need to be added in a secure environment. Example Admin and Producer properties are shown below:

reporter.admin.bootstrap.servers=localhost:9092
reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.admin.security.protocol=SASL_SSL
reporter.admin.sasl.mechanism=PLAIN

reporter.producer.bootstrap.servers=localhost:9092
reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.producer.security.protocol=SASL_SSL
reporter.producer.sasl.mechanism=PLAIN

Additional Reporter configuration property examples are provided in each applicable Kafka Connect sink connector document. For an example, see the Reporter properties in the HTTP Sink connecter.

Reporter and Kerberos security

The following configuration example shows a sink connector with all the necessary configuration properties for Reporter and Kerberos security. This example shows the Prometheus Metrics Sink connector, but can be modified for any applicable sink connector.

{

  "name" : "prometheus-connector",
  "config" : {
    "topics":"prediction-metrics",
    "connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
    "tasks.max" : "1",
    "confluent.topic.bootstrap.servers":"localhost:9092",
    "confluent.topic.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
    "confluent.topic.ssl.truststore.password":"xxxx",
    "confluent.topic.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
    "confluent.topic.ssl.keystore.password":"xxxx",
    "confluent.topic.ssl.key.password":"xxxx",
    "confluent.topic.security.protocol":"SASL_SSL",
    "confluent.topic.replication.factor": "3",
    "confluent.topic.sasl.kerberos.service.name":"kafka",
    "confluent.topic.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
    "prometheus.scrape.url": "http://localhost:8889/metrics",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "behavior.on.error": "LOG",
    "reporter.result.topic.replication.factor": "3",
    "reporter.error.topic.replication.factor": "3",
    "reporter.bootstrap.servers":"localhost:9092",
    "reporter.producer.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.producer.ssl.truststore.password":"xxxx",
    "reporter.producer.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.producer.ssl.keystore.password":"xxxx",
    "reporter.producer.ssl.key.password":"xxxx",
    "reporter.producer.security.protocol":"SASL_SSL",
    "reporter.producer.sasl.kerberos.service.name":"kafka",
    "reporter.producer.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
    "reporter.admin.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.admin.ssl.truststore.password":"xxxx",
    "reporter.admin.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.admin.ssl.keystore.password":"xxxx",
    "reporter.admin.ssl.key.password":"xxxx",
    "reporter.admin.security.protocol":"SASL_SSL",
    "reporter.admin.sasl.kerberos.service.name":"kafka",
    "reporter.admin.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
    "confluent.license":"eyJ0eXAiOiJK ...omitted"

  }

ConfigProvider interface

The ConfigProvider class interface allows you to use variables in your worker configuration that are dynamically resolved upon startup. It also allows you to use variables in your connector configurations that are dynamically resolved when the connector is (re)started. You can use variables within configuration property values in place of secrets, or in place of any information that should be resolved dynamically at runtime.

Note

Connector configurations are persisted and shared over the Connect REST API with the variables. Only when the connector starts does it transiently resolve and replace variables in-memory. Secrets are never persisted in connector configs, logs, or in REST API requests and responses.

The Connect worker relies upon the named ConfigProviders defined in the worker configuration to resolve the variables. Each variable specifies the name of the ConfigProvider that should be used, and the information the ConfigProvider uses to resolve the variable into a replacement string.

All ConfigProvider implementations are discovered using the standard Java ServiceLoader mechanism. To create a custom implementation of ConfigProvider, implement the ConfigProvider interface. Package the implementation class(es) and a file named META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider containing the fully qualified name of the ConfigProvider implementation class into a JAR file. Note that the JAR file can use third-party libraries other than those provided by the Connect framework, but they must be installed with the JAR file as described below.

To install the custom ConfigProvider implementation, add a new subdirectory containing the JAR files to the directory that is in Connect’s plugin.path and (re)start the Connect workers. When the Connect worker starts up it instantiates all ConfigProvider implementations specified in the worker configuration. All properties prefixed with config.providers.[provider].param. are passed to the configure() method of the ConfigProvider. When the Connect worker shuts down, it calls the close() method of the ConfigProvider.

Important

Any worker in a Connect cluster must be able to resolve every variable in the worker configuration, and must be able to resolve all variables used in every connector configuration. This requires the following:

  • All of the workers in a Connect cluster must have the same set of named config providers.
  • Each provider on every worker must have access to any resources required to resolve variables used in the worker config or in the connector configs.

The following configuration properties are added to the distributed worker configuration to make this work:

  • config.providers: A comma-separated list of names for providers.
  • config.providers.{name}.class: The Java class name for a provider.
  • config.providers.{name}.param.{param-name}: A parameter (or parameters) to be passed to the above Java class on initialization.

FileConfigProvider

Kafka provides an implementation of ConfigProvider called FileConfigProvider that allows variable references to be replaced with values from local files on each worker. For example, rather than having a secret in a configuration property, you can put the secret in a local file and use a variable in connector configurations. When the connector is started, Connect will use the file config provider to resolve and replace the variable with the actual secret, ensuring that the connector configuration does not include the secret when it is persisted and shared over the Connect REST API.

Important

Every worker in the Connect cluster must have access to the files referenced by all variables referencing the config provider.

Variables that refer by name to a FileConfigProvider should be in the form ${provider:[path:]key}. The path is the fully-qualified path of the property file on each Connect worker; the key is the name of the key within that property file. Note that the FileConfigProvider supports reading any file, where the path (and property key in that file) is specified in each variable. When Connect resolves one of these variables, it will read the properties file, extract the value for the corresponding key, and replace the whole variable with that value.

The following shows a JDBC connector configuration that includes the database URL, username, and password:

connection.url=jdbc:oracle:thin:@myhost:1521:orcl
connection.user=scott
connection.password=<my-secret-password>

Instead of having these details exposed in the connector configuration, you can use FileConfigProvider and store them in a file accessible to each Connect worker and protect them from other OS users. In the following examples, the separate file is named /opt/connect-secrets.properties. The properties added to /opt/connect-secrets.properties are listed below:

productsdb-url=jdbc:oracle:thin:@myhost:1521:orcl
productsdb-username=scott
productsdb-password=my-secret-password
other-connector-url=jdbc:oracle:thin:@myhost:1521:orcl
other-connector-username=customers
other-connector-password=superSecret!

Then, you can configure each Connect worker to use FileConfigProvider. The worker configuration would include the following properties:

# Additional properties added to the worker configuration

config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

The JDBC connector configuration can now use variables in place of the secrets:

# Additional properties added to the connector configuration

connection.url=${file:/opt/connect-secrets.properties:productsdb-url}
connection.user=${file:/opt/connect-secrets.properties:productsdb-username}
connection.password=${file:/opt/connect-secrets.properties:productsdb-password}

Another connector configuration could use variables with a different file, or variables that use different properties in the same file:

# Additional properties added to another connector configuration

connection.url=${file:/opt/connect-secrets.properties:other-connector-url}
connection.user=${file:/opt/connect-secrets.properties:other-connector-username}
connection.password=${file:/opt/connect-secrets.properties:other-connector-password}

InternalSecretConfigProvider

Confluent Platform provides another implementation of ConfigProvider named InternalSecretConfigProvider which is used with the Connect Secret Registry. The Secret Registry is a secret serving layer that enables Connect to store encrypted Connect credentials in a topic exposed through a REST API. This eliminates any unencrypted credentials being located in the actual connector configuration. The following example shows how InternalSecretConfigProvider is configured in the worker configuration file:

### Secret Provider

config.providers=secret
config.providers.secret.class=io.confluent.connect.secretregistry.rbac.config.provider.InternalSecretConfigProvider

config.providers.secret.param.master.encryption.key=<encryption key>
config.providers.secret.param.kafkastore.bootstrap.servers=SASL_PLAINTEXT://<Kafka broker URLs>
config.providers.secret.param.kafkastore.security.protocol=SASL_PLAINTEXT
config.providers.secret.param.kafkastore.sasl.mechanism=OAUTHBEARER
config.providers.secret.param.kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
config.providers.secret.param.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  username="<service-principal-username>" \
  password="<service-principal-password>" \
  metadataServerUrls="<metadata server URLs>";

Next Steps

After getting started with your deployment, you may want check out the following additional Kafka Connect documentation:

Tip

Try out our end-to-end demos for Kafka Connect on-premises, Confluent Cloud, and Confluent Operator.