Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
RabbitMQ Sink Connector for Confluent Platform¶
The Kafka Connect RabbitMQ Sink Connector is used to integrate with RabbitMQ servers utilizing the AMQP protocol. The RabbitMQ Sink connector reads data from one or more Apache Kafka® topics and sends the data to a RabbitMQ exchange.
Prerequisites¶
The following are required to run the Kafka Connect RabbitMQ Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above (requires header support in Connect)
- Java 1.8
- RabbitMQ Server version 3.x
Note
This connector is based on the AMQP 0-9-1 protocol so it may just work with other servers that implement this protocol.
Install the RabbitMQ Sink Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-rabbitmq-sink:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-rabbitmq-sink:1.3.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Configuration Properties¶
For a complete list of configuration properties for this connector, see RabbitMQ Sink Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see RabbitMQ Sink Connector Configuration Properties.
Usage Notes¶
Caution
The Kafka topics that this connector will be reading from must exist prior to starting the connector.
Features¶
The Kafka Connect RabbitMQ Sink Connector offers the following features:
- The connector supports forwarding Kafka headers and metadata to the RabbitMQ message as headers. The Kafka message key can also be forwarded as the
correlationID
on the RabbitMQ message. - The connector supports at-least-once message delivery. Messages may be reprocessed because of task failures or delivery failures, which may cause duplication.
- The connector supports delivering to one configured RabbitMQ exchange. When multiple Kafka topics are specified to read from, the messages will be produced to this one RabbitMQ exchange.
- The RabbitMQ message supports publishing bytes as payload. The connector supports storing raw bytes in RabbitMQ using the
value.converter
asorg.apache.kafka.connect.converters.ByteArrayConverter
. Use theByteArrayConverter
to store the binary serialized form (for example, JSON, Avro, Strings, etc.) of the Kafka record values in RabbitMQ as byte arrays. Applications accessing these values can then read this information from RabbitMQ and deserialize the bytes into a usable form. If your data in Kafka is not in the format you want to persist in RabbitMQ, consider using a Single Message Transformation to change records before they are sent to RabbitMQ. - The connector additionally supports SSL/TLS security to connect to the RabbitMQ server.
- The connector works with RabbitMQ publisher confirms for message acknowledgement from RabbitMQ.
- The connector batches the records from Kafka while publishing to RabbitMQ. This is controlled by
rabbitmq.publish.max.batch.size
configuration. Please note this does not supersede theconsumer.max.poll.records
configuration, and in effect will always be smaller than that. Please also note that the configurations prefixed byrabbitmq.publish
can, in tandem, influence throughput.
Examples¶
Property-based example¶
This configuration is typically used with standalone workers.
name=RabbitMQSinkConnector
connector.class=io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector
tasks.max=1
confluent.topic.bootstrap.servers=< Required Configuration >
topics=< Required Configuration >
rabbitmq.host=< Required Configuration >
rabbitmq.port=< Required Configuration >
rabbitmq.username=< Required Configuration >
rabbitmq.password=< Required Configuration >
rabbitmq.exchange=< Required Configuration >
rabbitmq.routing.key=< Required Configuration >
rabbitmq.delivery.mode=< Required Configuration >
key.converter=< Required Configuration >
value.converter==< Required Configuration >
REST-based example¶
This configuration is typically used with distributed workers. Write the following JSON to connector.json
,
configure all of the required values, and use the command below to post the
configuration to one or more distributed connect workers. Check here for more
information about the Kafka Connect REST API
Connect Distributed REST example:
{
"name" : "RabbitMQSinkConnector",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers" : "< Required Configuration >",
"topics" : "< Required Configuration >",
"rabbitmq.host" : "< Required Configuration >",
"rabbitmq.port" : "< Required Configuration >",
"rabbitmq.username" : "< Required Configuration >",
"rabbitmq.password" : "< Required Configuration >",
"rabbitmq.exchange" : "< Required Configuration >",
"rabbitmq.routing.key" : "< Required Configuration >",
"rabbitmq.delivery.mode" : "< Required Configuration >",
"key.converter" : "< Required Configuration >",
"value.converter" : "< Required Configuration >"
}
}
Use curl to post the configuration to the Connect worker. Change
http://localhost:8083/`
the endpoint of one of your Kafka Connect
worker(s).
Create a new connector:
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSinkConnector1/config
Quick Start¶
The RabbitMQ Sink Connector streams records from Kafka topics to a RabbitMQ exchange with high throughput. This quick start shows example data production and consumption setups in detail.
Start the RabbitMQ Server broker, specifying the docker image on basis of required RabbitMQ version.
docker run -it --rm --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ rabbitmq:3.8.4-management
Create a RabbitMQ exchange. To produce messages from Kafka to RabbitMQ, you also create a queue and binding.
- Once the RabbitMQ docker container has started, navigate to http://localhost:15672 in your browser and login with
guest
/guest
. - In the
Exchanges
tab click onAdd a new exchange
. Name itexchange1
and leave other options as the default settings. - In the
Queues
tab click onAdd a new queue
. Name itqueue1
and leave other options as the default settings. - In the
Exchanges
tab click on the exchange createdexchange1
. In theBindings
section add a binding in the fieldTo queue
toqueue1
with routing keyrkey1
.
- Once the RabbitMQ docker container has started, navigate to http://localhost:15672 in your browser and login with
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-rabbitmq-sink:latest
Start Confluent Platform.
confluent local start
Produce test data to a pre-created
rabbitmq-messages
topic in Kafka.seq 10 | confluent local produce rabbitmq-messages
Create a
rabbitmq-sink.json
file with the following contents:{ "name": "RabbitMQSinkConnector", "config": { "connector.class": "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector", "tasks.max": "1", "topics": "rabbitmq-messages", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "rabbitmq.host": "localhost", "rabbitmq.port": "5672", "rabbitmq.username": "guest", "rabbitmq.password": "guest", "rabbitmq.exchange": "exchange1", "rabbitmq.routing.key": "rkey1", "rabbitmq.delivery.mode": "PERSISTENT" } }
Load the RabbitMQ Sink Connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local load RabbitMQSinkConnector -- -d rabbitmq-sink.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local status RabbitMQSinkConnector
Navigate to the RabbitMQ UI to confirm the messages were delivered to the
queue1
queue.Tip
The default credentials for the RabbitMQ UI are
guest
/guest