Kafka Connect RabbitMQ Source Connector

The RabbitMQ Connector is used to integrate with RabbitMQ servers utilizing the AMQP protocol. The RabbitMQ Source connector reads data from a RabbitMQ queue or topic and persists the data in a Apache Kafka® topic.

Note

This connector is based on the AMQP protocol so it may work with other servers that implement this protocol.

Install RabbitMQ Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-rabbitmq:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-rabbitmq:1.1.1

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Usage Notes

Warning

The queue or topic that this connector will be reading from must exist prior to starting the connector.

Important

All of the headers that are associated with each message from RabbitMQ are prefixed with rabbitmq. and copied over to each of the records that are produced to Kafka. Due to this your configured message format must support headers.

Note

This connector is based on the AMQP protocol so it may work with other servers that implement this protocol.

Examples

Property-based example

This configuration is used typically along with standalone workers.

 name=RabbitMQSourceConnector1
 connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector
 tasks.max=1
 kafka.topic=< Required Configuration >
 rabbitmq.queue=< Required Configuration >

REST-based example

This configuration is used typically along with distributed workers. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the Kafka Connect REST API

Connect Distributed REST example:

 {
   "name" : "RabbitMQSourceConnector1",
   "config" : {
     "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
     "tasks.max" : "1",
     "kafka.topic" : "< Required Configuration >",
     "rabbitmq.queue" : "< Required Configuration >"
   }
 }

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

Create a new connector:

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing connector:

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSourceConnector1/config

Quick Start

The RabbitMQ Source Connector streams records from RabbitMQ queues into Kafka topics with high throughput. This quick start shows example data production and consumption setups in detail. The steps are tailored to a macOS terminal environment.

Install the Connector

For the following tutorial you need to have Confluent Platform running locally. Navigate to your Confluent Platform installation directory and enter the following command:

confluent-hub install confluentinc/kafka-connect-rabbitmq:latest

Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local start. For more information, see confluent local.

./confluent local stop connect && ./confluent local start connect

Wait for Connect to come up before continuing:

This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14
Stopping connect
connect is [DOWN]
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting connect
connect is [UP]

Verify that the RabbitMQ source connector plugin has been installed correctly and recognized by the plugin loader:

curl -sS localhost:8083/connector-plugins | jq .[].class | grep RabbitMQSourceConnector

This should print the name of at least one class that has been found by the classloader.

"io.confluent.connect.rabbitmq.RabbitMQSourceConnector"

Install and test RabbitMQ broker locally

Install and deploy the broker

From a terminal session, install the RabbitMQ broker. For a macOS environment, you can use homebrew :

brew update
brew install rabbitmq

Configure the terminal environment to include the RabbitMQ sbin folder in the PATH:

export PATH=$PATH:/usr/local/opt/rabbitmq/sbin

In a new terminal session, start the RabbitMQ broker:

rabbitmq-server

The server should print the following startup message:

 ##  ##
 ##  ##      RabbitMQ 3.7.16. Copyright (C) 2007-2019 Pivotal Software, Inc.
 ##########  Licensed under the MPL.  See https://www.rabbitmq.com/
 ######  ##
 ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
                   /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log

             Starting broker...

completed with 6 plugins.

Do a health check using the rabbitmqctl status command:

rabbitmqctl status

A healthy server should have a number of running_applications as shown in the following example output:

Status of node rabbit@localhost ...
[{pid,82126},
 {running_applications,
  [{rabbitmq_management,"RabbitMQ Management Console","3.7.15"},
   {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.15"},
   {cowboy,"Small, fast, modern HTTP server.","2.6.1"},
   {cowlib,"Support library for manipulating Web protocols.","2.7.0"},
   {rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.15"},
   {rabbitmq_stomp,"RabbitMQ STOMP plugin","3.7.15"},
   {rabbitmq_amqp1_0,"AMQP 1.0 support for RabbitMQ","3.7.15"},
   {rabbitmq_mqtt,"RabbitMQ MQTT Adapter","3.7.15"},
   {rabbit,"RabbitMQ","3.7.15"},
   {amqp_client,"RabbitMQ AMQP Client","3.7.15"},
   {rabbit_common,
       "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
       "3.7.15"},
   ...
   remainder clipped for brevity

Install Python for data generation

brew install python3
pip3 install pika --upgrade

RabbitMQ data generator in Python

Save this program into a file called producer.py:

#!/usr/bin/env python3
import pika
import sys
import json

if len(sys.argv) != 3:
   print("Usage: " + sys.argv[0] + " <queueName> <count>")
   sys.exit(1)

queue  = sys.argv[1]
count = int(sys.argv[2])

print("count:\t%d\nqueue:\t%s" % (count, queue) )

msgBody = {
        "id" : 0 ,
        "body" :  "010101010101010101010101010101010101010101010101010101010101010101010"
        }

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = queue)

properties = pika.BasicProperties(content_type='application/json', delivery_mode=1, priority=1, content_encoding='utf-8')
for i in range(count):
    msgBody["id"] = i
    jsonStr = json.dumps(msgBody)
    properties.message_id = str(i)
    channel.basic_publish(exchange = '', routing_key = queue, body = jsonStr, properties = properties)
    print("Send\t%r" % msgBody)

connection.close()
print('Exiting')

RabbitMQ consumer program in Python

Save this program into a file called consumer.py:

#!/usr/bin/env python3
import pika
import sys
import json

if len(sys.argv) != 2:
   print("Usage: " + sys.argv[0] + " <queueName>")
   sys.exit(1)

queue = sys.argv[1]

print("queue:\t%s" % (queue) )

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = queue)

def callback(ch, method, properties, body):
    msgBody = json.loads(body)
    print("Receive\t%r" % msgBody)

channel.basic_consume(queue = queue,
                      auto_ack=True,
                      on_message_callback=callback)

print('Waiting for messages. To exit press CTRL+C')
try:
    channel.start_consuming()
except KeyboardInterrupt:
    print('Exiting')

Generate and consume data in RabbitMQ

Run the following command to produce five records to a queue called myqueue:

./producer.py myqueue 5

The script should show five generated records in the console output.

count:      5
queue:      myqueue
Send        {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Exiting

Run the following command to consume all records in RabbitMQ queue myqueue:

./consumer.py myqueue

The script should show five consumed records in the console output.

queue:      myqueue
Waiting for messages. To exit press CTRL+C
Receive     {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
^CExiting

Start the Kafka RabbitMQ source connector

Create the file register-rabbitmq-connect.json to store the following connector configuration:

 {
   "name" : "RabbitMQSourceConnector1",
   "config" : {
    "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "rabbitmq",
    "rabbitmq.queue" : "myqueue",
     "rabbitmq.host" : "localhost",
     "rabbitmq.username" : "guest",
     "rabbitmq.password" : "guest"
   }
 }

Note

The username and password values are defaults in RabbitMQ. These default values are only permitted for authentication when attempted on the localhost where RabbitMQ is installed and running. See this article for instructions on allowing remote access to RabbitMQ’s default account.

Start the connector:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-rabbitmq-connect.json

Verify that the result includes 201 Created, indicating that the RabbitMQ connector was loaded.

Perform the end-to-end test

In a new terminal session, navigate to the Kafka bin folder and start a consumer:

./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic rabbitmq

Next, produce some records to the RabbitMQ queue:

./producer.py myqueue 5

You should expect to see the following from the kafka-avro-console-consumer command, indicating that the records were successfully produced to Kafka:

"´\u0001{\"id\": 0, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 1, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 2, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 3, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 4, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
^CProcessed a total of 5 messages

Note

If RabbitMQ is restarted, data in memory disappears because the message queue used in this test is not durable. Use the command rabbitmqctl list_queues to view active queues and message counts.

Clean up resources

Delete the connector and stop Confluent services.

curl -X DELETE localhost:8083/connectors/RabbitMQSourceConnector1
confluent local stop

Stop your RabbitMQ server using CTRL+C.

Additional Documentation