Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Azure SQL Data Warehouse Sink Connector for Confluent Platform¶
The Azure SQL Data Warehouse sink connector allows you to export data from Apache Kafka® topics to an Azure SQL Data Warehouse. The connector polls data from Kafka to write to the data warehouse based on the topics subscription. Auto-creation of tables and limited auto-evolution are also supported. This connector is compatible with Azure Synapse SQL pool
Prerequisites¶
The following are required to run the Kafka Connect Azure SQL Data Warehouse Sink Connector:
- Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Java 1.8
- At minimum,
INSERT
permission is required for this connector. See Permissions: GRANT, DENY, REVOKE (Azure SQL Data Warehouse, Parallel Data Warehouse).- If
auto.create=true
,CREATE TABLE
andCREATE SCHEMA
permissions are required. - If
auto.evolve=true
,ALTER ANY SCHEMA
permissions are required.
- If
Limitations¶
- This connector can only insert data into an Azure SQL Data Warehouse. Azure SQL Data Warehouse does not support primary keys, and because updates, upserts, and deletes are all performed on the primary keys, these queries are not supported for this connector.
- When
auto.evolve
is enabled, if a new column with a default value is added, that default value is only used for new records. Existing records will have “null” as the value for the new column.
Features¶
Data mapping¶
The sink connector requires knowledge of schemas, so you should use a suitable
converter. For example, the Avro converter that comes with Schema Registry, or the JSON
converter with schemas enabled. Kafka record keys if present can be primitive
types or a Connect struct, and the record value must be a Connect struct. Fields
selected from Connect structs must be primitive types. You may need to implement
a custom Converter
if the data in the topic is not in a compatible format.
Auto-creation and Auto-evolution¶
If auto.create
is enabled, the connector can create the destination table if
it is found to be missing. The connector uses the record schema as a basis for
the table definition, so the creation takes place online with records consumed
from the topic.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing alter on the destination table when it encounters a record for which
a column is found to be missing. Since data-type changes and removal of columns
can be dangerous, the connector does not attempt to perform such evolutions on
the table.
Important
For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value.
Install the Azure SQL Data Warehouse Sink Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-azure-sql-dw:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-azure-sql-dw:1.0.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Azure SQL Data Warehouse Sink Connector Configuration Properties.
Quick Start¶
In this quick start, the Azure SQL Data Warehouse sink connector is used to export data produced by the Avro console producer to an Azure SQL Data Warehouse instance.
- Azure Prerequisites
- Confluent Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
- Azure SQL Data Warehouse Sink Connector
Tip
Though this quick start requires the Azure CLI for creating the resources and the mssql-cli for querying the data from the resources, both of these can also be managed through the Azure Portal: see Create and query an Azure SQL Data Warehouse.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Create an Azure SQL Data Warehouse instance¶
For this section, use the Azure CLI to create the necessary resources for this Quick Start.
Login with your Azure account.
az login
The above command will open your default browser and load a sign-in page where you can login into your Azure account.
Create a resource group.
az group create \ --name quickstartResourceGroup \ --location eastus2
Create a SQL server instance.
Choose a unique SQL Server name, username and password and supply them for the
name
,admin-user
, andadmin-password
arguments.az sql server create \ --name <your-sql-server-name> \ --resource-group quickstartResourceGroup \ --location eastus2 \ --admin-user <your-username> \ --admin-password <your-password>
Enable a server-level firewall rule.
Pass your IP address for the
start-ip-address
andend-ip-address
argument to enable connectivity to the server.az sql server firewall-rule create \ --name quickstartFirewallRule \ --resource-group quickstartResourceGroup \ --server <your-sql-server-name> \ --start-ip-address <your-ip-address> --end-ip-address <your-ip-address>
Create a SQL Data Warehouse instance.
az sql dw create \ --name quickstartDataWarehouse \ --resource-group quickstartResourceGroup \ --server <your-sql-server-name>
This can take a couple of minutes.
Load the Connector¶
Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local start
. For more information, see confluent local.confluent local stop connect && confluent local start connect
Create an
azure-sql-dw-quickstart.properties
file and add the following properties.Make sure to substitute your SQL server name, username, and password in the
azure.sql.dw.url
,azure.sql.dw.user
andazure.sql.dw.password
arguments respectively.name=AzureSqlDwSinkConnector topics=products tasks.max=1 connector.class=io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector azure.sql.dw.url=jdbc:sqlserver://<your-sql-server-name>.database.windows.net:1433; azure.sql.dw.user=<your-username> azure.sql.dw.password=<your-password> azure.sql.dw.database.name=quickstartDataWarehouse auto.create=true auto.evolve=true table.name.format=kafka_${topic} # The following configs define the Confluent license stored in Kafka, so you need the Kafka bootstrap addresses. # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster, # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations. confluent.license= confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
Start the Azure SQL Data Warehouse sink connector by loading the connector’s configuration with the following command:
confluent local load azure-sql-dw -- -d azure-sql-dw-quickstart.properties
Confirm that the connector is in a
RUNNING
state.confluent local status azure-sql-dw
Send Data to Kafka¶
To produce some records into the
products
topic, first start a Kafka producer.kafka-avro-console-producer \ --broker-list localhost:9092 --topic products \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"price","type":"float"}, {"name":"quantity","type":"int"}]}'
The console producer is now waiting for input, so you can go ahead and insert some records into the topic.
{"name": "scissors", "price": 2.75, "quantity": 3} {"name": "tape", "price": 0.99, "quantity": 10} {"name": "notebooks", "price": 1.99, "quantity": 5}
Check Azure SQL Data Warehouse for Data¶
For this section, use the mssql-cli to query the data in the data warehouse and validate that the connector was able to export the data from Kafka to the data warehouse.
Connect to the remote data warehouse, supplying the SQL Server name, username, and password as arguments.
mssql-cli -S <your-sql-server-name>.database.windows.net -U <your-username> -P <your-password> -d quickstartDataWarehouse
Now, query the
kafka_products
table to see its contents.select * from kafka_products;
Your output should resemble the one below (the rows and columns may possibly be in a different order):
+------------+---------+-----------+ | quantity | price | name | |------------+---------+-----------| | 10 | 0.99 | tape | | 3 | 2.75 | scissors | | 5 | 1.99 | notebooks | +------------+---------+-----------+
Deleting Unnecessary Resources¶
Once you’ve finished the quick start, go ahead and delete the resources created here to avoid incurring additional charges.
az group delete --name quickstartResourceGroup
This command will delete the resource group and all the resources within it.