Elasticsearch Service Sink Connector for Confluent Cloud

Note

If you are installing the connector locally for Confluent Platform, see Elasticsearch Service Sink Connector for Confluent Platform.

The Kafka Connect Elasticsearch Service sink connector for Confluent Cloud moves data from Apache Kafka® to Elasticsearch. The connector supports Avro, JSON Schema, Protobuf, or JSON (schemaless) data output from Apache Kafka® topics. It writes data from a topic in Kafka to an Elasticsearch index. Elasticsearch is often used for text queries, analytics, and as a key-value store.

The connector supports both the analytics and key-value store use cases. For the analytics use case, each message in Kafka is treated as an event and the connector uses topic+partition+offset as a unique identifier for events, which are then converted to unique documents in Elasticsearch.

For the key-value store use case, the connector supports using keys from Kafka messages as document IDs in Elasticsearch, while providing configurations that ensure updates to a key are written to Elasticsearch in order. For both use cases, Elasticsearch’s idempotent write semantics guarantees exactly once delivery.

All data for a topic have the same type in Elasticsearch. This allows an independent evolution of schemas for data from different topics. This simplifies schema evolution because Elasticsearch has one enforcement on mappings; that is, all fields with the same name in the same index must have the same mapping type.

Important

After this connector moves from Preview to General Availability (GA), Confluent Cloud Enterprise customers must have a Confluent Cloud annual commitment to use this connector. Contact your Confluent Account Executive to learn more and to update your subscription, if necessary.

Features

