RabbitMQ Sink Connector for Confluent Cloud

The fully-managed RabbitMQ Sink connector uses the AMQP protocol to communicate with RabbitMQ servers. The RabbitMQ Sink connector reads data from one or more Apache Kafka® topics and sends the data to a RabbitMQ exchange.

Features

The RabbitMQ Sink connector provides the following features:

  • At least once delivery: The connector guarantees that records are delivered at least once from the Kafka topic to the RabbitMQ exchange.
  • Dead Letter Queue: This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see the Dead Letter Queue docs.
  • Supports multiple tasks: The connector supports running one or more tasks. More tasks may improve performance.
  • Header forwarding: The connector supports forwarding Kafka headers and metadata to the RabbitMQ message as headers. The Kafka message key can also be forwarded as the correlationID on the RabbitMQ message
  • RabbitMQ Exchange delivery: The connector supports delivering to one configured RabbitMQ exchange. When multiple Kafka topics are specified to read from, the messages are produced to this one RabbitMQ exchange.
  • Publishes bytes as payload: The RabbitMQ message supports publishing bytes as payload. The connector supports storing raw bytes in RabbitMQ using the value.converter set to org.apache.kafka.connect.converters.ByteArrayConverter. Using the ByteArrayConverter for value, the connector stores the binary serialized form (for example, JSON, Avro, Strings, etc.) of the Kafka record values in RabbitMQ as byte arrays. Applications accessing these values can then read this information from RabbitMQ and deserialize the bytes into a usable form. If your data in Kafka is not in the format you want to persist in RabbitMQ, consider using Single Message Transforms for Managed Connectors to change records before they are sent to RabbitMQ.
  • Supports SSL/TLS security: The connector also supports SSL/TLS security to connect to the RabbitMQ server.
  • Batches records: The connector batches the records from Kafka while publishing to RabbitMQ. This is controlled by the rabbitmq.publish.max.batch.size configuration property.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Quick Start

Use this quick start to get up and running with the Confluent Cloud RabbitMQ Sink connector. The quick start shows how to select the connector and configure it to read data from Apache Kafka® topics and persist the data to a RabbitMQ exchange.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
  • Authorized access to a RabbitMQ host server, exchange, and host security details.
  • The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
  • For networking considerations, see Network access. To use static egress IPs, see Static Egress IP Addresses.
  • Kafka cluster credentials. The following lists the different ways you can provide credentials.
    • Enter an existing service account resource ID.
    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Refer to Cloud connector limitations for additional information.

Note

There is no input.data.format configuration used with this sink connector. This is because this connector defaults to ByteArrayConverter for value and StringConverter for key. No other converter is useful for this connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

In the left navigation menu, click Data integration, and then click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector.

Click the RabbitMQ Sink connector icon.

RabbitMQ Sink Connector Icon

Step 4: Set up the connection.

