Worker Configuration Properties

The following lists many of the configuration properties related to Connect workers. The first section lists common properties that can be set in either standalone or distributed mode. These control basic functionality like which Apache Kafka® cluster to communicate with and what format data you’re working with. The next two sections list properties specific to standalone or distributed mode.

For additional configuration properties see the following sections:

For information about how the Connect worker functions, see Configuring and Running Workers.

Common Worker Configuration

bootstrap.servers

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping - this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

  • Type: list
  • Default: [localhost:9092]
  • Importance: high
key.converter

Converter class for key Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON.

  • Type: class
  • Default:
  • Importance: high
value.converter

Converter class for value Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON.

  • Type: class
  • Default:
  • Importance: high
internal.key.converter

Converter class for internal key Connect data that implements the Converter interface. Used for converting data like offsets and configs.

  • Type: class
  • Default:
  • Importance: low
internal.value.converter

Converter class for offset value Connect data that implements the Converter interface. Used for converting data like offsets and configs.

  • Type: class
  • Default:
  • Importance: low
offset.flush.interval.ms

Interval at which to try committing offsets for tasks.

  • Type: long
  • Default: 60000
  • Importance: low
offset.flush.timeout.ms

Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt.

  • Type: long
  • Default: 5000
  • Importance: low
plugin.path

The comma-separated list of paths to directories that contain Kafka Connect plugins.

  • Type: string
  • Default:
  • Importance: low
rest.advertised.host.name

If this is set, this is the hostname that will be given out to other Workers to connect to.

  • Type: string
  • Importance: low
rest.advertised.listener

Configures the listener used for communication between Workers. Valid values are either http or https. If the listeners property is not defined or if it contains an HTTP listener, the default value for this field is http. When the listeners property is defined and contains only HTTPS listeners, the default value is https.

  • Type: string
  • Importance: low
rest.advertised.port

If this is set, this is the port that will be given out to other Workers to connect to.

  • Type: int
  • Importance: low
rest.host.name

Hostname for the REST API. If this is set, it will only bind to this interface.

  • Type: string
  • Importance: low
rest.port

Port for the REST API to listen on.

  • Type: int
  • Default: 8083
  • Importance: low
response.http.headers.config

Used to select which HTTP headers are returned in the HTTP response for Confluent Platform components. Specify multiple values in a comma-separated string using the format [action][header name]:[header value] where [action] is one of the following: set, add, setDate, or addDate. You must use quotation marks around the header value when the header value contains commas. For example:

response.http.headers.config="add Cache-Control: no-cache, no-store, must-revalidate", add X-XSS-Protection: 1; mode=block, add Strict-Transport-Security: max-age=31536000; includeSubDomains, add X-Content-Type-Options: nosniff
  • Type: string
  • Default: “”
  • Importance: low
task.shutdown.graceful.timeout.ms

Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.

  • Type: long
  • Default: 5000
  • Importance: low

Standalone Worker Configuration

In addition to the common Worker configuration options, the following is available in standalone mode.

offset.storage.file.filename

The file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off.

  • Type: string
  • Default: “”
  • Importance: high

Distributed Worker Configuration

In addition to the common Worker configuration options, the following are available in distributed mode. For information about how the Connect worker functions, see Configuring and Running Workers.

group.id

A unique string that identifies the Connect cluster group this Worker belongs to.

Important

For production environments, you must explicitly set this configuration. When using the Confluent CLI, this configuration property is set to connect-cluster by default. All Workers with the same group.id will be in the same Connect cluster. For example, if Worker-a has group.id=connect-cluster-a and Worker-b has the same group.id, Worker-a and Worker-b form a cluster called connect-cluster-a.

Note

The group.id configuration property does not apply to sink connectors. For sink connectors, the group.id is created programmatically using the prefix connect- and the connector name.

  • Type: string
  • Default: “”
  • Importance: high
config.storage.topic

The name of the topic where connector and task configuration data are stored. This must be the same for all Workers with the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with a single-partition and compacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create this topic manually, always create it as a compacted topic with a single partition and a high replication factor (3x or more).

  • Type: string
  • Default: “”
  • Importance: high
config.storage.replication.factor

The replication factor used when Kafka Connects creates the topic used to store connector and task configuration data. This should always be at least 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster. Enter -1 to use the Kafka broker default replication factor.

  • Type: short
  • Default: 3
  • Importance: low
offset.storage.topic

The name of the topic where connector and task configuration offsets are stored. This must be the same for all Workers with the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with multiple partitions and a compacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create this topic manually, always create it as a compacted, highly replicated (3x or more) topic with a large number of partitions (e.g., 25 or 50, just like Kafka’s built-in __consumer_offsets topic) to support large Kafka Connect clusters.

  • Type: string
  • Default: “”
  • Importance: high
offset.storage.replication.factor

The replication factor used when Connect creates the topic used to store connector offsets. This should always be at least 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster. Enter -1 to use the Kafka broker default replication factor.

  • Type: short
  • Default: 3
  • Importance: low
offset.storage.partitions

The number of partitions used when Connect creates the topic used to store connector offsets. A large value (e.g., 25 or 50, just like Kafka’s built-in __consumer_offsets topic) is necessary to support large Kafka Connect clusters. Enter -1 to use the default number of partitions configured in the Kafka broker.

  • Type: int
  • Default: 25
  • Importance: low
status.storage.topic

The name of the topic where connector and task configuration status updates are stored. This must be the same for all Workers with the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with multiple partitions and a compacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create this topic manually, always create it as a compacted, highly replicated (3x or more) topic with multiple partitions.

  • Type: string
  • Default: “”
  • Importance: high
status.storage.replication.factor

The replication factor used when Connect creates the topic used to store connector and task status updates. This should always be at least 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster. Enter -1 to use the Kafka broker default replication factor.

  • Type: short
  • Default: 3
  • Importance: low
status.storage.partitions

The number of partitions used when Connect creates the topic used to store connector and task status updates. Enter -1 to use the default number of partitions configured in the Kafka broker.

  • Type: int
  • Default: 5
  • Importance: low
heartbeat.interval.ms

The expected time between heartbeats to the group coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the Worker’s session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

  • Type: int
  • Default: 3000
  • Importance: high
session.timeout.ms

The timeout used to detect failures when using Kafka’s group management facilities.

  • Type: int
  • Default: 30000
  • Importance: high
ssl.key.password

The password of the private key in the key store file. This is optional for client.

  • Type: password
  • Importance: high
ssl.keystore.location

The location of the key store file. This is optional for client and can be used for two-way authentication for client.

  • Type: string
  • Importance: high
ssl.keystore.password

The store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured.

  • Type: password
  • Importance: high
ssl.truststore.location

The location of the trust store file.

  • Type: string
  • Importance: high
ssl.truststore.password

The password for the trust store file.

  • Type: password
  • Importance: high
connections.max.idle.ms

Close idle connections after the number of milliseconds specified by this config.

  • Type: long
  • Default: 540000
  • Importance: medium
receive.buffer.bytes

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.

  • Type: int
  • Default: 32768
  • Importance: medium
request.timeout.ms

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

  • Type: int
  • Default: 40000
  • Importance: medium
sasl.kerberos.service.name

The Kerberos principal name that Kafka runs as. This can be defined either in Kafka’s JAAS config or in Kafka’s config.

  • Type: string
  • Importance: medium
security.protocol

Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

  • Type: string
  • Default: “PLAINTEXT”
  • Importance: medium
send.buffer.bytes

The size of the TCP send buffer (SO_SNDBUF) to use when sending data.

  • Type: int
  • Default: 131072
  • Importance: medium
ssl.enabled.protocols

The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.

  • Type: list
  • Default: [TLSv1.2, TLSv1.1, TLSv1]
  • Importance: medium
ssl.keystore.type

The file format of the key store file. This is optional for client. Default value is JKS

  • Type: string
  • Default: “JKS”
  • Importance: medium
ssl.protocol

The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.

  • Type: string
  • Default: “TLS”
  • Importance: medium
ssl.provider

The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.

  • Type: string
  • Importance: medium
ssl.truststore.type

The file format of the trust store file. Default value is JKS.

  • Type: string
  • Default: “JKS”
  • Importance: medium
worker.sync.timeout.ms

When the Worker is out of sync with other Workers and needs to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and waiting a backoff period before rejoining.

  • Type: int
  • Default: 3000
  • Importance: medium
worker.unsync.backoff.ms

When the Worker is out of sync with other Workers and fails to catch up within Worker.sync.timeout.ms, leave the Connect cluster for this long before rejoining.

  • Type: int
  • Default: 300000
  • Importance: medium
client.id

An ID string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.

  • Type: string
  • Default: “”
  • Importance: low
metadata.max.age.ms

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

  • Type: long
  • Default: 300000
  • Importance: low
metric.reporters

A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.

  • Type: list
  • Default: []
  • Importance: low
metrics.num.samples

The number of samples maintained to compute metrics.

  • Type: int
  • Default: 2
  • Importance: low
metrics.sample.window.ms

