.. _rabbit-m-q-source-connector: |kconnect-long| RabbitMQ Source Connector ========================================= The RabbitMQ Connector is used to integrate with RabbitMQ servers utilizing the AMQP protocol. The RabbitMQ Source connector reads data from a RabbitMQ queue or topic and persists the data in a Kafka topic. .. note:: This connector is based on the AMQP protocol so it may work with other servers that implement this protocol. Install RabbitMQ Connector -------------------------- .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-rabbitmq:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-rabbitmq:1.1.1 -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file `_ for your connector and then follow the manual connector installation :ref:`instructions `. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`rabbitmq-source-connector-license-config` for license properties and :ref:`rabbit-m-q-source-license-topic-configuration` for information about the license topic. Usage Notes ----------- .. warning:: The queue or topic that this connector will be reading from must exist prior to starting the connector. .. important:: All of the headers that are associated with each message from RabbitMQ are prefixed with ``rabbitmq.`` and copied over to each of the records that are produced to Kafka. Due to this your configured message format must support headers. .. note:: This connector is based on the AMQP protocol so it may work with other servers that implement this protocol. Examples -------- ---------------------- Property-based example ---------------------- This configuration is used typically along with :ref:`standalone workers `. .. codewithvars:: bash :emphasize-lines: 4,5 name=RabbitMQSourceConnector1 connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector tasks.max=1 kafka.topic=< Required Configuration > rabbitmq.queue=< Required Configuration > ------------------ REST-based example ------------------ This configuration is used typically along with :ref:`distributed workers `. Write the following json to ``connector.json``, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the |kconnect-long| :ref:`REST API ` **Connect Distributed REST example:** .. codewithvars:: json :emphasize-lines: 6,7 { "name" : "RabbitMQSourceConnector1", "config" : { "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector", "tasks.max" : "1", "kafka.topic" : "< Required Configuration >", "rabbitmq.queue" : "< Required Configuration >" } } Use curl to post the configuration to one of the |kconnect-long| Workers. Change `http://localhost:8083/` the endpoint of one of your |kconnect-long| worker(s). **Create a new connector:** .. codewithvars:: bash curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors **Update an existing connector:** .. codewithvars:: bash curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSourceConnector1/config .. _rabbitmq_quickstart: Quick Start ----------- The RabbitMQ Source Connector streams records from RabbitMQ queues into |ak| topics with high throughput. This quick start shows example data production and consumption setups in detail. The steps are tailored to a macOS terminal environment. --------------------- Install the Connector --------------------- For the following tutorial you need to have |cp| running locally. Navigate to your |cp| installation directory and enter the following command: .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-rabbitmq:latest Adding a new connector plugin requires restarting |kconnect-long|. Use the Confluent CLI to restart |kconnect|: .. codewithvars:: bash ./|confluent_stop| connect && ./|confluent_start| connect Wait for |kconnect| to come up before continuing: .. codewithvars:: bash This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14 Stopping connect connect is [DOWN] This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14 Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting connect connect is [UP] Verify that the RabbitMQ source connector plugin has been installed correctly and recognized by the plugin loader: .. codewithvars:: bash curl -sS localhost:8083/connector-plugins | jq .[].class | grep RabbitMQSourceConnector This should print the name of at least one class that has been found by the classloader. .. codewithvars:: bash "io.confluent.connect.rabbitmq.RabbitMQSourceConnector" ---------------------------------------- Install and test RabbitMQ broker locally ---------------------------------------- Install and deploy the broker """"""""""""""""""""""""""""" From a terminal session, `install the RabbitMQ broker `_. For a macOS environment, you can use `homebrew `_ : .. codewithvars:: bash brew update brew install rabbitmq Configure the terminal environment to include the RabbitMQ ``sbin`` folder in the PATH: .. codewithvars:: bash export PATH=$PATH:/usr/local/opt/rabbitmq/sbin In a new terminal session, start the RabbitMQ broker: .. codewithvars:: bash rabbitmq-server The server should print the following startup message: .. codewithvars:: bash ## ## ## ## RabbitMQ 3.7.16. Copyright (C) 2007-2019 Pivotal Software, Inc. ########## Licensed under the MPL. See https://www.rabbitmq.com/ ###### ## ########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log Starting broker... completed with 6 plugins. Do a health check using the ``rabbitmqctl status`` command: .. codewithvars:: bash rabbitmqctl status A healthy server should have a number of ``running_applications`` as shown in the following example output: .. codewithvars:: bash Status of node rabbit@localhost ... [{pid,82126}, {running_applications, [{rabbitmq_management,"RabbitMQ Management Console","3.7.15"}, {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.15"}, {cowboy,"Small, fast, modern HTTP server.","2.6.1"}, {cowlib,"Support library for manipulating Web protocols.","2.7.0"}, {rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.15"}, {rabbitmq_stomp,"RabbitMQ STOMP plugin","3.7.15"}, {rabbitmq_amqp1_0,"AMQP 1.0 support for RabbitMQ","3.7.15"}, {rabbitmq_mqtt,"RabbitMQ MQTT Adapter","3.7.15"}, {rabbit,"RabbitMQ","3.7.15"}, {amqp_client,"RabbitMQ AMQP Client","3.7.15"}, {rabbit_common, "Modules shared by rabbitmq-server and rabbitmq-erlang-client", "3.7.15"}, ... remainder clipped for brevity Install Python for data generation """""""""""""""""""""""""""""""""" .. codewithvars:: bash brew install python3 pip3 install pika --upgrade RabbitMQ data generator in Python """"""""""""""""""""""""""""""""" Save this program into a file called ``producer.py``: .. codewithvars:: python #!/usr/bin/env python3 import pika import sys import json if len(sys.argv) != 3: print("Usage: " + sys.argv[0] + " ") sys.exit(1) queue = sys.argv[1] count = int(sys.argv[2]) print("count:\t%d\nqueue:\t%s" % (count, queue) ) msgBody = { "id" : 0 , "body" : "010101010101010101010101010101010101010101010101010101010101010101010" } connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue = queue) properties = pika.BasicProperties(content_type='application/json', delivery_mode=1, priority=1, content_encoding='utf-8') for i in range(count): msgBody["id"] = i jsonStr = json.dumps(msgBody) properties.message_id = str(i) channel.basic_publish(exchange = '', routing_key = queue, body = jsonStr, properties = properties) print("Send\t%r" % msgBody) connection.close() print('Exiting') RabbitMQ consumer program in Python """"""""""""""""""""""""""""""""""" Save this program into a file called ``consumer.py``: .. codewithvars:: python #!/usr/bin/env python3 import pika import sys import json if len(sys.argv) != 2: print("Usage: " + sys.argv[0] + " ") sys.exit(1) queue = sys.argv[1] print("queue:\t%s" % (queue) ) connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue = queue) def callback(ch, method, properties, body): msgBody = json.loads(body) print("Receive\t%r" % msgBody) channel.basic_consume(queue = queue, auto_ack=True, on_message_callback=callback) print('Waiting for messages. To exit press CTRL+C') try: channel.start_consuming() except KeyboardInterrupt: print('Exiting') Generate and consume data in RabbitMQ """"""""""""""""""""""""""""""""""""" Run the following command to produce five records to a queue called ``myqueue``: .. codewithvars:: bash ./producer.py myqueue 5 The script should show five generated records in the console output. .. codewithvars:: bash count: 5 queue: myqueue Send {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'} Send {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'} Send {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'} Send {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'} Send {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'} Exiting Run the following command to consume all records in RabbitMQ queue ``myqueue``: .. codewithvars:: bash ./consumer.py myqueue The script should show five consumed records in the console output. .. codewithvars:: bash queue: myqueue Waiting for messages. To exit press CTRL+C Receive {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'} Receive {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'} Receive {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'} Receive {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'} Receive {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'} ^CExiting ---------------------------------------- Start the |ak| RabbitMQ source connector ---------------------------------------- Create the file ``register-rabbitmq-connect.json`` to store the following connector configuration: .. codewithvars:: json :name: register-rabbitmq-connect.json { "name" : "RabbitMQSourceConnector1", "config" : { "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector", "tasks.max" : "1", "kafka.topic" : "rabbitmq", "rabbitmq.queue" : "myqueue", "rabbitmq.host" : "localhost", "rabbitmq.username" : "guest", "rabbitmq.password" : "guest" } } .. note:: The ``username`` and ``password`` values are defaults in RabbitMQ. These default values are only permitted for authentication when attempted on the localhost where RabbitMQ is installed and running. See this `article `_ for instructions on allowing remote access to RabbitMQ's default account. Start the connector: .. codewithvars:: bash curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-rabbitmq-connect.json Verify that the result includes ``201 Created``, indicating that the RabbitMQ connector was loaded. --------------------------- Perform the end-to-end test --------------------------- In a new terminal session, navigate to the |ak| bin folder and start a consumer: .. codewithvars:: bash ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic rabbitmq Next, produce some records to the RabbitMQ queue: .. codewithvars:: bash ./producer.py myqueue 5 You should expect to see the following from the ``kafka-avro-console-consumer`` command, indicating that the records were successfully produced to |ak|: .. codewithvars:: bash "´\u0001{\"id\": 0, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}" "´\u0001{\"id\": 1, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}" "´\u0001{\"id\": 2, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}" "´\u0001{\"id\": 3, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}" "´\u0001{\"id\": 4, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}" ^CProcessed a total of 5 messages .. note:: If RabbitMQ is restarted, data in memory disappears because the message queue used in this test is not durable. Use the command ``rabbitmqctl list_queues`` to view active queues and message counts. ------------------ Clean up resources ------------------ Delete the connector and stop Confluent services. .. codewithvars:: bash curl -X DELETE localhost:8083/connectors/RabbitMQSourceConnector1 |confluent_stop| Stop your RabbitMQ server using CTRL+C. Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 rabbit_m_q_source_connector_config