IBM MQ Source Connector for Confluent Platform

The Kafka Connect IBM MQ Source Connector is used to read messages from an IBM MQ cluster and write them to a Apache Kafka® topic.

Before using this connector, consider the following:

  • Confluent Platform also includes a general JMS Source connector that uses a JNDI-based mechanism to connect to the JMS broker. If you have to use JNDI to connect to your JMS broker, consider using the JMS Source connector instead.
  • The 11.x version of this is connector does not support at-least-once semantics and is no longer supported. To upgrade to a 12.x version, see Migrate from 11.x to 12.x.
  • You must download the IBM MQ client library JAR files to use this connector.

Features

The IBM MQ Source connector includes the following features:

Delivery guarantee

The connector can run in either of the following depending on the connector and worker configuration:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Note that the IBM MQ Source connector allows you to configure a JMS destination type of either queue or topic. If the connector is the only consumer consuming from the source destination, Confluent recommends you configure the queue JMS destination type. With a destination type of queue, the connector will deliver messages at-least-once–that is, messages will be persisted in the queue.

If there are many consumers consuming messages from the same JMS destination, then Confluent recommends you configure the topic destination type as it will broadcast messages to connector and other consumers. If you configure topic, you must explicitly set the durable configuration parameter, jms.subscription.durable in the connector configuration; JMS topics don’t persist messages unless you use a durable subscription.

Note

Enabling exactly once settings shifts the connector from at least once delivery to Exactly once delivery.

Exactly once delivery

The connector supports exactly once semantics when the following conditions are met:

  • All the connect workers in the cluster have the exactly.once.support property set to enabled. For more information, see exactly once source worker .
  • The connect worker is running in a distributed mode. Exactly once delivery cannot be supported in standalone mode.
  • The connect worker principal should have the required ACLs. For more information on the required ACLs, see ACLs for exactly once source .
  • The connector is configured with the state.topic.name property.

When these conditions are met, the connector processes each record exactly once, even through failures or restarts. It uses the state topic to track progress of the records it has processed, allowing it to resume from the last processed record in case of a failure. You must set the state topic only when you first create the connector. Changing the topic name after the connector creation can result in duplicates.

For exactly once semantics, the connector requires only one consumer of the MQ destination. Hence, it doesn’t support more than one task or receiver thread.

The connector uses a transactional producer for writing records to the Kafka topic, guaranteeing exactly once delivery. Any Kafka consumer reading from the topic must also set isolation.level property to read_committed.

Note

If any of the above conditions are not fulfilled, the connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, it may deliver duplicate records to the Kafka topic.

When you configure the connector to consume from a priority queue in MQ, the MQ may deliver messages out of order. Under this condition, the connector cannot guarantee exactly once semantics and may fail.

Multiple tasks

The IBM MQ Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Note

The connector does not support running multiple tasks when the exactly once settings are enabled for the connector.

JMS message types

The connector currently supports TextMessage and BytesMessage. The connector does not currently support ObjectMessage or StreamMessage.

Retries and reconnection

The IBM MQ Source connector uses the general retry policy implemented for most Confluent connectors; however, the IBM MQ Source connector uses exponential backoff after each retry attempt. The backoff time is the time between retries and is a random value between zero and the exponentially increasing bound:

initialbackoffTime * 2 ^ (retry-1)

The exponential bound is capped at one minute, and the initial backoff time is 100 milliseconds (ms). While the total amount of retries attempted is not configurable in the IBM MQ Source connector, the maximum total time spent retrying before failure can be configured by the max.retry.time configuration property. The max.retry.time configuration property sets the maximum time in milliseconds that the connector will attempt to retry–the default value is 3600000 ms (one hour).

The following errors will trigger a retry:

  • MQRC_GET_INHIBITED
  • MQRC_BACKED_OUT
  • MQRC_CHANNEL_NOT_AVAILABLE
  • MQRC_CONNECTION_BROKEN
  • MQRC_HOST_NOT_AVAILABLE
  • MQRC_NOT_AUTHORIZED
  • MQRC_Q_MGR_NOT_AVAILABLE
  • MQRC_Q_MGR_QUIESCING
  • MQRC_Q_MGR_STOPPING
  • MQRC_UNEXPECTED_ERROR

With the exception of MQRC_GET_INHIBITED, all of the above errors will first shut down the connection and then reconnect before retrying.

