Elasticsearch Service Sink Connector for Confluent Platform

Note

If you are using Confluent Cloud, see Elasticsearch Service Sink Connector for Confluent Cloud for the cloud Quick Start.

The Kafka Connect Elasticsearch Service sink connector moves data from Apache Kafka® to Elasticsearch. It writes data from a topic in Apache Kafka® to an index in Elasticsearch. All data for a topic have the same type in Elasticsearch. This allows an independent evolution of schemas for data from different topics. This simplifies the schema evolution because Elasticsearch has one enforcement on mappings; that is, all fields with the same name in the same index must have the same mapping type.

Elasticsearch is often used for text queries, analytics and as a key-value store (use cases). The connector covers both the analytics and key-value store use cases. For the analytics use case, each message in Kafka is treated as an event and the connector uses topic+partition+offset as a unique identifier for events, which are then converted to unique documents in Elasticsearch.

For the key-value store use case, it supports using keys from Kafka messages as document IDs in Elasticsearch and provides configurations ensuring that updates to a key are written to Elasticsearch in order. For both use cases, Elasticsearch’s idempotent write semantics guarantees exactly once delivery.

Mapping is the process of defining how a document and the fields it contains are stored and indexed. Users can explicitly define mappings for types in indices. When mapping is not explicitly defined, Elasticsearch can determine field names and types from data. However, types such as timestamp and decimal may not be correctly inferred. To ensure that these types are correctly inferred, the connector provides a feature to infer mapping from the schemas of Kafka messages.

Features

The Elasticsearch connector offers the following features:

  • Exactly Once Delivery: The connector relies on Elasticsearch’s idempotent write semantics to ensure exactly once delivery to Elasticsearch. By setting IDs in Elasticsearch documents, the connector can ensure exactly once delivery. If keys are included in Kafka messages, then they are translated to Elasticsearch document IDs automatically.. When the keys are not included, or are explicitly ignored, the connector will use topic+partition+offset as the key, ensuring each message in Kafka has exactly one document corresponding to it in Elasticsearch.
  • Mapping Inference: The connector can infer mappings from Connect schemas. When enabled, the connector creates mappings based on schemas of Kafka messages. If a field is missing, the inference is limited to field types and default values. You should manually created mappings if more customizations are needed (for example, user-defined analyzers).
  • Schema Evolution: The connector supports schema evolution and can handle backward, forward, and fully compatible schema changes in Connect. It can also handle some incompatible schema changes such as changing a field from an integer to a string.

Prerequisites

The following are required to run the Kafka Connect Elasticsearch Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Java 1.8
  • Elasticsearch 2.x, 5.x, 6.x, or 7.x

Install the Elasticsearch Connector

Important

This connector is bundled natively with Confluent Platform. If you have Confluent Platform installed and running, there are no additional steps required to install.

If you do not have Confluent Platform installed and running, you can install the connector using the Confluent Hub client (recommended) or manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-elasticsearch:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-elasticsearch:5.5.1

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

This connector is available under the Confluent Community License.

The source code is available at https://github.com/confluentinc/kafka-connect-elasticsearch.

Configuration Properties

For a complete list of configuration properties for this connector, see Elasticsearch Sink Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.

Quick Start

This quick start uses the Elasticsearch connector to export data produced by the Avro console producer to Elasticsearch.

Prerequisites

See also

For a more detailed Docker-based example of the Confluent Elasticsearch Connector, refer to Confluent Platform demo. You can deploy a Kafka streaming ETL, including Elasticsearch, using ksqlDB for stream processing.

The quick start procedure assumes that you are using the Confluent CLI, but standalone installations are also supported. By default ZooKeeper, Kafka, Schema Registry, Connect REST API, and Connect are started with the confluent local start command. For more information, refer to On-Premises Deployments.

Add a Record to the Consumer

  1. Start the Avro console producer to import a few records to Kafka:

    <path-to-confluent>/bin/kafka-avro-console-producer \
    --broker-list localhost:9092 --topic test-elasticsearch-sink \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    
  2. Enter the following in the console producer:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    

    The three records entered are published to the Kafka topic test-elasticsearch in Avro format.

Load the Elasticsearch Connector

