JMS Sink Connector for Confluent Platform

The Kafka Connect JMS Sink connector is used to move messages from Apache Kafka® to any JMS-compliant broker.

This connector uses Java Naming and Directory Interface™ (JNDI) to create an instance of the JMS ConnectionFactory for your messaging system. Additional details can be found in Client Library JARs.

Some of the supported JMS Brokers include:

Features

The JMS Sink connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once from the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The JMS Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Install the JMS Sink Connector

You can install this connector by using the Confluent Hub Client installation instructions, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.
  • If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
  • Connect: Confluent Platform 4.1.0 or later, or Kafka 1.1.0 or later (requires header support in Connect)
  • Java 1.8
  • An installation of the JMS 1.1+ client library JAR files. For help with downloading the JAR files, see the Client Library JARs section.

Install the connector using the Confluent CLI

To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-jms-sink:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-jms-sink:2.1.5

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for JMS Sink Connector for Confluent Platform.

Client Library JARs

The Kafka Connect JMS connector works with any JMS 1.1 compliant system. The connector does not come with client libraries.

If you are running a multi-node Connect cluster, the JMS connector and JMS Client JARs must be installed on every Connect worker in the cluster. See below for details.

Install JMS Client Libraries

This connector uses Java Naming and Directory Interface™ (JNDI) to create an instance of the JMS ConnectionFactory for your messaging system. In order for this to work, the connectors must have the relevant JMS client jars on the classpath adjacent to the connector.

The installation steps are:

  1. Find the JMS client library JAR for any JMS system that will be used.
  2. Place these JAR files into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory of your Confluent Platform installation on each worker node. See the note below if you do not see this directory.
  3. Restart all of the Connect worker nodes.

Note

The share/confluent-hub-components/kafka-connect-jms-sink/lib directory mentioned above is the default installation directory used by the Confluent Hub client. If you have customized this directory or are using a different installation method, find the location of the Confluent JMS sink connector JAR files and place the JMS client JAR file(s) into the same directory.

Instructions for several JMS providers are listed below.

TIBCO EMS

The following steps show how to download the client JAR for TIBCO EMS v8.5.0. These steps should also apply to other versions.

  1. Navigate to TIBCO EMS Downloads.
  2. Accept the User Agreement for the “TIBCO Enterprise Message Service™” download.
  3. Download “TIBCO Enterprise Message Service™ - Community Edition – Free Download - Linux
  4. Unzip the TIB_ems-ce_8.5.0.zip file. This should result in a TIB_ems-ce_8.5.0 directory.
  5. In the TIB_ems-ce_8.5.0/tar directory, extract the contents of TIB_ems-ce_8.5.0_linux_x86_64-java_client.tar.gz into a temporary directory.
  6. In the temporary directory, copy only the tibco/ems/8.5/lib/tibjms.jar file into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory of your Confluent Platform installation on each worker node.
  7. Restart all of the Connect worker nodes.

Important

Do not place any other files from the TIBCO download into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory in your Confluent Platform installation.

IBM MQ

  1. Follow IBM’s guide on Getting the IBM MQ classes for Java and JMS to download the IBM MQ client JAR.
  2. Copy only the com.ibm.mq.allclient.jar file into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory of your Confluent Platform installation on each worker node.
  3. Restart all of the Connect worker nodes.

Important

Do not place any other files from the IBM download into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory in your Confluent Platform installation.

Solace

Important

When using the JMS Sink connector with Solace, you must manually install the commons-lang-<version-A>.jar file into the lib/ directory, where <version-A> is the version number of common-lang your sol-jms-<version-B>.jar file depends on.

  1. Download the Solace JMS Client JAR from Solace’s Open APIs and Protocols.
  2. Copy the downloaded sol-jms-<version-B>.jar file and its dependencies, including the commons-lang-<version-A>.jar file into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory of your Confluent Platform installation on each worker node.
  3. Restart all of the Connect worker nodes.

