.. _elasticsearch-overview: Elasticsearch Sink Connector for |cp| ===================================== The |kconnect-long| Elasticsearch connector allows moving data from |ak-tm| to Elasticsearch. It writes data from a topic in |ak-tm| to an `index `__ in Elasticsearch and all data for a topic have the same `type `__. Elasticsearch is often used for text queries, analytics and as a key-value store (`use cases `__). The connector covers both the analytics and key-value store use cases. For the **analytics** use case, each message in |ak| is treated as an event and the connector uses ``topic+partition+offset`` as a unique identifier for events, which are then converted to unique documents in Elasticsearch. For the **key-value store** use case, it supports using keys from |ak| messages as document IDs in Elasticsearch and provides configurations ensuring that updates to a key are written to Elasticsearch in order. For both use cases, Elasticsearch's idempotent write semantics guarantees exactly once delivery. `Mapping `__ is the process of defining how a document and the fields it contains are stored and indexed. Users can explicitly define mappings for types in indices. When mapping is not explicitly defined, Elasticsearch can determine field names and types from data. However, types such as timestamp and decimal may not be correctly inferred. To ensure that these types are correctly inferred, the connector provides a feature to infer mapping from the schemas of |ak| messages. -------- Features -------- The Elasticsearch connector offers the following features: * **Exactly Once Delivery**: The connector relies on Elasticsearch's idempotent write semantics to ensure exactly once delivery to Elasticsearch. By setting IDs in Elasticsearch documents, the connector can ensure exactly once delivery. If keys are included in |ak| messages, then they are translated to Elasticsearch document IDs automatically.. When the keys are not included, or are explicitly ignored, the connector will use ``topic+partition+offset`` as the key, ensuring each message in |ak| has exactly one document corresponding to it in Elasticsearch. * **Mapping Inference**: The connector can infer mappings from |kconnect| schemas. When enabled, the connector creates mappings based on schemas of |ak| messages. If a field is missing, the inference is limited to field types and default values. You should manually created mappings if more customizations are needed (for example, user-defined analyzers). * **Schema Evolution**: The connector supports schema evolution and can handle backward, forward, and fully compatible schema changes in |kconnect|. It can also handle some incompatible schema changes such as changing a field from an integer to a string. ------------- Prerequisites ------------- The following are required to run the |kconnect-long| Elasticsearch Sink Connector: * |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * |kconnect|: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * Java 1.8 * Elasticsearch 2.x, 5.x, 6.x, or 7.x ----------------------------------- Install the Elasticsearch Connector ----------------------------------- .. include:: ../includes/connector-native-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-elasticsearch:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-elasticsearch:|release| ------------------------------ Install the Connector Manually ------------------------------ `Download and extract the ZIP file `_ for your connector and then follow the manual connector installation :ref:`instructions `. ------- License ------- .. include:: ../includes/community-license.rst The source code is available at `https://github.com/confluentinc/kafka-connect-elasticsearch `_. .. _elasticsearch-quickstart: ----------- Quick Start ----------- This quick start uses the Elasticsearch connector to export data produced by the Avro console producer to Elasticsearch. .. include:: ../../includes/install-cli-prereqs.rst .. seealso:: For a more detailed Docker-based example of the Confluent Elasticsearch Connector, refer to :ref:`Confluent Platform demo`. You can deploy a |ak| streaming ETL, including Elasticsearch, using KSQL for stream processing. The quick start procedure assumes that you are using the Confluent CLI, but standalone installations are also supported. By default |zk|, |ak|, |sr|, |kconnect| REST API, and |kconnect| are started with the :litwithvars:`|confluent_start|` command. For more information, refer to :ref:`installation`. Add a Record to the Consumer ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Start the Avro console producer to import a few records to |ak|: .. codewithvars:: bash /bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic test-elasticsearch-sink \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' #. Enter the following in the console producer: .. codewithvars:: bash {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} The three records entered are published to the |ak| topic ``test-elasticsearch`` in Avro format. Load the Elasticsearch Connector ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Complete the following steps to load the predefined Elasticsearch connector bundled with |cp|. .. note:: Default connector properties are already set for this quick start. To view the connector properties, refer to ``etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties``. #. List the available predefined connectors using the :ref:`confluent_local_list` command: .. include:: ../../includes/cli-new.rst .. codewithvars:: bash |confluent_list| connectors Example output: .. codewithvars:: bash Bundled Predefined Connectors (edit configuration under etc/): elasticsearch-sink file-source file-sink jdbc-source jdbc-sink hdfs-sink s3-sink #. Load the ``elasticsearch-sink`` connector: .. codewithvars:: bash |confluent_load| elasticsearch-sink Example output: .. codewithvars:: bash { "name": "elasticsearch-sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "test-elasticsearch-sink", "key.ignore": "true", "connection.url": "http://localhost:9200", "type.name": "kafka-connect", "name": "elasticsearch-sink" }, "tasks": [], "type": null } .. tip:: For non-CLI users, you can load the Elasticsearch connector by running |kconnect-long| in standalone mode with this command: .. codewithvars:: bash ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties \ etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties #. After the connector finishes ingesting data to Elasticsearch, enter the following command to check that data is available in Elasticsearch: .. codewithvars:: bash curl -XGET 'http://localhost:9200/test-elasticsearch-sink/_search?pretty' Example output: .. codewithvars:: bash { "took" : 39, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 3, "max_score" : 1.0, "hits" : [ { "_index" : "test-elasticsearch-sink", "_type" : "kafka-connect", "_id" : "test-elasticsearch-sink+0+0", "_score" : 1.0, "_source" : { "f1" : "value1" } }, { "_index" : "test-elasticsearch-sink", "_type" : "kafka-connect", "_id" : "test-elasticsearch-sink+0+2", "_score" : 1.0, "_source" : { "f1" : "value3" } }, { "_index" : "test-elasticsearch-sink", "_type" : "kafka-connect", "_id" : "test-elasticsearch-sink+0+1", "_score" : 1.0, "_source" : { "f1" : "value2" } } ] } } ------------------ Delivery Semantics ------------------ The connector supports batching and pipelined writes to Elasticsearch to boost throughput. It accumulates messages in batches and allows concurrent processing of multiple batches. Document-level update ordering is ensured by using the partition-level |ak| offset as the `document version `__ and by using ``version_mode=external``. ------------------ Mapping Management ------------------ Before using the connector, carefully consider how the data should be tokenized, analyzed, and indexed. These are determined by mapping. Some changes are not allowed after the mapping is already defined. Although you can add new types to an index or add new fields to a type, you can’t add new analyzers or make changes to existing fields. If you do this, the data that was already indexed would be incorrect and your searches would no longer work as expected. You should define mappings before writing data to Elasticsearch. `Index templates `__ can be helpful when manually defining mappings, and allow you to define templates that are automatically applied when new indices are created. The templates include both settings and mappings, along with a simple pattern template that controls whether the template should be applied to the new index. ---------------- Schema Evolution ---------------- The Elasticsearch connector writes data from different topics in |ak| to different indices. All data for a topic will have the same type in Elasticsearch. This allows an independent evolution of schemas for data from different topics. This simplifies the schema evolution because Elasticsearch has one enforcement on mappings; that is, *all fields with the same name in the same index must have the same mapping*. Elasticsearch supports dynamic mapping: when it encounters previously unknown field in a document, it uses `dynamic mapping `__ to determine the datatype for the field and automatically adds the new field to the type mapping. When dynamic mapping is enabled, the Elasticsearch connector supports schema evolution. This is because mappings in Elasticsearch are more flexible than the schema evolution allowed in |kconnect| when different converters are used. For example, when the Avro converter is used, backward, forward, and fully compatible schema evolutions are allowed. When dynamic mapping is enabled, the Elasticsearch connector allows the following schema changes: * **Adding Fields**: Adding one or more fields to |ak| messages. Elasticsearch adds the new fields to the mapping when dynamic mapping is enabled. * **Removing Fields**: Removing one or more fields from |ak| messages. Missing fields are treated as the null value defined for those fields in the mapping. * **Changing types that can be merged**: Changing a field from integer type to string type. Elasticsearch can convert integers to strings. The following change is not allowed: * **Changing types that can not be merged**: Changing a field from a string type to an integer type. Because mappings are more flexible, schema compatibility should be enforced when writing data to |ak|. ----------------- Automatic Retries ----------------- The Elasticsearch connector may not be able to write to the Elasticsearch endpoint if the Elasticsearch service is temporarily overloaded. In many cases, the connector retries the request a number of times before failing. To prevent further overloading, the connector uses an exponential backoff technique to give the Elasticsearch service time to recover. This technique adds randomness, called jitter, to the calculated backoff times to prevent a *thundering herd*, wherein large numbers of requests from many tasks are submitted concurrently and overwhelm the service. Randomness spreads out the retries from many tasks. This should reduce the overall time required to complete all outstanding requests when compared to simple exponential backoff. The goal is to spread out the requests to Elasticsearch as much as possible. The number of retries is dictated by the ``max.retries`` connector configuration property. The ``max.retries`` property defaults to five attempts. The backoff time (the amount of time to wait before retrying) is a function of the retry attempt number and the initial backoff time specified in the ``retry.backoff.ms`` connector configuration property. The ``retry.backoff.ms`` property defaults to 100 milliseconds. For example, the following table shows the possible wait times for four subsequent retries after the first retry attempt that defaults to 100 milliseconds (0.1 second): .. table:: Range of backoff times :widths: auto ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 1 0.0 0.5 0.5 2 0.0 1.0 1.5 3 0.0 2.0 3.5 4 0.0 4.0 7.5 ===== ===================== ===================== ============================================== Note how the maximum wait time is simply the normal exponential backoff which is calculated as ``${retry.backoff.ms} * 2 ^ (retry-1)``. As shown in the following table, increasing the maximum number of retries adds more backoff: .. table:: Range of backoff times for additional retries :widths: auto ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 5 0.0 8.0 15.5 6 0.0 16.0 31.5 7 0.0 32.0 63.5 8 0.0 64.0 127.5 9 0.0 128.0 255.5 10 0.0 256.0 511.5 11 0.0 512.0 1023.5 12 0.0 1024.0 2047.5 13 0.0 2048.0 4095.5 ===== ===================== ===================== ============================================== By increasing ``max.retries`` to 10, the connector may take up to 511.5 seconds, or a little over 8.5 minutes to successfully send a batch of records when the Elasticsearch service is overloaded. Increasing the value to 13 quickly increases the maximum potential time to submit a batch of records to well over one hour. You can adjust both the ``max.retries`` and ``retry.backoff.ms`` connector configuration properties to optimize retry timing. ---------- Reindexing ---------- In some cases, the way to index a set of documents may need to be changed. For example, the analyzer, tokenizer, and indexed fields may need to be changed. Because these must not be changed after your mapping is defined, you must reindex the data. You can use `Index aliases `__ to achieve reindexing with zero downtime. To reindex the data, complete the following steps in Elasticsearch: #. Create an alias for the index with the original mapping. #. Point the applications using the index to the alias. #. Create a new index with the updated mapping. #. Move data from the original index to the new index. #. Atomically move the alias to the new index. #. Delete the original index. Write requests continue to come during the reindex period (if reindexing is done with no downtime). Aliases do not allow writing to both the original and the new index at the same time. To solve this, you can use two Elasticsearch connector jobs to achieve double writes, one to the original index and a second one to the new index. The following steps explain how to do this: #. Keep the original connector job that ingests data to the original indices running. #. Create a new connector job that writes to new indices. As long as the data is in |ak|, some of the old data and all new data are written to the new indices. #. After the reindexing process is complete and the data in the original indices are moved to the new indices, stop the original connector job. -------- Security -------- The Elasticsearch connector can read data from secure |ak| by following the instructions in the |kconnect-long| :ref:`security documentation `. The Elasticsearch connector can write data to a secure Elasticsearch cluster that supports basic authentication by setting the ``connection.username`` and ``connection.password`` configuration properties. See also: :ref:`elasticsearch_connector_security`. ----------------- Suggested Reading ----------------- Blog post: `Kafka Connect Elasticsearch Connector in Action `__ ------------------------ Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 security configuration_options changelog