The Elasticsearch Service Sink connector inserts Kafka records into an Elasticsearch index (it supports inserts only). The connector provides the following features:

  • Database authentication: Uses Username and password authentication.
  • Input data formats: The connector supports Avro, JSON Schema, Protobuf, or JSON (schemaless) input data formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • Select configuration properties: Provides several optional configuration properties that allow you to fine-tune the connector’s behavior and performance. These properties are described below:
    • key.ignore: Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. When this is set to true, document IDs are created from the topic name, partition, and offset (i.e., topic+partition+offset).
    • schema.ignore: Whether to ignore schemas during indexing. When this property is set to true, the record schema is ignored and Elasticsearch infers the mapping from the data. For this to work, Elasticsearch dynamic mapping must be enabled. Note that this property must stay set to false (default) for JSON (schemaless).
    • compact.map.entries: Defines how map entries with string keys in record values should be written to JSON. When this property is set to true, the entries are written compactly as `"entryKey": "entryValue". Otherwise, map entries with string keys are written as a nested document ({"key": "entryKey", "value": "entryValue"}).
    • behavior.on.null.values: How to handle records with a non-null key and a null value (i.e., Kafka tombstone records). Valid options are ignore, delete, and fail. Defaults to ignore.
    • drop.invalid.message: Whether to drop a Kafka message when it cannot be converted to an output message. Defaults to false.
    • batch.size: The number of records to process as a batch when writing to Elasticsearch. This value defaults to 2000.
    • linger.ms: Linger time in milliseconds for batching. Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the batch.size configuration. Normally this only occurs under load, when records arrive faster than they can be sent out. However, you may want to reduce the number of request under light load to get the benefits from bulk indexing. In other words, when a pending batch is not full, rather than immediately sending it out the task waits up to the given delay. This allows other records to be added so that they can be batched into a single request. This value defaults to 1 ms.
    • flush.timeout.ms The timeout in milliseconds to use for periodic flushing and waiting for buffer space to be made available by completed requests, as records are added. If this timeout is exceeded the task fails. This value defaults to 10000 ms.
    • connection.compression: Whether to use Gzip compression on the HTTP connection to ElasticSearch. To make this setting work the http.compression setting must be set to true on the Elasticsearch nodes. For more information about the Elasticsearch HTTP properties, see Elasticsearch HTTP Settings. Defaults to false.
    • auto.create.indices.at.start: Automatically create the Elasticsearch indices at startup. This is useful when indices are directly mapped from the Kafka topics. Defaults to true.

Configuration properties that are not shown in the Confluent Cloud UI use the default values. For default values and property definitions, see Elasticsearch Sink Connector Configuration Properties.

Refer to Cloud connector limitations for additional information.

Caution

Preview connectors are not currently supported and are not recommended for production use.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Elasticsearch Service Sink connector. The quick start provides the basics of selecting the connector and configuring it to stream events to an Elasticsearch deployment.

Prerequsites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).

  • The Confluent Cloud CLI installed and configured for the cluster. See Install and Configure the Confluent Cloud CLI.

  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  • The Elasticsearch Service deployment must be in the same region as your Confluent Cloud deployment.

  • You add a valid Elasticsearch Service username and password to the connector configuration. You get these when you create your Elastic deployment. An example is shown below:

    Elasticsearch Deployment username and password
  • Kafka cluster credentials. You can use one of the following ways to get credentials:
    • Create a Confluent Cloud API key and secret. To create a key and secret, go to Kafka API keys in your cluster or you can autogenerate the API key and secret directly in the UI when setting up the connector.
    • Create a Confluent Cloud service account for the connector.

Using the Confluent Cloud GUI

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

Click Connectors. If you already have connectors in your cluster, click Add connector.

Step 3: Select your connector.

Click the Elasticsearch Service Sink connector icon.

Elasticsearch Service Sink Connector Icon

Step 4: Set up the connection.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.

Complete the following and click Continue.

  1. Select one or more topics.
  2. Enter a Connector Name.
  3. Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.
  4. Select an Input message format (data coming from the Kafka topic): AVRO, JSON_SR (JSON Schema), PROTOBUF, or JSON (schemaless). A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  5. Provide Elasticsearch connection information.
    • Enter the connection URI. This is the Elasticsearch endpoint you can copy from your Elasticsearch deployment console. The URI you enter should look like this: https://ec5bfac80bc14c26a77eefb6585f196c.us-west-2.aws.found.io:9243.
    • Enter the Elasticsearch deployment username and password. An example showing where these are on the Elastic deployment console is shown in the prerequisites.
  6. Enter the remaining Elasticsearch deployment, error handling, and connection details. Other than Type name, these properties are optional.
    • Type name: This is a name that Elasticsearch uses when indexing and to divide documents into logical groups. This can be anything you choose (for example, customer or item). For more information about this property and mapping in general, see Elasticsearch Mapping: The Basics, Updates & Examples.
    • Key ignore: Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. When this is set to true, document IDs are created from the topic name, partition, and offset (i.e., topic+partition+offset).
    • Schema ignore: Whether to ignore schemas during indexing. When this property is set to true, the record schema is ignored and Elasticsearch infers the mapping from the data. For this to work, Elasticsearch dynamic mapping must be enabled. Note that this property must stay set to false (default) for JSON (schemaless).
    • Compact map entries: Defines how map entries with string keys in record values should be written to JSON. When this property is set to true, the entries are written compactly as `"entryKey": "entryValue". Otherwise, map entries with string keys are written as a nested document ({"key": "entryKey", "value": "entryValue"}).
    • Behavior on null values: How to handle records with a non-null key and a null value (i.e., Kafka tombstone records). Options are delete, fail, and ignore (default).
    • Drop invalid message: Whether to drop a Kafka message when it cannot be converted to an output message. Defaults to false.
    • Batch size: The number of records to process as a batch when writing to Elasticsearch. This value defaults to 2000.
    • Linger (ms): Linger time in milliseconds for batching. Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the Batch size value. Normally this only occurs under load, when records arrive faster than they can be sent out. However, you may want to reduce the number of request under light load, to get the benefits from bulk indexing. In other words, when a pending batch is not full, rather than immediately sending it out the task waits up to the given delay. This allows other records to be added so that they can be batched into a single request. This value defaults to 1 ms.
    • Flush timeout (ms): The timeout in milliseconds to use for periodic flushing and waiting for buffer space to be made available by completed requests, as records are added. If this timeout is exceeded the task fails. This value defaults to 10000 ms.
    • Connection compression: Whether to use Gzip compression on the HTTP connection to ElasticSearch. To make this setting work the http.compression setting must be set to true on the Elasticsearch nodes. For more information about the Elasticsearch HTTP properties, see Elasticsearch HTTP Settings.
    • Auto create indices at start: Automatically create the Elasticsearch indices at startup. This is useful when indices are directly mapped from the Kafka topics. This defaults to true.
  7. Enter the number of tasks for the connector. See Confluent Cloud connector limitations for additional task information.

Configuration properties that are not shown in the Confluent Cloud UI use the default values. For default values and property definitions, see Elasticsearch Service Sink Connector Configuration Properties.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Launch the connector

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running.

Connector running

Step 7: Check the results in Elasticsearch.

Verify that new records are being sinked to your Elasticsearch deployment.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Confluent Cloud Dead Letter Queue for details.

For additional information about this connector, see Elasticsearch Service Sink Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL example. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../../_images/topology.png

Using the Confluent Cloud CLI

Complete the following steps to set up and run the connector using the Confluent Cloud CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe ElasticsearchSink

Example output:

Following are the required configs:
connector.class: ElasticsearchSink
name
kafka.api.key
kafka.api.secret
topics
input.data.format
connection.url
connection.username
connection.password
type.name
tasks.max

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows required and optional connector properties.

