Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

RabbitMQ Sink Connector for Confluent Platform

The Kafka Connect RabbitMQ Sink Connector is used to integrate with RabbitMQ servers utilizing the AMQP protocol. The RabbitMQ Sink connector reads data from one or more Apache Kafka® topics and sends the data to a RabbitMQ exchange.

Prerequisites

The following are required to run the Kafka Connect RabbitMQ Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above (requires header support in Connect)
  • Java 1.8
  • RabbitMQ Server version 3.x

Note

This connector is based on the AMQP 0-9-1 protocol so it may just work with other servers that implement this protocol.

Install the RabbitMQ Sink Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-rabbitmq-sink:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-rabbitmq-sink:1.3.0-preview

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Configuration Properties

For a complete list of configuration properties for this connector, see RabbitMQ Sink Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see RabbitMQ Sink Connector Configuration Properties.

Usage Notes

Caution

The Kafka topics that this connector will be reading from must exist prior to starting the connector.

Features

The Kafka Connect RabbitMQ Sink Connector offers the following features:

  • The connector supports forwarding Kafka headers and metadata to the RabbitMQ message as headers. The Kafka message key can also be forwarded as the correlationID on the RabbitMQ message.
  • The connector supports at-least-once message delivery. Messages may be reprocessed because of task failures or delivery failures, which may cause duplication.
  • The connector supports delivering to one configured RabbitMQ exchange. When multiple Kafka topics are specified to read from, the messages will be produced to this one RabbitMQ exchange.
  • The RabbitMQ message supports publishing bytes as payload. The connector supports storing raw bytes in RabbitMQ using the value.converter as org.apache.kafka.connect.converters.ByteArrayConverter. Use the ByteArrayConverter to store the binary serialized form (for example, JSON, Avro, Strings, etc.) of the Kafka record values in RabbitMQ as byte arrays. Applications accessing these values can then read this information from RabbitMQ and deserialize the bytes into a usable form. If your data in Kafka is not in the format you want to persist in RabbitMQ, consider using a Single Message Transformation to change records before they are sent to RabbitMQ.
  • The connector additionally supports SSL/TLS security to connect to the RabbitMQ server.
  • The connector works with RabbitMQ publisher confirms for message acknowledgement from RabbitMQ.
  • The connector batches the records from Kafka while publishing to RabbitMQ. This is controlled by rabbitmq.publish.max.batch.size configuration. Please note this does not supersede the consumer.max.poll.records configuration, and in effect will always be smaller than that. Please also note that the configurations prefixed by rabbitmq.publish can, in tandem, influence throughput.

Examples

Property-based example

This configuration is typically used with standalone workers.

 name=RabbitMQSinkConnector
 connector.class=io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector
 tasks.max=1
 confluent.topic.bootstrap.servers=< Required Configuration >
 topics=< Required Configuration >
 rabbitmq.host=< Required Configuration >
 rabbitmq.port=< Required Configuration >
 rabbitmq.username=< Required Configuration >
 rabbitmq.password=< Required Configuration >
 rabbitmq.exchange=< Required Configuration >
 rabbitmq.routing.key=< Required Configuration >
 rabbitmq.delivery.mode=< Required Configuration >
 key.converter=< Required Configuration >
 value.converter==< Required Configuration >

REST-based example

This configuration is typically used with distributed workers. Write the following JSON to connector.json, configure all of the required values, and use the command below to post the configuration to one or more distributed connect workers. Check here for more information about the Kafka Connect REST API

Connect Distributed REST example:

 {
   "name" : "RabbitMQSinkConnector",
   "config" : {
     "connector.class" : "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector",
     "tasks.max" : "1",
     "confluent.topic.bootstrap.servers" : "< Required Configuration >",
     "topics" : "< Required Configuration >",
     "rabbitmq.host" : "< Required Configuration >",
     "rabbitmq.port" : "< Required Configuration >",
     "rabbitmq.username" : "< Required Configuration >",
     "rabbitmq.password" : "< Required Configuration >",
     "rabbitmq.exchange" : "< Required Configuration >",
     "rabbitmq.routing.key" : "< Required Configuration >",
     "rabbitmq.delivery.mode" : "< Required Configuration >",
     "key.converter" : "< Required Configuration >",
     "value.converter" : "< Required Configuration >"
   }
 }

Use curl to post the configuration to the Connect worker. Change http://localhost:8083/` the endpoint of one of your Kafka Connect worker(s).

Create a new connector:

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing connector:

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSinkConnector1/config

Quick Start

The RabbitMQ Sink Connector streams records from Kafka topics to a RabbitMQ exchange with high throughput. This quick start shows example data production and consumption setups in detail.

  1. Start the RabbitMQ Server broker, specifying the docker image on basis of required RabbitMQ version.

    docker run -it --rm --name rabbitmq \
        -p 5672:5672 \
        -p 15672:15672 \
        rabbitmq:3.8.4-management
    
  2. Create a RabbitMQ exchange. To produce messages from Kafka to RabbitMQ, you also create a queue and binding.

    1. Once the RabbitMQ docker container has started, navigate to http://localhost:15672 in your browser and login with guest/guest.
    2. In the Exchanges tab click on Add a new exchange. Name it exchange1 and leave other options as the default settings.
    3. In the Queues tab click on Add a new queue. Name it queue1 and leave other options as the default settings.
    4. In the Exchanges tab click on the exchange created exchange1. In the Bindings section add a binding in the field To queue to queue1 with routing key rkey1.
  3. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-rabbitmq-sink:latest
    
  4. Start Confluent Platform.

    confluent local start
    
  5. Produce test data to a pre-created rabbitmq-messages topic in Kafka.

    seq 10 | confluent local produce rabbitmq-messages
    
  6. Create a rabbitmq-sink.json file with the following contents:

    {
      "name": "RabbitMQSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector",
        "tasks.max": "1",
        "topics": "rabbitmq-messages",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "rabbitmq.host": "localhost",
        "rabbitmq.port": "5672",
        "rabbitmq.username": "guest",
        "rabbitmq.password": "guest",
        "rabbitmq.exchange": "exchange1",
        "rabbitmq.routing.key": "rkey1",
        "rabbitmq.delivery.mode": "PERSISTENT"
      }
    }
    
  7. Load the RabbitMQ Sink Connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local load RabbitMQSinkConnector -- -d rabbitmq-sink.json
    

    Important

    Don’t use the Confluent CLI in production environments.

  8. Confirm that the connector is in a RUNNING state.

    confluent local status RabbitMQSinkConnector
    
  9. Navigate to the RabbitMQ UI to confirm the messages were delivered to the queue1 queue.

    Tip

    The default credentials for the RabbitMQ UI are guest/guest