Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Kafka Connect Elasticsearch Sink Connector

The Elasticsearch connector allows moving data from Kafka to Elasticsearch. It writes data from a topic in Kafka to an index in Elasticsearch and all data for a topic have the same type.

Elasticsearch is often used for text queries, analytics and as an key-value store (use cases). The connector covers both the analytics and key-value store use cases. For the analytics use case, each message is in Kafka is treated as an event and the connector uses topic+partition+offset as a unique identifier for events, which then converted to unique documents in Elasticsearch. For the key-value store use case, it supports using keys from Kafka messages as document IDs in Elasticsearch and provides configurations ensuring that updates to a key are written to Elasticsearch in order. For both use cases, Elasticsearch’s idempotent write semantics guarantees exactly once delivery.

Mapping is the process of defining how a document, and the fields it contains, are stored and indexed. Users can explicitly define mappings for types in indices. When mapping is not explicitly defined, Elasticsearch can determine field names and types from data, however, some types such as timestamp and decimal, may not be correctly inferred. To ensure that the types are correctly inferred, the connector provides a feature to infer mapping from the schemas of Kafka messages.

Features

The Elasticsearch connector offers the following features:

  • Exactly Once Delivery: The connector relies on Elasticsearch’s idempotent write semantics to ensure exactly once delivery to Elasticsearch. By setting IDs in Elasticsearch documents, the connector can ensure exactly once delivery. If keys are included in Kafka messages, these keys are translated to Elasticsearch document IDs automatically. When the keys are not included, or are explicitly ignored, the connector will use topic+partition+offset as the key, ensuring each message in Kafka has exactly one document corresponding to it in Elasticsearch.
  • Mapping Inference: The connector can infer mappings from the Connect schemas. When enabled, the connector creates mappings based on schemas of Kafka messages. However, the inference is limited to field types and default values when a field is missing. If more customizations are needed (e.g. user defined analyzers), we highly recommend to manually create mappings.
  • Schema Evolution: The connector supports schema evolution and can handle backward, forward and fully compatible changes of schemas in Connect. It can also handle some incompatible schema changes such as changing a field from integer to string.

Prerequisites

The following are required to run the Kafka Connect Elasticsearch Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Java 1.8
  • Elasticsearch 2.x, 5.x, or 6.x. Currently 7.x is not supported

Install Elasticsearch Connector

Important

This connector is bundled natively with Confluent Platform. If you have Confluent Platform installed and running, there are no additional steps required to install.

If you do not have Confluent Platform installed and running, you can install the connector using the Confluent Hub client (recommended) or manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-elasticsearch:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-elasticsearch:5.1.4

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

This connector is available under the Confluent Community License.

The source code is available at https://github.com/confluentinc/kafka-connect-elasticsearch.

Quick Start

This quick start uses the Elasticsearch connector to export data produced by the Avro console producer to Elasticsearch.

For a more detailed Docker-based example of the Confluent Elasticsearch Connector, check out Confluent Platform demo. With this demo you can deploy a Kafka streaming ETL, including Elasticsearch, using KSQL for stream processing.

Prerequisites:

  • Confluent Platform is installed and services are running by using the Confluent CLI. This quick start assumes that you are using the Confluent CLI, but standalone installations are also supported. By default ZooKeeper, Kafka, Schema Registry, Connect REST API, and Connect are started with the confluent start command. For more information, see Install Using the Confluent CLI.

Add a Record to the Consumer

Start the Avro console producer to import a few records to Kafka:

<path-to-confluent>/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Then in the console producer, enter:

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}

The three records entered are published to the Kafka topic test-elasticsearch in Avro format.

Load the Elasticsearch Connector

Load the predefined Elasticsearch connector.

Tip

