.. _activemq-source-connector:

|kconnect-long| ActiveMQ Source Connector
=========================================


The ActiveMQ Source Connector is used to read messages from an 
`ActiveMQ <http://activemq.apache.org/>`_
cluster and write them to a |ak-tm| topic.

.. note::
   Confluent Platform also includes a general 
   :ref:`JMS source connector <jms-source-connector>`   that uses a JNDI-based mechanism to connect to the JMS broker.
   If you have to use JNDI to connect to your JMS broker, consider using that connector instead.

Install the ActiveMQ Connector
------------------------------

.. include:: ../includes/connector-native-install-cpe.rst

.. include:: ../includes/connector-install-hub.rst

.. codewithvars:: bash


   confluent-hub install confluentinc/kafka-connect-activemq:latest

.. include:: ../includes/connector-install-version.rst

.. codewithvars:: bash


   confluent-hub install confluentinc/kafka-connect-activemq:|release|


--------------------------
Install Connector Manually
--------------------------
`Download and extract the ZIP file <https://www.confluent.io/connector/kafka-connect-activemq/#download>`_ for your
connector and then follow the manual connector installation :ref:`instructions <connect_install_connectors>`.

.. _activeq-source-connector-license-key:


License
-------

.. include:: ../includes/enterprise-license.rst

See :ref:`activemq-source-connector-license-config` for license properties and :ref:`activemq_source_license-topic-configuration` for information about the license topic.


.. _activeq-source-connector-client-libs:


Client Libraries
----------------

The |kconnect-long| ActiveMQ connector includes all the libraries required to work with ActiveMQ, so there is nothing else to install.

.. _activemq-source-connector-messagetypes:


JMS Message types
-----------------

The connector currently supports only `TextMessage <http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html>`_
and `BytesMessage <http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html>`_
but does not currently support `ObjectMessage <http://docs.oracle.com/javaee/6/api/javax/jms/ObjectMessage.html>`_
or `StreamMessage <http://docs.oracle.com/javaee/6/api/javax/jms/StreamMessage.html>`_
.


.. _activemq-source-connection:

Connecting to ActiveMQ
----------------------

This connector connects directly to ActiveMQ using a `connection URL <http://activemq.apache.org/connection-configuration-uri.html>`_
for your messaging system, using the ActiveMQ client libraries included with the connector.

The following example shows a typical configuration of the connector for use with
:ref:`distributed mode <distributed-workers>`:

.. codewithvars:: bash

   {
     "name": "connector1",
     "config": {
       "connector.class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
       "kafka.topic":"MyKafkaTopicName",
       "activemq.url":"tcp://localhost:61616",
       "jms.destination.name":"testing",
       "jms.destination.type":"queue",
       "confluent.license":"",
       "confluent.topic.bootstrap.servers":"localhost:9092"
     }
   }


The connector supports other :ref:`configuration options <activemq-source-connector-config>` not included in the example above.


.. _activemq-source-connector-topics:


Topics
------

This connector consumes messages from ActiveMQ using the configured :ref:`message selectors <activemq-source-connector-config>` and writes them to a single |ak| topic.
If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria.


.. _activemq-source-connector-schemas:


Schemas
-------

The ActiveMQ connector produces messages with keys and values that adhere to the schemas described in the following sections.


.. _activemq-source-connector-schema-key:

----------------------------
io.confluent.connect.jms.Key
----------------------------

This schema is used to store the incoming MessageID on the message interface. This will ensure that when that if the same message id arrives it will end up in the same partition. In practice this should never occur.

The schema defines the following fields:

+-----------+--------+----------+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
| Name      | Schema | Required | Default Value | Documentation                                                                                                                               |
+===========+========+==========+===============+=============================================================================================================================================+
| messageID | STRING | yes      |               | This field stores the value of `Message.getJMSMessageID() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSMessageID()>`_. |
+-----------+--------+----------+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+



.. _activemq-source-connector-schema-value:

------------------------------
io.confluent.connect.jms.Value
------------------------------

This schema is used to store the value of the JMS message.

The schema defines the following fields:

