Configuration Reference for Elasticsearch Service Sink Connector for Confluent Platform

To use this connector, specify the name of the connector class in the connector.class configuration property:

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

Connector-specific configuration properties are described below.

Note

Connector

connection.url

List of Elasticsearch HTTP connection URLs, for example http://eshost1:9200,http://eshost2:9200, or https://eshost3:9200. If any of the URLs specifies https: protocol, HTTPS will be used for all connections. A URL with no protocol will be treated as http.

  • Type: list
  • Valid Values: List of valid URIs.
  • Importance: high
connection.username

The username used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if both the username and password are non-null.

  • Type: string
  • Default: null
  • Importance: medium
connection.password

The password used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if both the username and password are non-null.

  • Type: password
  • Default: null
  • Importance: medium
batch.size

The number of records to process as a batch when writing to Elasticsearch.

  • Type: int
  • Default: 2000
  • Valid Values: [1,…,1000000]
  • Importance: medium
bulk.size.bytes

The maximum size (in bytes) to be process as a batch when writing records to Elasticsearch. Setting to ‘-1’ will disable this configuration. If the condition set by ‘batch.size’ is met first, it will be used instead.

  • Type: long
  • Default: 5242880 (5 megabytes)
  • Valid Values: [-1,…,2147483647]
  • Importance: low
max.in.flight.requests

The maximum number of indexing requests that can be in-flight to Elasticsearch before blocking further requests.

  • Type: int
  • Default: 5
  • Valid Values: [1,…,1000]
  • Importance: medium
max.buffered.records

The maximum number of records each task will buffer before blocking acceptance of more records. This config can be used to limit the memory usage for each task.

  • Type: int
  • Default: 20000
  • Valid Values: [1,…,2147483647]
  • Importance: low
linger.ms

Linger time in milliseconds for batching. Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the batch.size configuration. Normally this only occurs under load when records arrive faster than they can be sent out. However it may be desirable to reduce the number of requests even under light load and benefit from bulk indexing. This setting helps accomplish that - when a pending batch is not full, rather than immediately sending it out the task will wait up to the given delay to allow other records to be added so that they can be batched into a single request.

  • Type: long
  • Default: 1
  • Valid Values: [0,…,604800000]
  • Importance: low
flush.timeout.ms

The timeout in milliseconds to use for periodic flushing, and when waiting for buffer space to be made available by completed requests as records are added. If this timeout is exceeded the task will fail.

  • Type: long
  • Default: 180000 (3 minutes)
  • Valid Values: [1000,…,604800000]
  • Importance: low
flush.synchronously

True if flushes should wait for background processing to finish. This has a throughput penalty and makes the connector less responsive but allows for topic-mutating SMTs (for example, RegexRouter or TimestampRouter).

  • Type: Boolean
  • Default: false
  • Importance: low
max.retries

The maximum number of retries that are allowed for failed indexing requests. If the retry attempts are exhausted the task will fail.

  • Type: int
  • Default: 5
  • Valid Values: [0,…,2147483647]
  • Importance: low
retry.backoff.ms

The amount of time in milliseconds to wait before attempting the first retry of a failed indexing request. Upon a failure, this connector may wait up to twice as long as the previous wait, up to the maximum number of retries. This avoids retrying in a tight loop under failure scenarios.

  • Type: long
  • Default: 100
  • Valid Values: [0,…,86400000]
  • Importance: low
connection.compression

Whether to use GZip compression on HTTP connection to ElasticSearch. To make this setting to work the http.compression setting also needs to be enabled at the Elasticsearch nodes before using it.

  • Type: boolean
  • Default: false
  • Importance: low
max.connection.idle.time.ms

The amount of time in milliseconds to wait before dropping an idle connection to prevent a read timeout.

  • Type: int
  • Default: 60000 (1 minute)
  • Valid Values: [-1,…,86400000]
  • Importance: low
connection.timeout.ms

The amount of time in milliseconds to wait when establishing a connection to the Elasticsearch server. The task fails if the client fails to connect to the server in this interval, and will need to be restarted.

  • Type: int
  • Default: 1000 (1 second)
  • Valid Values: [0,…,43200000]
  • Importance: low
read.timeout.ms

The amount of time in milliseconds to wait for the Elasticsearch server to send a response. The task fails if any read operation times out, and will need to be restarted to resume further operations.

  • Type: int
  • Default: 3000 (3 seconds)
  • Valid Values: [0,…,604800000]
  • Importance: low
log.sensitive.data

If set to true, the parameter logs sensitive data, such as exception traces containing sensitive data. Note that this is parameter is optional. Confluent recommends you configure log.sensitive.data if you experience unusual logging behavior after upgrading to a later version of the connector.

  • Type: boolean
  • Default: false
  • Importance: low

Data Conversion

key.ignore

Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. When this is set to true, document IDs will be generated as the record’s topic+partition+offset. Note that this is a global config that applies to all topics, use topic.key.ignore to override as true for specific topics.

  • Type: boolean
  • Default: false
  • Importance: high
schema.ignore

Whether to ignore schemas during indexing. When this is set to true, the record schema will be ignored for the purpose of registering an Elasticsearch mapping. Elasticsearch will infer the mapping from the data (dynamic mapping needs to be enabled by the user). Note that this is a global config that applies to all topics, use topic.schema.ignore to override as true for specific topics.

  • Type: boolean
  • Default: false
  • Importance: low
compact.map.entries

Defines how map entries with string keys within record values should be written to JSON. When this is set to true, these entries are written compactly as "entryKey": "entryValue". Otherwise, map entries with string keys are written as a nested document {"key": "entryKey", "value": "entryValue"}. All map entries with non-string keys are always written as nested documents. Prior to 3.3.0, this connector always wrote map entries as nested documents, so set this to false to use that older behavior.

  • Type: boolean
  • Default: true
  • Importance: low
topic.key.ignore

List of topics for which key.ignore should be true.

  • Type: list
  • Default: “”
  • Importance: low
topic.schema.ignore

List of topics for which schema.ignore should be true.

  • Type: list
  • Default: “”
  • Importance: low
drop.invalid.message

Whether to drop the Kafka message when it cannot be converted to an output message.

  • Type: boolean
  • Default: false
  • Importance: low
behavior.on.null.values

How to handle records with a non-null key and a null value (for example, Kafka tombstone records). Valid options are IGNORE, DELETE, and FAIL.

  • Type: string
  • Default: FAIL
  • Valid Values: [IGNORE, DELETE, FAIL]
  • Importance: low
behavior.on.malformed.documents

How to handle records that Elasticsearch rejects due to the following malformed document exception errors:

  • strict_dynamic_mapping_exception
  • mapper_parsing_exception
  • illegal_argument_exception
  • action_request_validation_exception

ignore will skip records with these errors. fail will fail the connector.

Note

In case of other malformed document errors, the connector sends the records to the DLQ along with the exception and the connector fails, even if this property is set to ignore.

  • Type: string
  • Default: FAIL
  • Valid Values: [ignore, warn, fail]
  • Importance: low
write.method

Method used for writing data to Elasticsearch, and one of INSERT or UPSERT. The default method is INSERT in which the connector constructs a document from the record value and inserts that document into Elasticsearch, completely replacing any existing document with the same ID; this matches previous behavior. The UPSERT method will create a new document if one with the specified ID does not yet exist, or will update an existing document with the same ID by adding or replacing only those fields present in the record value. The UPSERT method may require additional time and resources of Elasticsearch, so consider increasing the read.timeout.ms and decreasing the batch.size configuration properties.

  • Type: string
  • Default: INSERT
  • Valid Values: [INSERT, UPSERT]
  • Importance: low

Proxy

proxy.host

The address of the proxy host to connect through. Supports the basic authentication scheme only.

  • Type: string
  • Default: “”
  • Importance: low
proxy.port

The port of the proxy host to connect through.

  • Type: int
  • Default: 8080
  • Valid Values: [1,…,65535]
  • Importance: low
proxy.username

The username for the proxy host.

  • Type: string
  • Default: “”
  • Importance: low
proxy.password

The password for the proxy host.

  • Type: password
  • Default: null
  • Importance: low

Security

elastic.security.protocol

The security protocol to use when connecting to Elasticsearch. Values can be PLAINTEXT or SSL. If PLAINTEXT is passed, all configs prefixed by elastic.https. will be ignored. This is optional for client.

  • Type: string
  • Default: PLAINTEXT
  • Valid Values: [PLAINTEXT, SSL]
  • Importance: medium
elastic.https.ssl.key.password

The password of the private key in the key store file. This is optional for client.

  • Type: password
  • Default: null
  • Importance: high
elastic.https.ssl.keystore.location

The location of the key store file. This is optional for client and can be used for two-way authentication for client.

  • Type: string
  • Default: null
  • Importance: high
elastic.https.ssl.keystore.password

The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

  • Type: password
  • Default: null
  • Importance: high
elastic.https.ssl.truststore.location

The location of the trust store file.

  • Type: string
  • Default: null
  • Importance: high
elastic.https.ssl.truststore.password

