Kafka Connect IBM MQ Connector

The IBM MQ Source Connector is used to read messages from an IBM MQ cluster and write them to a Kafka topic.

Note

Confluent Platform also includes a general JMS source connector that uses a JNDI-based mechanism to connect to the JMS broker. If you have to use JNDI to connect to your JMS broker, consider using that connector instead.

Install the IBM MQ Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Confluent Hub

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-ibmmq:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-ibmmq:5.0.0

Download

Download the ZIP file and extract it into a directory that is listed on the plugin path of the Connect worker configuration properties (e.g. plugin.path=/usr/local/share/kafka/plugins). This must be done on each of the installations where Connect will be run. For more information, see Installing Plugins.

Client Libraries

Before you use the Kafka Connect IBM MQ connector, you must first download the IBM client library JARs , run the installer to extract the libraries, and then copy/move the JAR files into the share/java/kafka-connect-ibmmq directory in each of your Confluent Platform installations where you want to run the connector.

License Key

Without the license key, you can use the IBM MQ Connector for a 30-day trial period. Simply remove the confluent.license from your connector configuration file or leave that property blank. See the configuration options for more details.

If you are a Confluent customer, you can contact customer support and ask for an IBM MQ Connector license key. Then add confluent.license configuration to the IBM MQ Connector configuration file (see below) followed by the key you received from Confluent support.

JMS Message types

The connector currently supports only TextMessage and BytesMessage but does not currently support ObjectMessage or StreamMessage .

Connecting to IBM MQ

Before you can use this connector, you must install the IBM MQ client JARs into this connector’s installation directory. See the IBM MQ documentation for details.

This connector connects directly to the IBM MQ using a number of configuration properties that should match your environment.

The following example shows a typical configuration of the connector for use with distributed mode:

{
  "name": "connector1",
  "config": {
    "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnectorConfig",
    "kafka.topic":"MyKafkaTopicName",
    "mq.hostname":"localhost",
    "mq.port":"61616",
    "mq.transport.type":"client",
    "mq.queue.manager":"QMA",
    "mq.channel":"SYSTEM.DEF.SVRCONN",
    "jms.destination.name":"testing",
    "jms.destination.type":"queue",
    "confluent.license":""
    "confluent.topic.bootstrap.servers":"localhost:9092"
  }
}

The connector supports other configuration options not included in the example above.

Topics

This connector consumes messages from IBM MQ using the configured message selectors and writes them to a single Kafka topic. If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria.

Schemas

The IBM MQ connector produces messages with keys and values that adhere to the schemas described in the following sections.

This schema is used to store the incoming MessageID on the message interface. This will ensure that when that if the same message id arrives it will end up in the same partition. In practice this should never occur.

The schema defines the following fields:

Name Schema Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().

This schema is used to store the value of the JMS message.

The schema defines the following fields:

Name Schema Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().
messageType STRING yes   This field stores the type of message that was received. This corresponds to the subinterfaces of Message. BytesMessage = bytes, MapMessage = map, ObjectMessage = object, StreamMessage = stream and TextMessage = text. The corresponding field will be populated with the values from the respective Message subinterface.
timestamp INT64 yes   Data from the getJMSTimestamp() method.
deliveryMode INT32 yes   This field stores the value of Message.getJMSDeliveryMode().
correlationID STRING no   This field stores the value of Message.getJMSCorrelationID().
replyTo Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
destination Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
redelivered BOOLEAN yes   This field stores the value of Message.getJMSRedelivered().
type STRING no   This field stores the value of Message.getJMSType().
expiration INT64 yes   This field stores the value of Message.getJMSExpiration().
priority INT32 yes   This field stores the value of Message.getJMSPriority().
properties Map of STRING, PropertyValue yes   This field stores the data from all of the properties for the Message indexed by their propertyName.
bytes BYTES no   This field stores the value from BytesMessage.html.readBytes(byte[]).
map Map of STRING, PropertyValue no   This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key.
text STRING no   This field stores the value from TextMessage.html.getText().

This schema is used to represent a JMS Destination, and is either queue or topic.

The schema defines the following fields:

Name Schema Required Default Value Documentation
destinationType STRING yes   The type of JMS Destination, and either queue or topic.
name STRING yes   The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName().

This schema is used to store the data that is found in the properties of the message. To ensure that the proper type mappings are preserved field propertyType stores the value type for the field. The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by Message.getObjectProperty().

The schema defines the following fields:

Name Schema Required Default Value Documentation
propertyType STRING yes   The java type of the property on the Message. One of boolean, byte, short, integer, long, float, double, or string.
boolean BOOLEAN no   The value stored as a boolean. Null unless propertyType is set to boolean.
byte INT8 no   The value stored as a byte. Null unless propertyType is set to byte.
short INT16 no   The value stored as a short. Null unless propertyType is set to short.
integer INT32 no   The value stored as a integer. Null unless propertyType is set to integer.
long INT64 no   The value stored as a long. Null unless propertyType is set to long.
float FLOAT32 no   The value stored as a float. Null unless propertyType is set to float.
double FLOAT64 no   The value stored as a double. Null unless propertyType is set to double.
string STRING no   The value stored as a string. Null unless propertyType is set to string.

Contents: