Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Azure Service Bus Source Connector for Confluent Platform

The Kafka Connect Azure Service Bus connector is a multi-tenant cloud messaging service you can use to send information between applications and services. The Azure Service Bus Source connector reads data from a Azure Service Bus queue or topic and persists the data in a Kafka topic. The schema for Kafka record key and value is described in the Record Schema section.

Features

The Azure Service Bus Source connector offers the following features:

  • Atleast Once Delivery: The connector guarantees that messages from Azure Service Bus are delivered at least once to the Kafka topic.
  • No Ordering Guarantees: It is possible that the records written to Kafka topic end up in a different order as compared to Service Bus message-entity.
  • Fetch Multiple Messages In every poll cycle, the connector fetches azure.servicebus.max.message.count number of messages. By default, this value is 10. However, this can be altered depending upon the size of the message.
  • AMQP Protocol This connector is based on the AMQP protocol so it should work with other servers that implement this protocol.

Note

While creating the queue for the Service Bus queue or topic, the lock duration should be set to a high enough value to avoid duplicating records in Kafka topic. This allows the connector to commit the records and send acknowledgement for each Service Bus message processed. In the case when the connector fails to write records to Kafka topic, the messages in the Service Bus topic are made available again.

Prerequisites

The following are required to run the Kafka Connect Azure Service Bus Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8

Install the Azure Service Bus Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-azure-service-bus:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-azure-service-bus:1.0.0-preview

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Azure Service Bus Source Connector Configuration Properties.

Quick Start

This quick start uses the Azure Service Bus Source Connector to read messages from Azure Service Bus and write them to a Kafka topic. Before you start, use the Azure Service Bus Quickstart to create a basicqueue queue in Azure Service Bus.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.

Start Confluent

Start the Confluent services using the following Confluent CLI command.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local start. For more information, see confluent local.

confluent local start

Important

Do not use the Confluent CLI in production environments.

Property-based example

Create a configuration file ServiceBusSourceConnector.properties. This configuration is used typically along with standalone workers.

Important

Append EntityPath=<your-queue-name> at the end of the azure.servicebus.connection.string

name=ServiceBusSourceConnector
connector.class=io.confluent.connect.azure.servicebus.ServiceBusSourceConnector
tasks.max=1
kafka.topic=servicebus-topic
azure.servicebus.sas.keyname=sas-keyname
azure.servicebus.sas.key=sas-key
azure.servicebus.namespace=servicebus-namespace
azure.servicebus.entity.name=queue-name
azure.servicebus.max.message.count=10
azure.servicebus.max.waiting.time.seconds=30
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

Run the connector with this configuration.

confluent local load ServiceBusSourceConnector -- -d ServiceBusSourceConnector.properties

Confirm that the connector is in a RUNNING state.

confluent local status ServiceBusSourceConnector

REST-based example

Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API

Important

Append EntityPath=<your-queue-name> at the end of the azure.servicebus.connection.string

{
  "name" : "ServiceBusSourceConnector",
  "config" : {
    "connector.class" : "io.confluent.connect.azure.servicebus.ServiceBusSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "servicebus-topic",
    "azure.servicebus.sas.keyname":"sas-keyname",
    "azure.servicebus.sas.key":"sas-key",
    "azure.servicebus.namespace":"namespace",
    "azure.servicebus.entity.name":"queue-name",
    "azure.servicebus.subscription" : "",
    "azure.servicebus.max.message.count" : "10",
    "azure.servicebus.max.waiting.time.seconds" : "30",
    "confluent.license":"",
    "confluent.topic.bootstrap.servers":"localhost:9092",
    "confluent.topic.replication.factor":"1"
  }
}

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors

Use the following command to update the configuration of existing connector.

curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/ServiceBusSourceConnector/config

To publish messages to Service Bus queue, follow the Send and receive messages.

java -jar ./target/queuesgettingstarted-1.0.0-jar-with-dependencies.jar -c "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<keyName>;SharedAccessKey=<SharedAccessKey>;"

To consume records written by connector to the configured Kafka topic, run the following command:

kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic servicebus-topic \
--from-beginning

Record Schema

Then source connector creates records in the following format:

Key Schema

The Key is a struct with the following fields:

Field Name Schema Type Description
MessageId String The message identifier that uniquely identifies the message and its payload.

Value Schema

The Value is a struct with the following fields:

Field Name Schema Type Description
deliveryCount int64 The number of the times this message was delivered to clients.
enqueuedTimeUtc int64 The time at which this message was enqueued in Azure Service Bus.
contentType String The content type of this message.
label String The application specific message label.
correlationId Optional String The correlation identifier.
messageProperties Optional String The map of user application properties of this message.
partitionKey Optional String The partition key for sending a message to a partitioned entity.
replyTo Optional String The address of an entity to send replies to.
replyToSessionId Optional String The session identifier augmenting the ReplyTo address.
deadLetterSource Optional String The name of the queue or subscription that this message was enqueued on, before it was deadlettered.
timeToLive int64 The duration before this message expires.
lockedUntilUtc Optional int64 The time when the lock of this message expires.
sequenceNumber Optional int64 The unique number assigned to a message by Azure Service Bus.
sessionId Optional String The session identifier for a session-aware entity.
lockToken Optional String The lock token for the current message.
messageBody bytes The body of this message as a byte array.
getTo Optional String The “to” address.