Elasticsearch Service Sink Connector for Confluent Platform

Note

  • The Elasticsearch Sink connector for Confluent Platform provides support for Elasticsearch version 7.x and later–including version 8.x.
  • This version of the Elasticsearch Sink connector includes compatibility mode support for Elasticsearch version 8.x. The connector configures the compatibility mode automatically during startup, so nothing needs to be enabled in the connector configuration. For more details about the compatibility mode, see the Elasticsearch Compatibility documentation.

The Kafka Connect Elasticsearch Service Sink connector moves data from Apache Kafka® to Elasticsearch. It writes data from a topic in Kafka to an index in Elasticsearch. All data for a topic have the same type in Elasticsearch. This allows an independent evolution of schemas for data from different topics. This simplifies the schema evolution because Elasticsearch has one enforcement on mappings; that is, all fields with the same name in the same index must have the same mapping type.

Elasticsearch is often used for text queries, analytics and as a key-value store (use cases). The connector covers both the analytics and key-value store use cases. For the analytics use case, each message in Kafka is treated as an event and the connector uses topic+partition+offset as a unique identifier for events, which are then converted to unique documents in Elasticsearch.

For the key-value store use case, it supports using keys from Kafka messages as document IDs in Elasticsearch and provides configurations ensuring that updates to a key are written to Elasticsearch in order. For both use cases, Elasticsearch’s idempotent write semantics guarantees exactly once delivery.

Mapping is the process of defining how a document and the fields it contains are stored and indexed. Users can explicitly define mappings for types in indices. When mapping is not explicitly defined, Elasticsearch can determine field names and types from data. However, types such as timestamp and decimal may not be correctly inferred. To ensure that these types are correctly inferred, the connector provides a feature to infer mapping from the schemas of Kafka messages.

Note

The Kafka source topic name is used to create the destination index name in Elasticsearch. You can change this name prior to it being used as the index name with a Single Message Transformation (SMT)–RegexRouter or TimeStampRouter–only when the flush.synchronously configuration property is set to true. For more details, see Limitations.

Features

The Elasticsearch Service Sink connector offers the following features:

Exactly once delivery

The connector relies on Elasticsearch’s idempotent write semantics to ensure exactly once delivery to Elasticsearch. By setting IDs in Elasticsearch documents, the connector can ensure exactly once delivery. If keys are included in Kafka messages, then they are translated to Elasticsearch document IDs automatically. When the keys are not included, or are explicitly ignored, the connector will use topic+partition+offset as the key, ensuring each message in Kafka has exactly one document corresponding to it in Elasticsearch.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Elasticsearch Service Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Mapping inference

The connector can infer mappings from Connect schemas. When enabled, the connector creates mappings based on schemas of Kafka messages. If a field is missing, the inference is limited to field types and default values. You should manually create mappings if more customizations are needed (for example, user-defined analyzers).

Schema evolution

The connector supports schema evolution and can handle backward, forward, and fully compatible schema changes in Connect. It can also handle some incompatible schema changes such as changing a field from an integer to a string.

License

The following are required to run the Kafka Connect Elasticsearch Sink connector:

The source code is available at https://github.com/confluentinc/kafka-connect-elasticsearch.

Configuration Properties

For a complete list of configuration properties for this connector, see Elasticsearch Sink Connector Configuration Properties.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Limitations

  • The Elasticsearch Service Sink connector doesn’t support Amazon Elasticsearch Service.

  • The connector only supports the following data stream types: logs and metrics. For more details, see the data.stream.type configuration property.

  • The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:

    • io.debezium.transforms.ByLogicalTableRouter
    • io.debezium.transforms.outbox.EventRouter
    • org.apache.kafka.connect.transforms.RegexRouter
    • org.apache.kafka.connect.transforms.TimestampRouter
    • io.confluent.connect.transforms.MessageTimestampRouter
    • io.confluent.connect.transforms.ExtractTopic$Key
    • io.confluent.connect.transforms.ExtractTopic$Value

    Note

    These SMT limitations are inapplicable to the Elasticsearch Sink connector when the flush.synchronously configuration property is set to true. For more information about the flush.synchronously configuration property see the Elasticsearch Sink Connector Configuration Properties documentation page.

Install the Elasticsearch Service Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Connect: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Java 1.8.

  • Elasticsearch 7.x.

  • Elasticsearch assigned privileges: create_index, read, write, and view_index_metadata.

    Example:

    curl -XPOST "localhost:9200/_security/role/es_sink_connector_role?pretty" -H 'Content-Type: application/json' -d'
    {
    "indices": [
       {
          "names": [ "-" ],
          "privileges": ["create_index", "read", "write", "view_index_metadata"]
       }
    ]
    }'
    
    curl -XPOST "localhost:9200/_security/user/es_sink_connector_user?pretty" -H 'Content-Type: application/json' -d'
    {
    "password" : "seCret-secUre-PaSsW0rD",
    "roles" : [ "es_sink_connector_role" ]
    }'
    
  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-elasticsearch:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-elasticsearch:14.0.5
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick Start

