Azure Cosmos DB Sink Connector for Confluent Cloud

The Microsoft Azure Cosmos Sink Connector for Confluent Cloud reads from and writes data to a Microsoft Azure Cosmos database. The connector polls data from Kafka and writes to database containers.

Important

If you are still on Confluent Cloud Enterprise, please contact your Confluent Account Executive for more information about using this connector.

Features

The Azure Cosmos Sink connector supports the following features:

  • Topic mapping: Maps the Kafka Topic to the Azure Cosmos DB container.
  • Multiple key strategies:
    • FullKeyStrategy: The ID generated is the Kafka record key. This is the default option.
    • KafkaMetadataStrategy: The ID generated is a concatenation of the Kafka topic, partition, and offset. For example: ${topic}-${partition}-${offset}.
    • ProvidedInKeyStrategy: The ID generated is the id field found in the key object.
    • ProvidedInValueStrategy: The ID generated is the id field found in the value object. Every record must have (lower case) id field. This is an Azure Cosmos DB requirement. See the lower case id prerequisite.

The following shows an example of each strategy and the resulting id in Azure Cosmos.

ID Strategies

ID strategies

See Configuration Properties for configuration property values and descriptions.

See Cloud connector limitations for additional information.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Azure Cosmos DB Sink connector. The quick start provides the basics of selecting the connector and configuring it to stream Kafka events to an Azure Cosmos DB container.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Azure.

  • The Confluent Cloud CLI installed and configured for the cluster. See Install and Configure the Confluent Cloud CLI.

  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  • At least one source Kafka topic must exist in your Confluent Cloud cluster before creating the sink connector.

  • The Azure Cosmos DB and the Kafka cluster must be in the same region.

  • The Azure Cosmos DB requires an id field in every record. See ID strategies for an example of how each of these works. The following strategies are provided to generate the ID:

    • FullKeyStrategy: The ID generated is the Kafka record key. This is the default option.

    • KafkaMetadataStrategy: The ID generated is a concatenation of the Kafka topic, partition, and offset. For example: ${topic}-${partition}-${offset}.

    • ProvidedInKeyStrategy: The ID generated is the id field found in the key object.

    • ProvidedInValueStrategy: The ID generated is the id field found in the value object. If you select this ID strategy, you must create a new field named id. You can also use the following ksqlDB statement. The example below uses a topic named orders.

      CREATE STREAM ORDERS_STREAM WITH (
         KAFKA_TOPIC = 'orders',
         VALUE_FORMAT = 'AVRO'
         );
      CREATE STREAM ORDER_AUGMENTED AS
         SELECT
            ORDERID AS `id`,
              ORDERTIME,
              ITEMID,
              ORDERUNITS,
              ADDRESS
         FROM  ORDERS_STREAM;
      

Note

  • The connector supports Upsert based on id.
  • The connector does not support Delete for tombstone records.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

Click Connectors. If you already have connectors in your cluster, click Add connector.

Step 3: Select your connector.

Click the Azure Cosmos DB sink connector icon.

Azure Cosmos Sink Connector Icon

Step 4: Set up the connection.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  1. Select one or more topics.

  2. Enter a connector Name.

  3. Select an Input message format (data coming from the Kafka topic): AVRO, PROTOBUF, JSON_SR (JSON Schema), or JSON (schemaless). A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  4. Enter your Kafka Cluster credentials. The credentials are either the cluster API key and secret or the service account API key and secret.

  5. Enter your Cosmos DB connection details:

    • Cosmos Endpoint: A URI using the form https://ccloud-cosmos-db-1.documents.azure.com:443/.
    • Cosmos Connection Key: The Azure Cosmos master key.
    • Cosmos Database Name: The name of your Cosmos database.
    • Topic-Container Map: A comma-delimited list of Kafka topics mapped to Cosmos DB containers. The mapping between Kafka topics and Azure Cosmos DB containers. For example: topic#container1,topic2#container2.
  6. Select an ID Strategy:

    • FullKeyStrategy: The ID generated is the Kafka record key.
    • KafkaMetadataStrategy: The ID generated is a concatenation of the Kafka topic, partition, and offset. For example: ${topic}-${partition}-${offset}.
    • ProvidedInKeyStrategy: The ID generated is the id field found in the key object.
    • ProvidedInValueStrategy: The ID generated is the id field found in the value object. Every record must have (lower case) id field. This is an Azure Cosmos DB requirement. See Lower case id prerequisite.

    See ID strategies for an example of how each of these works.

  7. Enter the number of tasks to use with the connector.

Note

See Configuration Properties for full configuration property descriptions.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running.

Step 7: Check for records.

Verify that records are being produced in your Azure Cosmos database.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Using the Confluent Cloud CLI

