Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Elasticsearch Sink Connector Configuration Properties¶
To use this connector, specify the name of the connector class in the connector.class
configuration property.
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
Connector-specific configuration properties are described below.
Connector¶
connection.url
List of Elasticsearch HTTP connection URLs e.g.
http://eshost1:9200,http://eshost2:9200
, orhttps://eshost3:9200
. If any of the URLs specifieshttps:
protocol, HTTPS will be used for all connections. A URL with no protocol will be treated ashttp
.- Type: list
- Importance: high
connection.username
The username used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if both the username and password are non-null.
- Type: string
- Default: null
- Importance: medium
connection.password
The password used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if both the username and password are non-null.
- Type: password
- Default: null
- Importance: medium
batch.size
The number of records to process as a batch when writing to Elasticsearch.
- Type: int
- Default: 2000
- Importance: medium
max.in.flight.requests
The maximum number of indexing requests that can be in-flight to Elasticsearch before blocking further requests.
- Type: int
- Default: 5
- Importance: medium
max.buffered.records
The maximum number of records each task will buffer before blocking acceptance of more records. This config can be used to limit the memory usage for each task.
- Type: int
- Default: 20000
- Importance: low
linger.ms
Linger time in milliseconds for batching. Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the
batch.size
configuration. Normally this only occurs under load when records arrive faster than they can be sent out. However it may be desirable to reduce the number of requests even under light load and benefit from bulk indexing. This setting helps accomplish that - when a pending batch is not full, rather than immediately sending it out the task will wait up to the given delay to allow other records to be added so that they can be batched into a single request.- Type: long
- Default: 1
- Importance: low
flush.timeout.ms
The timeout in milliseconds to use for periodic flushing, and when waiting for buffer space to be made available by completed requests as records are added. If this timeout is exceeded the task will fail.
- Type: long
- Default: 10000
- Importance: low
max.retries
The maximum number of retries that are allowed for failed indexing requests. If the retry attempts are exhausted the task will fail.
- Type: int
- Default: 5
- Importance: low
retry.backoff.ms
How long to wait in milliseconds before attempting the first retry of a failed indexing request. Upon a failure, this connector may wait up to twice as long as the previous wait, up to the maximum number of retries. This avoids retrying in a tight loop under failure scenarios.
- Type: long
- Default: 100
- Importance: low
connection.compression
Whether to use GZip compression on HTTP connection to ElasticSearch. To make this setting to work the
http.compression
setting also needs to be enabled at the Elasticsearch nodes before using it.- Type: boolean
- Default: false
- Importance: low
connection.timeout.ms
How long to wait in milliseconds when establishing a connection to the Elasticsearch server. The task fails if the client fails to connect to the server in this interval, and will need to be restarted.
- Type: int
- Default: 1000
- Importance: low
read.timeout.ms
How long to wait in milliseconds for the Elasticsearch server to send a response. The task fails if any read operation times out, and will need to be restarted to resume further operations.
- Type: int
- Default: 3000
- Importance: low
Writes¶
write.method
The method used to write Kafka records to Elasticsearch.
The default method is
INSERT
. WhenINSERT
is used, the connector constructs a document from the record value and inserts that document into Elasticsearch, completely replacing any existing document with the same ID. TheUPSERT
method creates a new document if one with the specified ID does not exist, or updates an existing document with the same ID by adding or replacing only those fields present in the record value. TheUPSERT
method may require additional time and Elasticsearch resources, so consider increasing theflush.timeout.ms
andread.timeout.ms
configuration properties, and decreasing thebatch.size
property.- Type: string
- Default: INSERT
- Valid Values: one of [INSERT, UPSERT]
- Importance: high
Security¶
elastic.security.protocol
The security protocol to use when connecting to Elasticsearch. Values can be
PLAINTEXT
orSSL
. IfPLAINTEXT
is passed, all configs prefixed byelastic.https.
will be ignored. This is optional for client.- Type: string
- Default:
PLAINTEXT
- Importance: high
elastic.https.ssl.key.password
The password of the private key in the key store file. This is optional for client.
- Type: password
- Default: null
- Importance: high
elastic.https.ssl.keystore.location
The location of the key store file. This is optional for client and can be used for two-way authentication for client.
- Type: string
- Default: null
- Importance: high
elastic.https.ssl.keystore.password
The store password for the key store file. This is optional for client and only needed if
ssl.keystore.location
is configured.- Type: password
- Default: null
- Importance: high
elastic.https.ssl.truststore.location
The location of the trust store file.
- Type: string
- Default: null
- Importance: high
elastic.https.ssl.truststore.password
The password for the trust store file. If a password is not set, access to the trust store is still available, but integrity checking is disabled.
- Type: password
- Default: null
- Importance: high
elastic.https.ssl.enabled.protocols
The list of protocols enabled for SSL connections.
- Type: list
- Default: TLSv1.2,TLSv1.1,TLSv1
- Importance: medium
elastic.https.ssl.keystore.type
The file format of the key store file. This is optional for client.
- Type: string
- Default: JKS
- Importance: medium
elastic.https.ssl.protocol
The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.
- Type: string
- Default: TLS
- Importance: medium
elastic.https.ssl.provider
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
- Type: string
- Default: null
- Importance: medium
elastic.https.ssl.truststore.type
The file format of the trust store file.
- Type: string
- Default: JKS
- Importance: medium
elastic.https.ssl.cipher.suites
A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default, all the available cipher suites are supported.
- Type: list
- Default: null
- Importance: low
elastic.https.ssl.endpoint.identification.algorithm
The endpoint identification algorithm to validate server hostname using server certificate. Disable server host name verification by setting
elastic.https.ssl.endpoint.identification.algorithm
to an empty string.- Type: string
- Default: https
- Importance: low
elastic.https.ssl.keymanager.algorithm
The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
- Type: string
- Default: SunX509
- Importance: low
elastic.https.ssl.secure.random.implementation
The SecureRandom PRNG implementation to use for SSL cryptography operations.
- Type: string
- Default: null
- Importance: low
elastic.https.ssl.trustmanager.algorithm
The algorithm used by the trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.
- Type: string
- Default: PKIX
- Importance: low
auto.create.indices.at.start
Auto create the Elasticsearch indices at startup. This is useful when the indices are a direct mapping of the Kafka topics.
- Type: boolean
- Default: true
- Importance: low
Data Conversion¶
type.name
The Elasticsearch type name to use when indexing.
- Type: string
- Importance: high
key.ignore
Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. When this is set to
true
, document IDs will be generated as the record`stopic+partition+offset
. Note that this is a global config that applies to all topics. Usetopic.key.ignore
to override astrue
for specific topics.- Type: boolean
- Default: false
- Importance: high
schema.ignore
Whether to ignore schemas during indexing. When this is set to
true
, the record schema will be ignored for the purpose of registering an Elasticsearch mapping. Elasticsearch will infer the mapping from the data (dynamic mapping needs to be enabled by the user). Note that this is a global config that applies to all topics. Usetopic.schema.ignore
to override astrue
for specific topics.- Type: boolean
- Default: false
- Importance: low
compact.map.entries
Defines how map entries with string keys within record values should be written to JSON. When this is set to
true
, these entries are written compactly as"entryKey": "entryValue"
. Otherwise, map entries with string keys are written as a nested document{"key": "entryKey", "value": "entryValue"}
. All map entries with non-string keys are always written as nested documents. Prior to 3.3.0, this connector always wrote map entries as nested documents, so set this tofalse
to use that older behavior.- Type: boolean
- Default: true
- Importance: low
topic.index.map
This option is now deprecated. A future version may remove it completely. Please use single message transforms, such as RegexRouter, to map topic names to index names.
A map from Apache Kafka® topic name to the destination Elasticsearch index, represented as a list of
topic:index
pairs.- Type: list
- Default: “”
- Importance: low
topic.key.ignore
List of topics for which
key.ignore
should betrue
.- Type: list
- Default: “”
- Importance: low
topic.schema.ignore
List of topics for which
schema.ignore
should betrue
.- Type: list
- Default: “”
- Importance: low
drop.invalid.message
Whether to drop kafka message when it cannot be converted to output message.
- Type: boolean
- Default: false
- Importance: low
behavior.on.null.values
How to handle records with a non-null key and a null value (i.e. Kafka tombstone records). Valid options are
ignore
,delete
, andfail
.- Type: string
- Default: ignore
- Valid Values: [ignore, delete, fail]
- Importance: low
behavior.on.malformed.documents
How to handle records that Elasticsearch rejects due to some malformation of the document itself, such as an index mapping conflict or a field name containing illegal characters. Valid options are
ignore
,warn
, andfail
.- Type: string
- Default: fail
- Valid Values: [ignore, warn, fail]
- Importance: low