RabbitMQ Source Connector for Confluent Platform

The Kafka Connect RabbitMQ Source connector integrates with RabbitMQ servers, using the AMQP protocol. The RabbitMQ Source connector reads data from a RabbitMQ queue or topic and persists the data in an Apache Kafka® topic.

The connector reads Base64 encoded data from RabbitMQ and then writes the data to Kafka. The connector can’t infer the schema from this Base64 encoded string, so it writes the same binary data with a bytes Avro schema to the Kafka topic. Since the schema does not contain any record name, RecordNameStrategy and TopicRecordNameStrategy can’t be used with this connector.

Note

This connector is based on the AMQP protocol so it may work with other servers that implement this protocol.

Features

The RabbitMQ Source connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Multiple tasks

The RabbitMQ Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Install the RabbitMQ Source Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Note

kafka-connect-rabbitmq contains only the source connector.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-rabbitmq:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-rabbitmq:1.1.1
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.

Configuration properties

For a complete list of configuration properties for this connector, see RabbitMQ Source Connector Configuration Properties.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Usage notes

As a user of the RabbitMQ Source connector, you should be aware of the following:

  • The queue or topic that the connector will read from must exist prior to starting the connector.
  • All the headers associated with each message from RabbitMQ are prefixed with rabbitmq. and are copied over to each of the records produced to Kafka; therefore, you must configure the message format to support headers.
  • The RabbitMQ Source connector is based on the AMQP protocol, so it may work with other servers that implement the protocol.

Examples

Property-based example

This configuration is used typically along with standalone workers.

 name=RabbitMQSourceConnector1
 connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector
 tasks.max=1
 kafka.topic=< Required Configuration >
 rabbitmq.queue=< Required Configuration >

REST-based example

This configuration is used typically along with distributed workers. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the Kafka Connect Kafka Connect REST Interface.

Connect Distributed REST example

 {
   "name" : "RabbitMQSourceConnector1",
   "config" : {
     "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
     "tasks.max" : "1",
     "kafka.topic" : "< Required Configuration >",
     "rabbitmq.queue" : "< Required Configuration >"
   }
 }

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

Create a new connector

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing connector

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSourceConnector1/config

Quick start

The RabbitMQ Source connector streams records from RabbitMQ queues into Kafka topics with high throughput. This quick start shows example data production and consumption setups in detail. The steps are tailored to a macOS terminal environment.

Install the connector

For the following tutorial, you must have Confluent Platform running locally. To install the connector complete the following steps:

  1. Navigate to your Confluent Platform installation directory and enter the following command:

    confluent connect plugin install confluentinc/kafka-connect-rabbitmq:latest
    

    Note that adding a new connector plugin requires restarting Kafka Connect.

  2. Use the Confluent CLI to restart Connect.

    ./confluent local services connect stop && ./confluent local services connect start
    

    Wait for Connect to come up before continuing:

    This CLI is intended for development only, not for production
    https://docs.confluent.io/current/cli/index.html
    
    Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14
    Stopping Connect
    Connect is [DOWN]
    This CLI is intended for development only, not for production
    https://docs.confluent.io/current/cli/index.html
    
    Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14
    Starting Zookeeper
    Zookeeper is [UP]
    Starting Kafka
    Kafka is [UP]
    Starting Schema Registry
    Schema Registry is [UP]
    Starting Connect
    Connect is [UP]
    
  3. Verify the RabbitMQ Source connector plugin has been installed correctly and recognized by the plugin loader:

    curl -sS localhost:8083/connector-plugins | jq .[].class | grep RabbitMQSourceConnector
    

    This should print the name of at least one class that has been found by the classloader.

    "io.confluent.connect.rabbitmq.RabbitMQSourceConnector"
    

Install and test RabbitMQ broker locally

Use the following steps to install and test a RabbitMQ broker locally.

