.. _activemq-source-connector: |kconnect-long| ActiveMQ Source Connector ========================================= The ActiveMQ Source Connector is used to read messages from an `ActiveMQ <http://activemq.apache.org/>`_ cluster and write them to a |ak-tm| topic. .. note:: Confluent Platform also includes a general :ref:`JMS source connector <jms-source-connector>` that uses a JNDI-based mechanism to connect to the JMS broker. If you have to use JNDI to connect to your JMS broker, consider using that connector instead. Install the ActiveMQ Connector ------------------------------ .. include:: ../includes/connector-native-install-cpe.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-activemq:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-activemq:|release| -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file <https://www.confluent.io/connector/kafka-connect-activemq/#download>`_ for your connector and then follow the manual connector installation :ref:`instructions <connect_install_connectors>`. .. _activeq-source-connector-license-key: License ------- .. include:: ../includes/enterprise-license.rst See :ref:`activemq-source-connector-license-config` for license properties and :ref:`activemq_source_license-topic-configuration` for information about the license topic. .. _activeq-source-connector-client-libs: Client Libraries ---------------- The |kconnect-long| ActiveMQ connector includes all the libraries required to work with ActiveMQ, so there is nothing else to install. .. _activemq-source-connector-messagetypes: JMS Message types ----------------- The connector currently supports only `TextMessage <http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html>`_ and `BytesMessage <http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html>`_ but does not currently support `ObjectMessage <http://docs.oracle.com/javaee/6/api/javax/jms/ObjectMessage.html>`_ or `StreamMessage <http://docs.oracle.com/javaee/6/api/javax/jms/StreamMessage.html>`_ . .. _activemq-source-connection: Connecting to ActiveMQ ---------------------- This connector connects directly to ActiveMQ using a `connection URL <http://activemq.apache.org/connection-configuration-uri.html>`_ for your messaging system, using the ActiveMQ client libraries included with the connector. The following example shows a typical configuration of the connector for use with :ref:`distributed mode <distributed-workers>`: .. codewithvars:: bash { "name": "connector1", "config": { "connector.class": "io.confluent.connect.activemq.ActiveMQSourceConnector", "kafka.topic":"MyKafkaTopicName", "activemq.url":"tcp://localhost:61616", "jms.destination.name":"testing", "jms.destination.type":"queue", "confluent.license":"", "confluent.topic.bootstrap.servers":"localhost:9092" } } The connector supports other :ref:`configuration options <activemq-source-connector-config>` not included in the example above. .. _activemq-source-connector-topics: Topics ------ This connector consumes messages from ActiveMQ using the configured :ref:`message selectors <activemq-source-connector-config>` and writes them to a single |ak| topic. If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria. .. _activemq-source-connector-schemas: Schemas ------- The ActiveMQ connector produces messages with keys and values that adhere to the schemas described in the following sections. .. _activemq-source-connector-schema-key: ---------------------------- io.confluent.connect.jms.Key ---------------------------- This schema is used to store the incoming MessageID on the message interface. This will ensure that when that if the same message id arrives it will end up in the same partition. In practice this should never occur. The schema defines the following fields: +-----------+--------+----------+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | Name | Schema | Required | Default Value | Documentation | +===========+========+==========+===============+=============================================================================================================================================+ | messageID | STRING | yes | | This field stores the value of `Message.getJMSMessageID() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSMessageID()>`_. | +-----------+--------+----------+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+ .. _activemq-source-connector-schema-value: ------------------------------ io.confluent.connect.jms.Value ------------------------------ This schema is used to store the value of the JMS message. The schema defines the following fields: +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Name | Schema | Required | Default Value | Documentation | +===============+======================================================================================+==========+===============+=================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================+ | messageID | STRING | yes | | This field stores the value of `Message.getJMSMessageID() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSMessageID()>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | messageType | STRING | yes | | This field stores the type of message that was received. This corresponds to the subinterfaces of `Message <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html>`_. `BytesMessage <http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html>`_ = `bytes`, `MapMessage <http://docs.oracle.com/javaee/6/api/javax/jms/MapMessage.html>`_ = `map`, `ObjectMessage <http://docs.oracle.com/javaee/6/api/javax/jms/ObjectMessage.html>`_ = `object`, `StreamMessage <http://docs.oracle.com/javaee/6/api/javax/jms/StreamMessage.html>`_ = `stream` and `TextMessage <http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html>`_ = `text`. The corresponding field will be populated with the values from the respective Message subinterface. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | timestamp | INT64 | yes | | Data from the `getJMSTimestamp() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSTimestamp()>`_ method. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | deliveryMode | INT32 | yes | | This field stores the value of `Message.getJMSDeliveryMode() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSDeliveryMode()>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | correlationID | STRING | no | | This field stores the value of `Message.getJMSCorrelationID() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSCorrelationID()>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | replyTo | :ref:`Destination <activemq-source-connector-schema-destination>` | no | | This schema is used to represent a JMS Destination, and is either `queue <http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html>`_ or `topic <http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | destination | :ref:`Destination <activemq-source-connector-schema-destination>` | no | | This schema is used to represent a JMS Destination, and is either `queue <http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html>`_ or `topic <http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | redelivered | BOOLEAN | yes | | This field stores the value of `Message.getJMSRedelivered() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSRedelivered()>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | type | STRING | no | | This field stores the value of `Message.getJMSType() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSType()>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | expiration | INT64 | yes | | This field stores the value of `Message.getJMSExpiration() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSExpiration()>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | priority | INT32 | yes | | This field stores the value of `Message.getJMSPriority() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSPriority()>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | properties | Map of STRING, :ref:`PropertyValue <activemq-source-connector-schema-propertyvalue>` | yes | | This field stores the data from all of the properties for the Message indexed by their propertyName. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | bytes | BYTES | no | | This field stores the value from `BytesMessage.html.readBytes(byte[]) <http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html#readBytes(byte[])>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | map | Map of STRING, :ref:`PropertyValue <activemq-source-connector-schema-propertyvalue>` | no | | This field stores the data from all of the map entries returned from `MapMessage.getMapNames() <http://docs.oracle.com/javaee/6/api/javax/jms/MapMessage.html#getMapNames()>`_ for the Message indexed by their key. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | text | STRING | no | | This field stores the value from `TextMessage.html.getText() <http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html#getText()>`_. | +---------------+--------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ .. _activemq-source-connector-schema-destination: ------------------------------------ io.confluent.connect.jms.Destination ------------------------------------ This schema is used to represent a JMS Destination, and is either `queue <http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html>`_ or `topic <http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html>`_. The schema defines the following fields: +-----------------+--------+----------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Name | Schema | Required | Default Value | Documentation | +=================+========+==========+===============+================================================================================================================================================================================================================================================================+ | destinationType | STRING | yes | | The type of JMS Destination, and either ``queue`` or ``topic``. | +-----------------+--------+----------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | name | STRING | yes | | The name of the destination. This will be the value of `Queue.getQueueName() <http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html#getQueueName()>`_ or `Topic.getTopicName() <http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html#getTopicName()>`_. | +-----------------+--------+----------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ .. _activemq-source-connector-schema-propertyvalue: -------------------------------------- io.confluent.connect.jms.PropertyValue -------------------------------------- This schema is used to store the data that is found in the properties of the message. To ensure that the proper type mappings are preserved field ``propertyType`` stores the value type for the field. The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by `Message.getObjectProperty() <http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getObjectProperty(java.lang.String)>`_. The schema defines the following fields: +--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ | Name | Schema | Required | Default Value | Documentation | +==============+=========+==========+===============+=====================================================================================================================================================+ | propertyType | STRING | yes | | The java type of the property on the Message. One of ``boolean``, ``byte``, ``short``, ``integer``, ``long``, ``float``, ``double``, or ``string``. | +--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ | boolean | BOOLEAN | no | | The value stored as a boolean. Null unless ``propertyType`` is set to ``boolean``. | +--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ | byte | INT8 | no | | The value stored as a byte. Null unless ``propertyType`` is set to ``byte``. | +--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ | short | INT16 | no | | The value stored as a short. Null unless ``propertyType`` is set to ``short``. | +--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ | integer | INT32 | no | | The value stored as a integer. Null unless ``propertyType`` is set to ``integer``. | +--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ | long | INT64 | no | | The value stored as a long. Null unless ``propertyType`` is set to ``long``. | +--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ | float | FLOAT32 | no | | The value stored as a float. Null unless ``propertyType`` is set to ``float``. | +--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ | double | FLOAT64 | no | | The value stored as a double. Null unless ``propertyType`` is set to ``double``. | +--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ | string | STRING | no | | The value stored as a string. Null unless ``propertyType`` is set to ``string``. | +--------------+---------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 source_connector_config examples changelog