The number of samples maintained to compute metrics.

  • Type: long
  • Default: 30000
  • Importance: low
reconnect.backoff.ms

The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.

  • Type: long
  • Default: 50
  • Importance: low
retry.backoff.ms

The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.

  • Type: long
  • Default: 100
  • Importance: low
sasl.kerberos.kinit.cmd

Kerberos kinit command path. Default is /usr/bin/kinit

  • Type: string
  • Default: “/usr/bin/kinit”
  • Importance: low
sasl.kerberos.min.time.before.relogin

Login thread sleep time between refresh attempts.

  • Type: long
  • Default: 60000
  • Importance: low
sasl.kerberos.ticket.renew.jitter

Percentage of random jitter added to the renewal time.

  • Type: double
  • Default: 0.05
  • Importance: low
sasl.kerberos.ticket.renew.window.factor

Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket.

  • Type: double
  • Default: 0.8
  • Importance: low
ssl.cipher.suites

A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported.

  • Type: list
  • Importance: low
ssl.endpoint.identification.algorithm

The endpoint identification algorithm to validate server hostname using server certificate.

  • Type: string
  • Importance: low
ssl.keymanager.algorithm

The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

  • Type: string
  • Default: “SunX509”
  • Importance: low
ssl.trustmanager.algorithm

The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.

  • Type: string
  • Default: “PKIX”
  • Importance: low

Confluent Platform License Configuration

Starting in Confluent Platform 6.0, it is possible to put the Confluent license-related properties into the standalone and distributed worker configuration. The Connect worker will then automatically inject these license-related properties into all of Confluent’s commercial connector configuration. Then, the license-related properties can be left out of or removed from the connector configurations.

confluent.license

Confluent issues enterprise license keys to each subscriber. The license key is text that you can copy and paste as the value for confluent.license. A trial license allows using the connector for a 30-day trial period. A developer license allows using the connector indefinitely for single-broker development environments.

If you are a subscriber, contact Confluent Support for more information.

  • Type: string
  • Default: “”
  • Valid Values: Confluent Platform license
  • Importance: high
confluent.topic.ssl.truststore.location

The location of the trust store file.

  • Type: string
  • Default: null
  • Importance: high
confluent.topic.ssl.truststore.password

The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.

  • Type: password
  • Default: null
  • Importance: high
confluent.topic.ssl.keystore.location

The location of the key store file. This is optional for client and can be used for two-way authentication for client.

  • Type: string
  • Default: null
  • Importance: high
confluent.topic.ssl.keystore.password

The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

  • Type: password
  • Default: null
  • Importance: high
confluent.topic.ssl.key.password

The password of the private key in the key store file. This is optional for client.

  • Type: password
  • Default: null
  • Importance: high
confluent.topic.security.protocol

Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

  • Type: string
  • Default: “PLAINTEXT”
  • Importance: medium

License topic configuration

A Confluent enterprise license is stored in the _confluent-command topic. This topic is created by default and contains the license that corresponds to the license key supplied through the confluent.license property. No public keys are stored in Kafka topics.

The following describes how the default _confluent-command topic is generated under different scenarios:

  • A 30-day trial license is automatically generated for the _confluent command topic if you do not add the confluent.license property or leave this property empty (for example, confluent.license=).
  • Adding a valid license key (for example, confluent.license=<valid-license-key>) adds a valid license in the _confluent-command topic.

Here is an example of the minimal properties for development and testing.

You can change the name of the _confluent-command topic using the confluent.topic property (for instance, if your environment has strict naming conventions). The example below shows this change and the configured Kafka bootstrap server.

confluent.topic=foo_confluent-command
confluent.topic.bootstrap.servers=localhost:9092

The example above shows the minimally required bootstrap server property that you can use for development and testing. For a production environment, you add the normal producer, consumer, and topic configuration properties to the connector properties, prefixed with confluent.topic..

License topic ACLs

The _confluent-command topic contains the license that corresponds to the license key supplied through the confluent.license property. It is created by default. Connectors that access this topic require the following ACLs configured:

  • CREATE and DESCRIBE on the resource cluster, if the connector needs to create the topic.

  • DESCRIBE, READ, and WRITE on the _confluent-command topic.

    Important

    You can also use DESCRIBE and READ without WRITE to restrict access to read-only for license topic ACLs. If a topic exists, the LicenseManager will not try to create the topic.

