Azure Service Bus Source Connector for Confluent Platform¶
The Azure Service Bus Source connector is a multi-tenant cloud messaging service you can use to send information between applications and services. The Azure Service Bus Source connector reads data from a Azure Service Bus queue or topic and persists the data in a Kafka topic. The schema for Kafka record key and value is described in the Record Schema section.
Features¶
The Azure Service Bus Source connector includes the following features:
At least once delivery¶
The connector guarantees that messages from Azure Service Bus are delivered at least once to the Kafka topic.
Multiple tasks¶
The Azure Service Bus Source connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
No ordering guarantees¶
Although messages from different partitions may be received in a different order, messages within a partition are guaranteed to be delivered in the same order as they are delivered to the Azure Service Bus topic. Hence, there are ordering guarantees within a partition.
Fetch multiple messages¶
In every poll cycle, the connector fetches the number of messages defined by the
value set in the azure.servicebus.max.message.count
property. The default
value is 10. However, this value can be altered depending upon the size of the
message.
AMQP protocol¶
This connector is based on the AMQP protocol so it should work with other servers that implement this protocol.
Note
While creating the queue
for the Service Bus queue or topic, the
lock duration
should be set to a high enough value to avoid duplicating
records in Kafka topic. This allows the connector to commit the records and
send acknowledgement for each Service Bus message processed. In the
case when the connector fails to write records to Kafka topic, the messages in
the Service Bus topic are made available again.
Limitations¶
- This connector does not currently support reading messages from sessioned Azure Service Bus entities.
- The connector does not currently support proxy configuration.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and Confluent license properties for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Azure Service Bus Source Connector for Confluent Platform.
Install the Azure Service Bus Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Java 1.8.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-azure-service-bus:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-azure-service-bus:1.2.7
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
This quick start uses the Azure Service Bus Source connector to read messages from
Azure Service Bus and write them to a Kafka topic. Before you start, use the Azure
Service Bus Quickstart
to create a basicqueue
queue in Azure Service Bus.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Start Confluent¶
Start the Confluent services using the following Confluent CLI command:
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Important
Do not use the Confluent CLI in production environments.
Property-based example¶
Create a configuration file ServiceBusSourceConnector.properties
. This
configuration is used typically along with standalone
workers.
Important
Append EntityPath=<your-queue-name> at the end of the azure.servicebus.connection.string
name=ServiceBusSourceConnector
connector.class=io.confluent.connect.azure.servicebus.ServiceBusSourceConnector
tasks.max=1
kafka.topic=servicebus-topic
azure.servicebus.sas.keyname=sas-keyname
azure.servicebus.sas.key=sas-key
azure.servicebus.namespace=servicebus-namespace
azure.servicebus.entity.name=queue-name
azure.servicebus.max.message.count=10
azure.servicebus.max.waiting.time.seconds=30
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Run the connector with this configuration.
confluent local services connect connector load ServiceBusSourceConnector --config ServiceBusSourceConnector.properties
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status ServiceBusSourceConnector
REST-based example¶
Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API
Important
Append EntityPath=<your-queue-name> at the end of the azure.servicebus.connection.string
{
"name" : "ServiceBusSourceConnector",
"config" : {
"connector.class" : "io.confluent.connect.azure.servicebus.ServiceBusSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "servicebus-topic",
"azure.servicebus.sas.keyname":"sas-keyname",
"azure.servicebus.sas.key":"sas-key",
"azure.servicebus.namespace":"namespace",
"azure.servicebus.entity.name":"queue-name",
"azure.servicebus.subscription" : "",
"azure.servicebus.max.message.count" : "10",
"azure.servicebus.max.waiting.time.seconds" : "30",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
}
}
Use curl
to post the configuration to one of the Kafka Connect Workers.
Change http://localhost:8083/
the endpoint of one of your Kafka Connect
worker(s).
curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/ServiceBusSourceConnector/config
To publish messages to Service Bus queue, follow the Send and receive messages.
java -jar ./target/queuesgettingstarted-1.0.0-jar-with-dependencies.jar -c "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<keyName>;SharedAccessKey=<SharedAccessKey>;"
To consume records written by connector to the configured Kafka topic, run the following command:
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic servicebus-topic \
--from-beginning
Record Schema¶
Then source connector creates records in the following format:
Key Schema¶
The Key is a struct
with the following fields:
Field Name | Schema Type | Description |
---|---|---|
MessageId |
String | The message identifier that uniquely identifies the message and its payload. |
Value Schema¶
The Value is a struct
with the following fields:
Field Name | Schema Type | Description |
---|---|---|
deliveryCount |
int64 | The number of the times this message was delivered to clients. |
enqueuedTimeUtc |
int64 | The time at which this message was enqueued in Azure Service Bus. |
contentType |
String | The content type of this message. |
label |
String | The application specific message label. |
correlationId |
Optional String | The correlation identifier. |
messageProperties |
Optional String | The map of user application properties of this message. |
partitionKey |
Optional String | The partition key for sending a message to a partitioned entity. |
replyTo |
Optional String | The address of an entity to send replies to. |
replyToSessionId |
Optional String | The session identifier augmenting the ReplyTo address. |
deadLetterSource |
Optional String | The name of the queue or subscription that this message was enqueued on, before it was deadlettered. |
timeToLive |
int64 | The duration before this message expires. |
lockedUntilUtc |
Optional int64 | The time when the lock of this message expires. |
sequenceNumber |
Optional int64 | The unique number assigned to a message by Azure Service Bus. |
sessionId |
Optional String | The session identifier for a session-aware entity. |
lockToken |
Optional String | The lock token for the current message. |
messageBody |
bytes | The body of this message as a byte array. |
getTo |
Optional String | The “to” address. |