+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Name          | Schema                                                                               | Required | Default Value | Documentation                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+===============+======================================================================================+==========+===============+=================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================+
| messageID     | STRING                                                                               | yes      |               | This field stores the value of `Message.getJMSMessageID() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSMessageID()>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| messageType   | STRING                                                                               | yes      |               | This field stores the type of message that was received. This corresponds to the subinterfaces of `Message <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html>`_. `BytesMessage <http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html>`_ = `bytes`, `MapMessage <http://docs.oracle.com/javaee/6/api/javax/jms/MapMessage.html>`_ = `map`, `ObjectMessage <http://docs.oracle.com/javaee/6/api/javax/jms/ObjectMessage.html>`_ = `object`, `StreamMessage <http://docs.oracle.com/javaee/6/api/javax/jms/StreamMessage.html>`_ = `stream` and `TextMessage <http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html>`_ = `text`. The corresponding field will be populated with the values from the respective Message subinterface. |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| timestamp     | INT64                                                                                | yes      |               | Data from the `getJMSTimestamp() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSTimestamp()>`_ method.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| deliveryMode  | INT32                                                                                | yes      |               | This field stores the value of `Message.getJMSDeliveryMode() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSDeliveryMode()>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| correlationID | STRING                                                                               | no       |               | This field stores the value of `Message.getJMSCorrelationID() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSCorrelationID()>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| replyTo       | :ref:`Destination <activemq-source-connector-schema-destination>`                    | no       |               | This schema is used to represent a JMS Destination, and is either `queue <http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html>`_ or `topic <http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| destination   | :ref:`Destination <activemq-source-connector-schema-destination>`                    | no       |               | This schema is used to represent a JMS Destination, and is either `queue <http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html>`_ or `topic <http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| redelivered   | BOOLEAN                                                                              | yes      |               | This field stores the value of `Message.getJMSRedelivered() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSRedelivered()>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| type          | STRING                                                                               | no       |               | This field stores the value of `Message.getJMSType() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSType()>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| expiration    | INT64                                                                                | yes      |               | This field stores the value of `Message.getJMSExpiration() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSExpiration()>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| priority      | INT32                                                                                | yes      |               | This field stores the value of `Message.getJMSPriority() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSPriority()>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| properties    | Map of STRING, :ref:`PropertyValue <activemq-source-connector-schema-propertyvalue>` | yes      |               | This field stores the data from all of the properties for the Message indexed by their propertyName.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| bytes         | BYTES                                                                                | no       |               | This field stores the value from `BytesMessage.html.readBytes(byte[]) <http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html#readBytes(byte[])>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| map           | Map of STRING, :ref:`PropertyValue <activemq-source-connector-schema-propertyvalue>` | no       |               | This field stores the data from all of the map entries returned from `MapMessage.getMapNames() <http://docs.oracle.com/javaee/6/api/javax/jms/MapMessage.html#getMapNames()>`_ for the Message indexed by their key.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| text          | STRING                                                                               | no       |               | This field stores the value from `TextMessage.html.getText() <http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html#getText()>`_.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



.. _activemq-source-connector-schema-destination:

------------------------------------
io.confluent.connect.jms.Destination
------------------------------------

This schema is used to represent a JMS Destination, and is either `queue <http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html>`_ or `topic <http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html>`_.

The schema defines the following fields:

+-----------------+--------+----------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Name            | Schema | Required | Default Value | Documentation                                                                                                                                                                                                                                                  |
+=================+========+==========+===============+================================================================================================================================================================================================================================================================+
| destinationType | STRING | yes      |               | The type of JMS Destination, and either ``queue`` or ``topic``.                                                                                                                                                                                                |
+-----------------+--------+----------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| name            | STRING | yes      |               | The name of the destination. This will be the value of `Queue.getQueueName() <http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html#getQueueName()>`_ or `Topic.getTopicName() <http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html#getTopicName()>`_. |
+-----------------+--------+----------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



.. _activemq-source-connector-schema-propertyvalue:

--------------------------------------
io.confluent.connect.jms.PropertyValue
--------------------------------------

This schema is used to store the data that is found in the properties of the message. To ensure that the proper type mappings are preserved field ``propertyType`` stores the value type for the field. The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by `Message.getObjectProperty() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getObjectProperty(java.lang.String)>`_.

The schema defines the following fields:

+--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| Name         | Schema  | Required | Default Value | Documentation                                                                                                                                       |
+==============+=========+==========+===============+=====================================================================================================================================================+
| propertyType | STRING  | yes      |               | The java type of the property on the Message. One of ``boolean``, ``byte``, ``short``, ``integer``, ``long``, ``float``, ``double``, or ``string``. |
+--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| boolean      | BOOLEAN | no       |               | The value stored as a boolean. Null unless ``propertyType`` is set to ``boolean``.                                                                  |
+--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| byte         | INT8    | no       |               | The value stored as a byte. Null unless ``propertyType`` is set to ``byte``.                                                                        |
+--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| short        | INT16   | no       |               | The value stored as a short. Null unless ``propertyType`` is set to ``short``.                                                                      |
+--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| integer      | INT32   | no       |               | The value stored as a integer. Null unless ``propertyType`` is set to ``integer``.                                                                  |
+--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| long         | INT64   | no       |               | The value stored as a long. Null unless ``propertyType`` is set to ``long``.                                                                        |
+--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| float        | FLOAT32 | no       |               | The value stored as a float. Null unless ``propertyType`` is set to ``float``.                                                                      |
+--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| double       | FLOAT64 | no       |               | The value stored as a double. Null unless ``propertyType`` is set to ``double``.                                                                    |
+--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| string       | STRING  | no       |               | The value stored as a string. Null unless ``propertyType`` is set to ``string``.                                                                    |
+--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+

Additional Documentation
------------------------

.. toctree::
   :maxdepth: 1

   source_connector_config
   examples
   changelog