.. _connect_replicator_config_options: Configuration Properties ------------------------ To use this connector, specify the name of the connector class in the ``connector.class`` configuration property. .. codewithvars:: properties connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector Connector-specific configuration properties are described below. .. _rep-source-topics: Source Topics ^^^^^^^^^^^^^ ``topic.regex`` Regex of topics to replicate to the destination cluster. * Type: string * Default: null * Importance: high ``topic.whitelist`` Whitelist of topics to be replicated. * Type: list * Default: "" * Importance: high ``topic.blacklist`` Topics to exclude from replication. * Type: list * Default: "" * Importance: high ``topic.poll.interval.ms`` How often to poll the source cluster for new topics matching `topic.whitelist` or `topic.regex`. * Type: int * Default: 120000 * Valid Values: [0,...] * Importance: low ``offset.topic.commit`` Whether to commit Replicator's consumer offsets to the source |ak| cluster after the messages have been written to the destination cluster. These consumer offsets can be used to easily track the lag of Replicator. * Type: boolean * Default: true * Importance: medium Source Data Conversion ^^^^^^^^^^^^^^^^^^^^^^ ``src.key.converter`` Converter for the key field of messages retrieved from the source cluster. * Type: class * Default: io.confluent.connect.replicator.util.ByteArrayConverter * Importance: low ``src.value.converter`` Converter for the value field of messages retrieved from the source cluster. * Type: class * Default: io.confluent.connect.replicator.util.ByteArrayConverter * Importance: low ``src.header.converter`` HeaderConverter class used to convert serialized |ak| headers to |kconnect-long| headers. Default value is ByteArrayConverter, which simply passes the input bytes into the destination record. * Type: class * Default: io.confluent.connect.replicator.util.ByteArrayConverter * Importance: low .. _source-zk: Source |zk| ^^^^^^^^^^^ .. include:: includes/dest-zk-connect.rst ``src.zookeeper.connect`` |zk| connection string for the source cluster. * Type: string * Importance: high ``src.zookeeper.connection.timeout.ms`` Connection timeout in milliseconds for the source |zk| cluster. * Type: int * Default: 6000 * Valid Values: [0,...] * Importance: low ``src.zookeeper.session.timeout.ms`` Session timeout in milliseconds for the source |zk| cluster. * Type: int * Default: 6000 * Valid Values: [0,...] * Importance: low Source |ak| ^^^^^^^^^^^^ The following configuration options are common properties that are used across |ak| clients. Replicator uses these options to connect with the source cluster (consumer, adminclient). Valid client properties that use the ``src.kafka`` prefix will be forwarded to clients that connect to the source cluster. ``src.kafka.bootstrap.servers`` A list of host and port pairs to use for establishing the initial connection to the source |ak| cluster. The client will use all servers, irrespective of which servers are designated here for bootstrapping. This list only controls the initial hosts used to discover the full set of servers. This list must be in the form ``host1:port1,host2:port2,...``. These servers are only used for the initial connection, to discover the full cluster membership. You don't need to specify the full set of servers, but you may want to specify more than one, in case of failover. * Type: list * Importance: high ``src.kafka.client.id`` An ID string to pass to the server when making requests. This string is used to track the source of requests. It allows a logical application name to be included in server-side request logging. * Type: string * Default: "" * Importance: low ``src.kafka.request.timeout.ms`` Specifies the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client will resend, or fail the request if retries are exhausted. * Type: int * Default: 305000 * Valid Values: [0,...] * Importance: medium ``src.kafka.retry.backoff.ms`` Specifies the amount of time to wait before attempting to retry a failed request to a topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. * Type: long * Default: 100 * Valid Values: [0,...] * Importance: low ``src.kafka.connections.max.idle.ms`` Specifies the amount of time (milliseconds) before idle connections are closed. * Type: long * Default: 540000 * Importance: medium ``src.kafka.reconnect.backoff.ms`` Specifies the amount of time to wait before attempting to reconnect to a host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker. * Type: long * Default: 50 * Valid Values: [0,...] * Importance: low ``src.kafka.metric.reporters`` A list of classes to use as metrics reporters. Implementing the ``MetricReporter`` interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. * Type: list * Default: "" * Importance: low ``src.kafka.metrics.num.samples`` The number of samples maintained to compute metrics. * Type: int * Default: 2 * Valid Values: [1,...] * Importance: low ``src.kafka.metrics.sample.window.ms`` The window of time a metrics sample is computed over. * Type: long * Default: 30000 * Valid Values: [0,...] * Importance: low ``src.kafka.send.buffer.bytes`` The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the operating system default will be used. * Type: int * Default: 131072 * Valid Values: [-1,...] * Importance: medium ``src.kafka.receive.buffer.bytes`` The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the operating system default will be used. * Type: int * Default: 65536 * Valid Values: [-1,...] * Importance: medium .. _source_security_config: Source |ak|: Security ^^^^^^^^^^^^^^^^^^^^^^ ``src.kafka.security.protocol`` Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. * Type: string * Default: PLAINTEXT * Valid Values: [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL] * Importance: medium ``src.kafka.sasl.mechanism`` SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism. * Type: string * Default: GSSAPI * Importance: medium ``src.kafka.sasl.kerberos.ticket.renew.window.factor`` Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket. * Type: double * Default: 0.8 * Importance: low ``src.kafka.sasl.kerberos.min.time.before.relogin`` Login thread sleep time between refresh attempts. * Type: long * Default: 60000 * Importance: low ``src.kafka.sasl.kerberos.kinit.cmd`` Kerberos kinit command path. * Type: string * Default: /usr/bin/kinit * Importance: low ``src.kafka.sasl.kerberos.service.name`` The Kerberos principal name that |ak| runs as. This can be defined either in |ak|'s JAAS config or in |ak|'s config. * Type: string * Default: null * Importance: medium ``src.kafka.sasl.kerberos.ticket.renew.jitter`` Percentage of random jitter added to the renewal time. * Type: double * Default: 0.05 * Importance: low ``src.kafka.ssl.protocol`` The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. * Type: string * Default: TLS * Importance: medium ``src.kafka.ssl.provider`` The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. * Type: string * Default: null * Importance: medium ``src.kafka.ssl.enabled.protocols`` The list of protocols enabled for SSL connections. * Type: list * Default: TLSv1.2,TLSv1.1,TLSv1 * Importance: medium ``src.kafka.ssl.keystore.location`` The location of the key store file. This is optional for client and can be used for two-way authentication for client. * Type: string * Default: null * Importance: high ``src.kafka.ssl.cipher.suites`` A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported. * Type: list * Default: null * Importance: low ``src.kafka.ssl.secure.random.implementation`` The SecureRandom PRNG implementation to use for SSL cryptography operations. * Type: string * Default: null * Importance: low ``src.kafka.ssl.truststore.type`` The file format of the trust store file. * Type: string * Default: JKS * Importance: medium ``src.kafka.ssl.keystore.type`` The file format of the key store file. This is optional for client. * Type: string * Default: JKS * Importance: medium ``src.kafka.ssl.trustmanager.algorithm`` The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. * Type: string * Default: PKIX * Importance: low ``src.kafka.ssl.truststore.location`` The location of the trust store file. * Type: string * Default: null * Importance: high ``src.kafka.ssl.keystore.password`` The store password for the key store file. This is optional for client and only needed if ``ssl.keystore.location`` is configured. * Type: password * Default: null * Importance: high ``src.kafka.ssl.keymanager.algorithm`` The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. * Type: string * Default: SunX509 * Importance: low ``src.kafka.ssl.key.password`` The password of the private key in the key store file. This is optional for client. * Type: password * Default: null * Importance: high ``src.kafka.ssl.truststore.password`` The password for the trust store file. * Type: password * Default: null * Importance: high ``src.kafka.ssl.endpoint.identification.algorithm`` The endpoint identification algorithm to validate server hostname using server certificate. * Type: string * Default: null * Importance: low Source |ak|: Consumer ^^^^^^^^^^^^^^^^^^^^^^ The following configuration options are properties that are specific to the |ak| consumer. These options will be combined with the ``src.kafka`` properties and forwarded to consumers that connect to the source cluster. ``src.consumer.interceptor.classes`` A list of classes to use as interceptors. Implementing the ``ConsumerInterceptor`` interface allows you to intercept (and possibly mutate) records received by the consumer. By default, there are no interceptors. * Type: list * Default: null * Importance: low ``src.consumer.fetch.max.wait.ms`` The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes. * Type: int * Default: 500 * Valid Values: [0,...] * Importance: low ``src.consumer.fetch.min.bytes`` The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency. * Type: int * Default: 1 * Valid Values: [0,...] * Importance: high ``src.consumer.fetch.max.bytes`` The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch is larger than this value, the message will still be returned to ensure that the consumer can make progress. The maximum message size accepted by the broker is defined via ``message.max.bytes`` (broker config) or ``max.message.bytes`` (topic config). Note that the consumer performs multiple fetches in parallel. * Type: int * Default: 52428800 * Valid Values: [0,...] * Importance: medium ``src.consumer.max.partition.fetch.bytes`` The maximum amount of data per-partition the server will return. If the first message in the first non-empty partition of the fetch is larger than this limit, the message will still be returned to ensure that the consumer can make progress. The maximum message size accepted by the broker is defined via ``message.max.bytes`` (broker config) or ``max.message.bytes`` (topic config). See fetch.max.bytes for limiting the consumer request size * Type: int * Default: 1048576 * Valid Values: [0,...] * Importance: high ``src.consumer.max.poll.records`` The maximum number of records returned in a single call to poll(). * Type: int * Default: 500 * Valid Values: [1,...] * Importance: medium ``src.consumer.check.crcs`` Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. * Type: boolean * Default: true * Importance: low Destination Topics ^^^^^^^^^^^^^^^^^^ ``topic.rename.format`` A format string for the topic name in the destination cluster, which may contain '${topic}' as a placeholder for the originating topic name. For example, ``dc_${topic}`` for the topic 'orders' will map to the destination topic name 'dc_orders'. Be careful of the potential for topic name collisions when configuring replicators from multiple source clusters. We typically recommend that each cluster be given a distinct prefix or suffix (as in the example above). * Type: string * Default: ${topic} * Importance: high ``topic.auto.create`` Whether to automatically create topics in the destination cluster if required. * Type: boolean * Default: true * Importance: low ``topic.preserve.partitions`` Whether to automatically increase the number of partitions in the destination cluster to match the source cluster and ensure that messages replicated from the source cluster use the same partition in the destination cluster. .. tip:: .. include:: ../../includes/replicator-topic-preserve.rst * Type: boolean * Default: true * Importance: low ``topic.create.backoff.ms`` Time to wait before retrying auto topic creation or expansion. * Type: int * Default: 120000 * Valid Values: [0,...] * Importance: low ``topic.config.sync`` Whether to periodically sync topic configuration to the destination cluster. * Type: boolean * Default: true * Importance: low ``topic.config.sync.interval.ms`` How often to check for configuration changes when ``topic.config.sync`` is enabled. * Type: int * Default: 120000 * Valid Values: [0,...] * Importance: low ``topic.timestamp.type`` The timestamp type for the topics in the destination cluster. * Type: string * Default: CreateTime * Valid Values: [CreateTime, LogAppendTime] * Importance: low Destination |zk| ^^^^^^^^^^^^^^^^ ``dest.zookeeper.connect`` |zk| connection string for the destination cluster. .. important:: If ``dest.zookeeper.connect`` is set, the :ref:`destination Kafka cluster properties ` are only required if you are using consumer offset translation. * Type: string * Importance: high ``dest.zookeeper.connection.timeout.ms`` Connection timeout in milliseconds for the destination |zk| cluster. * Type: int * Default: 6000 * Valid Values: [0,...] * Importance: low ``dest.zookeeper.session.timeout.ms`` Session timeout in milliseconds for the destination |zk| cluster. * Type: int * Default: 6000 * Valid Values: [0,...] * Importance: low .. _dest-kafka: Destination |ak| ^^^^^^^^^^^^^^^^^ .. include:: includes/dest-zk-connect.rst ``dest.kafka.bootstrap.servers`` A list of host and port pairs to use for establishing the initial connection to the destination |ak| cluster. The client will use all servers, irrespective of which servers are designated here for bootstrapping. This list only controls the initial hosts used to discover the full set of servers. This list must be in the form ``host1:port1,host2:port2,...``. These servers are only used for the initial connection, to discover the full cluster membership. You don't need to specify the full set of servers, but you may want to specify more than one, in case of failover. * Type: list * Importance: high ``dest.kafka.client.id`` An ID string to pass to the server when making requests. This string is used to track the source of requests. It allows a logical application name to be included in server-side request logging. * Type: string * Default: "" * Importance: low ``dest.kafka.request.timeout.ms`` Specifies the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client will resend, or fail the request if retries are exhausted. * Type: int * Default: 305000 * Valid Values: [0,...] * Importance: medium ``dest.kafka.retry.backoff.ms`` Specifies the amount of time to wait before attempting to retry a failed request to a topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. * Type: long * Default: 100 * Valid Values: [0,...] * Importance: low ``dest.kafka.connections.max.idle.ms`` Specifies the amount of time (milliseconds) before idle connections are closed. * Type: long * Default: 540000 * Importance: medium ``dest.kafka.reconnect.backoff.ms`` Specifies the amount of time to wait before attempting to reconnect to a host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker. * Type: long * Default: 50 * Valid Values: [0,...] * Importance: low ``dest.kafka.metric.reporters`` A list of classes to use as metrics reporters. Implementing the ``MetricReporter`` interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. * Type: list * Default: "" * Importance: low ``dest.kafka.metrics.num.samples`` The number of samples maintained to compute metrics. * Type: int * Default: 2 * Valid Values: [1,...] * Importance: low ``dest.kafka.metrics.sample.window.ms`` The window of time a metrics sample is computed over. * Type: long * Default: 30000 * Valid Values: [0,...] * Importance: low ``dest.kafka.send.buffer.bytes`` The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the operating system default will be used. * Type: int * Default: 131072 * Valid Values: [-1,...] * Importance: medium ``dest.kafka.receive.buffer.bytes`` The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the operating system default will be used. * Type: int * Default: 65536 * Valid Values: [-1,...] * Importance: medium Destination |ak|: Security ^^^^^^^^^^^^^^^^^^^^^^^^^^^ ``dest.kafka.security.protocol`` Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. * Type: string * Default: PLAINTEXT * Valid Values: [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL] * Importance: medium ``dest.kafka.sasl.mechanism`` SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism. * Type: string * Default: GSSAPI * Importance: medium ``dest.kafka.sasl.kerberos.ticket.renew.window.factor`` Specifies the window factor of time that the login thread will sleep until the ticket is renewed. The window factor of time is computed from last refresh to ticket's expiry. * Type: double * Default: 0.8 * Importance: low ``dest.kafka.sasl.kerberos.min.time.before.relogin`` Login thread sleep time between refresh attempts. * Type: long * Default: 60000 * Importance: low ``dest.kafka.sasl.kerberos.kinit.cmd`` Kerberos kinit command path. * Type: string * Default: /usr/bin/kinit * Importance: low ``dest.kafka.sasl.kerberos.service.name`` The Kerberos principal name that |ak| runs as. This can be defined either in |ak|'s JAAS config or in |ak|'s config. * Type: string * Default: null * Importance: medium ``dest.kafka.sasl.kerberos.ticket.renew.jitter`` Percentage of random jitter added to the renewal time. * Type: double * Default: 0.05 * Importance: low ``dest.kafka.ssl.protocol`` The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. * Type: string * Default: TLS * Importance: medium ``dest.kafka.ssl.provider`` The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. * Type: string * Default: null * Importance: medium ``dest.kafka.ssl.enabled.protocols`` The list of protocols enabled for SSL connections. * Type: list * Default: TLSv1.2,TLSv1.1,TLSv1 * Importance: medium ``dest.kafka.ssl.keystore.location`` The location of the key store file. This is optional for client and can be used for two-way authentication for client. * Type: string * Default: null * Importance: high ``dest.kafka.ssl.cipher.suites`` A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported. * Type: list * Default: null * Importance: low ``dest.kafka.ssl.secure.random.implementation`` The SecureRandom PRNG implementation to use for SSL cryptography operations. * Type: string * Default: null * Importance: low ``dest.kafka.ssl.truststore.type`` The file format of the trust store file. * Type: string * Default: JKS * Importance: medium ``dest.kafka.ssl.keystore.type`` The file format of the key store file. This is optional for client. * Type: string * Default: JKS * Importance: medium ``dest.kafka.ssl.trustmanager.algorithm`` The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. * Type: string * Default: PKIX * Importance: low ``dest.kafka.ssl.truststore.location`` The location of the trust store file. * Type: string * Default: null * Importance: high ``dest.kafka.ssl.keystore.password`` The store password for the key store file. This is optional for client and only needed if ``ssl.keystore.location`` is configured. * Type: password * Default: null * Importance: high ``dest.kafka.ssl.keymanager.algorithm`` The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. * Type: string * Default: SunX509 * Importance: low ``dest.kafka.ssl.key.password`` The password of the private key in the key store file. This is optional for client. * Type: password * Default: null * Importance: high ``dest.kafka.ssl.truststore.password`` The password for the trust store file. * Type: password * Default: null * Importance: high ``dest.kafka.ssl.endpoint.identification.algorithm`` The endpoint identification algorithm to validate server hostname using server certificate. * Type: string * Default: null * Importance: low Destination Data Conversion ^^^^^^^^^^^^^^^^^^^^^^^^^^^ ``key.converter`` Converter for the key field of messages written to the destination cluster. Use ``io.confluent.connect.replicator.util.ByteArrayConverter`` if you don't need data conversion. * Type: class * Default: inherited from worker configuration * Importance: low ``value.converter`` Converter for the value field of messages written to the destination cluster. Use ``io.confluent.connect.replicator.util.ByteArrayConverter`` if you don't need data conversion. * Type: class * Default: inherited from worker configuration * Importance: low ``header.converter`` HeaderConverter class used to convert serialized |ak| headers to |kconnect-long| headers. Default value is ByteArrayConverter, which simply passes the input bytes into the destination record. * Type: class * Default: io.confluent.connect.replicator.util.ByteArrayConverter * Importance: low Consumer Offset Translation ^^^^^^^^^^^^^^^^^^^^^^^^^^^ ``offset.translator.tasks.max`` The maximum number of Replicator tasks that will perform offset translation. If -1 (the default), all tasks will perform offset translation. * Type: int * Default: -1 * Importance: medium ``offset.translator.tasks.separate`` Whether to translate offsets in separate tasks from those performing topic replication. * Type: boolean * Default: false * Importance: medium ``offset.timestamps.commit`` Whether to commit internal offset timestamps for Replicator, so that it can resume properly when switching to the secondary cluster. * Type: boolean * Default: true * Importance: medium ``timestamps.topic.replication.factor`` Replication factor for the consumer timestamps topic. * Type: int * Default: 3 * Importance: high ``timestamps.topic.num.partitions`` Number of partitions for the consumer timestamps topic. * Type: int * Default: 50 * Importance: high ``provenance.header.enable`` Whether to enable the use of provenance headers during replication. * Type: boolean * Default: false * Importance: medium ``fetch.offset.expiry.ms`` The amount of time in milliseconds after which a fetch offset request for offset translation expires and is discarded. * Type: long * Default: 600000 * Importance: low ``fetch.offset.retry.backoff.ms`` The amount of time in milliseconds to wait before attempting to retry a failed fetch offset request during offset translation. * Type: long * Default: 100 * Importance: low .. _replicator-connector-license-config: |cp| license ^^^^^^^^^^^^ ``confluent.topic.bootstrap.servers`` A list of host/port pairs to use for establishing the initial connection to the Kafka cluster used for licensing. All servers in the cluster will be discovered from the initial connection. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). * Type: list * Importance: high ``confluent.topic`` Name of the Kafka topic used for Confluent Platform configuration, including licensing information. * Type: string * Default: _confluent-command * Importance: low ``confluent.topic.replication.factor`` The replication factor for the Kafka topic used for Confluent Platform configuration, including licensing information. This is used only if the topic does not already exist, and the default of 3 is appropriate for production use. If you are using a development environment with less than 3 brokers, you must set this to the number of brokers (often 1). * Type: int * Default: 3 * Importance: low ---------------------------- Confluent license properties ---------------------------- .. include:: ../includes/security-info.rst .. include:: ../includes/platform-license.rst .. include:: ../includes/security-configs.rst .. _connect_replicator_license-topic-configuration: .. include:: ../includes/platform-license-detail.rst .. include:: ../includes/overriding-default-config-properties.rst