Azure Cognitive Search Sink Connector for Confluent Platform

The Kafka Connect Azure Cognitive Search Sink connector allows moving data from Apache Kafka® to Azure Cognitive Search. It writes each event from a topic in Kafka as document to an index in Azure Cognitive Search.

The connector leverages Azure Cognitive Search’s REST API to send records as documents.

Features

The Azure Cognitive Search connector offers the following features:

  • Ordered Writes: The connector writes records in exactly the same order that it receives them. And for uniqueness, the Kafka coordinates (topic, partition, and offset) can be used as the document key. Otherwise, the connector can use the record key as the document key.
  • Reporting: The connector writes the HTTP responses from Azure Cognitive Search to success and error topics for each individual record.
  • Automatic Retries: Occasionally, there may be issues writing to the Azure Cognitive Search service and the connector will retry all retry-able requests. The maximum amount of time that the connector spends retrying can be specified by the max.retry.ms config.

Prerequisites

The following are required to run the Kafka Connect Azure Cognitive Search Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above
  • Connect: Confluent Platform 4.1.0 or above
  • Java 1.8

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-azure-search:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-azure-search:1.0.0-preview

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent license properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Azure Cognitive Search Sink Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Quick Start

This quick start uses the Azure Cognitive Search Sink Connector to consume records and write them as documents to an Azure Cognitive Search service.

Prerequisites
  1. Before starting the connector, create and deploy an Azure Cognitive Search service.

    Note

    Make sure the index has the default name hotels-sample-index and only has the fields HotelId, HotelName, Description. All others should be deleted.

  2. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent-hub install confluentinc/kafka-connect-azure-seach:latest
    
  3. Start Confluent Platform using the Confluent CLI commands.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services start
    
  4. Produce test data to the hotels-sample topic in Kafka.

    Start the Avro console producer to import a few records to Kafka:

    <path-to-confluent>/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic hotels-sample \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"HotelName","type":"string"},{"name":"Description","type":"string"}]}' \
    --property key.schema='{"type":"string"}' \
    --property "parse.key=true" \
    --property "key.separator=,"
    

    Then in the console producer, enter:

    "marriotId",{"HotelName": "Marriot", "Description": "Marriot description"}
    "holidayinnId",{"HotelName": "HolidayInn", "Description": "HolidayInn description"}
    "motel8Id",{"HotelName": "Motel8", "Description": "motel8 description"}
    

    The three records entered are published to the Kafka topic hotels-sample in Avro format.

  5. Create a azure-search.json file with the following contents:

    {
      "name": "azure-search",
      "config": {
        "topics": "hotels-sample",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.azure.search.AzureSearchSinkConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "azure.search.service.name": "<the created Search service name>",
        "azure.search.api.key": "<the copied api key>",
        "index.name": "${topic}-index",
        "reporter.bootstrap.servers": "localhost:9092",
        "reporter.error.topic.name": "test-error",
        "reporter.error.topic.replication.factor": 1,
        "reporter.error.topic.key.format": "string",
        "reporter.error.topic.value.format": "string",
        "reporter.result.topic.name": "test-result",
        "reporter.result.topic.key.format": "string",
        "reporter.result.topic.value.format": "string",
        "reporter.result.topic.replication.factor": 1
      }
    }
    

    Caution

    Do not forget to change the azure.search.service.name and azure.search.api.key values in the JSON file to the copied name and admin key respectively. Find your azure.search.service.name.

    Note

    For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

  6. Load the Azure Cognitive Search Sink Connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load azure-search --config path/to/azure-search.json
    

    Important

    Don’t use the Confluent CLI commands in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status azure-search
    
  8. Confirm that the messages were delivered to the result topic in Kafka

    confluent local services kafka consume test-result --from-beginning
    
  9. Confirm that the messages were delivered to Azure Cognitive Search. Login to the service and check that the index hotel-samples-index contains the three written records from before.

  10. Cleanup resources

    • Delete the connector

      confluent local services connect connector unload azure-search
      
    • Stop Confluent Platform

      confluent local stop
      
    • Delete the created Azure Cognitive Search service and its resource group in the Azure portal.

Additional Documentation