Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Azure Service Bus Source Connector for Confluent Platform¶
The Kafka Connect Azure Service Bus connector is a multi-tenant cloud messaging service you can use to send information between applications and services. The Azure Service Bus Source connector reads data from a Azure Service Bus queue or topic and persists the data in a Kafka topic. The schema for Kafka record key and value is described in the Record Schema section.
Features¶
The Azure Service Bus Source connector offers the following features:
- Atleast Once Delivery: The connector guarantees that messages from Azure Service Bus are delivered at least once to the Kafka topic.
- No Ordering Guarantees: It is possible that the records written to Kafka topic end up in a different order as compared to Service Bus message-entity.
- Fetch Multiple Messages In every poll cycle, the connector fetches
azure.servicebus.max.message.count
number of messages. By default, this value is 10. However, this can be altered depending upon the size of the message. - AMQP Protocol This connector is based on the AMQP protocol so it should work with other servers that implement this protocol.
Note
While creating the queue
for the Service Bus queue or topic, the lock duration
should be set to a high enough value to avoid duplicating records in Kafka topic. This allows the connector to commit the records and send acknowledgement for each Service Bus message processed. In the case when the connector fails to write records to Kafka topic, the messages in the Service Bus topic are made available again.
Prerequisites¶
The following are required to run the Kafka Connect Azure Service Bus Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Java 1.8
Install the Azure Service Bus Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-azure-service-bus:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-azure-service-bus:1.0.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Azure Service Bus Source Connector Configuration Properties.
Quick Start¶
This quick start uses the Azure Service Bus Source Connector to read messages from Azure Service Bus and write them to a Kafka topic. Before you start, use the Azure Service Bus Quickstart to create a basicqueue
queue in Azure Service Bus.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Start Confluent¶
Start the Confluent services using the following Confluent CLI command.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local start
. For more information, see confluent local.
confluent local start
Important
Do not use the Confluent CLI in production environments.
Property-based example¶
Create a configuration file ServiceBusSourceConnector.properties
. This configuration is used typically along with standalone workers.
Important
Append EntityPath=<your-queue-name> at the end of the azure.servicebus.connection.string
name=ServiceBusSourceConnector
connector.class=io.confluent.connect.azure.servicebus.ServiceBusSourceConnector
tasks.max=1
kafka.topic=servicebus-topic
azure.servicebus.sas.keyname=sas-keyname
azure.servicebus.sas.key=sas-key
azure.servicebus.namespace=servicebus-namespace
azure.servicebus.entity.name=queue-name
azure.servicebus.max.message.count=10
azure.servicebus.max.waiting.time.seconds=30
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Run the connector with this configuration.
confluent local load ServiceBusSourceConnector -- -d ServiceBusSourceConnector.properties
Confirm that the connector is in a RUNNING
state.
confluent local status ServiceBusSourceConnector
REST-based example¶
Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API
Important
Append EntityPath=<your-queue-name> at the end of the azure.servicebus.connection.string
{
"name" : "ServiceBusSourceConnector",
"config" : {
"connector.class" : "io.confluent.connect.azure.servicebus.ServiceBusSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "servicebus-topic",
"azure.servicebus.sas.keyname":"sas-keyname",
"azure.servicebus.sas.key":"sas-key",
"azure.servicebus.namespace":"namespace",
"azure.servicebus.entity.name":"queue-name",
"azure.servicebus.subscription" : "",
"azure.servicebus.max.message.count" : "10",
"azure.servicebus.max.waiting.time.seconds" : "30",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
}
}
Use curl
to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/
the endpoint of
one of your Kafka Connect worker(s).
curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/ServiceBusSourceConnector/config
To publish messages to Service Bus queue, follow the Send and receive messages.
java -jar ./target/queuesgettingstarted-1.0.0-jar-with-dependencies.jar -c "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<keyName>;SharedAccessKey=<SharedAccessKey>;"
To consume records written by connector to the configured Kafka topic, run the following command:
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic servicebus-topic \
--from-beginning
Record Schema¶
Then source connector creates records in the following format:
Key Schema¶
The Key is a struct
with the following fields:
Field Name | Schema Type | Description |
---|---|---|
MessageId |
String | The message identifier that uniquely identifies the message and its payload. |
Value Schema¶
The Value is a struct
with the following fields:
Field Name | Schema Type | Description |
---|---|---|
deliveryCount |
int64 | The number of the times this message was delivered to clients. |
enqueuedTimeUtc |
int64 | The time at which this message was enqueued in Azure Service Bus. |
contentType |
String | The content type of this message. |
label |
String | The application specific message label. |
correlationId |
Optional String | The correlation identifier. |
messageProperties |
Optional String | The map of user application properties of this message. |
partitionKey |
Optional String | The partition key for sending a message to a partitioned entity. |
replyTo |
Optional String | The address of an entity to send replies to. |
replyToSessionId |
Optional String | The session identifier augmenting the ReplyTo address. |
deadLetterSource |
Optional String | The name of the queue or subscription that this message was enqueued on, before it was deadlettered. |
timeToLive |
int64 | The duration before this message expires. |
lockedUntilUtc |
Optional int64 | The time when the lock of this message expires. |
sequenceNumber |
Optional int64 | The unique number assigned to a message by Azure Service Bus. |
sessionId |
Optional String | The session identifier for a session-aware entity. |
lockToken |
Optional String | The lock token for the current message. |
messageBody |
bytes | The body of this message as a byte array. |
getTo |
Optional String | The “to” address. |