Configuration Reference for Elasticsearch Service Sink Connector for Confluent Platform
To use this connector, specify the name of the connector class in the
connector.class configuration property:
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
Connector-specific configuration properties are described below.
Note
These are properties for the self-managed connector. If you are using Confluent Cloud, see Elasticsearch Service Sink connector for Confluent Cloud.
For configuration properties that have been removed or deprecated in this version, see the Elasticsearch Sink connector changelog.
Connector
connection.urlList of Elasticsearch HTTP connection URLs, for example
http://eshost1:9200,http://eshost2:9200, orhttps://eshost3:9200. If any of the URLs specifieshttps:protocol, HTTPS will be used for all connections. A URL with no protocol will be treated ashttp.Type: list
Valid Values: List of valid URIs.
Importance: high
connection.usernameThe username used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if both the username and password are non-null.
Type: string
Default: null
Importance: medium
connection.passwordThe password used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if both the username and password are non-null.
Type: password
Default: null
Importance: medium
batch.sizeThe number of records to process as a batch when writing to Elasticsearch.
Type: int
Default: 2000
Valid Values: [1,…,1000000]
Importance: medium
bulk.size.bytesThe maximum size (in bytes) to be process as a batch when writing records to Elasticsearch. Setting to ‘-1’ will disable this configuration. If the condition set by ‘batch.size’ is met first, it will be used instead.
Type: long
Default: 5242880 (5 megabytes)
Valid Values: [-1,…,2147483647]
Importance: low
max.in.flight.requestsThe maximum number of indexing requests that can be in-flight to Elasticsearch before blocking further requests.
Type: int
Default: 5
Valid Values: [1,…,1000]
Importance: medium
max.buffered.recordsThe maximum number of records each task will buffer before blocking acceptance of more records. This config can be used to limit the memory usage for each task.
Type: int
Default: 20000
Valid Values: [1,…,2147483647]
Importance: low
linger.msLinger time in milliseconds for batching. Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the
batch.sizeconfiguration. Normally this only occurs under load when records arrive faster than they can be sent out. However it may be desirable to reduce the number of requests even under light load and benefit from bulk indexing. This setting helps accomplish that - when a pending batch is not full, rather than immediately sending it out the task will wait up to the given delay to allow other records to be added so that they can be batched into a single request.Type: long
Default: 1
Valid Values: [0,…,604800000]
Importance: low
flush.timeout.msThe timeout in milliseconds to use for periodic flushing, and when waiting for buffer space to be made available by completed requests as records are added. If this timeout is exceeded the task will fail.
Type: long
Default: 180000 (3 minutes)
Valid Values: [1000,…,604800000]
Importance: low
flush.synchronouslyTrue if flushes should wait for background processing to finish. This has a throughput penalty and makes the connector less responsive but allows for topic-mutating SMTs (for example,
RegexRouterorTimestampRouter).Type: Boolean
Default:
falseImportance: low
max.retriesThe maximum number of retries that are allowed for failed indexing requests. If the retry attempts are exhausted the task will fail.
Type: int
Default: 5
Valid Values: [0,…,2147483647]
Importance: low
retry.backoff.msThe amount of time in milliseconds to wait before attempting the first retry of a failed indexing request. Upon a failure, this connector may wait up to twice as long as the previous wait, up to the maximum number of retries. This avoids retrying in a tight loop under failure scenarios.
Type: long
Default: 100
Valid Values: [0,…,86400000]
Importance: low
connection.compressionWhether to use GZip compression on HTTP connection to ElasticSearch. To make this setting to work the
http.compressionsetting also needs to be enabled at the Elasticsearch nodes before using it.Type: boolean
Default: false
Importance: low
max.connection.idle.time.msThe amount of time in milliseconds to wait before dropping an idle connection to prevent a read timeout.
Type: int
Default: 60000 (1 minute)
Valid Values: [-1,…,86400000]
Importance: low
connection.timeout.msThe amount of time in milliseconds to wait when establishing a connection to the Elasticsearch server. The task fails if the client fails to connect to the server in this interval, and will need to be restarted.
Type: int
Default: 1000 (1 second)
Valid Values: [0,…,43200000]
Importance: low
read.timeout.msThe amount of time in milliseconds to wait for the Elasticsearch server to send a response. The task fails if any read operation times out, and will need to be restarted to resume further operations.
Type: int
Default: 3000 (3 seconds)
Valid Values: [0,…,604800000]
Importance: low
log.sensitive.dataIf set to
true, the parameter logs sensitive data, such as exception traces containing sensitive data. Note that this is parameter is optional. Confluent recommends you configurelog.sensitive.dataif you experience unusual logging behavior after upgrading to a later version of the connector.Type: boolean
Default: false
Importance: low
Data Conversion
key.ignoreWhether to ignore the record key for the purpose of forming the Elasticsearch document ID. When this is set to
true, document IDs will be generated as the record’stopic+partition+offset. Note that this is a global config that applies to all topics, usetopic.key.ignoreto override astruefor specific topics.Type: boolean
Default: false
Importance: high
schema.ignoreWhether to ignore schemas during indexing. When this is set to
true, the record schema will be ignored for the purpose of registering an Elasticsearch mapping. Elasticsearch will infer the mapping from the data (dynamic mapping needs to be enabled by the user). Note that this is a global config that applies to all topics, usetopic.schema.ignoreto override astruefor specific topics.Type: boolean
Default: false
Importance: low
compact.map.entriesDefines how map entries with string keys within record values should be written to JSON. When this is set to
true, these entries are written compactly as"entryKey": "entryValue". Otherwise, map entries with string keys are written as a nested document{"key": "entryKey", "value": "entryValue"}. All map entries with non-string keys are always written as nested documents. Prior to 3.3.0, this connector always wrote map entries as nested documents, so set this tofalseto use that older behavior.Type: boolean
Default: true
Importance: low
topic.key.ignoreList of topics for which
key.ignoreshould betrue.Type: list
Default: “”
Importance: low
topic.schema.ignoreList of topics for which
schema.ignoreshould betrue.Type: list
Default: “”
Importance: low
drop.invalid.messageWhether to drop the Kafka message when it cannot be converted to an output message.
Type: boolean
Default: false
Importance: low
behavior.on.null.valuesHow to handle records with a non-null key and a null value (for example, Kafka tombstone records). Valid options are
IGNORE,DELETE, andFAIL.Type: string
Default: FAIL
Valid Values: [IGNORE, DELETE, FAIL]
Importance: low
behavior.on.malformed.documentsHow to handle records that Elasticsearch rejects due to the following malformed document exception errors:
strict_dynamic_mapping_exception
mapper_parsing_exception
illegal_argument_exception
action_request_validation_exception
ignorewill skip records with these errors.failwill fail the connector.Note
In case of other malformed document errors, the connector sends the records to the DLQ along with the exception and the connector fails, even if this property is set to
ignore.Type: string
Default: FAIL
Valid Values: [ignore, warn, fail]
Importance: low
write.methodMethod used for writing data to Elasticsearch, and one of INSERT or UPSERT. The default method is INSERT in which the connector constructs a document from the record value and inserts that document into Elasticsearch, completely replacing any existing document with the same ID; this matches previous behavior. The UPSERT method will create a new document if one with the specified ID does not yet exist, or will update an existing document with the same ID by adding or replacing only those fields present in the record value. The UPSERT method may require additional time and resources of Elasticsearch, so consider increasing the
read.timeout.msand decreasing thebatch.sizeconfiguration properties.Type: string
Default: INSERT
Valid Values: [INSERT, UPSERT]
Importance: low
use.autogenerated.idsSpecifies whether to use auto-generated Elasticsearch document IDs for insertion requests. Note that this setting removes exactly once guarantees, and message delivery will be at least once. This only applies if the write method is set to
INSERT. When set totrue, the Ignore Key mode option will also be ignored when sending data to Elasticsearch.Type: string
Default: false
Importance: low
Proxy
proxy.hostThe address of the proxy host to connect through. Supports the basic authentication scheme only.
Type: string
Default: “”
Importance: low
proxy.portThe port of the proxy host to connect through.
Type: int
Default: 8080
Valid Values: [1,…,65535]
Importance: low
proxy.usernameThe username for the proxy host.
Type: string
Default: “”
Importance: low
proxy.passwordThe password for the proxy host.
Type: password
Default: null
Importance: low
Security
elastic.security.protocolThe security protocol to use when connecting to Elasticsearch. Values can be
PLAINTEXTorSSL. IfPLAINTEXTis passed, all configs prefixed byelastic.https.will be ignored. This is optional for client.Type: string
Default:
PLAINTEXTValid Values: [PLAINTEXT, SSL]
Importance: medium
elastic.https.ssl.key.passwordThe password of the private key in the key store file. This is optional for client.
Type: password
Default: null
Importance: high
elastic.https.ssl.keystore.locationThe location of the key store file. This is optional for client and can be used for two-way authentication for client.
Type: string
Default: null
Importance: high
elastic.https.ssl.keystore.passwordThe store password for the key store file. This is optional for client and only needed if
ssl.keystore.locationis configured.Type: password
Default: null
Importance: high
elastic.https.ssl.truststore.locationThe location of the trust store file.
Type: string
Default: null
Importance: high
elastic.https.ssl.truststore.passwordThe password for the trust store file. If a password is not set, access to the trust store is still available, but integrity checking is disabled.
Type: password
Default: null
Importance: high
elastic.https.ssl.enabled.protocolsThe list of protocols enabled for SSL connections. The default is
TLSv1.2,TLSv1.3when running with Java 11 or later, andTLSv1.2otherwise. With the default value for Java 11, clients and servers will preferTLSv1.3if both support it and fallback toTLSv1.2otherwise (assuming both support at leastTLSv1.2). This default should be fine for most cases. Also see the configuration documentation forssl.protocol.Type: list
Default: TLSv1.2,TLSv1.3
Importance: medium
elastic.https.ssl.keystore.typeThe file format of the key store file. This is optional for client.
Type: string
Default: JKS
Importance: medium
elastic.https.ssl.protocolThe SSL protocol used to generate the SSLContext. The default is
TLSv1.3when running with Java 11 or later,TLSv1.2otherwise. This value should be fine for most use cases. Most recent JVMs allowTLSv1.2andTLSv1.3.TLS,TLSv1.1,SSL,SSLv2, andSSLv3may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. With the default value for this config andssl.enabled.protocols, clients will downgrade toTLSv1.2if the server does not supportTLSv1.3. If this config is set toTLSv1.2, clients will not useTLSv1.3even if it is one of the values in ssl.enabled.protocols and the server only supportsTLSv1.3.Type: string
Default: TLSv1.2
Importance: medium
elastic.https.ssl.providerThe name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
Type: string
Default: null
Importance: medium
elastic.https.ssl.truststore.typeThe file format of the trust store file.
Type: string
Default: JKS
Importance: medium
elastic.https.ssl.cipher.suitesA list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.
Type: list
Default: null
Importance: low
elastic.https.ssl.endpoint.identification.algorithmThe endpoint identification algorithm to validate server hostname using server certificate. Disable server host name verification by setting
elastic.https.ssl.endpoint.identification.algorithmto an empty string.Type: string
Default: https
Importance: low
elastic.https.ssl.engine.factory.classThe class of type
org.apache.kafka.common.security.auth.SslEngineFactoryto provide SSLEngine objects. Default value isorg.apache.kafka.common.security.ssl.DefaultSslEngineFactoryType: class
Default: null
Importance: low
elastic.https.ssl.keymanager.algorithmThe algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
Type: string
Default: SunX509
Importance: low
elastic.https.ssl.secure.random.implementationThe SecureRandom PRNG implementation to use for SSL cryptography operations.
Type: string
Default: null
Importance: low
elastic.https.ssl.trustmanager.algorithmThe algorithm used by the trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the JVM.
Type: string
Default: PKIX
Importance: low
Kerberos
kerberos.user.principalThe Kerberos user principal the connector may use to authenticate with Kerberos.
Type: string
Default: null
Importance: low
kerberos.keytab.pathThe path to the keytab file to use for authentication with Kerberos.
Type: string
Default: null
Valid Values: Existing file with keytab extension.
Importance: low
Data Stream
data.stream.datasetDescribes the data ingested and its structure to be written to a data stream. This can be any arbitrary string, provided it is no longer than 100 characters, in all lowercase, and does not contain spaces or any special characters (
/\\*\"<>|,#:-). If no value is set, the connector writes to regular indices. If set, this configuration will be used alongsidedata.stream.typeto construct the data stream name in the form of {data.stream.type}-{data.stream.dataset}-{data.stream.namespace}.Type: string
Default: “”
Valid Values: A valid dataset name that is all lowercase, less than 100 characters, and does not contain any spaces, or invalid characters
\/*?"<>|,#-:.Importance: low
data.stream.typeDescribes the generic type of data to be written to a data stream. The default value is
none, indicating that the connector will write to regular indices. If set, this configuration will be used alongsidedata.stream.datasetto construct the data stream name in the form of {data.stream.type}-{data.stream.dataset}-{data.stream.namespace}. Possible values arelogs,metrics,none, and custom index templates defined in the destination cluster are also supported.`.Type: string
Default:
NONEValid Values: Possible values are
logs,metrics,``none`` and custom index templates defined in the destination cluster are supported as well.Importance: low
data.stream.timestamp.fieldThe Kafka record field to use as the timestamp for the
@timestampfield in documents sent to a data stream.All documents sent to a data stream needs an
@timestampfield with values of typedateordata_nanos. Otherwise, the document will not be sent. If multiple fields are provided, the first field listed that also appears in the record will be used. If this configuration is left empty, all of the documents will use the Kafka record timestamp as the@timestampfield value. Note that@timestampstill needs to be explicitly listed if records already contain this field. This configuration can only be set ifdata.stream.typeanddata.stream.datasetare set.Type: list
Default: “”
Importance: low
data.stream.namespaceDescribes a user-configurable arbitrary grouping for writing to a data stream. It can be any string up to 100 characters, in lowercase, without spaces or special characters (
/\\*\"<>|,#:-). If unset, the connector writes to regular indices. When set, it is used withdata.stream.typeanddata.stream.datasetto form the data stream name in the format {data.stream.type}-{data.stream.dataset}-{data.stream.namespace}. Default is${topic}, which means the topic name.Type: string
Default:
${topic}Importance: low
CSFLE configuration
csfle.enabled
Accepts a boolean value. CSFLE is enabled for the connector if csfle.enabled is set to True.
Type: boolean
Default: False
auto.register.schemas
Specifies if the Serializer should attempt to register the Schema with Schema Registry.
Type: boolean
Default: true
Importance: medium
use.latest.version
Only applies when auto.register.schemas is set to false. If auto.register.schemas is set to false and use.latest.version is set to true, then instead of deriving a schema for the object passed to the client for serialization, Schema Registry uses the latest version of the schema in the subject for serialization.
Type: boolean
Default: true
Importance: medium
External Topic to Resource Mappings
external.resource.usageControls the type of Elasticsearch resource to write to.
Type: String
Default: DISABLED
Valid Values:
INDEX,DATASTREAM,ALIAS_INDEX,ALIAS_DATASTREAM,DISABLEDImportance: High
topic.to.external.resource.mappingDefines one-to-one mappings between Kafka topics and Elasticsearch resources (for example,
orders:index-orders,logs:logs-ds).Type: List (comma-separated
topic:resourcemappings)Default:
""(empty)Importance: High