Azure Event Hubs Source Connector for Confluent Platform¶
The Kafka Connect Azure Event Hubs Source connector is used to poll data from Azure Event Hubs and persist the data to a Apache Kafka® topic.
The Event Hubs EventData record has System Property and custom User Property map fields. System properties are set by Event Hubs and custom user properties can include string data that is useful for downstream processing (sender ID, message importance, and so on). These properties are added to the Kafka record header as maps of string keys to string values. When the records are transported out of Confluent Platform, these properties can be reconstructed for downstream applications.
Features¶
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Multiple tasks¶
The Azure Event Hubs Source connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter.
Multiple tasks may improve performance when moving a large amount of data.
Install the Azure Event Hubs Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-azure-event-hubs:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-azure-event-hubs:1.1.4
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Azure Event Hubs Source Connector for Confluent Platform.
Usage Notes¶
Azure Event Hubs use Shared access policies
for authentication and
authorization. For more information, see the Event Hubs Authentication and
Security Model Overview.
REST-based Example¶
This configuration is used typically along with distributed
workers. Write the following JSON to
connector.json
, configure all of the required values, and use the command
below to post the configuration to one the distributed connect workers. Refer to
REST API for more information about
the Kafka Connect.
Connect Distributed REST example:
{
"name": "EventHubsSourceConnector1",
"config": {
"confluent.topic.bootstrap.servers": "< Required Configuration >",
"connector.class": "io.confluent.connect.azure.eventhubs.EventHubsSourceConnector",
"kafka.topic": "< Required Configuration >",
"tasks.max": "1",
"max.events": "< Optional Configuration >",
"azure.eventhubs.sas.keyname": "< Required Configuration >",
"azure.eventhubs.sas.key": "< Required Configuration >",
"azure.eventhubs.namespace": "< Required Configuration >",
"azure.eventhubs.hub.name": "< Required Configuration >"
}
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect workers.
Create a new connector:
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/EventHubsSourceConnector1/config
Quick Start¶
The Azure Event Hubs source connector is used to poll data from an Event Hub, and write into a Kafka topic. Before you begin, you need to have an Azure subscription with the privilege to create resource group and service. Using the Azure portal, create a namespace and event hub. Then produce some events to the hub using Event Hubs API.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Preliminary setup¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Navigate to your Confluent Platform installation directory and run the following command to install the latest connector version:
confluent connect plugin install confluentinc/kafka-connect-azure-event-hubs:latest
You can install a specific version by replacing latest with a version number. For example:
confluent connect plugin install confluentinc/kafka-connect-azure-event-hubs:1.1.1-preview
Adding a new connector plugin requires restarting Connect. Use the Confluent CLI command to restart Connect.
confluent local services connect stop && confluent local services connect start
Your output should resemble:
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Check if the Azure Event Hubs plugin has been installed correctly and picked up by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq .[].class | grep eventhubs
"io.confluent.connect.azure.eventhubs.EventHubsSourceConnector"
Azure Event Hubs setup¶
Follow the official tutorial, create a resource group, then a namespace, then an event hub, and then produce events to the hub. The producer will need SAS info to get access to the event hub.
Connector configuration¶
To configure the Azure Event Hubs Source connector, complete the following steps:
Start the services using the Confluent CLI command:
confluent local services start
Create a configuration file named event-hubs-source-config.json with the following contents.
{ "name": "EventHubsSourceConnector1", "config": { "confluent.topic.bootstrap.servers": "localhost:9092", "connector.class": "io.confluent.connect.azure.eventhubs.EventHubsSourceConnector", "kafka.topic": "event_hub_topic", "tasks.max": "1", "max.events": "1", "azure.eventhubs.sas.keyname": "<-Your Shared Access Policy name->", "azure.eventhubs.sas.key": "<-Your Shared Access key->", "azure.eventhubs.namespace": "<-Your namespace->", "azure.eventhubs.hub.name": "<-Your event hub->" } }
The important configuration parameters used here are:
azure.eventhubs.hub.name
: The event hub name to subscribe to.azure.eventhubs.namespace
: The Event Hubs namespace where the source hub resides.kafka.topic
: The Kafka topic into which the events received from Event Hubs are produced.tasks.max
: The maximum number of tasks that should be created for this connector. Each task can be assigned with one or more event hub partitions. The connector uses round-robin to assign partitions over tasks.- You must pass your shared access policy credentials to the Event Hubs
connector through your source connector configuration. To pass SAS in the
source configuration set the
azure.eventhubs.sas.keyname
and theazure.eventhubs.sas.key
: parameters. You can look up the shared access key through Azure portal after a namespace has been created. Navigate to the namespace panel, click Shared access policies under Settings, and you can view a list of policies on the right. Click any one of the policies, and you will see both primary key and secondary key.
"azure.eventhubs.sas.keyname":<-Your Shared Access Policy name-> "azure.eventhubs.sas.key":<-Your Shared Access key->
Depending on the serialization of data sent to Event Hubs, you may need to configure the
value.converter
andkey.converter
configurations accordingly. For example, when sending a string serialized to bytes to Event Hubs use the following:"key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
Note that the connector always converts Event Hubs records into bytes. For more information about converters, see Kafka Connect Deep Dive – Converters and Serialization Explained.
Run the following command to start the Azure Event Hubs Source connector:
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load EventHubsSourceConnector1 --config event-hubs-source-config.json
To check that the connector started, view the Connect worker’s log by running the following command:
confluent local services connect log
Start a Kafka Consumer in a separate terminal session to view the data exported by the connector into the Kafka topic:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic event_hub_topic --from-beginning
Finally, stop the Confluent services using:
confluent local stop
Remove unused resources¶
To avoid any unintended charges, delete your resource group using the Azure portal. All the namespaces and event hubs in the resource group will be deleted as well.