Complete the following steps to load the predefined Elasticsearch connector bundled with Confluent Platform.

Note

Default connector properties are already set for this quick start. To view the connector properties, refer to etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties.

  1. List the available predefined connectors using the confluent local list command:

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local start. For more information, see confluent local.

    confluent local list connectors
    

    Example output:

    Bundled Predefined Connectors (edit configuration under etc/):
      elasticsearch-sink
      file-source
      file-sink
      jdbc-source
      jdbc-sink
      hdfs-sink
      s3-sink
    
  2. Load the elasticsearch-sink connector:

    confluent local load elasticsearch-sink
    

    Example output:

    {
      "name": "elasticsearch-sink",
      "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "test-elasticsearch-sink",
        "key.ignore": "true",
        "connection.url": "http://localhost:9200",
        "type.name": "kafka-connect",
        "name": "elasticsearch-sink"
      },
      "tasks": [],
      "type": null
    }
    

    Tip

    For non-CLI users, you can load the Elasticsearch connector by running Kafka Connect in standalone mode with this command:

    ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties \
    etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
    
  3. After the connector finishes ingesting data to Elasticsearch, enter the following command to check that data is available in Elasticsearch:

    curl -XGET 'http://localhost:9200/test-elasticsearch-sink/_search?pretty'
    

    Example output:

    {
      "took" : 39,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : 3,
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "test-elasticsearch-sink",
            "_type" : "kafka-connect",
            "_id" : "test-elasticsearch-sink+0+0",
            "_score" : 1.0,
            "_source" : {
              "f1" : "value1"
            }
          },
          {
            "_index" : "test-elasticsearch-sink",
            "_type" : "kafka-connect",
            "_id" : "test-elasticsearch-sink+0+2",
            "_score" : 1.0,
            "_source" : {
              "f1" : "value3"
            }
          },
          {
            "_index" : "test-elasticsearch-sink",
            "_type" : "kafka-connect",
            "_id" : "test-elasticsearch-sink+0+1",
            "_score" : 1.0,
            "_source" : {
              "f1" : "value2"
            }
          }
        ]
      }
    }
    

Delivery Semantics

The connector supports batching and pipelined writes to Elasticsearch to boost throughput. It accumulates messages in batches and allows concurrent processing of multiple batches.

Document-level update ordering is ensured by using the partition-level Kafka offset as the document version and by using version_mode=external.

Mapping Management

Before using the connector, carefully consider how the data should be tokenized, analyzed, and indexed. These are determined by mapping. Some changes are not allowed after the mapping is already defined. Although you can add new types to an index or add new fields to a type, you can’t add new analyzers or make changes to existing fields. If you do this, the data that was already indexed would be incorrect and your searches would no longer work as expected. You should define mappings before writing data to Elasticsearch.

Index templates can be helpful when manually defining mappings, and allow you to define templates that are automatically applied when new indices are created. The templates include both settings and mappings, along with a simple pattern template that controls whether the template should be applied to the new index.

Schema Evolution

The Elasticsearch connector writes data from different topics in Kafka to different indices. All data for a topic will have the same type in Elasticsearch. This allows an independent evolution of schemas for data from different topics. This simplifies the schema evolution because Elasticsearch has one enforcement on mappings; that is, all fields with the same name in the same index must have the same mapping.

Elasticsearch supports dynamic mapping: when it encounters previously unknown field in a document, it uses dynamic mapping to determine the datatype for the field and automatically adds the new field to the type mapping.

When dynamic mapping is enabled, the Elasticsearch connector supports schema evolution. This is because mappings in Elasticsearch are more flexible than the schema evolution allowed in Connect when different converters are used. For example, when the Avro converter is used, backward, forward, and fully compatible schema evolutions are allowed.

When dynamic mapping is enabled, the Elasticsearch connector allows the following schema changes:

  • Adding Fields: Adding one or more fields to Kafka messages. Elasticsearch adds the new fields to the mapping when dynamic mapping is enabled.
  • Removing Fields: Removing one or more fields from Kafka messages. Missing fields are treated as the null value defined for those fields in the mapping.
  • Changing types that can be merged: Changing a field from integer type to string type. Elasticsearch can convert integers to strings.

The following change is not allowed:

  • Changing types that can not be merged: Changing a field from a string type to an integer type.