You can provide access either individually for each principal that will use the license or use a wildcard entry to allow all clients. The following examples show commands that you can use to configure ACLs for the resource cluster and _confluent-command topic.

  1. Set a CREATE and DESCRIBE ACL on the resource cluster:

    kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
    --add --allow-principal User:<principal> \
    --operation CREATE --operation DESCRIBE --cluster
    
  2. Set a DESCRIBE, READ, and WRITE ACL on the _confluent-command topic:

    kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
    --add --allow-principal User:<principal> \
    --operation DESCRIBE --operation READ --operation WRITE --topic _confluent-command
    

Overriding Default Configuration Properties

You can override the replication factor using confluent.topic.replication.factor. For example, when using a Kafka cluster as a destination with less than three brokers (for development and testing) you should set the confluent.topic.replication.factor property to 1.

You can override producer-specific properties by using the confluent.topic.producer. prefix and consumer-specific properties by using the confluent.topic.consumer. prefix.

You can use the defaults or customize the other properties as well. For example, the confluent.topic.client.id property defaults to the name of the connector with -licensing suffix. You can specify the configuration settings for brokers that require SSL or SASL for client connections using this prefix.

You cannot override the cleanup policy of a topic because the topic always has a single partition and is compacted. Also, do not specify serializers and deserializers using this prefix; they are ignored if added.

Override the Worker Configuration

By default, source and sink connectors inherit their client configurations from the worker configuration. Within the worker configuration, properties that have a prefix of producer. or consumer. are used to create clients for all source and sink connectors, respectively.

If you want to override producer or consumer properties for a specific connector, enable client overrides in the worker configuration and then use producer.override.* for a source connector config and consumer.override.* for a sink connector config.

To enable per-connector configuration properties and override the default worker properties, add the following connector.client.config.override.policy configuration parameter to the worker properties file.

connector.client.config.override.policy

Class name or alias of implementation of ConnectorClientConfigOverridePolicy. This defines configurations that can be overridden by the connector. The default implementation is None. The other possible policies are All and Principal.

  • Type: string
  • Default: None
  • Valid Values: [All, Principal]
  • Importance: medium

When this configuration property is set to connector.client.config.override.policy=All, each connector that belongs to the worker is allowed to override the worker configuration. This is implemented by adding one of the following override prefixes to the source and sink connector configurations:

  • producer.override.<source-configuration-property>
  • consumer.override.<sink-configuration-property>

Examples

The following example shows a line added that overrides the default worker compression.type property. After the connector configuration is updated, the Replicator connector will use gzip compression.

{
  "name": "Replicator",
  "config": {
    "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
    "topic.whitelist": "_schemas",
    "topic.rename.format": "\${topic}.replica",
    "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
    "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
    "src.kafka.bootstrap.servers": "srcKafka1:10091",
    "dest.kafka.bootstrap.servers": "destKafka1:11091",
    "tasks.max": "1",
    "producer.override.compression.type": "gzip",
    "confluent.topic.replication.factor": "1",
    "schema.subject.translator.class": "io.confluent.connect.replicator.schemas.DefaultSubjectTranslator",
    "schema.registry.topic": "_schemas",
    "schema.registry.url": "http://destSchemaregistry:8086"
  }

The following example shows a line added that overrides the default worker auto.offset.reset property. After the connector configuration is updated, the Elasticsearch connector will use latest instead of the default connect worker property value earliest.

{
  "name": "Elasticsearch",
  "config": {
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "topics": "orders",
  "consumer.override.auto.offset.reset": "latest",
  "tasks.max": 1,
  "connection.url": "http://elasticsearch:9200",
  "type.name": "type.name=kafkaconnect",
  "key.ignore": "true",
  "schema.ignore": "false",
  "transforms": "renameTopic",
  "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.renameTopic.regex": "orders",
  "transforms.renameTopic.replacement": "orders-latest"
}'

When the worker override configuration property is set to connector.client.config.override.policy=Principal, each of the connectors can use a different service principal. The following example shows a sink connector service principal override when implementing Role-Based Access Control (RBAC):

consumer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
   username="<username>" \
   password="<password>" \
   metadataServerUrls="<metadata_server_urls>";

Note

When set to All, per-connector override capability includes overriding the worker service principal in addition to other worker configuration properties. When set to Principal, per-connector override capability is restricted to service principal overrides only.

Customizing an Override

You may not want to use the worker override All to allow overriding all connector configuration properties defined in the worker. While this is not typical, you can create a custom override policy that allows you to limit the connector configurations that can be overridden and their property values.

For example, if you need to create a custom policy for batch.size that restricts the batch size to 1 MB, you would implement the ConnectorClientConfigOverridePolicy for this configuration property. This class implementation contains all the logic required to limit the list of configuration properties and their values.