Azure SQL Data Warehouse Sink Connector for Confluent Platform

The Azure SQL Data Warehouse sink connector allows you to export data from Apache Kafka® topics to an Azure SQL Data Warehouse. The connector polls data from Kafka to write to the data warehouse based on the topics subscription. Auto-creation of tables and limited auto-evolution are also supported. This connector is compatible with Azure Synapse SQL pool

Prerequisites

The following are required to run the Kafka Connect Azure SQL Data Warehouse Sink Connector:

Limitations

  • This connector can only insert data into an Azure SQL Data Warehouse. Azure SQL Data Warehouse does not support primary keys, and because updates, upserts, and deletes are all performed on the primary keys, these queries are not supported for this connector.
  • When auto.evolve is enabled, if a new column with a default value is added, that default value is only used for new records. Existing records will have “null” as the value for the new column.

Features

Data mapping

The sink connector requires knowledge of schemas, so you should use a suitable converter. For example, the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled. Kafka record keys if present can be primitive types or a Connect struct, and the record value must be a Connect struct. Fields selected from Connect structs must be primitive types. You may need to implement a custom Converter if the data in the topic is not in a compatible format.

Auto-creation and Auto-evolution

If auto.create is enabled, the connector can create the destination table if it is found to be missing. The connector uses the record schema as a basis for the table definition, so the creation takes place online with records consumed from the topic.

If auto.evolve is enabled, the connector can perform limited auto-evolution by issuing alter on the destination table when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.

Important

For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value.

Install the Azure SQL Data Warehouse Sink Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-azure-sql-dw:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-azure-sql-dw:1.0.0-preview

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Azure SQL Data Warehouse Sink Connector Configuration Properties.

Quick Start

In this quick start, the Azure SQL Data Warehouse sink connector is used to export data produced by the Avro console producer to an Azure SQL Data Warehouse instance.

Azure Prerequisites
Confluent Prerequisites

Tip

Though this quick start requires the Azure CLI for creating the resources and the mssql-cli for querying the data from the resources, both of these can also be managed through the Azure Portal: see Create and query an Azure SQL Data Warehouse.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.

Create an Azure SQL Data Warehouse instance

For this section, use the Azure CLI to create the necessary resources for this Quick Start.

  1. Login with your Azure account.

    az login
    

    The above command will open your default browser and load a sign-in page where you can login into your Azure account.

  2. Create a resource group.

    az group create \
        --name quickstartResourceGroup \
        --location eastus2
    
  3. Create a SQL server instance.

    Choose a unique SQL Server name, username and password and supply them for the name, admin-user, and admin-password arguments.

    az sql server create \
        --name <your-sql-server-name> \
        --resource-group quickstartResourceGroup \
        --location eastus2  \
        --admin-user <your-username> \
        --admin-password <your-password>
    
  4. Enable a server-level firewall rule.

    Pass your IP address for the start-ip-address and end-ip-address argument to enable connectivity to the server.

    az sql server firewall-rule create \
        --name quickstartFirewallRule \
        --resource-group quickstartResourceGroup \
        --server <your-sql-server-name> \
        --start-ip-address <your-ip-address> --end-ip-address <your-ip-address>
    
  5. Create a SQL Data Warehouse instance.

    az sql dw create \
        --name quickstartDataWarehouse \
        --resource-group quickstartResourceGroup \
        --server <your-sql-server-name>
    

    This can take a couple of minutes.

Load the Connector

  1. Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services connect stop && confluent local services connect start
    
  2. Create an azure-sql-dw-quickstart.properties file and add the following properties.

    Make sure to substitute your SQL server name, username, and password in the azure.sql.dw.url, azure.sql.dw.user and azure.sql.dw.password arguments respectively.

    name=AzureSqlDwSinkConnector
    topics=products
    tasks.max=1
    connector.class=io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector
    
    azure.sql.dw.url=jdbc:sqlserver://<your-sql-server-name>.database.windows.net:1433;
    azure.sql.dw.user=<your-username>
    azure.sql.dw.password=<your-password>
    azure.sql.dw.database.name=quickstartDataWarehouse
    auto.create=true
    auto.evolve=true
    table.name.format=kafka_${topic}
    
    # The following configs define the Confluent license stored in Kafka, so you need the Kafka bootstrap addresses.
    # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster,
    # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations.
    confluent.license=
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    
  3. Start the Azure SQL Data Warehouse sink connector by loading the connector’s configuration with the following command:

    confluent local services connect connector load azure-sql-dw --config azure-sql-dw-quickstart.properties
    
  4. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status azure-sql-dw
    

Send Data to Kafka

  1. To produce some records into the products topic, first start a Kafka producer.

    kafka-avro-console-producer \
    --broker-list localhost:9092 --topic products \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"price","type":"float"}, {"name":"quantity","type":"int"}]}'
    
  2. The console producer is now waiting for input, so you can go ahead and insert some records into the topic.

    {"name": "scissors", "price": 2.75, "quantity": 3}
    {"name": "tape", "price": 0.99, "quantity": 10}
    {"name": "notebooks", "price": 1.99, "quantity": 5}
    

Check Azure SQL Data Warehouse for Data

For this section, use the mssql-cli to query the data in the data warehouse and validate that the connector was able to export the data from Kafka to the data warehouse.

  1. Connect to the remote data warehouse, supplying the SQL Server name, username, and password as arguments.

    mssql-cli -S <your-sql-server-name>.database.windows.net -U <your-username> -P <your-password> -d quickstartDataWarehouse
    
  2. Now, query the kafka_products table to see its contents.

    select * from kafka_products;
    

    Your output should resemble the one below (the rows and columns may possibly be in a different order):

    +------------+---------+-----------+
    | quantity   | price   | name      |
    |------------+---------+-----------|
    | 10         | 0.99    | tape      |
    | 3          | 2.75    | scissors  |
    | 5          | 1.99    | notebooks |
    +------------+---------+-----------+
    

Deleting Unnecessary Resources

Once you’ve finished the quick start, go ahead and delete the resources created here to avoid incurring additional charges.

az group delete --name quickstartResourceGroup

This command will delete the resource group and all the resources within it.