IBM MQ Sink Connector for Confluent Platform¶
The Kafka Connect IBM MQ Sink connector is used to move messages from Kafka to an IBM MQ cluster.
Important
- If you are required to use JNDI to connect to IBM MQ, there is a general JMS Sink connector for Confluent Platform available that uses a JNDI-based mechanism to connect to the JMS broker.
- With the IBM MQ Sink connector, you can set the following record headers to
be passed as a JMS message property:
JMSCorrelationId
. The header is ignored ifforward_kafka_key
is set astrue
.JMSReplyTo
. The supported syntax is:- Queue:
queue://queueName
orqueue://queueManager/queueName
- Topic:
topic://topicName
- Queue:
JMSType
Features¶
The IBM MQ Sink connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The IBM MQ Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and Confluent license properties for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for IBM MQ Sink Connector for Confluent Platform.
Install the IBM MQ Sink Connector¶
You can install this connector by using the Confluent Hub Client installation instructions, or by manually downloading the ZIP file.
Prerequisites¶
The following are required to install and run the Kafka Connect IBM MQ Sink connector:
- Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
- Kafka Connect: Confluent Platform 4.1.0 or later, or Kafka 1.1.0 or later (requires header support in Connect)
- IBM MQ 8.0.0 or later, or IBM MQ on Cloud service
- Java 1.8
- An installation of the IBM MQ client library JAR files. For help with downloading the JAR files, see the IBM MQ Client Library section.
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-ibmmq-sink:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-ibmmq-sink:2.1.4
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
IBM MQ Client Library¶
The Kafka Connect IBM MQ connector does not come with the IBM MQ client library.
If you are running a multi-node Connect cluster, the IBM MQ connector and IBM MQ client JAR must be installed on every Connect worker in the cluster. See below for details.
Install IBM MQ Client Library¶
This connector relies on a provided com.ibm.mq.allclient
client JAR
distributed by IBM. The connector will not run if you have not installed the
JAR on each Connect worker node.
The installation steps are:
- Follow IBM’s guide on Obtaining the IBM MQ classes for JMS separately
to download the IBM MQ client JAR. #. The installation should have created a
wmq/JavaSE
directory. From this directory, copy only thecom.ibm.mq.allclient.jar
file into theshare/java/kafka-connect-ibmmq-sink
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
Note
The share/java/kafka-connect-ibmmq-sink
directory mentioned above is for
Confluent Platform. If you are using a different installation, find the location of the
Confluent IBM MQ sink connector JAR files and place the IBM MQ client JAR
file into the same directory.
JMS Message Formats¶
The format of outgoing JMS Message values is configured with the
jms.message.format
property, using one of the following options:
string (default)¶
When using the string
Message Format, record values are run through
Values.convertToString(...)
from the Connect Data package and produced as a
JMS TextMessage
.
Primitive values are converted to their String equivalent and structured objects are transformed to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted.
Tip
Single Message Transformation can
be used with the configured jms.message.format
to transform the record
value to the desired string representation before the connector processes
each record.
avro¶
Record values are serialized without the Avro schema information and produced as
a JMS BytesMessage
. JMS consumers must have the schema to deserialize
the data.
Important
The connector attempts to infer the Avro schema for records that have no
schema. If the connector cannot infer the schema, the task is killed. If you
are processing data without a schema, consider using one of the other
jms.message.format
configurations.
json¶
Record values are converted to a UTF-8 encoded JSON representation and produced
as a JMS TextMessage
.
bytes¶
Record values are passed along in bytes form without any conversion.
Important
Record values must be converted to bytes form before the connector processes
them. Configure the value.converter
property to
org.apache.kafka.connect.converters.ByteArrayConverter
to ensure that
record values arrive in byte format.
Forwarding Kafka Properties to JMS¶
The connector can be configured to forward various values from the Kafka record to the JMS Message.
- Enable
jms.forward.kafka.key
to convert the record’s key to a String and forward it as theJMSCorrelationID
. - Enable
jms.forward.kafka.metadata
to forward the record’s topic, partition, and offset on JMS Message properties.- Kafka topic is applied to the message as a String property named
KAFKA_TOPIC
. - Partition is applied to the message as an Int property named
KAFKA_PARTITION
. - Offset is applied to the message as a Long property named
KAFKA_OFFSET
.
- Kafka topic is applied to the message as a String property named
- Enable
jms.forward.kafka.headers
to add each header from the SinkRecord to the JMS Message as a String property.
Note
The connector converts the record key and headers to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted. No other conversion is done to the key and headers before forwarding them on the JMS Message. If another format is needed, out-of-the-box or custom Single Message Transformation can be used with the connector to transform the record keys and/or headers to the desired string representation before the JMS sink connector processes each record.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Quick Start¶
This quick start uses the IBM MQ Sink connector to consume records from Kafka and send them to an IBM MQ broker running in a Docker container.
Start the IBM MQ broker.
docker run -d \ -p 1414:1414 -p 9443:9443 \ -e LICENSE=accept \ ibmcom/mq:9.1.2.0
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent connect plugin install confluentinc/kafka-connect-ibmmq-sink:latest
Start Confluent Platform.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local services start
. For more information, see confluent local.confluent local services start
Produce test data to the
sink-messages
topic in Kafka.seq 10 | confluent local services kafka produce sink-messages
Create a
ibmmq-sink.json
file with the following contents:{ "name": "IbmMqSinkConnector", "config": { "connector.class": "io.confluent.connect.jms.IbmMqSinkConnector", "tasks.max": "1", "topics": "sink-messages", "mq.username": "app", "mq.channel": "DEV.APP.SVRCONN", "mq.hostname": "localhost", "mq.port": "1414", "mq.password": "", "mq.queue.manager": "mq", "mq.transport.type": "client", "jms.destination.type": "queue", "jms.destination.name": "DEV.QUEUE.1", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.replication.factor": "1", "confluent.topic.bootstrap.servers": "localhost:9091" } }
Load the IBM MQ Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load mq --config ibmmq-sink.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status mq
Navigate to the IBM MQ Console to confirm the messages were delivered to the
DEV.QUEUE.1
queue.Tip
The default credentials for the IBM MQ Console are
admin
/passw0rd