Kafka Connect RabbitMQ Source Connector

The RabbitMQ Connector is used to integrate with RabbitMQ servers utilizing the AMQP protocol. The RabbitMQ Source connector reads data from a RabbitMQ queue or topic and persists the data in a Kafka topic.

Note

This connector is based on the AMQP protocol so it may work with other servers that implement this protocol.

Install RabbitMQ Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Confluent Hub

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-rabbitmq:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-rabbitmq:1.0.0-preview

Download

Download the ZIP file and extract it into a directory that is listed on the plugin path of the Connect worker configuration properties (e.g. plugin.path=/usr/local/share/kafka/plugins). This must be done on each of the installations where Connect will be run. For more information, see Installing Plugins.

Usage Notes

Warning

The queue or topic that this connector will be reading from must exist prior to starting the connector.

Important

All of the headers that are associated with each message from RabbitMQ are prefixed with rabbitmq. and copied over to each of the records that are produced to Kafka. Due to this your configured message format must support headers.

Note

This connector is based on the AMQP protocol so it may work with other servers that implement this protocol.

Examples

This configuration is used typically along with standalone workers.

name=RabbitMQSourceConnector1
connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
kafka.topic=< Required Configuration >
rabbitmq.queue=< Required Configuration >

This configuration is used typically along with distributed workers. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the Kafka Connect Rest API

Connect Distributed REST example
{
  "config" : {
    "name" : "RabbitMQSourceConnector1",
    "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "< Required Configuration >",
    "rabbitmq.queue" : "< Required Configuration >"
  }
}

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the the endpoint of one of your Kafka Connect worker(s).

Create a new connector
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSourceConnector1/config

Contents: