Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

JMS Source Connector for Confluent Platform

The Kafka Connect JMS Source Connector is used to move messages from any JMS-compliant broker into Apache Kafka®. It supports any traditional JMS Broker, such as IBM MQ , ActiveMQ , TIBCO EMS , and Solace Appliance . This connector uses JNDI to connect to the JMS broker, consume messages from the specified topic or queue, and write them into the specified Kafka topic.

Note

  • IBM MQ, ActiveMQ, and TIBCO Source Connector Source Connectors are also available.
  • The IBM MQ and ActiveMQ connectors are included in Confluent Platform.
  • These are specializations of the JMS Source Connector that avoid JNDI and instead use system-specific APIs to establish connections. These are often easier to configure and use in most situations.

Install the JMS Source Connector

Important

This connector is bundled natively with Confluent Platform. If you have Confluent Platform installed and running, there are no additional steps required to install.

If you are using Confluent Platform using only Confluent Community components, you can install the connector using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-jms:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-jms:5.5.15

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see JMS Source Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.

Client Libraries

The Kafka Connect JMS connector works with any JMS-compliant system, but it does not come with client libraries. Instead, you must download the JMS client library JARs for your system and add them into the share/java/kafka-connect-jms directory in each of the Confluent Platform installations. If you plan to use multiple JMS source connectors to different types of JMS systems, all of the client libraries for those systems should be installed into the same location. However, make sure the libraries don’t clash with each other.

Note

As described in Installing Connect Plugins, connector plugin JAR files are placed in the plugin path (Connect worker property: plugin.path). However, a few connectors may require that you additionally export the CLASSPATH to the plugin JAR files when starting the connector (export CLASSPATH=<path-to-jar-files>). While not recommended, CLASSPATH is required for these connectors because Kafka Connect uses classloading isolation to distinguish between system classes and regular classes, and some plugins load system classes (for example, javax.naming and others in the package javax). An example error message showing this issue is provided below. If you see an error that resembles the example below, in addition to adding the plugin path, you must also export CLASSPATH=<path-to-jar-files> when starting the connector.

Caused by: javax.naming.NoInitialContextException:
Cannot instantiate class: com.tibco.tibjms.naming.TibjmsInitialContextFactory
[Root exception is java.lang.ClassNotFoundException: com.tibco.tibjms.naming.TibjmsInitialContextFactory]

JMS Message types

The connector currently supports the following message types:

Note

Currently, the JMS source connector only supports primitives types for values in a MapMessage.

JNDI Connection Factory

This connector uses JNDI to create an instance of the JMS ConnectionFactory for your messaging system. Because of this you must ensure that the relevant client jars for your messaging system are in the classpath along side this connector.

Using with TIBCO EMS

Note

A TIBCO Source Connector is available for installation from Confluent Hub.

This is a specialization of this connector that avoids JNDI and instead uses system-specific APIs to establish connections. This is often easier to configure and use in most situations.

This connector can be used with TIBCO EMS and its support for JMS. First, install the latest TIBCO EMS JMS client libraries into the same directory where this connector is installed. Check the TIBCO EMS installation or documentation for more details.

Then, create a connector configuration for your environment, using the appropriate configuration properties. The following example shows a typical configuration of the connector for use with distributed mode.

{
  "name": "connector1",
  "config": {
    "connector.class": "io.confluent.connect.jms.JmsSourceConnector",
    "kafka.topic":"MyKafkaTopicName",
    "jms.destination.name":"MyQueueName",
    "jms.destination.type":"queue",
    "java.naming.factory.initial":"com.tibco.tibjms.naming.TibjmsInitialContextFactory",
    "java.naming.provider.url":"tibjmsnaming://<host>:<port>"
    "confluent.license":""
    "confluent.topic.bootstrap.servers":"localhost:9092"
    "confluent.topic.ssl.truststore.location"="omitted"
    "confluent.topic.ssl.truststore.password"="<password>"
    "confluent.topic.ssl.keystore.location"="omitted"
    "confluent.topic.ssl.keystore.password"="<password>"
    "confluent.topic.ssl.key.password"="<password>"
    "confluent.topic.security.protocol"="SSL"
  }
}

Note

Any extra properties defined on the connector will be passed into the JNDI InitialContext. This makes it easy to use any TIBCO EMS specific settings.

Finally, deploy your connector by posting it to a Kafka Connect distributed worker.

Using with IBM MQ via LDAP

The Confluent Platform includes a custom connector for IBM MQ that is very easy to configure, and where possible you should use that connector instead of this general JMS connector. However, you may want to use this more general connector if you are required to connect to IBM MQ via LDAP or other JNDI mechanism.

