Azure Synapse Analytics Sink Connector for Confluent Platform

The Azure Synapse Analytics Sink connector allows you to export data from Apache Kafka® topics to Azure Synapse Analytics. The connector polls data from Kafka to write to the data warehouse based on the topics subscription. Auto-creation of tables and limited auto-evolution are also supported. This connector is compatible with Azure Synapse Analytics SQL pool

Features

The Azure Synapse Analytics Sink connector for Confluent Platform includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Azure Synapse Analytics Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. Multiple tasks may improve performance when moving a large amount of data.

Data mapping

The sink connector requires knowledge of schemas, so you should use a suitable converter. For example, the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled. Kafka record keys if present can be primitive types or a Connect struct, and the record value must be a Connect struct. Fields selected from Connect structs must be primitive types. You may need to implement a custom Converter if the data in the topic is not in a compatible format.

Auto-creation and auto-evolution

If auto.create is enabled, the connector can create the destination table if it is found to be missing. The connector uses the record schema as a basis for the table definition, so the creation takes place online with records consumed from the topic.

If auto.evolve is enabled, the connector can perform limited auto-evolution by issuing alter on the destination table when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.

Important

For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value.

Limitations

  • This connector can only insert data into Azure Synapse Analytics. Azure Synapse Analytics does not support primary keys, and because updates, upserts, and deletes are all performed on the primary keys, these queries are not supported for this connector.
  • When auto.evolve is enabled, if a new column with a default value is added, that default value is only used for new records. Existing records will have “null” as the value for the new column.

Install the Azure Synapse Analytics Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.

  • Java 1.8.

  • At minimum, the INSERT permission is required for this connector. For more details, see Permissions: GRANT, DENY, REVOKE (Azure Synapse Analytics, Parallel Data Warehouse).

  • If auto.create=true, the CREATE TABLE and CREATE SCHEMA permissions are required. Additionally, if auto.evolve=true, the ALTER ANY SCHEMA permissions are required.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-azure-sql-dw:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-azure-sql-dw:1.0.3
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Azure Synapse Analytics Sink Connector for Confluent Platform.

Quick Start

In this quick start, the Azure Synapse Analytics Sink connector is used to export data produced by the Avro console producer to an Azure Synapse Analytics instance.

Azure Prerequisites
Confluent Prerequisites

Tip

Though this quick start requires the Azure CLI for creating the resources and the mssql-cli for querying the data from the resources, both of these can also be managed through the Azure Portal: see Create and query Azure Synapse Analytics.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Create an Azure Synapse Analytics instance

For this section, use the Azure CLI to create the necessary resources for this Quick Start.

  1. Login with your Azure account.

    az login
    

    The above command will open your default browser and load a sign-in page where you can login into your Azure account.

  2. Create a resource group.

    az group create \
        --name quickstartResourceGroup \
        --location eastus2
    
  3. Create a SQL server instance.

    Choose a unique SQL Server name, username and password and supply them for the name, admin-user, and admin-password arguments.

    az sql server create \
        --name <your-sql-server-name> \
        --resource-group quickstartResourceGroup \
        --location eastus2  \
        --admin-user <your-username> \
        --admin-password <your-password>
    
  4. Enable a server-level firewall rule.

    Pass your IP address for the start-ip-address and end-ip-address argument to enable connectivity to the server.

    az sql server firewall-rule create \
        --name quickstartFirewallRule \
        --resource-group quickstartResourceGroup \
        --server <your-sql-server-name> \
        --start-ip-address <your-ip-address> --end-ip-address <your-ip-address>
    
  5. Create a Synapse Analytics instance.

    az sql dw create \
        --name quickstartDataWarehouse \
        --resource-group quickstartResourceGroup \
        --server <your-sql-server-name>
    

    This can take a couple of minutes.

Load the connector

  1. Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services connect stop && confluent local services connect start
    
  2. Create an azure-sql-dw-quickstart.properties file and add the following properties.

    Make sure to substitute your SQL server name, username, and password in the azure.sql.dw.url, azure.sql.dw.user and azure.sql.dw.password arguments respectively.

    name=AzureSqlDwSinkConnector
    topics=products
    tasks.max=1
    connector.class=io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector
    
    azure.sql.dw.url=jdbc:sqlserver://<your-sql-server-name>.database.windows.net:1433;
    azure.sql.dw.user=<your-username>
    azure.sql.dw.password=<your-password>
    azure.sql.dw.database.name=quickstartDataWarehouse
    auto.create=true
    auto.evolve=true
    table.name.format=kafka_${topic}
    
    # The following configs define the Confluent license stored in Kafka, so you need the Kafka bootstrap addresses.
    # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster,
    # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations.
    confluent.license=
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    
  3. Start the Azure Synapse Analytics Sink connector by loading the connector’s configuration with the following command:

    confluent local services connect connector load azure-sql-dw --config azure-sql-dw-quickstart.properties
    
  4. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status azure-sql-dw
    

Send data to Kafka

  1. To produce some records into the products topic, first start a Kafka producer.

    kafka-avro-console-producer \
    --broker-list localhost:9092 --topic products \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"price","type":"float"}, {"name":"quantity","type":"int"}]}'
    
  2. The console producer is now waiting for input, so you can go ahead and insert some records into the topic.

    {"name": "scissors", "price": 2.75, "quantity": 3}
    {"name": "tape", "price": 0.99, "quantity": 10}
    {"name": "notebooks", "price": 1.99, "quantity": 5}
    

Check Azure Synapse Analytics for data

For this section, use the mssql-cli to query the data in the data warehouse and validate that the connector was able to export the data from Kafka to the data warehouse.

  1. Connect to the remote data warehouse, supplying the SQL Server name, username, and password as arguments.

    mssql-cli -S <your-sql-server-name>.database.windows.net -U <your-username> -P <your-password> -d quickstartDataWarehouse
    
  2. Now, query the kafka_products table to see its contents.

    select * from kafka_products;
    

    Your output should resemble the one below (the rows and columns may possibly be in a different order):

    +------------+---------+-----------+
    | quantity   | price   | name      |
    |------------+---------+-----------|
    | 10         | 0.99    | tape      |
    | 3          | 2.75    | scissors  |
    | 5          | 1.99    | notebooks |
    +------------+---------+-----------+
    

Delete unnecessary resources

Once you’ve finished the quick start, go ahead and delete the resources created here to avoid incurring additional charges.

az group delete --name quickstartResourceGroup

This command will delete the resource group and all the resources within it.