Complete the following and click Continue.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  1. Enter a connector name.
  2. Select the way you want to provide Kafka Cluster credentials. You can either select a service account resource ID or you can enter an API key and secret (or generate these in the Cloud Console).
  3. Select one or more topics.
  4. Set the RabbitMQ Publishing details.
    • Maximum batch size: Maximum number of messages in a batch to block on for acknowledgments. The property defaults to 100 if not entered.
    • Time to wait: Time in milliseconds (ms) that the connector waits for acknowledgment that the message is delivered. The property defaults to 10000 ms (10 seconds) if not entered.
    • Message retries: The number of times the connector attempts to retry when the message is not acknowledged. The property defaults to 1 if not entered.
  5. Select a Security protocol. Options are PLAINTEXT and SSL. If you select SSL, the following options appear:
    • SSL Key Password: The private key password in the keystore file, or the PEM key specified in ssl.keystore.key. This is required for clients only if two-way authentication is configured.
    • Key Store button: Uploads the keystore file.
    • SSL Keystore Password: The keystore password for the keystore file.
    • Trust store button: Uploads the truststore file.
    • SSL Trustore Password: The password for the truststore file. If a password is not set, the truststore file is used, but integrity checking is disabled.
    • SSL Keystore Type: The keystore file format. Defaults to JKS.
    • SSL Trustore Type: The truststore file format. Defaults to JKS.
  6. Add the RabbitMQ Connection details.
    • RabbitMQ host: The RabbitMQ host server address to connect to. For example, 192.168.1.99.
    • RabbitMQ port: The server port the connector uses to connect to the server. For example, 5672.
    • RabbitMQ username: The username to use when authenticating to RabbitMQ.
    • RabbitMQ password: The password to use when authenticating to RabbitMQ.
    • RabbitMQ virtual host: The name of the virtual host created in RabbitMQ.
  7. Add the RabbitMQ destination details.
    • RabbitMQ Destination Exchange: The RabbitMQ destination exchange where messages are delivered. The connector delivers messages to this RabbitMQ exchange only, even when the connector consumes from multiple Kafka topics.
    • RabbitMQ Message Routing Key: The routing key that RabbitMQ uses to determine how to route the message.
    • RabbitMQ Message Delivery Mode: An option that determines the message durability in RabbitMQ. Options are persistent or transient. For more information, see the RabbitMQ docs.
    • Forward Kafka Record Key: If enabled, the Kafka record key is converted to a string and forwarded on the RabbitMQ message correlationID property. The connector does not send a correlationID if the Kafka record key is null and this property is set to true.
    • Forward Kafka Record Metadata: If set to true, the connector forwards Kafka record metadata as RabbitMQ message headers. This includes the record’s topic, partition, and offset. The topic name is forwarded as a header named KAFKA_TOPIC, the partition value is a header named KAFKA_PARTITION, and the offset value is a header named KAFKA_OFFSET.
    • Forward Kafka Record Headers: If set to true, the connector adds Kafka record headers to the RabbitMQ message as headers.
  8. Enter the number of tasks that the connector uses. The connector supports running one or more tasks. More tasks may improve performance.
  9. Transforms and Predicates: See the Single Message Transforms (SMT) documentation for details. See Unsupported transformations for a list of SMTs that are not supported with this sink connector.

See the RabbitMQ Sink configuration properties for property values and definitions.

Step 5: Launch the connector.

Verify the connection details by previewing the running configuration. Once you’ve validated that the properties are configured to your satisfaction, click Launch.

Tip

For information about previewing your connector output, see Connector Data Previews.

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running. It may take a few minutes.

Step 7: Check the RabbitMQ destination.

After the connector is running, verify that messages from your Kafka topic are populated to the configured RabbitMQ exchange.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png

Using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

confluent connect plugin describe <connector-catalog-name>

For example:

confluent connect plugin describe RabbitMQSink

Example output:

Following are the required configs:
connector.class: RabbitMQSink
topics
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
rabbitmq.host
rabbitmq.exchange
rabbitmq.routing.key
rabbitmq.delivery.mode
tasks.max

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
    "name" : "RabbitMQSinkConnector_0",
    "connector.class": "RabbitMQSink",
    "topics" : "pageviews",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "rabbitmq.host" : "192.168.1.99",
    "rabbitmq.exchange" : "exchange_1",
    "rabbitmq.routing.key" : "routingkey_1",
    "rabbitmq.delivery.mode" : "PERSISTENT",
    "tasks.max" : "1"
}

Note the following property definitions:

  • "name": Sets a name for your new connector.
  • "connector.class": Identifies the connector plugin name.
  • "topics": Enter Kafka topic name or comma-separated list of topic names.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "rabbitmq.<...>": See the RabbitMQ Sink configuration properties for property values and definitions. Note that the connector configuration defaults to host port 5672 (i.e., "rabbitmq.port" : "5672").

  • "tasks.max": Enter the number of tasks that the connector uses. The connector supports running one or more tasks. More tasks may improve performance.

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI. See Unsupported transformations for a list of SMTs that are not supported with this sink connector.

See the RabbitMQ Sink configuration properties for property values and definitions.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

confluent connect create --config <file-name>.json

For example:

confluent connect create --config rabbitmq-sink.json

Example output:

Created connector RabbitMQSinkConnector_0 lcc-ix4dl

Step 5: Check the connector status.

Enter the following command to check the connector status:

confluent connect list

Example output:

ID          |            Name           | Status  |  Type
+-----------+---------------------------+---------+-------+
lcc-ix4dl   | RabbitMQSinkConnector_0   | RUNNING | sink

Step 6: Check the RabbitMQ destination.

After the connector is running, verify that messages are populating from your Kafka topic to the configured RabbitMQ exchange.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Configuration Properties

The following connector configuration properties can be used with the RabbitMQ Sink connector for Confluent Cloud.