Message acknowledgement

The IBM MQ Source connector sends an acknowledgement message when one of the following occurs:

  • Message Queue (MQ) no longer has any more records.
  • The max.pending.messages configuration limit has been breached.

To ensure each batch sends an acknowledgement, you must set batch.size and max.pending.messages to an equal value.

CSFLE (Client-side field level encryption)

This connector supports the CSFLE functionality. For more information, see Manage CSFLE.

Custom Credentials Provider Support

You can configure the IBM MQ connector to use a custom credentials provider. To do this, you implement a custom credentials provider, build it as a JAR file, and deploy the JAR file to use the custom provider.

Complete the following steps to use a custom credentials provider:

  1. Set a custom credentials provider class: Set the credentials.provider.classpath property to a class that implements the io.confluent.connect.ibm.mq.creds.IbmMqCredentialsProvider. Configure the class to the fully qualified name of your custom credentials provider class.
  2. Configure additional settings (Optional): For additional configuration, prefix the configuration keys with credentials.provider. . If your custom credentials provider needs to accept additional configuration, implement the org.apache.kafka.common.Configurable interface that lets the connector receive configurations that are prefixed with credentials.provider..
  3. Package your provider: Once your custom credentials provider class is implemented, package it into a JAR file.
  4. Copy the JAR file to Connect Worker: Copy the built JAR file to the share/java/kafka-connect-ibmmq-source directory on all Connect workers. This step ensures that the IBM MQ connector can access and use your custom credentials provider.

To implement the custom credentials provider, you need to implement the following interface:

IbmMqCredentialsProvider Interface

package io.confluent.connect.ibm.mq.creds;

public interface IbmMqCredentialsProvider {
    /**
      * Interface for providing dynamic credentials to IBM MQ connections.
      * Can be used with any other authentication mode (like OAuth2) combined
      * with security exits for authentication flow.
      */
    void setCredentials();

    /**
      * Gets the username/password required for IBM MQ authentication.
      */
    IbmMqCredentials getCredentials();
}

IbmMqCredentials Class

package io.confluent.connect.ibm.mq.creds;

public class IbmMqCredentials {
    private final String username;
    private final String password;

    public IbmMqCredentials(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String username() {
        return username;
    }

    public String password() {
        return password;
    }
}

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent license properties for license properties and information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for IBM MQ Source Connector for Confluent Platform. To understand how the connector internally configures the acknowledgement mode, see the following section.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Acknowledgement mode

The connector internally uses CLIENT_ACKNOWLEDGE mode to receive and acknowledge messages from the JMS broker. In this mode, acknowledging any message will acknowledge every message received (see section 6.2.10 in the JMS 2.0 Specification). To prevent messages from being prematurely acknowledged, the connector processes only one message at time. In other words, the connector will not try to receive new messages until the last message is committed to a Kafka topic. This might compromise the throughput of the connector, but messages will be transferred to Kafka successfully.

Migrate from 11.x to 12.x

Users who have previously deployed an 11.x version can leverage at-least-once semantics by migrating to a 12.x version using the following steps:

Step 1: Stop Connect

Stop Connect using the Shutting Down Kafka Connect instructions. Connect performs classloading only during worker startup. To identify any later plugins, you must restart the entire worker.

Step 2: Install a later version of IBM MQ Source connector

  • To install a later version of IBM MQ Source connector using the Confluent CLI, run a command similar to the following:

    confluent connect plugin install confluentinc/kafka-connect-ibmmq:12.x
    

    This will install the 12.x version of IBM MQ Source connector. While running the previous command, ensure you choose the option to replace the existing connector.

  • To install an later version of IBM MQ Source connector manually, follow the instructions in Installing plugins.

Step 3: Start Kafka Connect

Install the IBM MQ Source Connector

You can install this connector by using the Confluent Hub Client installation instructions, or by manually downloading the ZIP file.

Prerequisites

The following are required to install and run the Connect IBM MQ Source connector:

  • IBM MQ 8.0.0 or later, or IBM MQ on Cloud service.
  • You must install the connector on every machine where Connect will run.
  • If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
  • An installation of the IBM MQ client library JAR files. For help with downloading the JAR files, see the Client Libraries section.

Install the connector using the Confluent CLI

To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-ibmmq:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-ibmmq:13.0.0

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Client Libraries

To use the Kafka Connect IBM MQ connector, you must download the IBM MQ client library JAR files. To download the required JAR files, complete the following steps:

