ActiveMQ Source Connector for Confluent Platform

Note

The JMS Source Connector for Confluent Platform is available for download from Confluent Hub. This connector uses a JNDI-based mechanism to connect to the JMS broker. If you have to use JNDI to connect to your JMS broker, consider using that connector instead.

The Kafka Connect ActiveMQ Source connector is used to read messages from an ActiveMQ cluster and write them to an Apache Kafka® topic.

Features

The ActiveMQ Source connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Multiple tasks

The ActiveMQ Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Install the ActiveMQ Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An install of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-activemq:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-activemq:12.2.0
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see ActiveMQ Source Connector Configuration Properties. To understand how the connector internally configures the acknowledgement mode, see the following section.

Acknowledgement mode

The connector internally uses CLIENT_ACKNOWLEDGE mode to receive and acknowledge messages from the JMS broker. In this mode, acknowledging any message will acknowledge every message received (see section 6.2.10 in the JMS 2.0 Specification). To prevent messages from being prematurely acknowledged, the connector processes only one message at time. In other words, the connector will not try to receive new messages until the last message is committed to a Kafka topic. This might compromise the throughput of the connector, but messages will be transferred to Kafka successfully.

Client Libraries

The Kafka Connect ActiveMQ connector includes all the libraries required to work with ActiveMQ, so there is nothing else to install.

JMS Message types

The connector currently supports only TextMessage and BytesMessage but does not currently support ObjectMessage or StreamMessage .

Connecting to ActiveMQ

This connector connects directly to ActiveMQ using a connection URL for your messaging system, using the ActiveMQ client libraries included with the connector.

The following example shows a typical configuration of the connector for use with distributed mode:

{
  "name": "connector1",
  "config": {
    "connector.class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
    "kafka.topic":"MyKafkaTopicName",
    "activemq.url":"tcp://localhost:61616",
    "jms.destination.name":"testing",
    "jms.destination.type":"queue",
    "confluent.license":"",
    "confluent.topic.bootstrap.servers":"localhost:9092"
  }
}

The connector supports other configuration options not included in the previous example.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Topics

This connector consumes messages from ActiveMQ using the configured message selectors and writes them to a single Kafka topic. If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria.

Schemas

The ActiveMQ Connector produces messages with keys and values that adhere to the schemas described in the following sections.

io.confluent.connect.jms.Key

This schema is used to store the incoming MessageID on the message interface. This will ensure that when that if the same message id arrives it will end up in the same partition. In practice this should never occur. The schema defines the following fields:

Name Schema Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().

io.confluent.connect.jms.Value

This schema is used to store the value of the JMS message. The schema defines the following fields:

Name Schemna Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().
messageType STRING yes   This field stores the type of message that was received. This corresponds to the sub-interfaces of Message. BytesMessage = bytes, MapMessage = map, ObjectMessage = object, StreamMessage = stream and TextMessage = text. The corresponding field will be populated with the values from the respective message sub-interface.
timestamp INT64 yes   Data from the getJMSTimestamp() method.
deliveryMode INT32 yes   This field stores the value of Message.getJMSDeliveryMode(). method.
correlationID STRING no   This field stores the value of Message.getJMSCorrelationID(). method.
replyTo Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
destination Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
redelivered BOOLEAN yes   This field stores the value of Message.getJMSRedelivered().
type STRING no   This field stores the value of Message.getJMSType().
expiration INT64 no   This field stores the value of Message.getJMSExpiration().
priority INT32 no   This field stores the value of Message.getJMSPriority().
properties Map of STRING, PropertyValue yes   This field stores the data from all of the properties for the Message indexed by their propertyName.
bytes BYTES no   This field stores the value from BytesMessage.html.readBytes(byte[]).
map Map of STRING, PropertyValue no   This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key.
text STRING no   This field stores the value from TextMessage.html.getText().

io.confluent.connect.jms.Destination

This schema is used to represent a JMS Destination, and is either queue or topic.

The schema defines the following fields:

Name Schema Required Default Value Documentation
destinationType STRING yes   The type of JMS Destination, and either queue or topic.
name STRING yes   The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName().

io.confluent.connect.jms.PropertyValue

This schema is used to store the data that is found in the properties of the message. To ensure that the proper type mappings are preserved field propertyType stores the value type for the field. The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by Message.getObjectProperty(). The schema defines the following fields:

Name Schema Required Default Value Documentation
propertyType STRING yes   The Java type of the property on the Message. One of boolean, byte, short, integer, long, float, double, or string.
boolean BOOLEAN no   The value stored as a boolean. Null unless propertyType is set to boolean.
byte INT8 no   The value stored as a byte. Null unless propertyType is set to byte.
short INT16 no   The value stored as a short. Null unless propertyType is set to short.
integer INT32 no   The value stored as a integer. Null unless propertyType is set to integer.
long INT64 no   The value stored as a long. Null unless propertyType is set to long.
float FLOAT32 no   The value stored as a float. Null unless propertyType is set to float.
double FLOAT64 no   The value stored as a double. Null unless propertyType is set to double.
string STRING no   The value stored as a string. Null unless propertyType is set to string.