Azure Service Bus Source Connector for Confluent Platform

The Azure Service Bus Source connector is a multi-tenant cloud messaging service you can use to send information between applications and services. The Azure Service Bus Source connector reads data from a Azure Service Bus queue or topic and persists the data in a Kafka topic. The schema for Kafka record key and value is described in the Record Schema section.

Features

The Azure Service Bus Source connector includes the following features:

At least once delivery

The connector guarantees that messages from Azure Service Bus are delivered at least once to the Kafka topic.

Multiple tasks

The Azure Service Bus Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

No ordering guarantees

Although messages from different partitions may be received in a different order, messages within a partition are guaranteed to be delivered in the same order as they are delivered to the Azure Service Bus topic. Hence, there are ordering guarantees within a partition.

Fetch multiple messages

In every poll cycle, the connector fetches the number of messages defined by the value set in the azure.servicebus.max.message.count property. The default value is 10. However, this value can be altered depending upon the size of the message.

AMQP protocol

This connector is based on the AMQP protocol so it should work with other servers that implement this protocol.

Note

While creating the queue for the Service Bus queue or topic, the lock duration should be set to a high enough value to avoid duplicating records in Kafka topic. This allows the connector to commit the records and send acknowledgement for each Service Bus message processed. In the case when the connector fails to write records to Kafka topic, the messages in the Service Bus topic are made available again.

Limitations

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent license properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Azure Service Bus Source Connector for Confluent Platform.

Install the Azure Service Bus Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.

  • Java 1.8.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-azure-service-bus:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-azure-service-bus:1.2.7
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick Start

This quick start uses the Azure Service Bus Source connector to read messages from Azure Service Bus and write them to a Kafka topic. Before you start, use the Azure Service Bus Quickstart to create a basicqueue queue in Azure Service Bus.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Start Confluent

Start the Confluent services using the following Confluent CLI command:

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

confluent local services start

Important

Do not use the Confluent CLI in production environments.

Property-based example

Create a configuration file ServiceBusSourceConnector.properties. This configuration is used typically along with standalone workers.

Important

Append EntityPath=<your-queue-name> at the end of the azure.servicebus.connection.string

name=ServiceBusSourceConnector
connector.class=io.confluent.connect.azure.servicebus.ServiceBusSourceConnector
tasks.max=1
kafka.topic=servicebus-topic
azure.servicebus.sas.keyname=sas-keyname
azure.servicebus.sas.key=sas-key
azure.servicebus.namespace=servicebus-namespace
azure.servicebus.entity.name=queue-name
azure.servicebus.max.message.count=10
azure.servicebus.max.waiting.time.seconds=30
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

Run the connector with this configuration.

confluent local services connect connector load ServiceBusSourceConnector --config ServiceBusSourceConnector.properties

Confirm that the connector is in a RUNNING state.

confluent local services connect connector status ServiceBusSourceConnector

REST-based example

Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API

Important

Append EntityPath=<your-queue-name> at the end of the azure.servicebus.connection.string

{
  "name" : "ServiceBusSourceConnector",
  "config" : {
    "connector.class" : "io.confluent.connect.azure.servicebus.ServiceBusSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "servicebus-topic",
    "azure.servicebus.sas.keyname":"sas-keyname",
    "azure.servicebus.sas.key":"sas-key",
    "azure.servicebus.namespace":"namespace",
    "azure.servicebus.entity.name":"queue-name",
    "azure.servicebus.subscription" : "",
    "azure.servicebus.max.message.count" : "10",
    "azure.servicebus.max.waiting.time.seconds" : "30",
    "confluent.license":"",
    "confluent.topic.bootstrap.servers":"localhost:9092",
    "confluent.topic.replication.factor":"1"
  }
}

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors

Use the following command to update the configuration of existing connector.

curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/ServiceBusSourceConnector/config

To publish messages to Service Bus queue, follow the Send and receive messages.

java -jar ./target/queuesgettingstarted-1.0.0-jar-with-dependencies.jar -c "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<keyName>;SharedAccessKey=<SharedAccessKey>;"

To consume records written by connector to the configured Kafka topic, run the following command:

kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic servicebus-topic \
--from-beginning

Record Schema

Then source connector creates records in the following format:

Key Schema

The Key is a struct with the following fields:

Field Name Schema Type Description
MessageId String The message identifier that uniquely identifies the message and its payload.

Value Schema

The Value is a struct with the following fields:

Field Name Schema Type Description
deliveryCount int64 The number of the times this message was delivered to clients.
enqueuedTimeUtc int64 The time at which this message was enqueued in Azure Service Bus.
contentType String The content type of this message.
label String The application specific message label.
correlationId Optional String The correlation identifier.
messageProperties Optional String The map of user application properties of this message.
partitionKey Optional String The partition key for sending a message to a partitioned entity.
replyTo Optional String The address of an entity to send replies to.
replyToSessionId Optional String The session identifier augmenting the ReplyTo address.
deadLetterSource Optional String The name of the queue or subscription that this message was enqueued on, before it was deadlettered.
timeToLive int64 The duration before this message expires.
lockedUntilUtc Optional int64 The time when the lock of this message expires.
sequenceNumber Optional int64 The unique number assigned to a message by Azure Service Bus.
sessionId Optional String The session identifier for a session-aware entity.
lockToken Optional String The lock token for the current message.
messageBody bytes The body of this message as a byte array.
getTo Optional String The “to” address.