.. _connect_allconfigs: |kconnect-long| Worker Configuration Properties for |cp| -------------------------------------------------------- The following lists many of the configuration properties related to |kconnect| workers. The first section lists common properties that can be set in either standalone or distributed mode. These control basic functionality like which |ak-tm| cluster to communicate with and what format data you're working with. The next two sections list properties specific to standalone or distributed mode. For additional configuration properties see the following sections: * |kconnect| and |sr|: See :ref:`schemaregistry_kafka_connect`. * Producer configuration properties: See :ref:`kafka_producer`. * Consumer configuration properties: See :ref:`kafka_consumer`. * TLS/SSL encryption properties: See :ref:`kafka_ssl_encryption`. * All |ak| configuration properties: See :ref:`cp-config-reference`. For information about how the |kconnect| worker functions, see :connect-common:`Configuring and Running Workers|userguide.html#configuring-and-running-workers`. .. _connect-common-work-config: Common Worker Configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``bootstrap.servers`` A list of host/port pairs to use for establishing the initial connection to the |ak| cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping - this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form ``host1:port1,host2:port2,...``. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). * Type: list * Default: [localhost:9092] * Importance: high ``key.converter`` Converter class for key |kconnect| data. This controls the format of the data that will be written to |ak| for source connectors or read from |ak| for sink connectors. Popular formats include Avro and JSON. * Type: class * Default: * Importance: high ``value.converter`` Converter class for value |kconnect| data. This controls the format of the data that will be written to |ak| for source connectors or read from |ak| for sink connectors. Popular formats include Avro and JSON. * Type: class * Default: * Importance: high ``internal.key.converter`` Converter class for internal key |kconnect| data that implements the ``Converter`` interface. Used for converting data like offsets and configs. * Type: class * Default: * Importance: low ``internal.value.converter`` Converter class for offset value |kconnect| data that implements the ``Converter`` interface. Used for converting data like offsets and configs. * Type: class * Default: * Importance: low ``offset.flush.interval.ms`` Interval at which to try committing offsets for tasks. * Type: long * Default: 60000 * Importance: low ``offset.flush.timeout.ms`` Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. * Type: long * Default: 5000 * Importance: low ``plugin.path`` The comma-separated list of paths to directories that contain :connect-common:`Kafka Connect plugins|userguide.html#installing-kconnect-plugins`. * Type: string * Default: * Importance: low ``rest.advertised.host.name`` If this is set, this is the hostname that will be given out to other Workers to connect to. * Type: string * Importance: low ``rest.advertised.listener`` Configures the listener used for communication between Workers. Valid values are either ``http`` or ``https``. If the listeners property is not defined or if it contains an HTTP listener, the default value for this field is ``http``. When the listeners property is defined and contains only HTTPS listeners, the default value is ``https``. * Type: string * Importance: low ``rest.advertised.port`` If this is set, this is the port that will be given out to other Workers to connect to. * Type: int * Importance: low ``listeners`` A list of REST listeners in the format ``protocol://host:port,protocol2://host2:port2`` that determines the protocol used by |kconnect-long|, where the protocol is either HTTP or HTTPS. For example: .. codewithvars:: bash listeners=http://localhost:8080,https://localhost:8443 By default, if no listeners are specified, the REST server runs on port 8083 using the HTTP protocol. When using HTTPS, the configuration must include the TLS/SSL configuration. For more details, see :ref:`connect-rest-api-http`. * Type: list * Importance: low ``response.http.headers.config`` Used to select which HTTP headers are returned in the HTTP response for |cp| components. Specify multiple values in a comma-separated string using the format ``[action][header name]:[header value]`` where ``[action]`` is one of the following: ``set``, ``add``, ``setDate``, or ``addDate``. You must use quotation marks around the header value when the header value contains commas. For example: :: response.http.headers.config="add Cache-Control: no-cache, no-store, must-revalidate", add X-XSS-Protection: 1; mode=block, add Strict-Transport-Security: max-age=31536000; includeSubDomains, add X-Content-Type-Options: nosniff * Type: string * Default: "" * Importance: low ``task.shutdown.graceful.timeout.ms`` Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially. * Type: long * Default: 5000 * Importance: low .. _connect-standalone-work-config: Standalone Worker Configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In addition to the common Worker configuration options, the following is available in standalone mode. ``offset.storage.file.filename`` The file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off. * Type: string * Default: "" * Importance: high .. _connect-dist-work-config: Distributed Worker Configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In addition to the common worker configuration options, the following are available in distributed mode. For information about how the |kconnect| worker functions, see :connect-common:`Configuring and Running Workers|userguide.html#configuring-and-running-workers`. ``group.id`` A unique string that identifies the |kconnect| cluster group this Worker belongs to. .. important:: - For production environments, you must explicitly set this configuration. When using the :confluent-cli:`Confluent CLI|index.html`, this configuration property is set to ``connect-cluster`` by default. All workers with the same ``group.id`` will be in the same |kconnect| cluster. For example, if worker A has ``group.id=connect-cluster-a`` and worker B has the same ``group.id``, worker A and worker B form a cluster called ``connect-cluster-a``. - The ``group.id`` for sink connectors is derived from the ``consumer.group.id`` in the worker properties. The ``group.id`` is created using the prefix ``connect-`` and the connector name. To override this value for a sink connector, add the following line ​in the worker properties file. .. code-block:: bash connector.client.config.override.policy=All * Type: string * Default: "" * Importance: high ``config.storage.topic`` The name of the topic where connector and task configuration data are stored. This *must* be the same for all Workers with the same ``group.id``. |kconnect-long| will upon startup attempt to automatically create this topic with a single-partition and compacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create this topic manually, **always** create it as a compacted topic with a single partition and a high replication factor (3x or more). * Type: string * Default: "" * Importance: high ``config.storage.replication.factor`` The replication factor used when |ak| Connects creates the topic used to store connector and task configuration data. This should **always** be at least ``3`` for a production system, but cannot be larger than the number of |ak| brokers in the cluster. Enter ``-1`` to use the |ak| broker default replication factor. * Type: short * Default: 3 * Importance: low ``offset.storage.topic`` The name of the topic where connector and task configuration offsets are stored. This *must* be the same for all Workers with the same ``group.id``. |kconnect-long| will upon startup attempt to automatically create this topic with multiple partitions and a compacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create this topic manually, **always** create it as a compacted, highly replicated (3x or more) topic with a large number of partitions (e.g., 25 or 50, just like |ak|'s built-in ``__consumer_offsets`` topic) to support large |kconnect-long| clusters. * Type: string * Default: "" * Importance: high ``offset.storage.replication.factor`` The replication factor used when |kconnect| creates the topic used to store connector offsets. This should **always** be at least ``3`` for a production system, but cannot be larger than the number of |ak| brokers in the cluster. Enter ``-1`` to use the |ak| broker default replication factor. * Type: short * Default: 3 * Importance: low ``offset.storage.partitions`` The number of partitions used when |kconnect| creates the topic used to store connector offsets. A large value (e.g., ``25`` or ``50``, just like |ak|'s built-in ``__consumer_offsets`` topic) is necessary to support large |kconnect-long| clusters. Enter ``-1`` to use the default number of partitions configured in the |ak| broker. * Type: int * Default: 25 * Importance: low ``status.storage.topic`` The name of the topic where connector and task configuration status updates are stored. This *must* be the same for all Workers with the same ``group.id``. |kconnect-long| will upon startup attempt to automatically create this topic with multiple partitions and a compacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create this topic manually, **always** create it as a compacted, highly replicated (3x or more) topic with multiple partitions. * Type: string * Default: "" * Importance: high ``status.storage.replication.factor`` The replication factor used when |kconnect| creates the topic used to store connector and task status updates. This should **always** be at least ``3`` for a production system, but cannot be larger than the number of |ak| brokers in the cluster. Enter ``-1`` to use the |ak| broker default replication factor. * Type: short * Default: 3 * Importance: low ``status.storage.partitions`` The number of partitions used when |kconnect| creates the topic used to store connector and task status updates. Enter ``-1`` to use the default number of partitions configured in the |ak| broker. * Type: int * Default: 5 * Importance: low ``heartbeat.interval.ms`` The expected time between heartbeats to the group coordinator when using |ak|'s group management facilities. Heartbeats are used to ensure that the Worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than ``session.timeout.ms``, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. * Type: int * Default: 3000 * Importance: high ``session.timeout.ms`` The timeout used to detect failures when using |ak|'s group management facilities. * Type: int * Default: 30000 * Importance: high ``ssl.key.password`` The password of the private key in the key store file. This is optional for client. * Type: password * Importance: high ``ssl.keystore.location`` The location of the key store file. This is optional for client and can be used for two-way client authentication. * Type: string * Importance: high ``ssl.keystore.password`` The store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured. * Type: password * Importance: high ``ssl.truststore.location`` The location of the trust store file. * Type: string * Importance: high ``ssl.truststore.password`` The password for the trust store file. * Type: password * Importance: high ``connections.max.idle.ms`` Close idle connections after the number of milliseconds specified by this config. * Type: long * Default: 540000 * Importance: medium ``receive.buffer.bytes`` The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. * Type: int * Default: 32768 * Importance: medium ``request.timeout.ms`` The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. * Type: int * Default: 40000 * Importance: medium ``sasl.kerberos.service.name`` The Kerberos principal name that |ak| runs as. This can be defined either in |ak|'s JAAS config or in |ak|'s config. * Type: string * Importance: medium ``security.protocol`` Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. * Type: string * Default: "PLAINTEXT" * Importance: medium ``send.buffer.bytes`` The size of the TCP send buffer (SO_SNDBUF) to use when sending data. * Type: int * Default: 131072 * Importance: medium ``ssl.enabled.protocols`` The comma-separated list of protocols enabled for TLS connections. The default value is ``TLSv1.2,TLSv1.3`` when running with Java 11 or later, ``TLSv1.2`` otherwise. With the default value for Java 11 (``TLSv1.2,TLSv1.3``), |ak| clients and brokers prefer TLSv1.3 if both support it, and falls back to TLSv1.2 otherwise (assuming both support at least TLSv1.2). * Type: list * Default: ``TLSv1.2,TLSv1.3`` * Importance: medium ``ssl.keystore.type`` The file format of the key store file. This is optional for client. Default value is JKS * Type: string * Default: "JKS" * Importance: medium ``ssl.protocol`` The TLS protocol used to generate the SSLContext. The default is ``TLSv1.3`` when running with Java 11 or newer, ``TLSv1.2`` otherwise. This value should be fine for most use cases. Allowed values in recent JVMs are ``TLSv1.2`` and ``TLSv1.3``. ``TLS``, ``TLSv1.1``, ``SSL``, ``SSLv2`` and ``SSLv3`` might be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. With the default value for this configuration and ``ssl.enabled.protocols``, clients downgrade to ``TLSv1.2`` if the server does not support ``TLSv1.3``. If this configuration is set to ``TLSv1.2``, clients do not use ``TLSv1.3``, even if it is one of the values in ``ssl.enabled.protocols`` and the server only supports ``TLSv1.3``. * Type: string * Default: ``TLSv1.3`` * Importance: medium ``ssl.provider`` The name of the security provider used for TLS/SSL connections. Default value is the default security provider of the JVM. * Type: string * Importance: medium ``ssl.truststore.type`` The file format of the trust store file. Default value is JKS. * Type: string * Default: "JKS" * Importance: medium ``worker.sync.timeout.ms`` When the Worker is out of sync with other Workers and needs to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and waiting a backoff period before rejoining. * Type: int * Default: 3000 * Importance: medium ``worker.unsync.backoff.ms`` When the Worker is out of sync with other Workers and fails to catch up within Worker.sync.timeout.ms, leave the |kconnect| cluster for this long before rejoining. * Type: int * Default: 300000 * Importance: medium ``client.id`` An ID string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging. * Type: string * Default: "" * Importance: low ``metadata.max.age.ms`` The period of time in milliseconds after which you force a refresh of metadata even if you haven't seen any partition leadership changes to proactively discover any new brokers or partitions. * Type: long * Default: 300000 * Importance: low ``metric.reporters`` A list of classes to use as metrics reporters. Implementing the ``MetricReporter`` interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. * Type: list * Default: [] * Importance: low ``metrics.num.samples`` The number of samples maintained to compute metrics. * Type: int * Default: 2 * Importance: low ``metrics.sample.window.ms`` The number of samples maintained to compute metrics. * Type: long * Default: 30000 * Importance: low ``reconnect.backoff.ms`` The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker. * Type: long * Default: 50 * Importance: low ``retry.backoff.ms`` The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop. * Type: long * Default: 100 * Importance: low ``sasl.kerberos.kinit.cmd`` Kerberos kinit command path. Default is /usr/bin/kinit * Type: string * Default: "/usr/bin/kinit" * Importance: low ``sasl.kerberos.min.time.before.relogin`` Login thread sleep time between refresh attempts. * Type: long * Default: 60000 * Importance: low ``sasl.kerberos.ticket.renew.jitter`` Percentage of random jitter added to the renewal time. * Type: double * Default: 0.05 * Importance: low ``sasl.kerberos.ticket.renew.window.factor`` Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket. * Type: double * Default: 0.8 * Importance: low ``ssl.cipher.suites`` A list of cipher suites. This is a named combination of authentication, encryption, MAC, and key exchange algorithms used to negotiate the security settings for a network connection using TLS. By default, all the available cipher suites are supported. * Type: list * Importance: low ``ssl.endpoint.identification.algorithm`` The endpoint identification algorithm to validate server hostname using server certificate. * Type: string * Importance: low ``ssl.keymanager.algorithm`` The algorithm used by key manager factory for TLS/SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. * Type: string * Default: "SunX509" * Importance: low ``ssl.trustmanager.algorithm`` The algorithm used by trust manager factory for TLS/SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. * Type: string * Default: "PKIX" * Importance: low .. _connect-worker-license-config: |cp| License Configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~ Starting in |cp| 6.0, it is possible to put the Confluent license-related properties into the standalone and distributed worker configuration. The Connect worker will then automatically inject these license-related properties into all of Confluent's commercial connector configuration. Then, the license-related properties can be left out of or removed from the connector configurations. .. include:: ../../.hidden/docs-common/kafka-connectors/self-managed/includes/platform-license.rst .. include:: ../../.hidden/docs-common/kafka-connectors/self-managed/includes/security-configs.rst .. include:: ../../.hidden/docs-common/kafka-connectors/self-managed/includes/platform-license-detail.rst .. include:: ../../.hidden/docs-common/kafka-connectors/self-managed/includes/platform-license-acl.rst .. include:: ../includes/overriding-default-config-properties.rst .. _connect-override-config: Override the Worker Configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ By default, source and sink connectors inherit their client configurations from the worker configuration. Within the worker configuration, properties that have a prefix of ``producer.`` or ``consumer.`` are used to create clients for all source and sink connectors, respectively. If you want to override producer or consumer properties for a specific connector, enable client overrides in the worker configuration and then use ``producer.override.*`` for a source connector configuration and ``consumer.override.*`` for a sink connector configuration. If you want to override admin properties for a specific connector, the process is the same. You can include properties prefixed with ``admin.override.*`` in your connector configuration and these properties will override any properties derived from the worker configuration when creating admin clients for connectors. Source connectors use admin clients when you enable automatic topic creation. Sink connectors, on the other hand, use admin clients when you enable a DLQ topic. .. important:: - The |kconnect-long| framework does not allow you to unset or set ``null`` for producer or consumer configuration properties. Instead, try to set the default callback handler at the connector level using the following configuration property: .. code-block:: properties producer.override.sasl.login.callback.handler.class=org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler - For source connectors whose destination clusters uses SCRAM SASL mechanism, the default callback handler should not be set at connector level. Instead, set the producer configurations on the |kconnect-long| framework. In such cases, change the distributed worker to point to the appropriate producer settings, for example: .. code:: bash producer.bootstrap.servers"=x:9096,y:9096,z:9096 producer.retry.backoff.ms= 500, producer.security.protocol=SASL_SSL producer.sasl.mechanism=SCRAM-SHA-512 producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=\"######\" password=\"#####\";", producer.ssl.truststore.location = /var/ssl/private/kafka_connect.truststore.jks producer.ssl.truststore.password = ${securepass:/var/ssl/private/kafka-connect-security.properties:connect-distributed.properties/producer.ssl.truststore.password} To enable per-connector configuration properties and override the default worker properties, add the following ``connector.client.config.override.policy`` configuration parameter to the worker properties file. ``connector.client.config.override.policy`` The class name or alias implementation of ConnectorClientConfigOverridePolicy. This defines configurations that can be overridden by the connector. The default implementation is ``All``. Other possible policies are ``None`` and ``Principal``. * Type: string * Default: All * Valid Values: [All, None, Principal] * Importance: medium When ``connector.client.config.override.policy=All``, each connector that belongs to the worker is allowed to override the worker configuration. This is implemented by adding one of the following override prefixes to the source and sink connector configurations: * ``producer.override.`` * ``consumer.override.`` -------- Examples -------- The following example shows a line added that overrides the default worker ``compression.type`` property. After the connector configuration is updated, the :ref:`Replicator ` connector will use gzip compression. .. sourcecode:: json :emphasize-lines: 12 { "name": "Replicator", "config": { "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector", "topic.whitelist": "_schemas", "topic.rename.format": "\${topic}.replica", "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "src.kafka.bootstrap.servers": "srcKafka1:10091", "dest.kafka.bootstrap.servers": "destKafka1:11091", "tasks.max": "1", "producer.override.compression.type": "gzip", "confluent.topic.replication.factor": "1", "schema.subject.translator.class": "io.confluent.connect.replicator.schemas.DefaultSubjectTranslator", "schema.registry.topic": "_schemas", "schema.registry.url": "http://destSchemaregistry:8086" } The following example shows a line added that overrides the default worker ``auto.offset.reset`` property. After the connector configuration is updated, the `Elasticsearch `__ connector will use ``latest`` instead of the default connect worker property value ``earliest``. .. sourcecode:: json :emphasize-lines: 6 { "name": "Elasticsearch", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "orders", "consumer.override.auto.offset.reset": "latest", "tasks.max": 1, "connection.url": "http://elasticsearch:9200", "type.name": "type.name=kafkaconnect", "key.ignore": "true", "schema.ignore": "false", "transforms": "renameTopic", "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.renameTopic.regex": "orders", "transforms.renameTopic.replacement": "orders-latest" }' When the worker override configuration property is set to ``connector.client.config.override.policy=Principal``, each of the connectors can use a different service principal. The following example shows a sink connector service principal override when implementing :ref:`Role-Based Access Control (RBAC) `: :: consumer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ username="" \ password="" \ metadataServerUrls=""; .. note:: When set to ``All``, per-connector override capability includes overriding the worker service principal in addition to other worker configuration properties. When set to ``Principal``, per-connector override capability is restricted to service principal overrides only. ----------------------- Customize an Override ----------------------- You may not want to use the worker override ``All`` to allow overriding all connector configuration properties defined in the worker. While this is not typical, you can create a custom override policy that allows you to limit the connector configurations that can be overridden and their property values. For example, if you need to create a custom policy for ``batch.size`` that restricts the batch size to 1 MB, you would implement the :platform:`ConnectorClientConfigOverridePolicy|connect/javadocs/javadoc/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.html` for this configuration property. This class implementation contains all the logic required to limit the list of configuration properties and their values.