This quick start uses the Elasticsearch connector to export data produced by the Avro console producer to Elasticsearch.

Prerequisites

See also

For a more detailed Docker-based example of the Confluent Elasticsearch Connector, refer to Confluent Platform Demo (cp-demo). You can deploy a Kafka streaming ETL, including Elasticsearch, using ksqlDB for stream processing.

This quick start assumes that you are using the Confluent CLI commands, but standalone installations are also supported. By default ZooKeeper, Apache Kafka®, Schema Registry, Kafka Connect REST API, and Kafka Connect are started with the confluent local services start command. Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.

Add a record to the consumer

  1. Start the Avro console producer to import a few records to Kafka:

    ${CONFLUENT_HOME}/bin/kafka-avro-console-producer \
    --broker-list localhost:9092 --topic test-elasticsearch-sink \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    
  2. Enter the following in the console producer:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    

    The three records entered are published to the Kafka topic test-elasticsearch in Avro format.

Load the connector

Complete the following steps to load the predefined Elasticsearch connector bundled with Confluent Platform.

Note

Default connector properties are already set for this quick start. To view the connector properties, refer to etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties.

  1. List the available predefined connectors using the following command:

    confluent local services connect connector list
    

    Example output:

    Bundled Predefined Connectors (edit configuration under etc/):
      elasticsearch-sink
      file-source
      file-sink
      jdbc-source
      jdbc-sink
      hdfs-sink
      s3-sink
    
  2. Load the elasticsearch-sink connector:

    confluent local services connect connector load elasticsearch-sink
    

    Example output:

    {
      "name": "elasticsearch-sink",
      "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "test-elasticsearch-sink",
        "key.ignore": "true",
        "connection.url": "http://localhost:9200",
        "type.name": "kafka-connect",
        "name": "elasticsearch-sink"
      },
      "tasks": [],
      "type": null
    }
    

    Tip

    For non-CLI users, you can load the Elasticsearch connector by running Kafka Connect in standalone mode with this command:

    ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties \
    etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
    
  3. After the connector finishes ingesting data to Elasticsearch, enter the following command to check that data is available in Elasticsearch:

    curl -XGET 'http://localhost:9200/test-elasticsearch-sink/_search?pretty'
    

    Example output:

    {
      "took" : 39,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : 3,
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "test-elasticsearch-sink",
            "_type" : "kafka-connect",
            "_id" : "test-elasticsearch-sink+0+0",
            "_score" : 1.0,
            "_source" : {
              "f1" : "value1"
            }
          },
          {
            "_index" : "test-elasticsearch-sink",
            "_type" : "kafka-connect",
            "_id" : "test-elasticsearch-sink+0+2",
            "_score" : 1.0,
            "_source" : {
              "f1" : "value3"
            }
          },
          {
            "_index" : "test-elasticsearch-sink",
            "_type" : "kafka-connect",
            "_id" : "test-elasticsearch-sink+0+1",
            "_score" : 1.0,
            "_source" : {
              "f1" : "value2"
            }
          }
        ]
      }
    }
    

Delivery semantics

The connector supports batching and pipelined writes to Elasticsearch to boost throughput. It accumulates messages in batches and allows concurrent processing of multiple batches.

Mapping management

Before using the connector, carefully consider how the data should be tokenized, analyzed, and indexed. These are determined by mapping. Some changes are not allowed after the mapping is already defined. Although you can add new types to an index or add new fields to a type, you can’t add new analyzers or make changes to existing fields. If you do this, the data that was already indexed would be incorrect and your searches would no longer work as expected. You should define mappings before writing data to Elasticsearch.

Index templates can be helpful when manually defining mappings, and allow you to define templates that are automatically applied when new indices are created. The templates include both settings and mappings, along with a simple pattern template that controls whether the template should be applied to the new index.

Schema evolution

The Elasticsearch connector writes data from different topics in Kafka to different indices. All data for a topic will have the same type in Elasticsearch. This allows an independent evolution of schemas for data from different topics. This simplifies the schema evolution because Elasticsearch has one enforcement on mappings; that is, all fields with the same name in the same index must have the same mapping.

Elasticsearch supports dynamic mapping: when it encounters previously unknown field in a document, it uses dynamic mapping to determine the datatype for the field and automatically adds the new field to the type mapping.

When dynamic mapping is enabled, the Elasticsearch connector supports schema evolution. This is because mappings in Elasticsearch are more flexible than the schema evolution allowed in Connect when different converters are used. For example, when the Avro converter is used, backward, forward, and fully compatible schema evolutions are allowed.

