Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
JMS Source Connector¶
The Confluent JMS Source Connector is used to move messages from any JMS-compliant broker into Kafka. It supports any traditional JMS Broker, such as IBM MQ , ActiveMQ , TIBCO EMS , and Solace Appliance . This connector uses JNDI to connect to the JMS broker, consume messages from the specified topic or queue, and write them into the specified Kafka topic.
Note
See also the IBM MQ connector and ActiveMQ connector that are also included in the Confluent Platform. These are specializations of this connector that avoid JNDI and instead use system-specific APIs to establish connections. These are often easier to configure and use in most situations.
Client Libraries¶
The Confluent JMS connector works with any JMS compliant system, but it does not come with client libraries.
Instead, you must download the JMS client library JARs for your system and add them into the share/java/kafka-connect-jms
directory in each of the Confluent Platform installations.
If you need plan to use multiple JMS source connectors to different types of JMS systems, all of the client libraries for those systems should be installed into the same location.
Just be careful the libraries don’t clash with each other.
License Key¶
Without the license key, you can use the JMS Connector for a 30-day trial period. Simply remove the confluent.license
from your connector configuration file or leave that property blank.
See the configuration options for more details.
If you are a Confluent customer, you can contact customer support and ask for a JMS Connector license key. Then add confluent.license
configuration to the JMS Connector configuration file (see below) followed by the key you received from Confluent support.
JMS Message types¶
The connector currently supports only TextMessage and BytesMessage but does not currently support ObjectMessage or StreamMessage .
Acknowledgement Mode¶
The connector internally uses CLIENT_ACKNOWLEDGE
mode to receive and acknowledge messages from the JMS broker. In this mode, acknowledging any message will acknowledge every message received (see section 6.2.10 in the JMS Spec
). To prevent messages from being prematurely acknowledged, the connector processes only one message at time. In other words, the connector will not attempt to receive new messages until the last message is committed to a Kafka topic. This might compromise the throughput of the Connector, but messages will be transferred to Kafka successfully.
JNDI Connection Factory¶
This connector uses JNDI to create an instance of the JMS ConnectionFactory for your messaging system. Because of this you must ensure that the relevant client jars for your messaging system are in the classpath along side this connector.
Using with TIBCO EMS¶
This connector can be used with TIBCO EMS and its support for JMS. First, install the latest TIBCO EMS JMS client libraries into the same directory where this connector is installed. Check the TIBCO EMS installation or documentation for more details.
Then, create a connector configuration for your environment, using the appropriate configuration properties. The following example shows a typical configuration of the connector for use with distributed mode.
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.jms.JmsSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"jms.destination.name":"MyQueueName",
"jms.destination.type":"queue",
"java.naming.factory.initial":"com.tibco.tibjms.naming.TibjmsInitialContextFactory",
"java.naming.provider.url":"tibjmsnaming://<host>:<port>"
"confluent.license":""
"confluent.topic.bootstrap.servers":"localhost:9092"
}
}
Note
Any extra properties defined on the connector will be passed into the JNDI InitialContext. This makes it easy to use any TIBCO EMS specific settings.
Finally, deploy your connector by posting it to a Kafka Connect distributed worker.
Using with IBM MQ via LDAP¶
The Confluent Platform includes a custom connector for IBM MQ that is very easy to configure, and where possible you should use that connector instead of this general JMS connector. However, you may want to use this more general connector if you are required to connect to IBM MQ via LDAP or other JNDI mechanism.
First, install the latest IBM MQ JMS client libraries into the same directory where this connector is installed. Check your IBM installation or documentation for more details.
Then, create a connector configuration for your environment, using the appropriate configuration properties. The following example shows a typical but incomplete configuration of the connector for use with distributed mode.
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.jms.JmsSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"jms.destination.name":"MyQueueName",
"jms.destination.type":"queue",
"java.naming.factory.initial":"com.sun.jndi.ldap.LdapCtxFactory",
"java.naming.provider.url":"ldap://<ldap_url>"
"java.naming.security.principal":"MyUserName",
"java.naming.security.credentials":"MyPassword",
"confluent.license":""
"confluent.topic.bootstrap.servers":"localhost:9092"
}
}
Note
Any extra properties defined on the connector will be passed into the JNDI InitialContext. This makes it easy to pass any IBM MQ specific settings used for connecting to the IBM MQ broker.
Finally, deploy your connector by posting it to a Kafka Connect distributed worker.
Topics¶
This connector consumes messages from the JMS broker using the configured message selectors and writes them to a single Kafka topic. If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria.
Schemas¶
The JMS connector produces messages with keys and values that adhere to the schemas described in the following sections.
io.confluent.connect.jms.Key¶
This schema is used to store the incoming MessageID on the message interface. This will ensure that when that if the same message id arrives it will end up in the same partition. In practice this should never occur.
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). |
io.confluent.connect.jms.Value¶
This schema is used to store the value of the JMS message.
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). | |
messageType | STRING | yes | This field stores the type of message that was received. This corresponds to the subinterfaces of Message. BytesMessage = bytes, MapMessage = map, ObjectMessage = object, StreamMessage = stream and TextMessage = text. The corresponding field will be populated with the values from the respective Message subinterface. | |
timestamp | INT64 | yes | Data from the getJMSTimestamp() method. | |
deliveryMode | INT32 | yes | This field stores the value of Message.getJMSDeliveryMode(). | |
correlationID | STRING | no | This field stores the value of Message.getJMSCorrelationID(). | |
replyTo | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
destination | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
redelivered | BOOLEAN | yes | This field stores the value of Message.getJMSRedelivered(). | |
type | STRING | no | This field stores the value of Message.getJMSType(). | |
expiration | INT64 | yes | This field stores the value of Message.getJMSExpiration(). | |
priority | INT32 | yes | This field stores the value of Message.getJMSPriority(). | |
properties | Map of STRING, PropertyValue | yes | This field stores the data from all of the properties for the Message indexed by their propertyName. | |
bytes | BYTES | no | This field stores the value from BytesMessage.html.readBytes(byte[]). | |
map | Map of STRING, PropertyValue | no | This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key. | |
text | STRING | no | This field stores the value from TextMessage.html.getText(). |
io.confluent.connect.jms.Destination¶
This schema is used to represent a JMS Destination, and is either queue or topic.
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
destinationType | STRING | yes | The type of JMS Destination, and either queue or topic . |
|
name | STRING | yes | The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue¶
This schema is used to store the data that is found in the properties of the message. To ensure that the proper type mappings are preserved field propertyType
stores the value type for the field. The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by Message.getObjectProperty().
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
propertyType | STRING | yes | The java type of the property on the Message. One of boolean , byte , short , integer , long , float , double , or string . |
|
boolean | BOOLEAN | no | The value stored as a boolean. Null unless propertyType is set to boolean . |
|
byte | INT8 | no | The value stored as a byte. Null unless propertyType is set to byte . |
|
short | INT16 | no | The value stored as a short. Null unless propertyType is set to short . |
|
integer | INT32 | no | The value stored as a integer. Null unless propertyType is set to integer . |
|
long | INT64 | no | The value stored as a long. Null unless propertyType is set to long . |
|
float | FLOAT32 | no | The value stored as a float. Null unless propertyType is set to float . |
|
double | FLOAT64 | no | The value stored as a double. Null unless propertyType is set to double . |
|
string | STRING | no | The value stored as a string. Null unless propertyType is set to string . |