First, install the latest IBM MQ JMS client libraries into the same directory where this connector is installed. Check your IBM installation or documentation for more details.

Then, create a connector configuration for your environment, using the appropriate configuration properties. The following example shows a typical but incomplete configuration of the connector for use with distributed mode.

{
  "name": "connector1",
  "config": {
    "connector.class": "io.confluent.connect.jms.JmsSourceConnector",
    "kafka.topic":"MyKafkaTopicName",
    "jms.destination.name":"MyQueueName",
    "jms.destination.type":"queue",
    "java.naming.factory.initial":"com.sun.jndi.ldap.LdapCtxFactory",
    "java.naming.provider.url":"ldap://<ldap_url>"
    "java.naming.security.principal":"MyUserName",
    "java.naming.security.credentials":"MyPassword",
    "confluent.license":""
    "confluent.topic.bootstrap.servers":"localhost:9092"
  }
}

Note

Any extra properties defined on the connector will be passed into the JNDI InitialContext. This makes it easy to pass any IBM MQ specific settings used for connecting to the IBM MQ broker.

Finally, deploy your connector by posting it to a Kafka Connect distributed worker.

Topics

This connector consumes messages from the JMS broker using the configured message selectors and writes them to a single Kafka topic. If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria.

Acknowledgement Mode

The connector internally uses CLIENT_ACKNOWLEDGE mode to receive and acknowledge messages from the JMS broker. In this mode, acknowledging any message will acknowledge every message received (see section 6.2.10 in the JMS Spec). To prevent messages from being prematurely acknowledged, the connector processes only one message at time. In other words, the connector will not attempt to receive new messages until the last message is committed to a Kafka topic. This might compromise the throughput of the Connector, but messages will be transferred to Kafka successfully.

Schemas

The connector produces Kafka messages with keys and values that adhere to the schemas described in the following sections.

io.confluent.connect.jms.Key

This schema stores the incoming MessageID on the message interface. This ensures that if the same message ID arrives, which is unlikely, it will end up in the same Kafka partition.

The schema defines the following fields:

Name Schema Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().

io.confluent.connect.jms.Value

This schema stores the value of the JMS message.

The schema defines the following fields:

Name Schema Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().
messageType STRING yes   This field stores the type of message that was received. This corresponds to the subinterfaces of Message. BytesMessage = bytes, MapMessage = map, ObjectMessage = object, StreamMessage = stream and TextMessage = text. The corresponding field will be populated with the values from the respective Message subinterface.
timestamp INT64 yes   Data from the getJMSTimestamp() method.
deliveryMode INT32 yes   This field stores the value of Message.getJMSDeliveryMode().
correlationID STRING no   This field stores the value of Message.getJMSCorrelationID().
replyTo Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
destination Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
redelivered BOOLEAN yes   This field stores the value of Message.getJMSRedelivered().
type STRING no   This field stores the value of Message.getJMSType().
expiration INT64 yes   This field stores the value of Message.getJMSExpiration().
priority INT32 yes   This field stores the value of Message.getJMSPriority().
properties Map of STRING, PropertyValue yes   This field stores the data from all of the properties for the Message indexed by their propertyName.
bytes BYTES no   This field stores the value from BytesMessage.html.readBytes(byte[]).
map Map of STRING, PropertyValue no   This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key.
text STRING no   This field stores the value from TextMessage.html.getText().

io.confluent.connect.jms.Destination

This schema represents a JMS Destination, and is either queue or topic.

The schema defines the following fields:

Name Schema Required Default Value Documentation
destinationType STRING yes   The type of JMS Destination, and either queue or topic.
name STRING yes   The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName().

io.confluent.connect.jms.PropertyValue

This schema stores the data found in the properties of the message. To ensure that type mappings are preserved, propertyType stores the type of the field. The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by Message.getObjectProperty().

The schema defines the following fields:

Name Schema Required Default Value Documentation
propertyType STRING yes   The java type of the property on the Message. One of boolean, byte, short, integer, long, float, double, or string.
boolean BOOLEAN no   The value stored as a boolean. Null unless propertyType is set to boolean.
byte INT8 no   The value stored as a byte. Null unless propertyType is set to byte.
short INT16 no   The value stored as a short. Null unless propertyType is set to short.
integer INT32 no   The value stored as a integer. Null unless propertyType is set to integer.
long INT64 no   The value stored as a long. Null unless propertyType is set to long.
float FLOAT32 no   The value stored as a float. Null unless propertyType is set to float.
double FLOAT64 no   The value stored as a double. Null unless propertyType is set to double.
string STRING no   The value stored as a string. Null unless propertyType is set to string.