{
  "connector.class": "ElasticsearchSink",
  "name": "elasticsearch-connector",
  "kafka.api.key": "<my-kafka-api-key",
  "kafka.api.secret": "<my-kafka-api-secret",
  "topics":  "<topic1>, <topic2>"
  "input.data.format": "JSON",
  "connection.url": "<elasticsearch-URI>",
  "connection.user": "<elasticsearch-username>",
  "connection.password": "<elasticsearch-password>",
  "type.name": "<type-name>",
  "key.ignore": "true",
  "schema.ignore": "true",
  "tasks.max": "1"
}

Note the following property definitions:

  • "name": Sets a name for your new connector.
  • "connector.class": Identifies the connector plugin name.
  • "input.data.format": Sets the input message format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • "connection.url": Enter the connection URI. This is the Elasticsearch endpoint you can copy from your Elasticsearch deployment console. The URI you enter should look like this: https://ec5bfac80bc14c26a77eefb6585f196c.us-west-2.aws.found.io:9243.
  • "connection.user" and "connection.password" Enter the Elasticsearch deployment username and password. An example showing where these are on the Elastic deployment console is shown in the prerequisites.
  • "type.name": This is a name that Elasticsearch uses when indexing and to divide documents into logical groups. This can be anything you choose (for example, customer or item). For more information about this property and mapping in general, see Elasticsearch Mapping: The Basics, Updates & Examples.

The following are optional properties you can include in the configuration:

  • key.ignore: Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. When this is set to true, document IDs are created from the topic name, partition, and offset (i.e., topic+partition+offset). Defaults to false if not used.
  • schema.ignore: Whether to ignore schemas during indexing. When this property is set to true, the record schema is ignored and Elasticsearch infers the mapping from the data. For this to work, Elasticsearch dynamic mapping must be enabled. Note that this property must stay set to false (default) for JSON. Defaults to false if not used.
  • compact.map.entries: Defines how map entries with string keys in record values should be written to JSON. When this property is set to true, the entries are written compactly as `"entryKey": "entryValue". Otherwise, map entries with string keys are written as a nested document ({"key": "entryKey", "value": "entryValue"}). Defaults to false if not used.
  • behavior.on.null.values: How to handle records with a non-null key and a null value (i.e., Kafka tombstone records). Valid options are ignore, delete, and fail. Defaults to ignore if not used.
  • drop.invalid.message: Whether to drop a Kafka message when it cannot be converted to an output message. Defaults to false if not used.
  • batch.size: The number of records to process as a batch when writing to Elasticsearch. This value defaults to 2000 if not used.
  • linger.ms: Linger time in milliseconds for batching. Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the batch.size configuration. Normally this only occurs under load, when records arrive faster than they can be sent out. However, you may want to reduce the number of request under light load, to get the benefits from bulk indexing. In other words, when a pending batch is not full, rather than immediately sending it out the task waits up to the given delay. This allows other records to be added so that they can be batched into a single request. This value defaults to 1 ms if not used.
  • flush.timeout.ms The timeout in milliseconds to use for periodic flushing and waiting for buffer space to be made available by completed requests, as records are added. If this timeout is exceeded the task fails. This value defaults to 10000 ms.
  • connection.compression: Whether to use Gzip compression on the HTTP connection to ElasticSearch. To make this setting work the http.compression setting must be set to true on the Elasticsearch nodes. For more information about the Elasticsearch HTTP properties, see Elasticsearch HTTP Settings. Defaults to false if not used.
  • auto.create.indices.at.start: Automatically create the Elasticsearch indices at startup. This is useful when indices are directly mapped from the Kafka topics. Defaults to true if not used.

Configuration properties that are not listed use the default values. For default values and property definitions, see Elasticsearch Service Sink Connector Configuration Properties.

Step 4: Load the configuration file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config elasticsearch-sink-config.json

Example output:

Created connector elasticsearch-connector lcc-ix4dl

Step 5: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID          |       Name                 | Status  | Type
+-----------+----------------------------+---------+------+
lcc-ix4dl   | elasticsearch-connector    | RUNNING | sink

Step 6: Check the results in Elasticsearch.

Verify that new records are being added to the Elasticsearch deployment.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Confluent Cloud Dead Letter Queue for details.

For additional information about this connector, see Elasticsearch Service Sink Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.

Suggested Reading

The following blog post shows how to use the Confluent Cloud Elasticsearch sink connector to get user contact information from a Kafka topic into an Elasticsearch index for use by multiple business systems.

Blog post: Announcing the Elasticsearch Service Sink Connector for Apache Kafka in Confluent Cloud

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL example. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../../_images/topology.png