Before starting the connector, you can verify that the configurations in etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties are properly set (e.g. connection.url points to the correct HTTP address).

  1. Optional: View the available predefined connectors with this command:

    confluent list connectors
    

    Your output should resemble:

    Bundled Predefined Connectors (edit configuration under etc/):
      elasticsearch-sink
      file-source
      file-sink
      jdbc-source
      jdbc-sink
      hdfs-sink
      s3-sink
    
  2. Load the elasticsearch-sink connector:

    confluent load elasticsearch-sink
    

    Your output should resemble:

    {
      "name": "elasticsearch-sink",
      "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "test-elasticsearch-sink",
        "key.ignore": "true",
        "connection.url": "http://localhost:9200",
        "type.name": "kafka-connect",
        "name": "elasticsearch-sink"
      },
      "tasks": [],
      "type": null
    }
    

    Tip

    For non-CLI users, you can load the Elasticsearch connector by running Connect in standalone mode with this command:

      ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties \
    etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
    
  3. After the connector finishes ingesting data to Elasticsearch, check that the data is available in Elasticsearch:

    curl -XGET 'http://localhost:9200/test-elasticsearch-sink/_search?pretty'
    

    Your output should resemble:

    {
      "took" : 39,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : 3,
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "test-elasticsearch-sink",
            "_type" : "kafka-connect",
            "_id" : "test-elasticsearch-sink+0+0",
            "_score" : 1.0,
            "_source" : {
              "f1" : "value1"
            }
          },
          {
            "_index" : "test-elasticsearch-sink",
            "_type" : "kafka-connect",
            "_id" : "test-elasticsearch-sink+0+2",
            "_score" : 1.0,
            "_source" : {
              "f1" : "value3"
            }
          },
          {
            "_index" : "test-elasticsearch-sink",
            "_type" : "kafka-connect",
            "_id" : "test-elasticsearch-sink+0+1",
            "_score" : 1.0,
            "_source" : {
              "f1" : "value2"
            }
          }
        ]
      }
    }
    

Delivery Semantics

The connector supports batching and pipelined writes to Elasticsearch to boost throughput. It accumulates messages in batches and allows concurrent processing of multiple batches.

Document-level update ordering is ensured by using the partition-level Kafka offset as the document version, and using version_mode=external.

Mapping Management

Before using the connector, you need to think carefully on how the data should be tokenized, analyzed and indexed, which are determined by mapping. Some changes are not allowed after a mapping is already defined. Although you can add new types to an index, or add new fields to a type, you can’t add new analyzers or make changes to existing fields. If you were to do so, the data that had already been indexed would be incorrect and your searches would no longer work as expected. It is highly recommended that to manually define mappings before writing data to Elasticsearch.

Index templates can be helpful when manually define mappings. It allows you to define templates that will automatically be applied when new indices are created. The templates include both settings and mappings, and a simple pattern template that controls whether the template should be applied to the new index.

Warning

Elasticsearch 7.x has deprecated mappings types, and plans to remove them altogether in 8.x. Because the Elasticsearch sink connector relies on mapping types, using the connector with Elasticsearch 7.x will result in a failure of the task when the connector attempts to write to the Elasticsearch 7 index. The task will fail with an error Exiting WorkerSinkTask due to unrecoverable exception caused by Cannot create mapping with Types cannot be provided in put mapping requests, unless the include_type_name parameter is set to true as the reason.

Support for Elasticsearch 7 will be added in a future release.

Schema Evolution

The Elasticsearch connector writes data from different topics in Kafka to different indices. All data for a topic will have the same type in Elasticseearch. This allows independent evolution of schemas for data from different topics. This simplifies the schema evolution as Elasticsearch has one enforcement on mappings: all fields with the same name in the same index must have the same mapping.

Elasticsearch supports dynamic mapping: when it encounters previously unknown field in a document, it uses dynamic mapping to determine the datatype for the field and automatically adds the new field to the type mapping.

When dynamic mapping is enabled, the Elasticsearch connector supports schema evolution as mappings in Elasticsearch are more flexible than the schema evolution allowed in Connect when different converters are used. For example, when the Avro converter is used, backward, forward and fully compatible schema evolutions are allowed.

