Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka Connect IBM MQ Source Connector¶
The IBM MQ Source Connector is used to read messages from an IBM MQ cluster and write them to a Apache Kafka® topic.
Note
Confluent Platform also includes a general JMS source connector that uses a JNDI-based mechanism to connect to the JMS broker. If you have to use JNDI to connect to your JMS broker, consider using that connector instead.
Install the IBM MQ Source Connector¶
Important
This connector is bundled natively with Confluent Platform. If you have Confluent Platform installed and running, there are no additional steps required to install.
If you are using Confluent Platform using only Confluent Community components, you can install the connector using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run this command to install the latest (latest
) connector version.
The connector must be installed on every machine where Connect will be run.
confluent-hub install confluentinc/kafka-connect-ibmmq:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-ibmmq:5.1.4
Install Connector Manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Client Libraries¶
To use the Kafka Connect IBM MQ connector, you must download the IBM MQ client library JAR files. Complete the following steps to get these JAR files.
Note
The exact list of JAR files from the IBM client library may vary depending on the Java software version.
Follow the instructions at the IBM Support page Getting IBM MQ classes for Java and JMS. Once completed, you should have a downloaded file named
9.0.0.7-IBM-MQ-Install-Java-All.jar
(your version number may be different).To extract the JAR files, enter the following command:
java -jar 9.0.0.7-IBM-MQ-Install-Java-All.jar Before you can use, extract, or install IBM MQ V9.0, you must accept the terms of 1. IBM International License Agreement for Evaluation of Programs 2. IBM International Program License Agreement and additional license information. Please read the following license agreements carefully. ...omitted Target directory for product files? /tmp/ibmmq-jars/ Extracting files to /tmp/ibmmq-jars/wmq Successfully extracted all product files.
Copy all of the required JAR files to the same folder as the kafka-connect-ibmmq JAR file and dependencies.
cp /tmp/ibmmq-jars/wmq/JavaSE/* /u01/connectors/kafka-connect-ibmmq/
Important
These JAR files need to be copied for each of your Confluent Platform installations where you want to run this connector.
List the JAR files to verify that they were copied successfully.
ls -l /u01/connectors/kafka-connect-ibmmq/
Make sure that you have the following files:
- com.ibm.mq.allclient.jar
- com.ibm.mq.traceControl.jar
- jms.jar
JMS Message types¶
The connector currently supports only TextMessage and BytesMessage but does not currently support ObjectMessage or StreamMessage .
Connecting to IBM MQ¶
Before you can use this connector, you must install the IBM MQ client JARs into this connector’s installation directory. See the IBM MQ documentation for details.
This connector connects directly to the IBM MQ using a number of configuration properties that should match your environment.
The following example shows a typical configuration of the connector for use with distributed mode:
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"mq.hostname":"localhost",
"mq.port":"61616",
"mq.transport.type":"client",
"mq.queue.manager":"QMA",
"mq.channel":"SYSTEM.DEF.SVRCONN",
"jms.destination.name":"testing",
"jms.destination.type":"queue",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"localhost:9092"
}
}
The connector supports other configuration options not included in the example above.
Topics¶
This connector consumes messages from IBM MQ using the configured message selectors and writes them to a single Kafka topic. If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria.
Schemas¶
The IBM MQ connector produces messages with keys and values that adhere to the schemas described in the following sections.
io.confluent.connect.jms.Key¶
This schema is used to store the incoming MessageID on the message interface. This will ensure that when that if the same message id arrives it will end up in the same partition. In practice this should never occur.
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). |
io.confluent.connect.jms.Value¶
This schema is used to store the value of the JMS message.
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). | |
messageType | STRING | yes | This field stores the type of message that was received. This corresponds to the subinterfaces of Message. BytesMessage = bytes, MapMessage = map, ObjectMessage = object, StreamMessage = stream and TextMessage = text. The corresponding field will be populated with the values from the respective Message subinterface. | |
timestamp | INT64 | yes | Data from the getJMSTimestamp() method. | |
deliveryMode | INT32 | yes | This field stores the value of Message.getJMSDeliveryMode(). | |
correlationID | STRING | no | This field stores the value of Message.getJMSCorrelationID(). | |
replyTo | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
destination | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
redelivered | BOOLEAN | yes | This field stores the value of Message.getJMSRedelivered(). | |
type | STRING | no | This field stores the value of Message.getJMSType(). | |
expiration | INT64 | yes | This field stores the value of Message.getJMSExpiration(). | |
priority | INT32 | yes | This field stores the value of Message.getJMSPriority(). | |
properties | Map of STRING, PropertyValue | yes | This field stores the data from all of the properties for the Message indexed by their propertyName. | |
bytes | BYTES | no | This field stores the value from BytesMessage.html.readBytes(byte[]). | |
map | Map of STRING, PropertyValue | no | This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key. | |
text | STRING | no | This field stores the value from TextMessage.html.getText(). |
io.confluent.connect.jms.Destination¶
This schema is used to represent a JMS Destination, and is either queue or topic.
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
destinationType | STRING | yes | The type of JMS Destination, and either queue or topic . |
|
name | STRING | yes | The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue¶
This schema is used to store the data that is found in the properties of the message. To ensure that the proper type mappings are preserved field propertyType
stores the value type for the field. The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by Message.getObjectProperty().
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
propertyType | STRING | yes | The java type of the property on the Message. One of boolean , byte , short , integer , long , float , double , or string . |
|
boolean | BOOLEAN | no | The value stored as a boolean. Null unless propertyType is set to boolean . |
|
byte | INT8 | no | The value stored as a byte. Null unless propertyType is set to byte . |
|
short | INT16 | no | The value stored as a short. Null unless propertyType is set to short . |
|
integer | INT32 | no | The value stored as a integer. Null unless propertyType is set to integer . |
|
long | INT64 | no | The value stored as a long. Null unless propertyType is set to long . |
|
float | FLOAT32 | no | The value stored as a float. Null unless propertyType is set to float . |
|
double | FLOAT64 | no | The value stored as a double. Null unless propertyType is set to double . |
|
string | STRING | no | The value stored as a string. Null unless propertyType is set to string . |