Azure Cognitive Search Sink Connector for Confluent Platform¶
The Kafka Connect Azure Cognitive Search Sink connector allows moving data from Apache Kafka® to Azure Cognitive Search. It writes each event from a topic in Kafka as document to an index in Azure Cognitive Search.
The connector leverages Azure Cognitive Search’s REST API to send records as documents.
Features¶
The Azure Cognitive Search Sink connector includes the following features:
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Azure Cognitive Search Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Ordered writes¶
The connector writes records in exactly the same order that it receives them. And for uniqueness, the Kafka coordinates (topic, partition, and offset) can be used as the document key. Otherwise, the connector can use the record key as the document key.
Reporting¶
The connector writes the HTTP responses from Azure Cognitive Search to success and error topics for each individual record.
Limitations¶
The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:
io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.outbox.EventRouter
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and Confluent license topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Azure Cognitive Search Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the Azure Cognitive Search Sink Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later.
Connect: Confluent Platform 4.1.0 or later.
Java 1.8.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-azure-search:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-azure-search:1.1.4
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
This quick start uses the Azure Cognitive Search Sink connector to consume records and write them as documents to an Azure Cognitive Search service.
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Before starting the connector, create and deploy an Azure Cognitive Search service.
- Navigate to the Microsoft Azure Portal.
- Create a Search service following this Azure Cognitive Search quick start guide.
- Create an index in the service following this index quick start guide.
- Copy the admin key and the Search service name from the portal and save it for later. Azure Cognitive Search should now be set up for the connector.
Note
Ensure the index has the default name
hotels-sample-index
and only has the fieldsHotelId
,HotelName
,Description
. All others should be deleted.Install the connector through the Confluent Hub Client.
# run from your CP installation directory confluent connect plugin install confluentinc/kafka-connect-azure-seach:latest
Start Confluent Platform using the Confluent CLI commands.
confluent local services start
Produce test data to the
hotels-sample
topic in Kafka.Start the Avro console producer to import a few records to Kafka:
${CONFLUENT_HOME}/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic hotels-sample \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"HotelName","type":"string"},{"name":"Description","type":"string"}]}' \ --property key.schema='{"type":"string"}' \ --property "parse.key=true" \ --property "key.separator=,"
Then in the console producer, enter:
"marriotId",{"HotelName": "Marriot", "Description": "Marriot description"} "holidayinnId",{"HotelName": "HolidayInn", "Description": "HolidayInn description"} "motel8Id",{"HotelName": "Motel8", "Description": "motel8 description"}
The three records entered are published to the Kafka topic
hotels-sample
in Avro format.Create a
azure-search.json
file with the following contents:{ "name": "azure-search", "config": { "topics": "hotels-sample", "tasks.max": "1", "connector.class": "io.confluent.connect.azure.search.AzureSearchSinkConnector", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "azure.search.service.name": "<the created Search service name>", "azure.search.api.key": "<the copied api key>", "index.name": "${topic}-index", "reporter.bootstrap.servers": "localhost:9092", "reporter.error.topic.name": "test-error", "reporter.error.topic.replication.factor": 1, "reporter.error.topic.key.format": "string", "reporter.error.topic.value.format": "string", "reporter.result.topic.name": "test-result", "reporter.result.topic.key.format": "string", "reporter.result.topic.value.format": "string", "reporter.result.topic.replication.factor": 1 } }
Caution
Do not forget to change the
azure.search.service.name
andazure.search.api.key
values in the JSON file to the copied name and admin key respectively. Find your azure.search.service.name.Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
Load the Azure Cognitive Search Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load azure-search --config path/to/azure-search.json
Important
Don’t use the Confluent CLI commands in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status azure-search
Confirm that the messages were delivered to the result topic in Kafka
confluent local services kafka consume test-result --from-beginning
Confirm that the messages were delivered to Azure Cognitive Search.
Log in to the service and check that the index
hotel-samples-index
contains the three written records from before.Clean up resources:
Delete the connector
confluent local services connect connector unload azure-search
Stop Confluent Platform
confluent local stop
Delete the created Azure Cognitive Search service and its resource group in the Azure portal.