Azure Synapse Analytics Sink Connector for Confluent Platform¶
The Azure Synapse Analytics Sink connector allows you to export data from Apache Kafka® topics to Azure Synapse Analytics. The connector polls data from Kafka to write to the data warehouse based on the topics subscription. Auto-creation of tables and limited auto-evolution are also supported. This connector is compatible with Azure Synapse Analytics SQL pool
Features¶
The Azure Synapse Analytics Sink connector for Confluent Platform includes the following features:
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- Data mapping
- Auto-creation and auto-evolution
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Azure Synapse Analytics Sink connector supports running one or more tasks.
You can specify the number of tasks in the tasks.max
configuration
parameter. Multiple tasks may improve performance when moving a large amount of
data.
Data mapping¶
The sink connector requires knowledge of schemas, so you should use a suitable
converter. For example, the Avro converter that comes with Schema Registry, or the JSON
converter with schemas enabled. Kafka record keys if present can be primitive
types or a Connect struct, and the record value must be a Connect struct. Fields
selected from Connect structs must be primitive types. You may need to implement
a custom Converter
if the data in the topic is not in a compatible format.
Auto-creation and auto-evolution¶
If auto.create
is enabled, the connector can create the destination table if
it is found to be missing. The connector uses the record schema as a basis for
the table definition, so the creation takes place online with records consumed
from the topic.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing alter on the destination table when it encounters a record for which
a column is found to be missing. Since data-type changes and removal of columns
can be dangerous, the connector does not attempt to perform such evolutions on
the table.
Important
For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value.
Limitations¶
- This connector can only insert data into Azure Synapse Analytics. Azure Synapse Analytics does not support primary keys, and because updates, upserts, and deletes are all performed on the primary keys, these queries are not supported for this connector.
- When
auto.evolve
is enabled, if a new column with a default value is added, that default value is only used for new records. Existing records will have “null” as the value for the new column.
Install the Azure Synapse Analytics Sink Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Java 1.8.
At minimum, the
INSERT
permission is required for this connector. For more details, see Permissions: GRANT, DENY, REVOKE (Azure Synapse Analytics, Parallel Data Warehouse).If
auto.create=true
, theCREATE TABLE
andCREATE SCHEMA
permissions are required. Additionally, ifauto.evolve=true
, theALTER ANY SCHEMA
permissions are required.An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-azure-sql-dw:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-azure-sql-dw:1.0.3
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Azure Synapse Analytics Sink Connector for Confluent Platform.
Quick Start¶
In this quick start, the Azure Synapse Analytics Sink connector is used to export data produced by the Avro console producer to an Azure Synapse Analytics instance.
- Azure Prerequisites
- Confluent Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
- Azure Synapse Analytics Sink connector
Tip
Though this quick start requires the Azure CLI for creating the resources and the mssql-cli for querying the data from the resources, both of these can also be managed through the Azure Portal: see Create and query Azure Synapse Analytics.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Create an Azure Synapse Analytics instance¶
For this section, use the Azure CLI to create the necessary resources for this Quick Start.
Login with your Azure account.
az login
The above command will open your default browser and load a sign-in page where you can login into your Azure account.
Create a resource group.
az group create \ --name quickstartResourceGroup \ --location eastus2
Create a SQL server instance.
Choose a unique SQL Server name, username and password and supply them for the
name
,admin-user
, andadmin-password
arguments.az sql server create \ --name <your-sql-server-name> \ --resource-group quickstartResourceGroup \ --location eastus2 \ --admin-user <your-username> \ --admin-password <your-password>
Enable a server-level firewall rule.
Pass your IP address for the
start-ip-address
andend-ip-address
argument to enable connectivity to the server.az sql server firewall-rule create \ --name quickstartFirewallRule \ --resource-group quickstartResourceGroup \ --server <your-sql-server-name> \ --start-ip-address <your-ip-address> --end-ip-address <your-ip-address>
Create a Synapse Analytics instance.
az sql dw create \ --name quickstartDataWarehouse \ --resource-group quickstartResourceGroup \ --server <your-sql-server-name>
This can take a couple of minutes.
Load the connector¶
Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local services start
. For more information, see confluent local.confluent local services connect stop && confluent local services connect start
Create an
azure-sql-dw-quickstart.properties
file and add the following properties.Make sure to substitute your SQL server name, username, and password in the
azure.sql.dw.url
,azure.sql.dw.user
andazure.sql.dw.password
arguments respectively.name=AzureSqlDwSinkConnector topics=products tasks.max=1 connector.class=io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector azure.sql.dw.url=jdbc:sqlserver://<your-sql-server-name>.database.windows.net:1433; azure.sql.dw.user=<your-username> azure.sql.dw.password=<your-password> azure.sql.dw.database.name=quickstartDataWarehouse auto.create=true auto.evolve=true table.name.format=kafka_${topic} # The following configs define the Confluent license stored in Kafka, so you need the Kafka bootstrap addresses. # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster, # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations. confluent.license= confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
Start the Azure Synapse Analytics Sink connector by loading the connector’s configuration with the following command:
confluent local services connect connector load azure-sql-dw --config azure-sql-dw-quickstart.properties
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status azure-sql-dw
Send data to Kafka¶
To produce some records into the
products
topic, first start a Kafka producer.kafka-avro-console-producer \ --broker-list localhost:9092 --topic products \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"price","type":"float"}, {"name":"quantity","type":"int"}]}'
The console producer is now waiting for input, so you can go ahead and insert some records into the topic.
{"name": "scissors", "price": 2.75, "quantity": 3} {"name": "tape", "price": 0.99, "quantity": 10} {"name": "notebooks", "price": 1.99, "quantity": 5}
Check Azure Synapse Analytics for data¶
For this section, use the mssql-cli to query the data in the data warehouse and validate that the connector was able to export the data from Kafka to the data warehouse.
Connect to the remote data warehouse, supplying the SQL Server name, username, and password as arguments.
mssql-cli -S <your-sql-server-name>.database.windows.net -U <your-username> -P <your-password> -d quickstartDataWarehouse
Now, query the
kafka_products
table to see its contents.select * from kafka_products;
Your output should resemble the one below (the rows and columns may possibly be in a different order):
+------------+---------+-----------+ | quantity | price | name | |------------+---------+-----------| | 10 | 0.99 | tape | | 3 | 2.75 | scissors | | 5 | 1.99 | notebooks | +------------+---------+-----------+
Delete unnecessary resources¶
Once you’ve finished the quick start, go ahead and delete the resources created here to avoid incurring additional charges.
az group delete --name quickstartResourceGroup
This command will delete the resource group and all the resources within it.