RabbitMQ Sink Connector for Confluent Platform¶
The Kafka Connect RabbitMQ Sink connector integrates with RabbitMQ servers, using the Advanced Message Queuing Protocol (AMQP) protocol. The RabbitMQ Sink connector reads data from one or more Apache Kafka® topics and sends the data to a RabbitMQ exchange.
Features¶
The RabbitMQ Sink connector includes the following features:
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- Header forwarding
- Delivery to RabbitMQ exchange
- Publishing bytes as payload
- SSL/TLS security
- Record batching
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The RabbitMQ Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
Header forwarding¶
The connector supports forwarding Kafka headers and metadata to the RabbitMQ
message as headers. The Kafka message key can also be forwarded as the
correlationID
on the RabbitMQ message.
Delivery to RabbitMQ exchange¶
The connector supports delivering to one configured RabbitMQ exchange. When multiple Kafka topics are specified to read from, the messages will be produced to this one RabbitMQ exchange.
Publishing bytes as payload¶
The RabbitMQ message supports publishing bytes as payload. The connector
supports storing raw bytes in RabbitMQ using the value.converter
as
org.apache.kafka.connect.converters.ByteArrayConverter
. Use the
ByteArrayConverter
to store the binary serialized form (for example, JSON,
Avro, Strings, etc.) of the Kafka record values in RabbitMQ as byte arrays.
Applications accessing these values can then read this information from RabbitMQ
and deserialize the bytes into a usable form. If your data in Kafka is not in the
format you want to persist in RabbitMQ, consider using a Single
Message Transformation to change records before
they are sent to RabbitMQ.
SSL/TLS security¶
The connector also supports SSL/TLS security to connect to the RabbitMQ server.
Record batching¶
The connector batches the records from Kafka while publishing to RabbitMQ. This
is controlled by rabbitmq.publish.max.batch.size
configuration. Please note
this does not supersede the consumer.max.poll.records
configuration, and in
effect will always be smaller than that. Please also note that the
configurations prefixed by rabbitmq.publish
can, in tandem, influence
throughput.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for RabbitMQ Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the RabbitMQ Sink Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
Note
- You must install the connector on every machine where Connect will run.
- This connector is based on the AMQP 0-9-1 protocol, so it may work with other servers that implement this protocol.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.1.0 or later, or Kafka 1.1.0 or later (requires header support in Connect).
Java 1.8.
RabbitMQ Server version 3.x.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-rabbitmq-sink:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-rabbitmq-sink:1.7.6
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Usage Notes¶
Caution
The Kafka topics that this connector will be reading from must exist prior to starting the connector.
Examples¶
Property-based example¶
This configuration is typically used with standalone workers.
name=RabbitMQSinkConnector
connector.class=io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector
tasks.max=1
confluent.topic.bootstrap.servers=< Required Configuration >
topics=< Required Configuration >
rabbitmq.host=< Required Configuration >
rabbitmq.port=< Required Configuration >
rabbitmq.username=< Required Configuration >
rabbitmq.password=< Required Configuration >
rabbitmq.exchange=< Required Configuration >
rabbitmq.routing.key=< Required Configuration >
rabbitmq.delivery.mode=< Required Configuration >
key.converter=< Required Configuration >
value.converter==< Required Configuration >
REST-based example¶
This configuration is typically used with distributed
workers. Write the following JSON to
connector.json
, configure all of the required values, and use the command
below to post the configuration to one or more distributed connect workers.
Check here for more information about the Kafka Connect Kafka
Connect REST Interface
Connect Distributed REST example¶
{
"name" : "RabbitMQSinkConnector",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers" : "< Required Configuration >",
"topics" : "< Required Configuration >",
"rabbitmq.host" : "< Required Configuration >",
"rabbitmq.port" : "< Required Configuration >",
"rabbitmq.username" : "< Required Configuration >",
"rabbitmq.password" : "< Required Configuration >",
"rabbitmq.exchange" : "< Required Configuration >",
"rabbitmq.routing.key" : "< Required Configuration >",
"rabbitmq.delivery.mode" : "< Required Configuration >",
"key.converter" : "< Required Configuration >",
"value.converter" : "< Required Configuration >"
}
}
Use curl to post the configuration to the Connect worker. Change
http://localhost:8083/`
the endpoint of one of your Kafka Connect
worker(s).
Create a new connector¶
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing Connector¶
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSinkConnector1/config
Quick Start¶
The RabbitMQ Sink connector streams records from Kafka topics to a RabbitMQ exchange with high throughput. This quick start shows example data production and consumption setups in detail.
Start the RabbitMQ Server broker, specifying the docker image on basis of required RabbitMQ version.
docker run -it --rm --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ rabbitmq:3.8.4-management
Create a RabbitMQ exchange. To produce messages from Kafka to RabbitMQ, you also create a queue and binding.
- Once the RabbitMQ docker container has started, navigate to http://localhost:15672 in your browser and login with
guest
/guest
. - In the
Exchanges
tab click onAdd a new exchange
. Name itexchange1
and leave other options as the default settings. - In the
Queues
tab click onAdd a new queue
. Name itqueue1
and leave other options as the default settings. - In the
Exchanges
tab click on the exchange createdexchange1
. In theBindings
section add a binding in the fieldTo queue
toqueue1
with routing keyrkey1
.
- Once the RabbitMQ docker container has started, navigate to http://localhost:15672 in your browser and login with
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent connect plugin install confluentinc/kafka-connect-rabbitmq-sink:latest
Start Confluent Platform.
confluent local services start
Produce test data to a pre-created
rabbitmq-messages
topic in Kafka.seq 10 | confluent local services kafka produce rabbitmq-messages
Create a
rabbitmq-sink.json
file with the following contents:{ "name": "RabbitMQSinkConnector", "config": { "connector.class": "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector", "tasks.max": "1", "topics": "rabbitmq-messages", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "rabbitmq.host": "localhost", "rabbitmq.port": "5672", "rabbitmq.username": "guest", "rabbitmq.password": "guest", "rabbitmq.exchange": "exchange1", "rabbitmq.routing.key": "rkey1", "rabbitmq.delivery.mode": "PERSISTENT" } }
Load the RabbitMQ Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load RabbitMQSinkConnector --config rabbitmq-sink.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status RabbitMQSinkConnector
Navigate to the RabbitMQ UI to confirm the messages were delivered to the
queue1
queue.Tip
The default credentials for the RabbitMQ UI are
guest
/guest