RabbitMQ Sink Connector for Confluent Platform

The Kafka Connect RabbitMQ Sink connector integrates with RabbitMQ servers, using the Advanced Message Queuing Protocol (AMQP) protocol. The RabbitMQ Sink connector reads data from one or more Apache Kafka® topics and sends the data to a RabbitMQ exchange.

Features

The RabbitMQ Sink connector includes the following features:

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The RabbitMQ Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Header forwarding

The connector supports forwarding Kafka headers and metadata to the RabbitMQ message as headers. The Kafka message key can also be forwarded as the correlationID on the RabbitMQ message.

Delivery to RabbitMQ exchange

The connector supports delivering to one configured RabbitMQ exchange. When multiple Kafka topics are specified to read from, the messages will be produced to this one RabbitMQ exchange.

Publishing bytes as payload

The RabbitMQ message supports publishing bytes as payload. The connector supports storing raw bytes in RabbitMQ using the value.converter as org.apache.kafka.connect.converters.ByteArrayConverter. Use the ByteArrayConverter to store the binary serialized form (for example, JSON, Avro, Strings, etc.) of the Kafka record values in RabbitMQ as byte arrays. Applications accessing these values can then read this information from RabbitMQ and deserialize the bytes into a usable form. If your data in Kafka is not in the format you want to persist in RabbitMQ, consider using a Single Message Transformation to change records before they are sent to RabbitMQ.

SSL/TLS security

The connector also supports SSL/TLS security to connect to the RabbitMQ server.

Record batching

The connector batches the records from Kafka while publishing to RabbitMQ. This is controlled by rabbitmq.publish.max.batch.size configuration. Please note this does not supersede the consumer.max.poll.records configuration, and in effect will always be smaller than that. Please also note that the configurations prefixed by rabbitmq.publish can, in tandem, influence throughput.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for RabbitMQ Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the RabbitMQ Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

Note

  • You must install the connector on every machine where Connect will run.
  • This connector is based on the AMQP 0-9-1 protocol, so it may work with other servers that implement this protocol.
  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Connect: Confluent Platform 4.1.0 or later, or Kafka 1.1.0 or later (requires header support in Connect).

  • Java 1.8.

  • RabbitMQ Server version 3.x.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-rabbitmq-sink:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-rabbitmq-sink:1.7.6
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Usage Notes

Caution

The Kafka topics that this connector will be reading from must exist prior to starting the connector.

Examples

Property-based example

This configuration is typically used with standalone workers.

 name=RabbitMQSinkConnector
 connector.class=io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector
 tasks.max=1
 confluent.topic.bootstrap.servers=< Required Configuration >
 topics=< Required Configuration >
 rabbitmq.host=< Required Configuration >
 rabbitmq.port=< Required Configuration >
 rabbitmq.username=< Required Configuration >
 rabbitmq.password=< Required Configuration >
 rabbitmq.exchange=< Required Configuration >
 rabbitmq.routing.key=< Required Configuration >
 rabbitmq.delivery.mode=< Required Configuration >
 key.converter=< Required Configuration >
 value.converter==< Required Configuration >

REST-based example

This configuration is typically used with distributed workers. Write the following JSON to connector.json, configure all of the required values, and use the command below to post the configuration to one or more distributed connect workers. Check here for more information about the Kafka Connect Kafka Connect REST Interface

Connect Distributed REST example

 {
   "name" : "RabbitMQSinkConnector",
   "config" : {
     "connector.class" : "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector",
     "tasks.max" : "1",
     "confluent.topic.bootstrap.servers" : "< Required Configuration >",
     "topics" : "< Required Configuration >",
     "rabbitmq.host" : "< Required Configuration >",
     "rabbitmq.port" : "< Required Configuration >",
     "rabbitmq.username" : "< Required Configuration >",
     "rabbitmq.password" : "< Required Configuration >",
     "rabbitmq.exchange" : "< Required Configuration >",
     "rabbitmq.routing.key" : "< Required Configuration >",
     "rabbitmq.delivery.mode" : "< Required Configuration >",
     "key.converter" : "< Required Configuration >",
     "value.converter" : "< Required Configuration >"
   }
 }

Use curl to post the configuration to the Connect worker. Change http://localhost:8083/` the endpoint of one of your Kafka Connect worker(s).

Create a new connector

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing Connector

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSinkConnector1/config

Quick Start

The RabbitMQ Sink connector streams records from Kafka topics to a RabbitMQ exchange with high throughput. This quick start shows example data production and consumption setups in detail.

  1. Start the RabbitMQ Server broker, specifying the docker image on basis of required RabbitMQ version.

    docker run -it --rm --name rabbitmq \
        -p 5672:5672 \
        -p 15672:15672 \
        rabbitmq:3.8.4-management
    
  2. Create a RabbitMQ exchange. To produce messages from Kafka to RabbitMQ, you also create a queue and binding.

    1. Once the RabbitMQ docker container has started, navigate to http://localhost:15672 in your browser and login with guest/guest.
    2. In the Exchanges tab click on Add a new exchange. Name it exchange1 and leave other options as the default settings.
    3. In the Queues tab click on Add a new queue. Name it queue1 and leave other options as the default settings.
    4. In the Exchanges tab click on the exchange created exchange1. In the Bindings section add a binding in the field To queue to queue1 with routing key rkey1.
  3. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent connect plugin install confluentinc/kafka-connect-rabbitmq-sink:latest
    
  4. Start Confluent Platform.

    confluent local services start
    
  5. Produce test data to a pre-created rabbitmq-messages topic in Kafka.

    seq 10 | confluent local services kafka produce rabbitmq-messages
    
  6. Create a rabbitmq-sink.json file with the following contents:

    {
      "name": "RabbitMQSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector",
        "tasks.max": "1",
        "topics": "rabbitmq-messages",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "rabbitmq.host": "localhost",
        "rabbitmq.port": "5672",
        "rabbitmq.username": "guest",
        "rabbitmq.password": "guest",
        "rabbitmq.exchange": "exchange1",
        "rabbitmq.routing.key": "rkey1",
        "rabbitmq.delivery.mode": "PERSISTENT"
      }
    }
    
  7. Load the RabbitMQ Sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load RabbitMQSinkConnector --config rabbitmq-sink.json
    

    Important

    Don’t use the Confluent CLI in production environments.

  8. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status RabbitMQSinkConnector
    
  9. Navigate to the RabbitMQ UI to confirm the messages were delivered to the queue1 queue.

    Tip

    The default credentials for the RabbitMQ UI are guest/guest