rabbitmq.host

The RabbitMQ host to connect to.

  • Type: string
  • Importance: high
rabbitmq.port

The RabbitMQ port to connect to.

  • Type: int
  • Default: 5672
  • Importance: medium
rabbitmq.username

The username for authenticating with RabbitMQ.

  • Type: string
  • Default: guest
  • Importance: high
rabbitmq.password

The password for authenticating with RabbitMQ.

  • Type: password
  • Default: guest
  • Importance: high
rabbitmq.virtual.host

The virtual host to use when connecting to the broker.

  • Type: string
  • Default: /
  • Importance: high
rabbitmq.exchange

The destination RabbitMQ exchange where messages are delivered. The connector delivers messages to this one RabbitMQ exchange, even when the connector consumes from multiple specified Kafka topics.

  • Type: String
  • Importance: high
rabbitmq.routing.key

The routing key that RabbitMQ uses to determine how to route the message.

  • Type: string
  • Importance: high
rabbitmq.delivery.mode

Determines message durability in RabbitMQ. Options are PERSISTENT or TRANSIENT. For more information, see the RabbitMQ docs.

  • Type: string
  • Valid Values: [PERSISTENT, TRANSIENT]
  • Importance: high
rabbitmq.security.protocol

The security protocol to use when connecting to RabbitMQ. Values can be PLAINTEXT or SSL.

  • Type: string
  • Default: PLAINTEXT
  • Importance: medium
rabbitmq.https.ssl.key.password

The password of the private key in the key store file.

  • Type: password
  • Default: null
  • Importance: high
rabbitmq.https.ssl.keystorefile

The key store containing the server certificate. Only required if using rabbitmq.security.protocol as SSL. Note that you encode the binary keystore file in base64, take the encoded string, add the data:text/plain;base64 prefix, and then specify the entire string as the property entry. For example: "rabbitmq.https.ssl.keystorefile" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...==".

  • Type: password
  • Default: null
  • Importance: high
rabbitmq.https.ssl.keystore.password

The store password for the keystore file. This is optional and only needed if rabbitmq.https.ssl.keystorefile is configured.

  • Type: password
  • Default: null
  • Importance: high
rabbitmq.https.ssl.truststorefile

The truststore containing server CA certificate. Only required if using rabbitmq.security.protocol as SSL. Note that you encode the binary truststore file in base64, take the encoded string, add the data:text/plain;base64 prefix, and then specify the entire string as the property entry. For example: "rabbitmq.https.ssl.truststorefile" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...==".

  • Type: password
  • Default: null
  • Importance: high
rabbitmq.https.ssl.truststore.password

The password for the truststore file. If a password is not set, the truststore file configured is used, but integrity checking is disabled.

  • Type: password
  • Default: null
  • Importance: high
rabbitmq.https.ssl.keystore.type

The file format of the keystore file. The default used is JKS.

  • Type: string
  • Default: JKS
  • Importance: medium
rabbitmq.publish.max.batch.size

Maximum number of messages in a batch to block on for acknowledgements.

  • Type: int
  • Default: 100
  • Importance: medium
rabbitmq.publish.ack.timeout

Period of time to wait for message acknowledgement in milliseconds (ms). The default is 10000 ms (10 seconds).

  • Type: int
  • Default: 10000
  • Importance: medium
rabbitmq.publish.max.retries

Number of retries for un-acked or n-acked messages. Defaults to 1 retry.

  • Type: int
  • Default: 1
  • Importance: medium
rabbitmq.forward.kafka.key

If enabled, the Kafka record key is converted to a string and forwarded on the correlationID property of the RabbitMQ Message. In case the Kafka record key is null and this value is true, no correlationID is sent.

  • Type: boolean
  • Default: false
  • Importance: low
rabbitmq.forward.kafka.metadata

If enabled, metadata from the Kafka record is forwarded on the RabbitMQ Message as headers. This includes the record’s topic, partition, and offset. The topic name is applied as a header named KAFKA_TOPIC, the partition value is applied as a header named KAFKA_PARTITION, and the offset value is applied as a header named KAFKA_OFFSET.

  • Type: boolean
  • Default: false
  • Importance: low
rabbitmq.forward.kafka.headers

If enabled, Kafka record headers are added to the RabbitMQ message as headers.

  • Type: boolean
  • Default: false
  • Importance: low

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png