Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
RabbitMQ Source Connector for Confluent Platform¶
The Kafka Connect RabbitMQ Connector is used to integrate with RabbitMQ servers utilizing the AMQP protocol. The RabbitMQ Source connector reads data from a RabbitMQ queue or topic and persists the data in an Apache Kafka® topic.
Note
This connector is based on the AMQP protocol so it may work with other servers that implement this protocol.
Install the RabbitMQ Source Connector¶
Note
kafka-connect-rabbitmq
contains only the source connector.
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-rabbitmq:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-rabbitmq:1.1.1
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see RabbitMQ Source Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Usage Notes¶
Warning
The queue or topic that this connector will be reading from must exist prior to starting the connector.
Important
All of the headers that are associated with each message from RabbitMQ are prefixed with rabbitmq.
and copied over to each of the records that are produced to Kafka. Due to this your configured message format must support headers.
Note
This connector is based on the AMQP protocol so it may work with other servers that implement this protocol.
Examples¶
Property-based example¶
This configuration is used typically along with standalone workers.
name=RabbitMQSourceConnector1
connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
kafka.topic=< Required Configuration >
rabbitmq.queue=< Required Configuration >
REST-based example¶
This configuration is used typically along with distributed workers.
Write the following json to connector.json
, configure all of the required values, and use the command below to
post the configuration to one the distributed connect worker(s). Check here for more information about the
Kafka Connect REST API
Connect Distributed REST example:
{
"name" : "RabbitMQSourceConnector1",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "< Required Configuration >",
"rabbitmq.queue" : "< Required Configuration >"
}
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).
Create a new connector:
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSourceConnector1/config
Quick Start¶
The RabbitMQ Source Connector streams records from RabbitMQ queues into Kafka topics with high throughput. This quick start shows example data production and consumption setups in detail. The steps are tailored to a macOS terminal environment.
Install the Connector¶
For the following tutorial you need to have Confluent Platform running locally. Navigate to your Confluent Platform installation directory and enter the following command:
confluent-hub install confluentinc/kafka-connect-rabbitmq:latest
Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local start
. For more information, see confluent local.
./confluent local stop connect && ./confluent local start connect
Wait for Connect to come up before continuing:
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14
Stopping connect
connect is [DOWN]
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting connect
connect is [UP]
Verify that the RabbitMQ source connector plugin has been installed correctly and recognized by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq .[].class | grep RabbitMQSourceConnector
This should print the name of at least one class that has been found by the classloader.
"io.confluent.connect.rabbitmq.RabbitMQSourceConnector"
Install and test RabbitMQ broker locally¶
Install and deploy the broker¶
From a terminal session, install the RabbitMQ broker. For a macOS environment, you can use homebrew :
brew update
brew install rabbitmq
Configure the terminal environment to include the RabbitMQ sbin
folder in the PATH:
export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
In a new terminal session, start the RabbitMQ broker:
rabbitmq-server
The server should print the following startup message:
## ##
## ## RabbitMQ 3.7.16. Copyright (C) 2007-2019 Pivotal Software, Inc.
########## Licensed under the MPL. See https://www.rabbitmq.com/
###### ##
########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
/usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log
Starting broker...
completed with 6 plugins.
Do a health check using the rabbitmqctl status
command:
rabbitmqctl status
A healthy server should have a number of running_applications
as shown in the following example output:
Status of node rabbit@localhost ...
[{pid,82126},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.7.15"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.15"},
{cowboy,"Small, fast, modern HTTP server.","2.6.1"},
{cowlib,"Support library for manipulating Web protocols.","2.7.0"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.15"},
{rabbitmq_stomp,"RabbitMQ STOMP plugin","3.7.15"},
{rabbitmq_amqp1_0,"AMQP 1.0 support for RabbitMQ","3.7.15"},
{rabbitmq_mqtt,"RabbitMQ MQTT Adapter","3.7.15"},
{rabbit,"RabbitMQ","3.7.15"},
{amqp_client,"RabbitMQ AMQP Client","3.7.15"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.7.15"},
...
remainder clipped for brevity
Install Python for data generation¶
brew install python3
pip3 install pika --upgrade
RabbitMQ data generator in Python¶
Save this program into a file called producer.py
:
#!/usr/bin/env python3
import pika
import sys
import json
if len(sys.argv) != 3:
print("Usage: " + sys.argv[0] + " <queueName> <count>")
sys.exit(1)
queue = sys.argv[1]
count = int(sys.argv[2])
print("count:\t%d\nqueue:\t%s" % (count, queue) )
msgBody = {
"id" : 0 ,
"body" : "010101010101010101010101010101010101010101010101010101010101010101010"
}
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = queue)
properties = pika.BasicProperties(content_type='application/json', delivery_mode=1, priority=1, content_encoding='utf-8')
for i in range(count):
msgBody["id"] = i
jsonStr = json.dumps(msgBody)
properties.message_id = str(i)
channel.basic_publish(exchange = '', routing_key = queue, body = jsonStr, properties = properties)
print("Send\t%r" % msgBody)
connection.close()
print('Exiting')
Important
Messages produced to the RabbitMQ queue must have basic properties defined. In order to read messages lacking basic properties, use the ByteArrayConverter
value converter in the connector properties file. All other converters require basic properties.
RabbitMQ consumer program in Python¶
Save this program into a file called consumer.py
:
#!/usr/bin/env python3
import pika
import sys
import json
if len(sys.argv) != 2:
print("Usage: " + sys.argv[0] + " <queueName>")
sys.exit(1)
queue = sys.argv[1]
print("queue:\t%s" % (queue) )
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = queue)
def callback(ch, method, properties, body):
msgBody = json.loads(body)
print("Receive\t%r" % msgBody)
channel.basic_consume(queue = queue,
auto_ack=True,
on_message_callback=callback)
print('Waiting for messages. To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
print('Exiting')
Generate and consume data in RabbitMQ¶
Run the following command to produce five records to a queue called myqueue
:
./producer.py myqueue 5
The script should show five generated records in the console output.
count: 5
queue: myqueue
Send {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Exiting
Run the following command to consume all records in RabbitMQ queue myqueue
:
./consumer.py myqueue
The script should show five consumed records in the console output.
queue: myqueue
Waiting for messages. To exit press CTRL+C
Receive {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
^CExiting
Start the Kafka RabbitMQ source connector¶
Create the file register-rabbitmq-connect.json
to store the following connector configuration:
{
"name" : "RabbitMQSourceConnector1",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "rabbitmq",
"rabbitmq.queue" : "myqueue",
"rabbitmq.host" : "localhost",
"rabbitmq.username" : "guest",
"rabbitmq.password" : "guest"
}
}
Note
The username
and password
values are defaults in RabbitMQ. These default values are only permitted for authentication when attempted on the localhost where RabbitMQ is installed and running. See this article for instructions on allowing remote access to RabbitMQ’s default account.
Start the connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-rabbitmq-connect.json
Verify that the result includes 201 Created
, indicating that the RabbitMQ connector was loaded.
Perform the end-to-end test¶
In a new terminal session, navigate to the Kafka bin folder and start a consumer:
./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic rabbitmq
Next, produce some records to the RabbitMQ queue:
./producer.py myqueue 5
You should expect to see the following from the kafka-avro-console-consumer
command, indicating that the records were successfully produced to Kafka:
"´\u0001{\"id\": 0, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 1, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 2, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 3, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 4, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
^CProcessed a total of 5 messages
Note
If RabbitMQ is restarted, data in memory disappears because the message queue used in this test is not durable. Use the command rabbitmqctl list_queues
to view active queues and message counts.
Clean up resources¶
Delete the connector and stop Confluent services.
curl -X DELETE localhost:8083/connectors/RabbitMQSourceConnector1
confluent local stop
Stop your RabbitMQ server using CTRL+C.