.. _azure-service-bus-source-connector: .. |servicebus| replace:: Azure Service Bus .. |servicebus-short| replace:: Service Bus |servicebus| Source Connector for |cp| ====================================== The |kconnect-long| |servicebus| connector is a multi-tenant cloud messaging service you can use to send information between applications and services. The |servicebus| Source connector reads data from a |servicebus| queue or topic and persists the data in a |ak| topic. The schema for |ak| record key and value is described in the :ref:`servicebus_record-schema` section. Features -------- The |servicebus| Source connector offers the following features: * **Atleast Once Delivery**: The connector guarantees that messages from |servicebus| are delivered at least once to the |ak| topic. * **No Ordering Guarantees**: It is possible that the records written to |ak| topic end up in a different order as compared to |servicebus-short| message-entity. * **Fetch Multiple Messages** In every poll cycle, the connector fetches ``azure.servicebus.max.message.count`` number of messages. By default, this value is 10. However, this can be altered depending upon the size of the message. * **AMQP Protocol** This connector is based on the AMQP protocol so it should work with other servers that implement this protocol. .. note:: While creating the ``queue`` for the |servicebus-short| queue or topic, the ``lock duration`` should be set to a high enough value to avoid duplicating records in |ak| topic. This allows the connector to commit the records and send acknowledgement for each |servicebus-short| message processed. In the case when the connector fails to write records to |ak| topic, the messages in the |servicebus-short| topic are made available again. Prerequisites ------------- The following are required to run the |kconnect-long| |servicebus| Sink Connector: * |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * |kconnect|: |cp| 4.0.0 or above, or |ak| 1.0.0 or above * Java 1.8 License ------- .. include:: ../includes/enterprise-license.rst See :ref:`azure-servicebus-source-connector-license-config` for license properties and :ref:`servicebus-source_license-topic-configuration` for information about the license topic. Install the |servicebus| Connector ---------------------------------- .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-azure-service-bus:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-azure-service-bus:1.0.0-preview Quick Start ----------- This quick start uses the |servicebus| Source Connector to read messages from |servicebus| and write them to a |ak| topic. Before you start, use the `Azure Service Bus Quickstart `__ to create a ``basicqueue`` queue in Azure Service Bus. --------------- Start Confluent --------------- Start the Confluent services using the following :ref:`cli` command. .. include:: ../../includes/cli-new.rst .. codewithvars:: bash |confluent_start| .. important:: Do not use the :ref:`cli` in production environments. ---------------------- Property-based example ---------------------- Create a configuration file ``ServiceBusSourceConnector.properties``. This configuration is used typically along with :ref:`standalone workers `. .. important:: Append EntityPath= at the end of the ``azure.servicebus.connection.string`` :: name=ServiceBusSourceConnector connector.class=io.confluent.connect.azure.servicebus.ServiceBusSourceConnector tasks.max=1 kafka.topic=servicebus-topic azure.servicebus.sas.keyname=sas-keyname azure.servicebus.sas.key=sas-key azure.servicebus.namespace=servicebus-namespace azure.servicebus.entity.name=queue-name azure.servicebus.max.message.count=10 azure.servicebus.max.waiting.time.seconds=30 confluent.license= confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 Run the connector with this configuration. .. codewithvars:: bash |confluent_load| ServiceBusSourceConnector|dash| -d ServiceBusSourceConnector.properties Confirm that the connector is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| ServiceBusSourceConnector ------------------ REST-based example ------------------ Use this setting with :ref:`distributed workers `. Write the following JSON to `config.json`, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the |kconnect-long| :ref:`REST API ` .. important:: Append EntityPath= at the end of the ``azure.servicebus.connection.string`` .. code-block:: json { "name" : "ServiceBusSourceConnector", "config" : { "connector.class" : "io.confluent.connect.azure.servicebus.ServiceBusSourceConnector", "tasks.max" : "1", "kafka.topic" : "servicebus-topic", "azure.servicebus.sas.keyname":"sas-keyname", "azure.servicebus.sas.key":"sas-key", "azure.servicebus.namespace":"namespace", "azure.servicebus.entity.name":"queue-name", "azure.servicebus.subscription" : "", "azure.servicebus.max.message.count" : "10", "azure.servicebus.max.waiting.time.seconds" : "30", "confluent.license":"", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.factor":"1" } } Use ``curl`` to post the configuration to one of the |kconnect-long| Workers. Change ``http://localhost:8083/`` the endpoint of one of your |kconnect-long| worker(s). .. code-block:: bash curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors Use the following command to update the configuration of existing connector. .. code-block:: bash curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/ServiceBusSourceConnector/config To publish messages to |servicebus-short| queue, follow the `Send and receive messages `_. .. code-block:: bash java -jar ./target/queuesgettingstarted-1.0.0-jar-with-dependencies.jar -c "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=;" To consume records written by connector to the configured |ak| topic, run the following command: .. codewithvars:: bash kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic servicebus-topic \ --from-beginning .. _servicebus_record-schema: Record Schema ------------- Then source connector creates records in the following format: ---------- Key Schema ---------- The Key is a ``struct`` with the following fields: +------------------+-------------+------------------------------------------------------------------------------+ | Field Name | Schema Type | Description | +==================+=============+==============================================================================+ | ``MessageId`` | String | The message identifier that uniquely identifies the message and its payload. | +------------------+-------------+------------------------------------------------------------------------------+ ------------ Value Schema ------------ The Value is a ``struct`` with the following fields: +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | Field Name | Schema Type | Description | +======================================+==================+=============================================================================================================================================+ | ``deliveryCount`` | int64 | The number of the times this message was delivered to clients. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``enqueuedTimeUtc`` | int64 | The time at which this message was enqueued in |servicebus|. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``contentType`` | String | The content type of this message. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``label`` | String | The application specific message label. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``correlationId`` | Optional String | The correlation identifier. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``messageProperties`` | Optional String | The map of user application properties of this message. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``partitionKey`` | Optional String | The partition key for sending a message to a partitioned entity. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``replyTo`` | Optional String | The address of an entity to send replies to. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``replyToSessionId`` | Optional String | The session identifier augmenting the ReplyTo address. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``deadLetterSource`` | Optional String | The name of the queue or subscription that this message was enqueued on, before it was deadlettered. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``timeToLive`` | int64 | The duration before this message expires. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``lockedUntilUtc`` | Optional int64 | The time when the lock of this message expires. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``sequenceNumber`` | Optional int64 | The unique number assigned to a message by |servicebus|. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``sessionId`` | Optional String | The session identifier for a session-aware entity. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``lockToken`` | Optional String | The lock token for the current message. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``messageBody`` | bytes | The body of this message as a byte array. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | ``getTo`` | Optional String | The "to" address. | +--------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+ Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options changelog