Azure Search Sink Connector for Confluent Platform

The Kafka Connect Azure Search Sink connector allows moving data from Apache Kafka® to Azure Search. It writes each event from a topic in Kafka as document to an index in Azure Search.

The connector leverages Azure Search’s REST API to send records as documents.

Features

The Azure Search connector offers the following features:

  • Ordered Writes: The connector writes records in exactly the same order that it receives them. And for uniqueness, the Kafka coordinates (topic, partition, and offset) can be used as the document key. Otherwise, the connector can use the record key as the document key.
  • Reporting: The connector writes the HTTP responses from Azure Search to success and error topics for each individual record.
  • Automatic Retries: Occasionally, there may be issues writing to the Azure Search service and the connector will retry all retry-able requests. The maximum amount of time that the connector spends retrying can be specified by the max.retry.ms config.

Prerequisites

The following are required to run the Kafka Connect Azure Search Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above
  • Connect: Confluent Platform 4.1.0 or above
  • Java 1.8

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-azure-search:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-azure-search:1.0.0-preview

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Quick Start

This quick start uses the Azure Search Sink Connector to consume records and write them as documents to an Azure Search service.

Prerequisites
  1. Before starting the connector, create and deploy an Azure Search service.

    • Navigate to the Microsoft Azure Portal.
    • Create a Search service following this Azure Search quickstart guide.
    • Create an index in the service following this index quickstart guide.
    • Copy the admin key and the Search service name from the portal and save it for later. Azure Search should now be set up for the connector.

    Note

    Make sure the index has the default name hotels-sample-index and only has the fields HotelId, HotelName, Description. All others should be deleted.

  2. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent-hub install confluentinc/kafka-connect-azure-seach:latest
    
  3. Start Confluent Platform using the Confluent CLI commands.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local start. For more information, see confluent local.

    confluent local start
    
  4. Produce test data to the hotels-sample topic in Kafka.

    Start the Avro console producer to import a few records to Kafka:

    <path-to-confluent>/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic hotels-sample \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"HotelName","type":"string"},{"name":"Description","type":"string"}]}' \
    --property key.schema='{"type":"string"}' \
    --property "parse.key=true" \
    --property "key.separator=,"
    

    Then in the console producer, enter:

    "marriotId",{"HotelName": "Marriot", "Description": "Marriot description"}
    "holidayinnId",{"HotelName": "HolidayInn", "Description": "HolidayInn description"}
    "motel8Id",{"HotelName": "Motel8", "Description": "motel8 description"}
    

    The three records entered are published to the Kafka topic hotels-sample in Avro format.

  5. Create a azure-search.json file with the following contents:

    {
      "name": "azure-search",
      "config": {
        "topics": "hotels-sample",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.azure.search.AzureSearchSinkConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "azure.search.service.name": "<the created Search service name>",
        "azure.search.api.key": "<the copied api key>",
        "index.name": "${topic}-index",
        "reporter.bootstrap.servers": "localhost:9092",
        "reporter.error.topic.name": "test-error",
        "reporter.error.topic.replication.factor": 1,
        "reporter.error.topic.key.format": "string",
        "reporter.error.topic.value.format": "string",
        "reporter.result.topic.name": "test-result",
        "reporter.result.topic.key.format": "string",
        "reporter.result.topic.value.format": "string",
        "reporter.result.topic.replication.factor": 1
      }
    }
    

    Caution

    Do not forget to change the azure.search.service.name and azure.search.api.key values in the JSON file to the copied name and admin key respectively. Find your azure.search.service.name.

  6. Load the Azure Search Sink Connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local load azure-search -- -d path/to/azure-search.json
    

    Important

    Don’t use the Confluent CLI commands in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent local status azure-search
    
  8. Confirm that the messages were delivered to the result topic in Kafka

    confluent local consume test-result -- --from-beginning
    
  9. Confirm that the messages were delivered to Azure Search. Login to the service and check that the index hotel-samples-index contains the three written records from before.

  10. Cleanup resources

    • Delete the connector

      confluent local unload azure-search
      
    • Stop Confluent Platform

      confluent local stop
      
    • Delete the created Azure Search service and its resource group in the Azure portal.

Additional Documentation