The password for the trust store file. If a password is not set, access to the trust store is still available, but integrity checking is disabled.

  • Type: password
  • Default: null
  • Importance: high
elastic.https.ssl.enabled.protocols

The list of protocols enabled for SSL connections. The default is TLSv1.2,TLSv1.3 when running with Java 11 or later, and TLSv1.2 otherwise. With the default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most cases. Also see the configuration documentation for ssl.protocol.

  • Type: list
  • Default: TLSv1.2,TLSv1.3
  • Importance: medium
elastic.https.ssl.keystore.type

The file format of the key store file. This is optional for client.

  • Type: string
  • Default: JKS
  • Importance: medium
elastic.https.ssl.protocol

The SSL protocol used to generate the SSLContext. The default is TLSv1.3 when running with Java 11 or later, TLSv1.2 otherwise. This value should be fine for most use cases. Most recent JVMs allow TLSv1.2 and TLSv1.3. TLS, TLSv1.1, SSL, SSLv2, and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. With the default value for this config and ssl.enabled.protocols, clients will downgrade to TLSv1.2 if the server does not support TLSv1.3. If this config is set to TLSv1.2, clients will not use TLSv1.3 even if it is one of the values in ssl.enabled.protocols and the server only supports TLSv1.3.

  • Type: string
  • Default: TLSv1.2
  • Importance: medium
elastic.https.ssl.provider

The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.

  • Type: string
  • Default: null
  • Importance: medium
elastic.https.ssl.truststore.type

The file format of the trust store file.

  • Type: string
  • Default: JKS
  • Importance: medium
elastic.https.ssl.cipher.suites

A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.

  • Type: list
  • Default: null
  • Importance: low
elastic.https.ssl.endpoint.identification.algorithm

The endpoint identification algorithm to validate server hostname using server certificate. Disable server host name verification by setting elastic.https.ssl.endpoint.identification.algorithm to an empty string.

  • Type: string
  • Default: https
  • Importance: low
elastic.https.ssl.engine.factory.class

The class of type org.apache.kafka.common.security.auth.SslEngineFactory to provide SSLEngine objects. Default value is org.apache.kafka.common.security.ssl.DefaultSslEngineFactory

  • Type: class
  • Default: null
  • Importance: low
elastic.https.ssl.keymanager.algorithm

The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

  • Type: string
  • Default: SunX509
  • Importance: low
elastic.https.ssl.secure.random.implementation

The SecureRandom PRNG implementation to use for SSL cryptography operations.

  • Type: string
  • Default: null
  • Importance: low
elastic.https.ssl.trustmanager.algorithm

The algorithm used by the trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the JVM.

  • Type: string
  • Default: PKIX
  • Importance: low

Kerberos

kerberos.user.principal

The Kerberos user principal the connector may use to authenticate with Kerberos.

  • Type: string
  • Default: null
  • Importance: low
kerberos.keytab.path

The path to the keytab file to use for authentication with Kerberos.

  • Type: string
  • Default: null
  • Valid Values: Existing file with keytab extension.
  • Importance: low

Data Stream

data.stream.dataset

Generic name describing data ingested and its structure to be written to a data stream. Can be any arbitrary string that is no longer than 100 characters, is in all lowercase, and does not contain spaces or any of these special characters /\*"<>|,#:-. Otherwise, no value indicates the connector will write to regular indices instead. If set, this configuration will be used alongside data.stream.type to construct the data stream name in the form of {data.stream.type}-{data.stream.dataset}-{topic}.

  • Type: string
  • Default: “”
  • Valid Values: A valid dataset name that is all lowercase, less than 100 characters, and does not contain any spaces, or invalid characters \/*?"<>|,#-:.
  • Importance: low
data.stream.type

Generic type describing the data to be written to data stream. The default NONE indicates the connector will write to regular indices instead. If set, this configuration will be used alongside data.stream.dataset to construct the data stream name in the form of <data.stream.type>-<data.stream.dataset>-<topic>.

  • Type: string
  • Default: NONE
  • Valid Values: [LOGS, METRICS, NONE]
  • Importance: low
data.stream.timestamp.field

The Kafka record field to use as the timestamp for the @timestamp field in documents sent to a data stream.

All documents sent to a data stream needs an @timestamp field with values of type date or data_nanos. Otherwise, the document will not be sent. If multiple fields are provided, the first field listed that also appears in the record will be used. If this configuration is left empty, all of the documents will use the Kafka record timestamp as the @timestamp field value. Note that @timestamp still needs to be explicitly listed if records already contain this field. This configuration can only be set if data.stream.type and data.stream.dataset are set.

  • Type: list
  • Default: “”
  • Importance: low