Azure Cognitive Search Sink Connector for Confluent Platform

The Kafka Connect Azure Cognitive Search Sink connector allows moving data from Apache Kafka® to Azure Cognitive Search. It writes each event from a topic in Kafka as document to an index in Azure Cognitive Search.

The connector leverages Azure Cognitive Search’s REST API to send records as documents.

Features

The Azure Cognitive Search Sink connector includes the following features:

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Azure Cognitive Search Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Ordered writes

The connector writes records in exactly the same order that it receives them. And for uniqueness, the Kafka coordinates (topic, partition, and offset) can be used as the document key. Otherwise, the connector can use the record key as the document key.

Reporting

The connector writes the HTTP responses from Azure Cognitive Search to success and error topics for each individual record.

Automatic retries

Occasionally, there may be issues writing to the Azure Cognitive Search service and the connector will retry all retry-able requests. The maximum amount of time that the connector spends retrying can be specified by the max.retry.ms config.

Limitations

The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:

  • io.debezium.transforms.ByLogicalTableRouter
  • io.debezium.transforms.outbox.EventRouter
  • org.apache.kafka.connect.transforms.RegexRouter
  • org.apache.kafka.connect.transforms.TimestampRouter
  • io.confluent.connect.transforms.MessageTimestampRouter
  • io.confluent.connect.transforms.ExtractTopic$Key
  • io.confluent.connect.transforms.ExtractTopic$Value

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent license topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Azure Cognitive Search Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the Azure Cognitive Search Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later.

  • Connect: Confluent Platform 4.1.0 or later.

  • Java 1.8.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-azure-search:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-azure-search:1.1.4
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick Start

This quick start uses the Azure Cognitive Search Sink connector to consume records and write them as documents to an Azure Cognitive Search service.

Prerequisites
  1. Before starting the connector, create and deploy an Azure Cognitive Search service.

    Note

    Ensure the index has the default name hotels-sample-index and only has the fields HotelId, HotelName, Description. All others should be deleted.

  2. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent connect plugin install confluentinc/kafka-connect-azure-seach:latest
    
  3. Start Confluent Platform using the Confluent CLI commands.

    confluent local services start
    
  4. Produce test data to the hotels-sample topic in Kafka.

    Start the Avro console producer to import a few records to Kafka:

    ${CONFLUENT_HOME}/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic hotels-sample \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"HotelName","type":"string"},{"name":"Description","type":"string"}]}' \
    --property key.schema='{"type":"string"}' \
    --property "parse.key=true" \
    --property "key.separator=,"
    

    Then in the console producer, enter:

    "marriotId",{"HotelName": "Marriot", "Description": "Marriot description"}
    "holidayinnId",{"HotelName": "HolidayInn", "Description": "HolidayInn description"}
    "motel8Id",{"HotelName": "Motel8", "Description": "motel8 description"}
    

    The three records entered are published to the Kafka topic hotels-sample in Avro format.

  5. Create a azure-search.json file with the following contents:

    {
      "name": "azure-search",
      "config": {
        "topics": "hotels-sample",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.azure.search.AzureSearchSinkConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "azure.search.service.name": "<the created Search service name>",
        "azure.search.api.key": "<the copied api key>",
        "index.name": "${topic}-index",
        "reporter.bootstrap.servers": "localhost:9092",
        "reporter.error.topic.name": "test-error",
        "reporter.error.topic.replication.factor": 1,
        "reporter.error.topic.key.format": "string",
        "reporter.error.topic.value.format": "string",
        "reporter.result.topic.name": "test-result",
        "reporter.result.topic.key.format": "string",
        "reporter.result.topic.value.format": "string",
        "reporter.result.topic.replication.factor": 1
      }
    }
    

    Caution

    Do not forget to change the azure.search.service.name and azure.search.api.key values in the JSON file to the copied name and admin key respectively. Find your azure.search.service.name.

    Note

    For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

  6. Load the Azure Cognitive Search Sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load azure-search --config path/to/azure-search.json
    

    Important

    Don’t use the Confluent CLI commands in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status azure-search
    
  8. Confirm that the messages were delivered to the result topic in Kafka

    confluent local services kafka consume test-result --from-beginning
    
  9. Confirm that the messages were delivered to Azure Cognitive Search.

  10. Log in to the service and check that the index hotel-samples-index contains the three written records from before.

  11. Clean up resources:

    1. Delete the connector

      confluent local services connect connector unload azure-search
      
    2. Stop Confluent Platform

      confluent local stop
      
    3. Delete the created Azure Cognitive Search service and its resource group in the Azure portal.