When dynamic mapping is enabled, the Elasticsearch connector allows the following schema changes:

  • Adding Fields: Adding one or more fields to Kafka messages. Elasticsearch will add the new fields to the mapping when dynamic mapping is enabled.
  • Removing Fields: Removing one or more fields to Kafka messages. Missing fields will be treated as the null value defined for those fields in the mapping.
  • Changing types that can be merged: Changing a field from string type to integer type. For example, Elasticsearch can convert integers to strings.

The following change is not allowed:

  • Changing types that can not be merged: Changing a field from integer type to string type.

As mappings are more flexible, schema compatibility should be enforced when writing data to Kafka.

Automatic Retries

The Elasticsearch connector may experience problems writing to the Elasticsearch endpoint, such as when the Elasticsearch service is temporarily overloaded. In many cases, the connector will retry the request a number of times before failing. To prevent from further overloading the Elasticsearch service, the connector uses an exponential backoff technique to give the Elasticsearch service time to recover. The technique adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, where large numbers of requests from many tasks are submitted concurrently and overwhelm the service. Randomness spreads out the retries from many tasks and should reduce the overall time required to complete all outstanding requests compared to simple exponential backoff. The goal is to spread out the requests to Elasticsearch as much as possible.

The number of retries is dictated by the max.retries connector configuration property, which defaults to 5 attempts. The backoff time, which is the amount of time to wait before retrying, is a function of the retry attempt number and the initial backoff time specified in the retry.backoff.ms connector configuration property, which defaults to 500 milliseconds. For example, the following table shows the possible wait times before submitting each of the 5 retry attempts:

Range of backoff times for each retry using the default configuration
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
1 0.0 0.5 0.5
2 0.0 1.0 1.5
3 0.0 2.0 3.5
4 0.0 4.0 7.5
5 0.0 8.0 15.5

Note how the maximum wait time is simply the normal exponential backoff, calculated as ${retry.backoff.ms} * 2 ^ (retry-1). Increasing the maximum number of retries adds more backoff:

Range of backoff times for additional retries
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
6 0.0 16.0 31.5
7 0.0 32.0 63.5
8 0.0 64.0 127.5
9 0.0 128.0 255.5
10 0.0 256.0 511.5
11 0.0 512.0 1023.5
12 0.0 1024.0 2047.5
13 0.0 2048.0 4095.5

By increasing max.retries to 10, the connector may take up to 511.5 seconds, or a little over 8.5 minutes, to successfully send a batch of records when experiencing an overloaded Elasticsearch service. Increasing the value to 13 quickly increases the maximum potential time to submit a batch of records to well over 1 hour 8 minutes.

You can adjust both the max.retries and retry.backoff.ms connector configuration properties to achieve the desired backoff and retry characteristics.

Reindexing

In some cases, the way to index a set of documents may need to be changed. For example, the analyzer, tokenizer and which fields are indexed may need to be changed. As those cannot be changed once a mapping is defined, we have to reindex the data. Index aliases can be used to achieve reindexing with zero downtime. Here are the steps at needs to be performed in Elasticsearch:

  1. Create an alias for the index with the old mapping.
  2. The applications that uses the index are pointed to the alias.
  3. Create a new index with the updated mapping.
  4. Move data from old to the new index.
  5. Atomically move the alias to the new index.
  6. Delete the old index.

For zero downtime reindexing, there are still write requests coming during the reindex period. As aliases do not allow writing to both the old and the new index at the same time. To solve this, the same data needs to be written both to the old and the new index.

When the Elasticsearch connector is used to write data to Elasticsearch, we can use two connector jobs to achieve double writes:

  1. The connector job that ingest data to the old indices continue writing to the old indices.
  2. Create a new connector job that writes to new indices. This will copy both some old data and new data to the new indices as long as the data is in Kafka.
  3. Once the data in the old indices are moved to the new indices by the reindexing process, we can stop the old connector job.

Security

The Elasticsearch connector can read data from secure Kafka by following the instructions in the Connect security documentation.

The Elasticsearch connector can write data to a secure Elasticsearch cluster that supports basic authentication by setting the connection.username and connection.password configuration properties.

Additional Documentation