IBM MQ Source Connector for Confluent Platform
The Kafka Connect IBM MQ Source Connector is used to read messages from an IBM MQ cluster and write them to a Apache Kafka® topic.
Note
Confluent Platform also includes a general JMS Source Connector for Confluent Platform that uses a JNDI-based mechanism to connect to the JMS broker. If you have to use JNDI to connect to your JMS broker, consider using the source connector instead.
The 11.0.x version of this is connector does not support at-least-once semantics and is no longer supported. To upgrade to a 12.0.x version, see Migrate from 11.0.x to 12.0.x.
Features
The IBM MQ Source connector includes the following features:
At least once delivery
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Multiple tasks
The IBM MQ Source connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
JMS message types
The connector currently supports TextMessage and BytesMessage. The connector does not currently support ObjectMessage or StreamMessage.
Retries and reconnection
The IBM MQ Source connector uses the general retry policy implemented for most Confluent connectors; however, the IBM MQ Source connector uses exponential backoff after each retry attempt. The backoff time is the time between retries and is a random value between zero and the exponentially increasing bound:
initialbackoffTime * 2 ^ (retry-1)
The exponential bound is capped at one minute, and the initial backoff time is
100 milliseconds (ms). While the total amount of retries attempted is not
configurable in the IBM MQ Source connector, the maximum total time spent
retrying before failure can be configured by the max.retry.time
configuration property. The max.retry.time
configuration property sets the maximum time in milliseconds that the connector
will attempt to retry–the default value is 3600000 ms (one hour).
The following errors will trigger a retry:
MQRC_GET_INHIBITEDMQRC_BACKED_OUTMQRC_CHANNEL_NOT_AVAILABLEMQRC_CONNECTION_BROKENMQRC_HOST_NOT_AVAILABLEMQRC_NOT_AUTHORIZEDMQRC_Q_MGR_NOT_AVAILABLEMQRC_Q_MGR_QUIESCINGMQRC_Q_MGR_STOPPINGMQRC_UNEXPECTED_ERROR
With the exception of MQRC_GET_INHIBITED, all of the above errors will first
shut down the connection and then reconnect before retrying.
Message acknowledgement
The IBM MQ Source connector sends an acknowledgement message when one of the following occurs:
The
max.poll.durationis breached.Message Queue (MQ) no longer has any more records.
The
max.pending.messagesconfiguration limit has been breached.
To ensure each batch sends an acknowledgement, you must set batch.size and
max.pending.messages to an equal value.
License
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent license properties for license properties and information about the license topic.
Configuration Properties
For a complete list of configuration properties for this connector, see IBM MQ Source Connector Configuration Properties. To understand how the connector internally configures the acknowledgement mode, see the following section.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Acknowledgement mode
The connector internally uses CLIENT_ACKNOWLEDGE mode to receive and
acknowledge messages from the JMS broker. In this mode, acknowledging any
message will acknowledge every message received (see section 6.2.10 in the JMS
2.0 Specification). To prevent
messages from being prematurely acknowledged, the connector processes only one
message at time. In other words, the connector will not try to receive new
messages until the last message is committed to a Kafka topic. This might
compromise the throughput of the connector, but messages will be transferred to
Kafka successfully.
Migrate from 11.0.x to 12.0.x
Important
This section is only for users who have previously deployed a 11.0.x version of the IBM MQ Source connector.
You can migrate to a newer version of the IBM MQ Source connector–that is, from a 11.0.x version to a 12.0.x version–to leverage at-least-once semantics using the following steps:
Step 1: Shut down Connect
Shut down Connect using the Shutting Down Kafka Connect instructions.|kconnect| performs classloading only during worker startup. To identify any newer plugins, you must restart the entire worker.
Step 2: Install a newer version of IBM MQ Source connector
To install a newer version of IBM MQ Source connector using the Confluent Hub Client run a command similar to the following:
confluent-hub install confluentinc/kafka-connect-ibmmq:12.0.x
This will install the 12.0.x version of IBM MQ Source connector. While running the previous command, ensure you choose the option to replace the existing connector.
To install an newer version of IBM MQ Source connector manually, follow the instructions in Installing plugins.
Step 3: Start Kafka Connect
For standalone mode, follow the steps in Standalone Mode.
For distributed mode, follow the steps in Distributed Mode.
For running Kafka in production, see Running Kafka in Production.
Install the IBM MQ Source Connector
You can install this connector by using the Confluent Hub Client installation instructions, or by manually downloading the ZIP file.
Prerequisites
You must install the connector on every machine where Connect will run.
If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
An installation of the IBM MQ client library JAR files. For help with downloading the JAR files, see the Client Libraries section.
Install the connector using Confluent Hub
To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent-hub install confluentinc/kafka-connect-ibmmq:latest
You can install a specific version by replacing latest with a version number
as shown in the following example:
confluent-hub install confluentinc/kafka-connect-ibmmq:11.0.2
Install the connector manually
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Client Libraries
To use the Kafka Connect IBM MQ connector, you must download the IBM MQ client library JAR files. Complete the following steps to get these JAR files.
Note
The exact list of JAR files from the IBM client library may vary depending on the Java software version.
Follow the instructions at the IBM Support page Getting IBM MQ classes for Java and JMS. Once completed, you should have a downloaded file named
9.2.0.3-IBM-MQ-Install-Java-All.jar(your version number may be different).To extract the JAR files, enter the following command:
java -jar 9.2.0.3-IBM-MQ-Install-Java-All.jar Before you can use, extract, or install IBM MQ V9.2, you must accept the terms of 1. IBM International License Agreement for Evaluation of Programs 2. IBM International Program License Agreement and additional license information. Please read the following license agreements carefully. ...omitted Target directory for product files? /tmp/ibmmq-jars/ Extracting files to /tmp/ibmmq-jars/wmq Successfully extracted all product files.
Copy all of the required JAR files to the same folder as the
kafka-connect-ibmmqJAR file and dependencies.cp /tmp/ibmmq-jars/wmq/JavaSE/* /u01/connectors/kafka-connect-ibmmq/
Important
These JAR files need to be copied for each of your Confluent Platform installations where you want to run this connector.
List the JAR files to verify that they were copied successfully.
ls -l /u01/connectors/kafka-connect-ibmmq/
Ensure you have the following files:
com.ibm.mq.allclient.jar
com.ibm.mq.traceControl.jar
jms.jar
Connecting to IBM MQ
Before you can use this connector, you must install the IBM MQ client JARs into this connector’s installation directory. See the IBM MQ documentation for details.
This connector connects directly to the IBM MQ using a number of configuration properties that should match your environment.
The following example shows a typical configuration of the connector for use with distributed mode:
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"mq.hostname":"localhost",
"mq.port":"61616",
"mq.transport.type":"client",
"mq.queue.manager":"QMA",
"mq.channel":"SYSTEM.DEF.SVRCONN",
"jms.destination.name":"testing",
"jms.destination.type":"queue",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"localhost:9092"
}
}
The connector supports other configuration options not included in the example above.
Topics
This connector consumes messages from IBM MQ using the configured message selectors and writes them to a single Kafka topic. If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria.
Schemas
The IBM MQ connector produces messages with keys and values that adhere to the schemas described in the following sections.
io.confluent.connect.jms.Key
This schema is used to store the incoming MessageID on the message interface. This will ensure that when that if the same message id arrives it will end up in the same partition. In practice this should never occur. The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
|---|---|---|---|---|
messageID |
STRING |
yes |
This field stores the value of Message.getJMSMessageID(). |
io.confluent.connect.jms.Value
This schema is used to store the value of the JMS message. The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
|---|---|---|---|---|
messageID |
STRING |
yes |
This field stores the value of Message.getJMSMessageID(). |
|
messageType |
STRING |
yes |
This field stores the type of message that was received. This corresponds
to the sub-interfaces of Message.
BytesMessage =
|
|
timestamp |
INT64 |
yes |
Data from the getJMSTimestamp() method. |
|
deliveryMode |
INT32 |
yes |
This field stores the value of Message.getJMSDeliveryMode(). method. |
|
correlationID |
STRING |
no |
This field stores the value of Message.getJMSCorrelationID(). method. |
|
replyTo |
no |
This schema is used to represent a JMS Destination, and is either queue or topic. |
||
destination |
no |
This schema is used to represent a JMS Destination, and is either queue or topic. |
||
redelivered |
BOOLEAN |
yes |
This field stores the value of Message.getJMSRedelivered(). |
|
type |
STRING |
no |
This field stores the value of Message.getJMSType(). |
|
expiration |
INT64 |
no |
This field stores the value of Message.getJMSExpiration(). |
|
priority |
INT32 |
no |
This field stores the value of Message.getJMSPriority(). |
|
properties |
Map of STRING, PropertyValue |
yes |
This field stores the data from all of the properties for the Message indexed by their propertyName. |
|
bytes |
BYTES |
no |
This field stores the value from BytesMessage.html.readBytes(byte[]). |
|
map |
Map of STRING, PropertyValue |
no |
This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key. |
|
text |
STRING |
no |
This field stores the value from TextMessage.html.getText(). |
io.confluent.connect.jms.Destination
This schema is used to represent a JMS Destination, and is either queue or topic. The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
|---|---|---|---|---|
destinationType |
STRING |
yes |
The type of JMS Destination, and either |
|
name |
STRING |
yes |
The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue
This schema is used to store the data that is found in the properties of the
message. To ensure that the proper type mappings are preserved field
propertyType stores the value type for the field. The corresponding field in
the schema will contain the data for the property. This ensures that the data is
retrievable as the type returned by Message.getObjectProperty().
The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
|---|---|---|---|---|
propertyType |
STRING |
yes |
The Java type of the property on the Message. One of |
|
boolean |
BOOLEAN |
no |
The value stored as a boolean. Null unless |
|
byte |
INT8 |
no |
The value stored as a byte. Null unless |
|
short |
INT16 |
no |
The value stored as a short. Null unless |
|
integer |
INT32 |
no |
The value stored as a integer. Null unless |
|
long |
INT64 |
no |
The value stored as a long. Null unless |
|
float |
FLOAT32 |
no |
The value stored as a float. Null unless |
|
double |
FLOAT64 |
no |
The value stored as a double. Null unless |
|
string |
STRING |
no |
The value stored as a string. Null unless |