Complete the following steps to set up and run the connector using the Confluent Cloud CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe CosmosDbSink

Example output:

Following are the required configs:
connector.class: CosmosDbSink
name
input.data.format
kafka.api.key
kafka.api.secret
connect.cosmos.connection.endpoint
connect.cosmos.master.key
connect.cosmos.databasename
connect.cosmos.containers.topicmap
tasks.max
topics

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "name": "CosmosDbSinkConnector_0",
  "config": {
    "connector.class": "CosmosDbSink",
    "name": "CosmosDbSinkConnector_0",
    "input.data.format": "AVRO",
    "kafka.api.key": "****************",
    "kafka.api.secret": "**********************************************",
    "topics": "pageviews",
    "connect.cosmos.connection.endpoint": "https://myaccount.documents.azure.com:443/",
    "connect.cosmos.master.key": "****************************************",
    "connect.cosmos.databasename": "myDBname",
    "connect.cosmos.containers.topicmap": "pageviews#Container2",
    "cosmos.id.strategy": "FullKeyStrategy",
    "tasks.max": "1"
  }
}

Note

See Configuration Properties for full configuration property descriptions.

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.

  • "input.data.format": Sets the input message format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  • "name": Sets a name for your new connector.

  • "kafka.api.key" and ""kafka.api.secret": These credentials are either the cluster API key and secret or the service account API key and secret.

  • "connect.cosmos.connection.endpoint": A URI with the form https://ccloud-cosmos-db-1.documents.azure.com:443/.

  • "connect.cosmos.master.key": The Azure Cosmos master key.

  • "connect.cosmos.databasename": The name of your Cosmos DB.

  • "connect.cosmos.containers.topicmap": A comma-delimited list of Kafka topics mapped to Cosmos DB containers. The mapping between Kafka topics and Azure Cosmos DB containers. For example: topic#container1,topic2#container2.

  • (Optional) "cosmos.id.strategy": Defaults to FullKeyStrategy. Enter one of the following strategies:

    • FullKeyStrategy: The ID generated is the Kafka record key.
    • KafkaMetadataStrategy: The ID generated is a concatenation of the Kafka topic, partition, and offset. For example: ${topic}-${partition}-${offset}.
    • ProvidedInKeyStrategy: The ID generated is the id field found in the key object. Every record must have (lower case) id field. This is an Azure Cosmos DB requirement. See Lower case id prerequisite.
    • ProvidedInValueStrategy: The ID generated is the id field found in the value object. Every record must have (lower case) id field. This is an Azure Cosmos DB requirement. See Lower case id prerequisite.

    See ID strategies for an example of how each of these works.

  • "tasks": The number of tasks to use with the connector. For Confluent Cloud and Confluent Cloud Enterprise, organizations are limited to one task and one connector. Use of this connector is free for a limited time.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config azure-cosmos-sink-config.json

Example output:

Created connector CosmosDbSinkConnector_0 lcc-do6vzd

Step 4: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID           |             Name              | Status  | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd   | CosmosDbSinkConnector_0       | RUNNING | sink |       |

Step 5: Check for records.

..Verify that records are populating the endpoint.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Configuration Properties

The unique properties used for this connector are defined below.

connect.cosmos.connection.endpoint

Azure Cosmos endpoint URI string.

  • Type: string
  • Importance: high
connect.cosmos.master.key

The Azure Cosmos DB primary key that the sink connects with.

  • Type: string
  • Importance: high
connect.cosmos.databasename

The name of the Azure Cosmos database the sink writes to.

  • Type: string
  • Importance: high
connect.cosmos.containers.topicmap

Mapping between Kafka topics and Azure Cosmos DB containers. Entered as a comma-delimited list. For example: topic#container,topic2#container2.

  • Type: string
  • Importance: high

cosmos.id.strategy The strategy for generating a record ID. Entered as FullKeyStrategy, KafkaMetadataStrategy, ProvidedInKeyStrategy, ProvidedInValueStrategy.

  • Type: string
  • Importance: high
  • Default: FullKeyStrategy
  • FullKeyStrategy: (Default) The ID generated is the Kafka record key.
  • KafkaMetadataStrategy: The ID generated is a concatenation of the Kafka topic, partition, and offset. For example: ${topic}-${partition}-${offset}.
  • ProvidedInKeyStrategy: The ID generated is the id field found in the key object. Every record must have (lower case) id field. This is an Azure Cosmos DB requirement. See Lower case id prerequisite.
  • ProvidedInValueStrategy: The ID generated is the id field found in the value object. Every record must have (lower case) id field. This is an Azure Cosmos DB requirement. See Lower case id prerequisite.

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png