When dynamic mapping is enabled, the Elasticsearch connector allows the following schema changes:

  • Adding Fields: Adding one or more fields to Kafka messages. Elasticsearch adds the new fields to the mapping when dynamic mapping is enabled.
  • Removing Fields: Removing one or more fields from Kafka messages. Missing fields are treated as the null value defined for those fields in the mapping.
  • Changing types that can be merged: Changing a field from integer type to string type. Elasticsearch can convert integers to strings.

The following change is not allowed:

  • Changing types that can not be merged: Changing a field from a string type to an integer type.

Because mappings are more flexible, schema compatibility should be enforced when writing data to Kafka.

Automatic retries

The Elasticsearch connector may not be able to write to the Elasticsearch endpoint if the Elasticsearch service is temporarily overloaded. In many cases, the connector retries the request a number of times before failing. To prevent further overloading, the connector uses an exponential backoff technique to give the Elasticsearch service time to recover. This technique adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, wherein large numbers of requests from many tasks are submitted concurrently and overwhelm the service.

Randomness spreads out the retries from many tasks. This should reduce the overall time required to complete all outstanding requests when compared to simple exponential backoff. The goal is to spread out the requests to Elasticsearch as much as possible.

The number of retries is dictated by the max.retries connector configuration property. The max.retries property defaults to five attempts. The maximum backoff time (the amount of time to wait before retrying) is a function of the retry attempt number and the initial backoff time specified in the retry.backoff.ms connector configuration property. The retry.backoff.ms property defaults to 100 milliseconds.

The jitter strategy used is “Full Jitter” where the actual backoff time is a uniform random value selected between the minimum backoff (0.0) and maximum backoff at the current attempt. Since the actual backoff value is selected randomly, it is not guaranteed to increase with each consecutive retry attempt.

For example, the following table shows the possible wait times for four subsequent retries with retry.backoff.ms set to 500 milliseconds (0.5 second):

Range of backoff times
Retry Minimum Backoff (sec) Maximum Backoff (sec) Actual Backoff with Jitter (sec) Total Potential Delay from First Attempt (sec)
1 0.0 0.5 0.4 0.5
2 0.0 1.0 0.7 1.5
3 0.0 2.0 1.9 3.5
4 0.0 4.0 1.5 7.5

Note how the maximum wait time is simply the normal exponential backoff which is calculated as ${retry.backoff.ms} * 2 ^ (retry-1). Also note how the actual backoff decreased between retry attempt #3 and #4 despite the maximum backoff increasing exponentially.

As shown in the following table, increasing the maximum number of retries adds more backoff:

Range of backoff times for additional retries
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
5 0.0 8.0 15.5
6 0.0 16.0 31.5
7 0.0 32.0 63.5
8 0.0 64.0 127.5
9 0.0 128.0 255.5
10 0.0 256.0 511.5
11 0.0 512.0 1023.5
12 0.0 1024.0 2047.5
13 0.0 2048.0 4095.5

By increasing max.retries to 10, the connector may take up to 511.5 seconds, or a little over 8.5 minutes to successfully send a batch of records when the Elasticsearch service is overloaded. Increasing the value to 13 quickly increases the maximum potential time to submit a batch of records to well over one hour.

You can adjust both the max.retries and retry.backoff.ms connector configuration properties to optimize retry timing.

Reindexing

In some cases, the way to index a set of documents may need to be changed. For example, the analyzer, tokenizer, and indexed fields may need to be changed. Because these must not be changed after your mapping is defined, you must reindex the data. You can use Index aliases to achieve reindexing with zero downtime.

To reindex the data, complete the following steps in Elasticsearch:

  1. Create an alias for the index with the original mapping.
  2. Point the applications using the index to the alias.
  3. Create a new index with the updated mapping.
  4. Move data from the original index to the new index.
  5. Atomically move the alias to the new index.
  6. Delete the original index.

Write requests continue to come during the reindex period (if reindexing is done with no downtime). Aliases do not allow writing to both the original and the new index at the same time. To solve this, you can use two Elasticsearch connector jobs to achieve double writes, one to the original index and a second one to the new index. The following steps explain how to do this:

  1. Keep the original connector job that ingests data to the original indices running.
  2. Create a new connector job that writes to new indices. As long as the data is in Kafka, some of the old data and all new data are written to the new indices.
  3. After the reindexing process is complete and the data in the original indices are moved to the new indices, stop the original connector job.

Security

The Elasticsearch connector can read data from secure Kafka by following the instructions in the Kafka Connect security documentation. The connector can write data to a secure Elasticsearch cluster that supports either of the following authentication methods:

  • Basic authentication: By setting the connection.username and connection.password configuration properties.
  • Kerberos authentication: By setting the Kerberos configuration properties.

For more information, see Elasticsearch Connector with Security.