Install and deploy the broker

  1. From a terminal session, install the RabbitMQ broker. For a macOS environment, you can use homebrew :

    brew update
    brew install rabbitmq
    
  2. Configure the terminal environment to include the RabbitMQ sbin folder in the PATH:

    export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
    
  3. In a new terminal session, start the RabbitMQ broker:

    rabbitmq-server
    

    The server should print the following startup message:

    ##  ##
    ##  ##      RabbitMQ 3.7.16. Copyright (C) 2007-2019 Pivotal Software, Inc.
    ##########  Licensed under the MPL.  See https://www.rabbitmq.com/
    ######  ##
    ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
                   /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log
    
                Starting broker...
    
    completed with 6 plugins.
    
  4. Perform a health check using the rabbitmqctl status command:

    rabbitmqctl status
    

    A healthy server should have a number of running_applications as shown in the following example output:

    Status of node rabbit@localhost ...
    [{pid,82126},
    {running_applications,
    [{rabbitmq_management,"RabbitMQ Management Console","3.7.15"},
       {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.15"},
       {cowboy,"Small, fast, modern HTTP server.","2.6.1"},
       {cowlib,"Support library for manipulating Web protocols.","2.7.0"},
       {rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.15"},
       {rabbitmq_stomp,"RabbitMQ STOMP plugin","3.7.15"},
       {rabbitmq_amqp1_0,"AMQP 1.0 support for RabbitMQ","3.7.15"},
       {rabbitmq_mqtt,"RabbitMQ MQTT Adapter","3.7.15"},
       {rabbit,"RabbitMQ","3.7.15"},
       {amqp_client,"RabbitMQ AMQP Client","3.7.15"},
       {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.7.15"},
       ...
       remainder clipped for brevity
    

Install Python for data generation

brew install python3
pip3 install pika --upgrade

RabbitMQ data generator in Python

Save this program into a file called producer.py:

#!/usr/bin/env python3
import pika
import sys
import json

if len(sys.argv) != 3:
   print("Usage: " + sys.argv[0] + " <queueName> <count>")
   sys.exit(1)

queue  = sys.argv[1]
count = int(sys.argv[2])

print("count:\t%d\nqueue:\t%s" % (count, queue) )

msgBody = {
        "id" : 0 ,
        "body" :  "010101010101010101010101010101010101010101010101010101010101010101010"
        }

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = queue)

properties = pika.BasicProperties(content_type='application/json', delivery_mode=1, priority=1, content_encoding='utf-8')
for i in range(count):
    msgBody["id"] = i
    jsonStr = json.dumps(msgBody)
    properties.message_id = str(i)
    channel.basic_publish(exchange = '', routing_key = queue, body = jsonStr, properties = properties)
    print("Send\t%r" % msgBody)

connection.close()
print('Exiting')

Important

Messages produced to the RabbitMQ queue must have basic properties defined. In order to read messages lacking basic properties, use the ByteArrayConverter value converter in the connector properties file. All other converters require basic properties.

RabbitMQ consumer program in Python

Save this program into a file called consumer.py:

#!/usr/bin/env python3
import pika
import sys
import json

if len(sys.argv) != 2:
   print("Usage: " + sys.argv[0] + " <queueName>")
   sys.exit(1)

queue = sys.argv[1]

print("queue:\t%s" % (queue) )

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = queue)

def callback(ch, method, properties, body):
    msgBody = json.loads(body)
    print("Receive\t%r" % msgBody)

channel.basic_consume(queue = queue,
                      auto_ack=True,
                      on_message_callback=callback)

print('Waiting for messages. To exit press CTRL+C')
try:
    channel.start_consuming()
except KeyboardInterrupt:
    print('Exiting')

Generate and consume data in RabbitMQ

Run the following command to produce five records to a queue called myqueue:

./producer.py myqueue 5

The script should show five generated records in the console output.

count:      5
queue:      myqueue
Send        {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Exiting

Run the following command to consume all records in RabbitMQ queue myqueue:

./consumer.py myqueue

The script should show five consumed records in the console output.

queue:      myqueue
Waiting for messages. To exit press CTRL+C
Receive     {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
^CExiting

Start the Kafka RabbitMQ Source connector

To start the Kafka Connect RabbitMQ Source connector, complete the following steps:

  1. Create the register-rabbitmq-connect.json file containing the following connector configuration:

    {
          "name" : "RabbitMQSourceConnector1",
          "config" : {
          "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
          "tasks.max" : "1",
          "key.converter":"org.apache.kafka.connect.storage.StringConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "kafka.topic" : "rabbitmq",
          "rabbitmq.queue" : "myqueue",
          "rabbitmq.host" : "localhost",
          "rabbitmq.username" : "guest",
          "rabbitmq.password" : "guest"
          }
       }
    

    Note

    The username and password values are defaults in RabbitMQ. These default values are only permitted for authentication when attempted on the localhost where RabbitMQ is installed and running. See this article for instructions on allowing remote access to RabbitMQ’s default account.

  2. Start the connector:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-rabbitmq-connect.json
    
  3. Verify the result includes 201 Created which indicates that the RabbitMQ connector was loaded.

Perform the end-to-end test

To perform an end-to-end test, complete the following steps:

  1. In a new terminal session, navigate to the Kafka bin folder and start a consumer:

    ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic rabbitmq
    
  2. Produce some records to the RabbitMQ queue:

    ./producer.py myqueue 5
    

    You should expect to see the following from the kafka-avro-console-consumer command, indicating that the records were successfully produced to Kafka:

    "´\u0001{\"id\": 0, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
    "´\u0001{\"id\": 1, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
    "´\u0001{\"id\": 2, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
    "´\u0001{\"id\": 3, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
    "´\u0001{\"id\": 4, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
    ^CProcessed a total of 5 messages
    

    Note that if RabbitMQ is restarted, data in memory disappears because the message queue used in this test is not durable. Use the command rabbitmqctl list_queues to view active queues and message counts.

Clean up resources

  1. Delete the connector and stop Confluent services.

    curl -X DELETE localhost:8083/connectors/RabbitMQSourceConnector1
    confluent local stop
    
  2. Stop your RabbitMQ server using CTRL+C.