  1. Follow IBM’s Getting the IBM MQ classes for Java and JMS instructions to download the IBM MQ client JAR.
  2. Copy only the com.ibm.mq.allclient.jar file into the share/confluent-hub-components/confluentinc-kafka-connect-ibmmq/lib directory of your Confluent Platform installation on each Connect worker node.
  3. Restart all of the Connect worker nodes.

Connecting to IBM MQ

Before you can use this connector, you must install the IBM MQ client JARs into this connector’s installation directory. See the IBM MQ documentation for details.

This connector connects directly to the IBM MQ using a number of configuration properties that should match your environment.

The following example shows a typical configuration of the connector for use with distributed mode:

{
  "name": "connector1",
  "config": {
    "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
    "kafka.topic":"MyKafkaTopicName",
    "mq.hostname":"localhost",
    "mq.port":"61616",
    "mq.transport.type":"client",
    "mq.queue.manager":"QMA",
    "mq.channel":"SYSTEM.DEF.SVRCONN",
    "jms.destination.name":"testing",
    "jms.destination.type":"queue",
    "confluent.license":"",
    "confluent.topic.bootstrap.servers":"localhost:9092"
  }
}

The connector supports other configuration options not included in the example above.

Topics

This connector consumes messages from IBM MQ using the configured message selectors and writes them to a single Kafka topic. If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria.

Schemas

The IBM MQ connector produces messages with keys and values that adhere to the schemas described in the following sections.

io.confluent.connect.jms.Key

This schema is used to store the incoming MessageID on the message interface. This will ensure that when that if the same message id arrives it will end up in the same partition. In practice this should never occur. The schema defines the following fields:

Name Schema Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().

io.confluent.connect.jms.Value

This schema is used to store the value of the JMS message. The schema defines the following fields:

Name Schema Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().
messageType STRING yes   This field stores the type of message that was received. This corresponds to the sub-interfaces of Message. BytesMessage = bytes, MapMessage = map, ObjectMessage = object, StreamMessage = stream and TextMessage = text. The corresponding field will be populated with the values from the respective message sub-interface.
timestamp INT64 yes   Data from the getJMSTimestamp() method.
deliveryMode INT32 yes   This field stores the value of Message.getJMSDeliveryMode(). method.
correlationID STRING no   This field stores the value of Message.getJMSCorrelationID(). method.
replyTo Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
destination Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
redelivered BOOLEAN yes   This field stores the value of Message.getJMSRedelivered().
type STRING no   This field stores the value of Message.getJMSType().
expiration INT64 no   This field stores the value of Message.getJMSExpiration().
priority INT32 no   This field stores the value of Message.getJMSPriority().
properties Map of STRING, PropertyValue yes   This field stores the data from all of the properties for the Message indexed by their propertyName.
bytes BYTES no   This field stores the value from BytesMessage.html.readBytes(byte[]).
map Map of STRING, PropertyValue no   This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key.
text STRING no   This field stores the value from TextMessage.html.getText().

io.confluent.connect.jms.Destination

This schema is used to represent a JMS Destination, and is either queue or topic. The schema defines the following fields:

Name Schema Required Default Value Documentation
destinationType STRING yes   The type of JMS Destination, and either queue or topic.
name STRING yes   The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName().

io.confluent.connect.jms.PropertyValue

This schema is used to store the data that is found in the properties of the message. To ensure that the proper type mappings are preserved field propertyType stores the value type for the field. The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by Message.getObjectProperty(). The schema defines the following fields:

Name Schema Required Default Value Documentation
propertyType STRING yes   The Java type of the property on the Message. One of boolean, byte, short, integer, long, float, double, or string.
boolean BOOLEAN no   The value stored as a boolean. Null unless propertyType is set to boolean.
byte INT8 no   The value stored as a byte. Null unless propertyType is set to byte.
short INT16 no   The value stored as a short. Null unless propertyType is set to short.
integer INT32 no   The value stored as a integer. Null unless propertyType is set to integer.
long INT64 no   The value stored as a long. Null unless propertyType is set to long.
float FLOAT32 no   The value stored as a float. Null unless propertyType is set to float.
double FLOAT64 no   The value stored as a double. Null unless propertyType is set to double.
string STRING no   The value stored as a string. Null unless propertyType is set to string.