Azure Synapse Analytics Sink Connector for Confluent Cloud

The Azure Synapse Analytics Sink connector allows you to export data from Apache Kafka® topics to Azure Synapse Analytics. The connector polls data from Kafka and writes data to the data warehouse based on a topic subscription. Auto-creation of tables and limited auto-evolution are also supported. This connector is compatible with Azure Synapse Analytics SQL pool.

Important

If you are still on Confluent Cloud Enterprise, please contact your Confluent Account Executive for more information about using this connector.

Features

The Azure Synapse Analytics Sink connector supports the following features:

  • At least once delivery: This connector guarantees that records from the Kafka topic are delivered at least once.

  • Supports multiple tasks: The connector supports running one or more tasks. More tasks may improve performance.

  • Supports auto-creation and auto-evolution:

    • If Auto create table (auto.create) is enabled, the connector can create the destination table if it is missing. The connector uses the record schema as the basis for the table definition, and the table is created with records consumed from the topic.

    • If Auto add columns (auto.evolve) is enabled, the connector can perform limited auto-evolution by issuing the alter command on the destination table for a new record with a missing column. The connector will only add a column to a new record. Existing records will have "null" as the value for the new column.

      Important

      For backward-compatible schema evolution, new fields in record schemas must be optional or have a default value.

  • Supported data formats: The connector supports Avro, JSON Schema (JSON_SR), and Protobuf input message formats. Schema Registry must be enabled to use these Schema Registry-based formats.

See Configuration Properties configuration property values and descriptions.

See Cloud connector limitations for additional information.

Quick Start

Use this quick start to get up and running with the fully-managed Azure Synapse Analytics Sink connector. The quick start provides the basics of selecting the connector and configuring it to stream events.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Microsoft Azure (Azure).
  • An authorized SQL data warehouse user and password for the connector configuration.
  • The Confluent Cloud CLI installed and configured for the cluster. See Install and Configure the Confluent Cloud CLI.
  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • At least one source Kafka topic must exist in your Confluent Cloud cluster before creating the sink connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

Click Connectors. If you already have connectors in your cluster, click Add connector.

Step 3: Select your connector.

Click the ServiceNow Sink connector icon.

Azure Synapse Analytics Sink Connector Icon

Step 4: Set up the connection.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  1. Select one or more topics.
  2. Enter a connector Name.
  3. Select an Input message format (data coming from the Kafka topic): AVRO, JSON_SR (JSON Schema), or PROTOBUF. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR (JSON Schema) or Protobuf).
  4. Enter or generate your Kafka Cluster credentials. The credentials are either the cluster API key and secret or the service account API key and secret.
  5. Enter the Azure SQL Data Warehouse connection details. Note that the SQL data warehouse server name is in this format: <my_server_name>.db.windows.net.
  6. Enter the Table name format. Sets the name used for the destination table. If left blank, this defaults to $(topic) and the connector uses the topic name for the destination table name. If you enter kafka_$(topic) and the topic is named orders, the resulting table name is kafka_orders.
  7. Enter the names of record fields to include. This can be a comma-separated list of fields. If left empty, all fields from a record are included.
  8. Select a Database timezone. Defaults to UTC.
  9. Enter the number of records to include in a batch. The default is 3000 records.
  10. Select whether or not to Auto create tables. If set to true, the connector creates the destination table if it is missing. The connector uses the record schema as the basis for the table definition. The table is created with records consumed from the topic.
  11. Select whether or not to Auto add columns. If set to true, the connector can perform limited auto-evolution. The connector issues the alter command on the destination table for a new record with a missing column. The connector will only add a column to a new record. Existing records will have "null" as the value for the new column.
  12. Select When to quote SQL identifiers. Select whether or not to quote table names, column names, and other identifiers in SQL statements. For backward compatibility, the default is always.
  13. Enter the number of tasks to use with the connector. More tasks may improve performance.

Note

See Configuration Properties for configuration property descriptions.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running.

Step 7: Check for records.

Verify that data is exported from Kafka to the data warehouse.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Using the Confluent Cloud CLI

Complete the following steps to set up and run the connector using the Confluent Cloud CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe AzureSqlDwSink

Example output:

Following are the required configs:
connector.class: AzureSqlDwSink
input.data.format
name
kafka.api.key
kafka.api.secret
azure.sql.dw.server.name
azure.sql.dw.user
azure.sql.dw.password
azure.sql.dw.database.name
tasks.max
topics

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "name": "AzureSqlDwSinkConnector_0",
  "config": {
    "topics": "pageviews",
    "input.data.format": "AVRO",
    "connector.class": "AzureSqlDwSink",
    "name": "AzureSqlDwSinkConnector_0",
    "kafka.api.key": "****************",
    "kafka.api.secret": "**********************************************",
    "azure.sql.dw.server.name": "azure-sql-dw-sink.db.windows.net",
    "azure.sql.dw.user": "<db_user>",
    "azure.sql.dw.password": "**************",
    "azure.sql.dw.database.name": "<db_name>",
    "db.timezone": "UTC",
    "auto.create": "true",
    "auto.evolve": "true",
    "tasks.max": "1"
  }
}

Note the following property definitions:

  • "name": Sets a name for your new connector.
  • "topics": Enter the topic name or a comma-separated list of topic names.
  • "input.data.format": Sets the input message format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, and PROTOBUF. You must have Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • "connector.class": Identifies the connector plugin name.
  • "kafka.api.key" and ""kafka.api.secret": These credentials are either the cluster API key and secret or the service account API key and secret.
  • "azure.sql.<>"": Enter the Azure SQL data warehouse connection details. Note that the Azure SQL data warehouse server name is in this format: <my_server_name>.db.windows.net.
  • "db.timezone"": Enter a valid database timezone. Defaults to UTC.
  • "auto.create": If set to true, the connector creates the destination table if it is missing. The connector uses the record schema as the basis for the table definition. The table is created with records consumed from the topic.
  • "auto.evolve": If set to true, the connector can perform limited auto-evolution. The connector issues the alter command on the destination table for a new record with a missing column. The connector will only add a column to a new record. Existing records will have "null" as the value for the new column.
  • "tasks.max": Enter the maximum number of tasks for the connector to use. More tasks may improve performance.

Note

See Configuration Properties for all available configuration property descriptions.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config azure-synapse-analytics-sink-config.json

Example output:

Created connector AzureSqlDwSinkConnector_0 lcc-do6vzd

Step 5: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID           |             Name           | Status  | Type | Trace
+------------+----------------------------+---------+------+-------+
lcc-do6vzd   | AzureSqlDwSinkConnector_0  | RUNNING | sink |       |

Step 6: Check for records.

Verify that data is exported from Kafka to the data warehouse.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Configuration Properties

The following connector configuration properties can be used with the Azure Synapse Analytics Sink connector for Confluent Cloud.

azure.sql.dw.server.name

Name of the Azure SQL data warehouse.

  • Type: string
  • Importance: high
azure.sql.dw.user

Azure SQL data warehouse user name.

  • Type: string
  • Importance: high
azure.sql.dw.password

Azure SQL data warehouse user password.

  • Type: password
  • Importance: high
azure.sql.dw.database.name

Azure SQL data warehouse database name.

  • Type: string
  • Importance: high
input.data.format

Sets the input message format. Valid entries are AVRO, JSON_SR, or PROTOBUF. You must have Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  • Type: string
  • Importance: high
table.name.format

Sets the name used for the destination table. If left blank, this defaults to $(topic) and the connector uses the topic name for the destination table name. If you entere kafka_$(topic) and the topic is named orders, the resulting table name is kafka_orders.

  • Type: string
  • Default value: $(topic)
  • Importance: medium
fields.whitelist

The names of record fields to include. This can be a comma-separated list of fields. If unused, all fields from a record are included.

  • Type: list
  • Importance: high
db.timezone

A valid database timezone. Defaults to UTC.

  • Type: string
  • Default value: UTC
  • Importance: medium
batch.size

The number of records to include in a batch. The default is 3000 records.

  • Type: int
  • Default value: 3000
  • Importance: medium
auto.create

If set to true, the connector creates the destination table if it is missing. The connector uses the record schema as the basis for the table definition. The table is created with records consumed from the topic.

  • Type: string
  • Default value: false
  • Importance: medium
auto.evolve

If set to true, the connector can perform limited auto-evolution. The connector issues the alter command on the destination table for a new record with a missing column. The connector will only add a column to a new record. Existing records will have "null" as the value for the new column.

  • Type: string
  • Default value: false
  • Importance: medium
quote.sql.identifiers

Sets whether or not to quote table names, column names, and other identifiers in SQL statements. For backward compatibility, the default is always.

  • Type: string
  • Default value: always
  • Importance: medium

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png