JMS Source Connector for Confluent Platform¶
The Kafka Connect JMS Source connector is used to move messages from any JMS-compliant broker into Apache Kafka®. It supports any traditional JMS Broker, such as IBM MQ, ActiveMQ, TIBCO EMS, and Solace Appliance.
This connector uses JNDI to connect to the JMS broker, consume messages from the specified topic or queue, and write them into the specified Kafka topic.
Note
- The IBM MQ Source, ActiveMQ Source, and TIBCO Source connectors are also available.
- These are specializations of the JMS Source connector that avoid JNDI and instead use system-specific APIs to establish connections. These are often easier to configure and use in most situations.
Features¶
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Multiple tasks¶
The JMS Source connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
Support for JMS 2.0¶
Starting with version 11.0.0, the JMS connector is adding support for clients and servers compatible with JMS 2.0. This upgrade broadens the set of JMS systems that the connector can connect to. The upgrade will also allow the connector to add support for additional features enabled by JMS 2.0, such as shared subscriptions and other capabilities.
Starting with version 11.0.0, the connector is also dropping support for JMS servers and clients compatible with JMS 1.1 and earlier. JMS 1.1 will still be supported in versions prior to 11.0.0.
Install the JMS Source Connector¶
You can install this connector by using the Confluent Hub Client installation instructions, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- An installation of the JMS client library JAR files. For help with downloading the JAR files, see the Client libraries section.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-jms:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-jms:12.2.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license and for information about the license topic, see License topic configuration.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for JMS Source Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Client libraries¶
The Kafka Connect JMS connector works with any JMS-compliant system, but it
does not come with client libraries. Instead, you must download the JMS client
library JARs for your system and add them into the
share/java/kafka-connect-jms
directory in each of the Confluent Platform installations. If
you plan to use several JMS Source connectors for different types of JMS
systems, you must install all the client libraries for those systems into the
same location. Also, ensure the libraries don’t clash with each other.
Note
For clients and servers running QPID JMS, the JMS connector supports sourcing messages to Kafka only from JMS Queues. Sourcing messages from JMS Topics might work for specific use cases but is currently unsupported. Additionally, for QPID JMS, this connector supports only the set of types described in the following section. It doesn’t support the conversion of other AMQP data types supported by QPID.
Note
As described in Installing Connect Plugins, connector plugin JAR files
are placed in the plugin path (Connect worker
property: plugin.path
). However, a few connectors may require that you
additionally export the CLASSPATH
to the plugin JAR files when starting
the connector (export CLASSPATH=<path-to-jar-files>
). While not
recommended, CLASSPATH
is required for these connectors because
Kafka Connect uses classloading isolation to distinguish between system
classes and regular classes, and some plugins load system classes (for
example, javax.naming
and others in the package javax
). An example
error message showing this issue is provided below. If you see an error that
resembles the example below, in addition to adding the plugin path, you must also export CLASSPATH=<path-to-jar-files>
when starting the connector.
Caused by: javax.naming.NoInitialContextException:
Cannot instantiate class: com.tibco.tibjms.naming.TibjmsInitialContextFactory
[Root exception is java.lang.ClassNotFoundException: com.tibco.tibjms.naming.TibjmsInitialContextFactory]
JMS Message types¶
The connector currently supports the following message types:
Note
The JMS Source connector only supports primitives types for values in a MapMessage and doesn’t support the ObjectMessage message type.
Support for AMQP types in JMS propertyValues¶
With Qpid Proton-j, the connector can support AMQP types when they appear in the JMS propertyValue of a JMS message. The AMQP types are mapped to Java types as follows:
AMQP Type | Java Type |
---|---|
UnsignedByte | byte |
UnsignedShort | short |
UnsignedInteger | int |
UnsignedLong | long |
Decimal32 | float |
Decimal64 | double |
Decimal128 | double |
Binary | byte[] |
Symbol | String |
If either an unsigned AMQP type or a Decimal128 exceeds the limit of the corresponding signed Java type, overflow is expected to occur. Because of Qpid Proton-J limitations, Decimal32 is mapped to float; Decimal64 and Decimal128 are mapped to double. Due to this limitation and the resulting Decimal type mapping, there may be precision loss for AMQP Decimal type propertyValues.
JNDI ConnectionFactory¶
This connector uses JNDI to create an instance of the JMS ConnectionFactory for your messaging system. Because of this, you must ensure that the relevant client JARs for your messaging system are in the classpath along side this connector.
Use the JMS Source connector with TIBCO EMS¶
You can use the JMS Source connector with TIBCO EMS and its support for JMS. Note that this is a specialization of the connector that avoids JNDI and instead uses system-specific APIs to establish connections. This is often easier to configure and use in most cases. To get started, you must install the latest TIBCO EMS JMS client libraries into the same directory where this connector is installed. For more details, see the TIBCO EMS product documentation
Next, you must create a connector configuration for your environment, using the appropriate configuration properties. The following example shows a typical configuration of the connector for use with distributed mode.
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.jms.JmsSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"jms.destination.name":"MyQueueName",
"jms.destination.type":"queue",
"java.naming.factory.initial":"com.tibco.tibjms.naming.TibjmsInitialContextFactory",
"java.naming.provider.url":"tibjmsnaming://<host>:<port>"
"confluent.license":""
"confluent.topic.bootstrap.servers":"localhost:9092"
"confluent.topic.ssl.truststore.location"="omitted"
"confluent.topic.ssl.truststore.password"="<password>"
"confluent.topic.ssl.keystore.location"="omitted"
"confluent.topic.ssl.keystore.password"="<password>"
"confluent.topic.ssl.key.password"="<password>"
"confluent.topic.security.protocol"="SSL"
}
}
Note that any extra properties defined on the connector will be passed into the JNDI InitialContext. This makes it easy to use any TIBCO EMS specific settings.
Finally, deploy your connector by posting it to a Kafka Connect distributed worker.
Connect to IBM MQ using LDAP¶
The IBM MQ is available for download from Confluent Hub. If possible, you should use the IBM MQ Source connector instead of the general JMS connector. However, you may want to use the more general connector if you are required to connect to IBM MQ using LDAP, or any other JNDI mechanism.
To get started, you must install the latest IBM MQ JMS client libraries into the same directory where this connector is installed. For more details, see the IBM MQ installation documentation for more details.
Next, create a connector configuration for your environment using the appropriate configuration properties. The following example shows a typical but incomplete configuration of the connector for use with distributed mode.
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.jms.JmsSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"jms.destination.name":"MyQueueName",
"jms.destination.type":"queue",
"java.naming.factory.initial":"com.sun.jndi.ldap.LdapCtxFactory",
"java.naming.provider.url":"ldap://<ldap_url>"
"java.naming.security.principal":"MyUserName",
"java.naming.security.credentials":"MyPassword",
"confluent.license":""
"confluent.topic.bootstrap.servers":"localhost:9092"
}
}
Note that any extra properties defined on the connector will be passed into the JNDI InitialContext. This makes it easy to pass any IBM MQ specific settings used for connecting to the IBM MQ broker.
Finally, deploy your connector by posting it to a Kafka Connect distributed worker.
Topics¶
This connector consumes messages from the JMS broker using the configured message selectors and writes them to a single Kafka topic. If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria.
Acknowledgement mode¶
The connector internally uses CLIENT_ACKNOWLEDGE
mode to receive and
acknowledge messages from the JMS broker. In this mode, acknowledging any
message will acknowledge every message received (see section 6.2.10 in the JMS
Spec). To
prevent messages from being prematurely acknowledged, the connector processes
only one message at time. In other words, the connector will not attempt to
receive new messages until the last message is committed to a Kafka topic. This
might compromise the throughput of the Connector, but messages will be
transferred to Kafka successfully.
Schemas¶
The connector produces Kafka messages with keys and values that adhere to the schemas described in the following sections.
io.confluent.connect.jms.Key¶
This schema stores the incoming MessageID on the message interface. This ensures that if the same message ID arrives, which is unlikely, it will end up in the same Kafka partition. The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). |
io.confluent.connect.jms.Value¶
This schema stores the value of the JMS message. The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). | |
messageType | STRING | yes | This field stores the type of message that was received. This corresponds
to the sub-interfaces of Message.
BytesMessage =
bytes , MapMessage = map ,
ObjectMessage =
object , StreamMessage =
stream and TextMessage =
text . The corresponding field will be populated with the values from the
respective message sub-interface. |
|
timestamp | INT64 | yes | Data from the getJMSTimestamp() method. | |
deliveryMode | INT32 | yes | This field stores the value of Message.getJMSDeliveryMode(). method. | |
correlationID | STRING | no | This field stores the value of Message.getJMSCorrelationID(). method. | |
replyTo | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
destination | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
redelivered | BOOLEAN | yes | This field stores the value of Message.getJMSRedelivered(). | |
type | STRING | no | This field stores the value of Message.getJMSType(). | |
expiration | INT64 | no | This field stores the value of Message.getJMSExpiration(). | |
priority | INT32 | no | This field stores the value of Message.getJMSPriority(). | |
properties | Map of STRING, PropertyValue | yes | This field stores the data from all of the properties for the Message indexed by their propertyName. | |
bytes | BYTES | no | This field stores the value from BytesMessage.html.readBytes(byte[]). | |
map | Map of STRING, PropertyValue | no | This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key. | |
text | STRING | no | This field stores the value from TextMessage.html.getText(). |
io.confluent.connect.jms.Destination¶
This schema represents a JMS Destination, and is either queue or topic. The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
destinationType | STRING | yes | The type of JMS Destination, and either queue or topic . |
|
name | STRING | yes | The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue¶
This schema stores the data found in the properties of the message. To ensure
that type mappings are preserved, propertyType
stores the type of the field.
The corresponding field in the schema will contain the data for the property.
This ensures that the data is retrievable as the type returned by
Message.getObjectProperty().
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
propertyType | STRING | yes | The Java type of the property on the Message. One of boolean , byte ,
short , integer , long , float , double , or string . |
|
boolean | BOOLEAN | no | The value stored as a boolean. Null unless propertyType is set to boolean . |
|
byte | INT8 | no | The value stored as a byte. Null unless propertyType is set to byte . |
|
short | INT16 | no | The value stored as a short. Null unless propertyType is set to short . |
|
integer | INT32 | no | The value stored as a integer. Null unless propertyType is set to integer . |
|
long | INT64 | no | The value stored as a long. Null unless propertyType is set to long . |
|
float | FLOAT32 | no | The value stored as a float. Null unless propertyType is set to float . |
|
double | FLOAT64 | no | The value stored as a double. Null unless propertyType is set to double . |
|
string | STRING | no | The value stored as a string. Null unless propertyType is set to string . |