Because mappings are more flexible, schema compatibility should be enforced when writing data to Kafka.

Automatic Retries

The Elasticsearch connector may not be able to write to the Elasticsearch endpoint if the Elasticsearch service is temporarily overloaded. In many cases, the connector retries the request a number of times before failing. To prevent further overloading, the connector uses an exponential backoff technique to give the Elasticsearch service time to recover. This technique adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, wherein large numbers of requests from many tasks are submitted concurrently and overwhelm the service.

Randomness spreads out the retries from many tasks. This should reduce the overall time required to complete all outstanding requests when compared to simple exponential backoff. The goal is to spread out the requests to Elasticsearch as much as possible.

The number of retries is dictated by the max.retries connector configuration property. The max.retries property defaults to five attempts. The maximum backoff time (the amount of time to wait before retrying) is a function of the retry attempt number and the initial backoff time specified in the retry.backoff.ms connector configuration property. The retry.backoff.ms property defaults to 100 milliseconds.

The jitter strategy used is “Full Jitter” where the actual backoff time is a uniform random value selected between the minimum backoff (0.0) and maximum backoff at the current attempt. Since the actual backoff value is selected randomly, it is not guaranteed to increase with each consecutive retry attempt.

For example, the following table shows the possible wait times for four subsequent retries after the first retry attempt that defaults to 100 milliseconds (0.1 second):

Range of backoff times
Retry Minimum Backoff (sec) Maximum Backoff (sec) Actual Backoff with Jitter (sec) Total Potential Delay from First Attempt (sec)
1 0.0 0.5 0.4 0.5
2 0.0 1.0 0.7 1.5
3 0.0 2.0 1.9 3.5
4 0.0 4.0 1.5 7.5

Note how the maximum wait time is simply the normal exponential backoff which is calculated as ${retry.backoff.ms} * 2 ^ (retry-1). Also note how the actual backoff decreased between retry attempt #3 and #4 despite the maximum backoff increasing exponentially.

As shown in the following table, increasing the maximum number of retries adds more backoff:

Range of backoff times for additional retries
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
5 0.0 8.0 15.5
6 0.0 16.0 31.5
7 0.0 32.0 63.5
8 0.0 64.0 127.5
9 0.0 128.0 255.5
10 0.0 256.0 511.5
11 0.0 512.0 1023.5
12 0.0 1024.0 2047.5
13 0.0 2048.0 4095.5

By increasing max.retries to 10, the connector may take up to 511.5 seconds, or a little over 8.5 minutes to successfully send a batch of records when the Elasticsearch service is overloaded. Increasing the value to 13 quickly increases the maximum potential time to submit a batch of records to well over one hour.

You can adjust both the max.retries and retry.backoff.ms connector configuration properties to optimize retry timing.

Reindexing

In some cases, the way to index a set of documents may need to be changed. For example, the analyzer, tokenizer, and indexed fields may need to be changed. Because these must not be changed after your mapping is defined, you must reindex the data. You can use Index aliases to achieve reindexing with zero downtime.

To reindex the data, complete the following steps in Elasticsearch:

  1. Create an alias for the index with the original mapping.
  2. Point the applications using the index to the alias.
  3. Create a new index with the updated mapping.
  4. Move data from the original index to the new index.
  5. Atomically move the alias to the new index.
  6. Delete the original index.

Write requests continue to come during the reindex period (if reindexing is done with no downtime). Aliases do not allow writing to both the original and the new index at the same time. To solve this, you can use two Elasticsearch connector jobs to achieve double writes, one to the original index and a second one to the new index. The following steps explain how to do this:

  1. Keep the original connector job that ingests data to the original indices running.
  2. Create a new connector job that writes to new indices. As long as the data is in Kafka, some of the old data and all new data are written to the new indices.
  3. After the reindexing process is complete and the data in the original indices are moved to the new indices, stop the original connector job.

Security

The Elasticsearch connector can read data from secure Kafka by following the instructions in the Kafka Connect security documentation.

The Elasticsearch connector can write data to a secure Elasticsearch cluster that supports basic authentication by setting the connection.username and connection.password configuration properties.

See also: Elasticsearch Connector with Security.