RabbitMQ Source Connector for Confluent Platform¶
The Kafka Connect RabbitMQ Source connector integrates with RabbitMQ servers, using the AMQP protocol. The RabbitMQ Source connector reads data from a RabbitMQ queue or topic and persists the data in an Apache Kafka® topic.
The connector reads Base64 encoded data from RabbitMQ and then writes the data
to Kafka. The connector can’t infer the schema from this Base64 encoded string,
so it writes the same binary data with a bytes Avro schema to the Kafka topic.
Since the schema does not contain any record name, RecordNameStrategy
and
TopicRecordNameStrategy
can’t be used with this connector.
Note
This connector is based on the AMQP protocol so it may work with other servers that implement this protocol.
Features¶
The RabbitMQ Source connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Multiple tasks¶
The RabbitMQ Source connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Install the RabbitMQ Source Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Note
kafka-connect-rabbitmq
contains only the source connector.
Prerequisites¶
You must install the connector on every machine where Connect will run.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-rabbitmq:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-rabbitmq:1.1.1
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for RabbitMQ Source Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Usage notes¶
As a user of the RabbitMQ Source connector, you should be aware of the following:
- The queue or topic that the connector will read from must exist prior to starting the connector.
- All the headers associated with each message from RabbitMQ are prefixed with
rabbitmq.
and are copied over to each of the records produced to Kafka; therefore, you must configure the message format to support headers. - The RabbitMQ Source connector is based on the AMQP protocol, so it may work with other servers that implement the protocol.
Examples¶
Property-based example¶
This configuration is used typically along with standalone workers.
name=RabbitMQSourceConnector1
connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
kafka.topic=< Required Configuration >
rabbitmq.queue=< Required Configuration >
REST-based example¶
This configuration is used typically along with distributed
workers. Write the following json to
connector.json
, configure all of the required values, and use the command
below to post the configuration to one the distributed connect worker(s). Check
here for more information about the Kafka Connect Kafka Connect
REST Interface.
Connect Distributed REST example¶
{
"name" : "RabbitMQSourceConnector1",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "< Required Configuration >",
"rabbitmq.queue" : "< Required Configuration >"
}
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).
Create a new connector¶
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector¶
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSourceConnector1/config
Quick start¶
The RabbitMQ Source connector streams records from RabbitMQ queues into Kafka topics with high throughput. This quick start shows example data production and consumption setups in detail. The steps are tailored to a macOS terminal environment.
Install the connector¶
For the following tutorial, you must have Confluent Platform running locally. To install the connector complete the following steps:
Navigate to your Confluent Platform installation directory and enter the following command:
confluent connect plugin install confluentinc/kafka-connect-rabbitmq:latest
Note that adding a new connector plugin requires restarting Kafka Connect.
Use the Confluent CLI to restart Connect.
./confluent local services connect stop && ./confluent local services connect start
Wait for Connect to come up before continuing:
This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14 Stopping Connect Connect is [DOWN] This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14 Starting Zookeeper Zookeeper is [UP] Starting Kafka Kafka is [UP] Starting Schema Registry Schema Registry is [UP] Starting Connect Connect is [UP]
Verify the RabbitMQ Source connector plugin has been installed correctly and recognized by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq .[].class | grep RabbitMQSourceConnector
This should print the name of at least one class that has been found by the classloader.
"io.confluent.connect.rabbitmq.RabbitMQSourceConnector"
Install and test RabbitMQ broker locally¶
Use the following steps to install and test a RabbitMQ broker locally.
Install and deploy the broker¶
From a terminal session, install the RabbitMQ broker. For a macOS environment, you can use homebrew :
brew update brew install rabbitmq
Configure the terminal environment to include the RabbitMQ
sbin
folder in the PATH:export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
In a new terminal session, start the RabbitMQ broker:
rabbitmq-server
The server should print the following startup message:
## ## ## ## RabbitMQ 3.7.16. Copyright (C) 2007-2019 Pivotal Software, Inc. ########## Licensed under the MPL. See https://www.rabbitmq.com/ ###### ## ########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log Starting broker... completed with 6 plugins.
Perform a health check using the
rabbitmqctl status
command:rabbitmqctl status
A healthy server should have a number of
running_applications
as shown in the following example output:Status of node rabbit@localhost ... [{pid,82126}, {running_applications, [{rabbitmq_management,"RabbitMQ Management Console","3.7.15"}, {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.15"}, {cowboy,"Small, fast, modern HTTP server.","2.6.1"}, {cowlib,"Support library for manipulating Web protocols.","2.7.0"}, {rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.15"}, {rabbitmq_stomp,"RabbitMQ STOMP plugin","3.7.15"}, {rabbitmq_amqp1_0,"AMQP 1.0 support for RabbitMQ","3.7.15"}, {rabbitmq_mqtt,"RabbitMQ MQTT Adapter","3.7.15"}, {rabbit,"RabbitMQ","3.7.15"}, {amqp_client,"RabbitMQ AMQP Client","3.7.15"}, {rabbit_common, "Modules shared by rabbitmq-server and rabbitmq-erlang-client", "3.7.15"}, ... remainder clipped for brevity
Install Python for data generation¶
brew install python3
pip3 install pika --upgrade
RabbitMQ data generator in Python¶
Save this program into a file called producer.py
:
#!/usr/bin/env python3
import pika
import sys
import json
if len(sys.argv) != 3:
print("Usage: " + sys.argv[0] + " <queueName> <count>")
sys.exit(1)
queue = sys.argv[1]
count = int(sys.argv[2])
print("count:\t%d\nqueue:\t%s" % (count, queue) )
msgBody = {
"id" : 0 ,
"body" : "010101010101010101010101010101010101010101010101010101010101010101010"
}
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = queue)
properties = pika.BasicProperties(content_type='application/json', delivery_mode=1, priority=1, content_encoding='utf-8')
for i in range(count):
msgBody["id"] = i
jsonStr = json.dumps(msgBody)
properties.message_id = str(i)
channel.basic_publish(exchange = '', routing_key = queue, body = jsonStr, properties = properties)
print("Send\t%r" % msgBody)
connection.close()
print('Exiting')
Important
Messages produced to the RabbitMQ queue must have basic properties defined.
In order to read messages lacking basic properties, use the
ByteArrayConverter
value
converter in the connector properties
file. All other converters require basic properties.
RabbitMQ consumer program in Python¶
Save this program into a file called consumer.py
:
#!/usr/bin/env python3
import pika
import sys
import json
if len(sys.argv) != 2:
print("Usage: " + sys.argv[0] + " <queueName>")
sys.exit(1)
queue = sys.argv[1]
print("queue:\t%s" % (queue) )
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = queue)
def callback(ch, method, properties, body):
msgBody = json.loads(body)
print("Receive\t%r" % msgBody)
channel.basic_consume(queue = queue,
auto_ack=True,
on_message_callback=callback)
print('Waiting for messages. To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
print('Exiting')
Generate and consume data in RabbitMQ¶
Run the following command to produce five records to a queue called myqueue
:
./producer.py myqueue 5
The script should show five generated records in the console output.
count: 5
queue: myqueue
Send {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Exiting
Run the following command to consume all records in RabbitMQ queue myqueue
:
./consumer.py myqueue
The script should show five consumed records in the console output.
queue: myqueue
Waiting for messages. To exit press CTRL+C
Receive {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
^CExiting
Start the Kafka RabbitMQ Source connector¶
To start the Kafka Connect RabbitMQ Source connector, complete the following steps:
Create the
register-rabbitmq-connect.json
file containing the following connector configuration:{ "name" : "RabbitMQSourceConnector1", "config" : { "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector", "tasks.max" : "1", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "kafka.topic" : "rabbitmq", "rabbitmq.queue" : "myqueue", "rabbitmq.host" : "localhost", "rabbitmq.username" : "guest", "rabbitmq.password" : "guest" } }
Note
The
username
andpassword
values are defaults in RabbitMQ. These default values are only permitted for authentication when attempted on the localhost where RabbitMQ is installed and running. See this article for instructions on allowing remote access to RabbitMQ’s default account.Start the connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-rabbitmq-connect.json
Verify the result includes
201 Created
which indicates that the RabbitMQ connector was loaded.
Perform the end-to-end test¶
To perform an end-to-end test, complete the following steps:
In a new terminal session, navigate to the Kafka bin folder and start a consumer:
./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic rabbitmq
Produce some records to the RabbitMQ queue:
./producer.py myqueue 5
You should expect to see the following from the
kafka-avro-console-consumer
command, indicating that the records were successfully produced to Kafka:"´\u0001{\"id\": 0, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}" "´\u0001{\"id\": 1, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}" "´\u0001{\"id\": 2, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}" "´\u0001{\"id\": 3, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}" "´\u0001{\"id\": 4, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}" ^CProcessed a total of 5 messages
Note that if RabbitMQ is restarted, data in memory disappears because the message queue used in this test is not durable. Use the command
rabbitmqctl list_queues
to view active queues and message counts.
Clean up resources¶
Delete the connector and stop Confluent services.
curl -X DELETE localhost:8083/connectors/RabbitMQSourceConnector1 confluent local stop
Stop your RabbitMQ server using
CTRL+C
.