Note

A comprehensive list of Solace JMS Client JAR versions can be found on Maven at com.solacesystems:sol-jms.

ActiveMQ

  1. Download the latest ActiveMQ Release in the tar.gz format (Unix/Linux/Cygwin).
  2. Extract the contents of the tar.gz download to a temporary directory.
  3. From the temporary directory, copy only the activemq-all-{version}.jar file into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory of your Confluent Platform installation on each worker node.
  4. Restart all of the Connect worker nodes.

Important

Do not place any other files from the ActiveMQ download into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory in your Confluent Platform installation.

JMS Message Formats

The format of outgoing JMS Message values is configured with the jms.message.format property, using one of the following options:

string (default)

When using the string Message Format, record values are run through Values.convertToString(...) from the Connect Data package and produced as a JMS TextMessage.

Primitive values are converted to their String equivalent and structured objects are transformed to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted.

Tip

Single Message Transformation can be used with the configured jms.message.format to transform the record value to the desired string representation before the connector processes each record.

avro

Record values are serialized without the Avro schema information and produced as a JMS BytesMessage. JMS consumers must have the schema to deserialize the data.

Important

The connector attempts to infer the Avro schema for records that have no schema. If the connector cannot infer the schema, the task is killed. If you are processing data without a schema, consider using one of the other jms.message.format configurations.

json

Record values are converted to a UTF-8 encoded JSON representation and produced as a JMS TextMessage.

bytes

Record values are passed along in bytes form without any conversion.

Important

Record values must be converted to bytes form before the connector processes them. Configure the value.converter property to org.apache.kafka.connect.converters.ByteArrayConverter to ensure that record values arrive in byte format.

Forwarding Kafka Properties to JMS

The connector can be configured to forward various values from the Kafka record to the JMS Message.

  • Enable jms.forward.kafka.key to convert the record’s key to a String and forward it as the JMSCorrelationID.
  • Enable jms.forward.kafka.metadata to forward the record’s topic, partition, and offset on JMS Message properties.
    • Kafka topic is applied to the message as a String property named KAFKA_TOPIC.
    • Partition is applied to the message as an Int property named KAFKA_PARTITION.
    • Offset is applied to the message as a Long property named KAFKA_OFFSET.
  • Enable jms.forward.kafka.headers to add each header from the SinkRecord to the JMS Message as a String property.

Note

The connector converts the record key and headers to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted. No other conversion is done to the key and headers before forwarding them on the JMS Message. If another format is needed, out-of-the-box or custom Single Message Transformation can be used with the connector to transform the record keys and/or headers to the desired string representation before the JMS sink connector processes each record.

JMS Reconnection

Each JMS provider handles broken connections, reconnections, and failovers in their own way. As a result, the JMS sink connector delegates most client reconnection capabilities to the provider. The reconnection examples below illustrate some of the configurations exposed to enable reconnection and failover capabilities. It is recommended that you review your JMS provider documentation to find examples of how the provider implements and exposes these features. Any configurations given to the connector that are not part of the connector’s primary configurations are passed along to the InitialContext when setting up the JMS connection.

The JMS sink connector tasks handle JMS Exceptions thrown from the JMS Producer by rethrowing them as a RetriableException. This triggers the connector to retry the entire batch. To avoid sending duplicate data to JMS, the connector tasks track the most recent offset processed for a topic and partition and skips ahead to the first record that has not already been sent to JMS. When retrying a batch in this scenario, the task automatically refreshes the JMS connection to ensure that any transient connection failures are mitigated.

Reconnection Examples

ActveMQ Quick Start

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

This quick start uses the JMS Sink connector to consume records from Kafka and send them to an ActiveMQ broker.

Prerequisites
  1. Install ActiveMQ

  2. Start ActiveMQ

  3. Install the connector by using the following CLI command:

    # run from your Confluent Platform installation directory
    confluent connect plugin install confluentinc/kafka-connect-jms-sink:latest
    
  4. Download the activemq-all JAR and copy it into the JMS Sink connector’s plugin folder. This needs to be done on every Connect worker node and you must restart the workers pick up the client JAR.

  5. Start Confluent Platform using the confluent local command.

    confluent local services start
    
  6. Produce test data to the jms-messages topic in Kafka.

    seq 10 | confluent local services kafka produce jms-messages
    
  7. Create a jms-sink.json file with the following contents:

    {
      "name": "JmsSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.JmsSinkConnector",
        "tasks.max": "1",
        "topics": "jms-messages",
        "java.naming.factory.initial": "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
        "java.naming.provider.url": "tcp://localhost:61616",
        "java.naming.security.principal": "connectuser",
        "java.naming.security.credentials": "connectpassword",
        "connection.factory.name": "connectionFactory",
        "jms.destination.type": "queue",
        "jms.destination.name": "connector-quickstart",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
    
  8. Load the JMS Sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load jms --config jms-sink.json
    

    Important

    Don’t use the Confluent CLI in production environments.

  9. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status jms
    
  10. Navigate to the ActiveMQ Admin UI to confirm the messages were delivered to the connector-quickstart queue.

    Tip

    The default credentials for the ActiveMQ Admin UI are admin/admin

Solace Quick Start

This quick start uses the JMS Sink connector to consume records from Kafka and send them to a Solace PubSub+ broker.

  1. Start the Solace PubSub+ Standard broker.

    docker run -d --name "solace" \
        -p 8080:8080 -p 55555:55555 \
        --shm-size=1000000000 \
        --ulimit nofile=2448:38048 \
        -e username_admin_globalaccesslevel=admin \
        -e username_admin_password=admin \
        -e system_scaling_maxconnectioncount=100 \
        solace/solace-pubsub-standard:9.1.0.77
    
  2. Create a Solace Queue in the default Message VPN.

    1. Once the solace docker container has started, navigate to http://localhost:8080 in your browser and login with admin/admin.
    2. Select the default Message VPN on the home screen.
    3. Select “Queues” in the left menu to navigate to the Queues page.
    4. On the Queues page, select the “+ Queue” button in the upper right and name the Queue connector-quickstart.
  3. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent connect plugin install confluentinc/kafka-connect-jms-sink:latest
    
  4. Download the sol-jms jar and copy it into the JMS Sink connector’s plugin folder. This needs to be done on every Connect worker node and the workers must be restarted to pick up the client jar.

    Tip

    The plugin should be located in your Confluent Platform installation at share/confluent-hub-components/kafka-connect-jms-sink/lib

  5. Start Confluent Platform.

    confluent local services start
    
  6. Produce test data to the jms-messages topic in Kafka.

    seq 10 | confluent local services kafka produce jms-messages
    
  7. Create a jms-sink.json file with the following contents:

    {
      "name": "JmsSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.JmsSinkConnector",
        "tasks.max": "1",
        "topics": "jms-messages",
        "java.naming.factory.initial": "com.solacesystems.jndi.SolJNDIInitialContextFactory",
        "java.naming.provider.url": "smf://localhost:55555",
        "java.naming.security.principal": "admin",
        "java.naming.security.credentials": "admin",
        "connection.factory.name": "/jms/cf/default",
        "Solace_JMS_VPN": "default",
        "jms.destination.type": "queue",
        "jms.destination.name": "connector-quickstart",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
    
  8. Load the JMS Sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load jms --config jms-sink.json
    

    Important

    Don’t use the Confluent CLI in production environments.

  9. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status jms
    
  10. Navigate to the Solace UI to confirm the messages were delivered to the connector-quickstart queue.

    Tip

    The default credentials for the Solace UI are admin/admin

TIBCO EMS Quick Start

This quick start uses the JMS Sink connector to consume records from Kafka and send them to TIBCO Enterprise Message Service - Community Edition.

  1. Download and unzip TIBCO EMS Community Edition.

  2. Run the TIBCOUniversalInstaller-mac.command and step through the TIBCO Universal Installer.

    Tip

    Remember your TIBCO_HOME directory. It is used in the upcoming steps.

  3. Start TIBCO EMS with default configurations.

    ~/TIBCO_HOME/ems/8.4/bin/tibemsd
    
  4. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent connect plugin install confluentinc/kafka-connect-jms-sink:latest
    
  5. Copy ~/TIBCO_HOME/ems/8.4/lib/tibjms.jar into the JMS Sink connector’s plugin folder. This needs to be done on every Connect worker node and the workers must be restarted to pick up the client jar.

    Tip

    The plugin should be located in your Confluent Platform installation at share/confluent-hub-components/kafka-connect-jms-sink/lib

  6. Start Confluent Platform.

    confluent local services start
    
  7. Produce test data to the jms-messages topic in Kafka.

    seq 10 | confluent local services kafka produce jms-messages
    
  8. Create a jms-sink.json file with the following contents:

    {
      "name": "JmsSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.JmsSinkConnector",
        "tasks.max": "1",
        "topics": "jms-messages",
        "java.naming.provider.url": "tibjmsnaming://localhost:7222",
        "java.naming.factory.initial": "com.tibco.tibjms.naming.TibjmsInitialContextFactory",
        "connection.factory.name": "QueueConnectionFactory",
        "java.naming.security.principal": "admin",
        "java.naming.security.credentials": "",
        "jms.destination.type": "queue",
        "jms.destination.name": "connector-quickstart",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
    
  9. Load the JMS Sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load jms --config jms-sink.json
    

    Important

    Don’t use the Confluent CLI in production environments.

  10. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status jms
    
  11. Validate that there are messages on the queue using the tibemsadmin tool.

    ~/TIBCO_HOME/ems/8.4/bin/tibemsadmin -server "tcp://localhost:7222" -user admin
    # admin password is blank by default
    
    tcp://localhost:7222> show queue connector-quickstart
    

Connecting to Oracle WebLogic Server

You can connect to Oracle’s WebLogic Server using the JMS Sink connector. For a working example, see Oracle Weblogic Sink with JMS connector.

The following is a sample connector configuration:

{
     "connector.class": "io.confluent.connect.jms.JmsSinkConnector",
     "confluent.topic.bootstrap.servers": "broker:9092",
     "topics": "sink-messages",
     "java.naming.factory.initial": "weblogic.jndi.WLInitialContextFactory",
     "java.naming.provider.url": "t3://weblogic-jms:7001",
     "jms.destination.name": "myJMSServer/mySystemModule!myJMSServer@MyDistributedQueue",
     "confluent.license": "",
     "connection.factory.name": "myFactory",
     "java.naming.security.principal": "weblogic",
     "confluent.topic.replication.factor": "1",
     "name": "jms-weblogic-topic-sink",
     "jms.destination.type": "queue",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "java.naming.security.credentials": "welcome1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
}

You must add the Weblogic client library JAR to the connector /lib folder:

  1. Enter the following command to check the connector path:

    grep plugin /etc/kafka-connect/kafka-connect.properties
    
  2. Your output should resemble:

    plugin.path=/usr/share/confluent-hub-components/confluentinc-kafka-connect-jms-sink
    
  3. Add the JAR file to the /lib folder and verify the file is present in the folder:

    ls -larth /usr/share/confluent-hub-components/confluentinc-kafka-connect-jms-sink/lib/example3client.jar
    -rw-r--r-- 1 <username> <username> 8.7M Sep 29 10:29 /usr/share/confluent-hub-components/confluentinc-kafka-connect-jms-sink/lib/example3client.jar
    

    See Client Library JARs for more information about installing client JARs.