Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Kafka Connect JMS Sink Connector

The Kafka Connect JMS Sink Connector is used to move messages from Kafka to any JMS-compliant broker.

This connector uses Java Naming and Directory Interface™ (JNDI) to create an instance of the JMS ConnectionFactory for your messaging system. Additional details can be found in Client Library JARs.

Some of the supported JMS Brokers include:

Prerequisites

The following are required to run the Kafka Connect JMS Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above (requires header support in Connect)
  • Java 1.8
  • JMS 1.1+ Client Jars (See Client Jars section)

Install JMS Sink Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-jms-sink:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-jms-sink:1.0.0-preview

Important

This connector does not include any JMS client JARs.

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Important

This connector does not include any JMS client JARs. See the Client Library JARs section for more details.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Client Library JARs

The Kafka Connect JMS connector works with any JMS 1.1 compliant system. The connector does not come with client libraries.

If you are running a multi-node Connect cluster, the JMS connector and JMS Client JARs must be installed on every Connect worker in the cluster. See below for details.

Installing JMS Client Libraries

This connector uses Java Naming and Directory Interface™ (JNDI) to create an instance of the JMS ConnectionFactory for your messaging system. In order for this to work, the connectors must have the relevant JMS client jars on the classpath adjacent to the connector.

The installation steps are:

  1. Find the JMS client library JAR for any JMS system that will be used.
  2. Place these JAR files into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory of your Confluent Platform installation on each worker node. See the note below if you do not see this directory.
  3. Restart all of the Connect worker nodes.

Note

The share/confluent-hub-components/kafka-connect-jms-sink/lib directory mentioned above is the default installation directory used by the Confluent Hub client. If you have customized this directory or are using a different installation method, find the location of the Confluent JMS sink connector JAR files and place the JMS client JAR file(s) into the same directory.

Instructions for several JMS providers are listed below.

TIBCO EMS

The following steps show how to download the client JAR for TIBCO EMS v8.5.0. These steps should also apply to other versions.

  1. Navigate to TIBCO EMS Downloads.
  2. Accept the User Agreement for the “TIBCO Enterprise Message Service™” download.
  3. Download “TIBCO Enterprise Message Service™ - Community Edition – Free Download - Linux
  4. Unzip the TIB_ems-ce_8.5.0.zip file. This should result in a TIB_ems-ce_8.5.0 directory.
  5. In the TIB_ems-ce_8.5.0/tar directory, extract the contents of TIB_ems-ce_8.5.0_linux_x86_64-java_client.tar.gz into a temporary directory.
  6. In the temporary directory, copy only the tibco/ems/8.5/lib/tibjms.jar file into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory of your Confluent Platform installation on each worker node.
  7. Restart all of the Connect worker nodes.

Important

Do not place any other files from the TIBCO download into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory in your Confluent Platform installation.

IBM MQ

  1. Follow IBM’s guide on Getting the IBM MQ classes for Java and JMS to download the IBM MQ client JAR.
  2. Copy only the com.ibm.mq.allclient.jar file into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory of your Confluent Platform installation on each worker node.
  3. Restart all of the Connect worker nodes.

Important

Do not place any other files from the IBM download into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory in your Confluent Platform installation.

Solace

  1. Download the Solace JMS Client JAR from Solace’s Open APIs and Protocols.
  2. Copy the downloaded sol-jms-{version}.jar file into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory of your Confluent Platform installation on each worker node.
  3. Restart all of the Connect worker nodes.

Note

A comprehensive list of Solace JMS Client JAR versions can be found on Maven at com.solacesystems:sol-jms.

ActiveMQ

  1. Download the latest ActiveMQ Release in the tar.gz format (Unix/Linux/Cygwin).
  2. Extract the contents of the tar.gz download to a temporary directory.
  3. From the temporary directory, copy only the activemq-all-{version}.jar file into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory of your Confluent Platform installation on each worker node.
  4. Restart all of the Connect worker nodes.

Important

Do not place any other files from the ActiveMQ download into the share/confluent-hub-components/kafka-connect-jms-sink/lib directory in your Confluent Platform installation.

JMS Message Formats

The format of outgoing JMS Message values is configured with the jms.message.format property, using one of the following options:

string (default)

When using the string Message Format, record values are run through Values.convertToString(...) from the Connect Data package and produced as a JMS TextMessage.

Primitive values are converted to their String equivalent and structured objects are transformed to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted.

Tip

Kafka Connect Transformations can be used with the configured jms.message.format to transform the record value to the desired string representation before the connector processes each record.

avro

Record values are serialized without the Avro schema information and produced as a JMS BytesMessage. JMS consumers must have the schema to deserialize the data.

Important

The connector attempts to infer the Avro schema for records that have no schema. If the connector cannot infer the schema, the task is killed. If you are processing data without a schema, consider using one of the other jms.message.format configurations.

json

Record values are converted to a UTF-8 encoded JSON representation and produced as a JMS TextMessage.

bytes

Record values are passed along in bytes form without any conversion.

Important

Record values must be converted to bytes form before the connector processes them. Configure the value.converter property to org.apache.kafka.connect.converters.ByteArrayConverter to ensure that record values arrive in byte format.

Forwarding Kafka Properties to JMS

The connector can be configured to forward various values from the Kafka record to the JMS Message.

  • Enable jms.forward.kafka.key to convert the record’s key to a String and forward it as the JMSCorrelationID.
  • Enable jms.forward.kafka.metadata to forward the record’s topic, partition, and offset on JMS Message properties.
    • Kafka topic is applied to the message as a String property named KAFKA_TOPIC.
    • Partition is applied to the message as an Int property named KAFKA_PARTITION.
    • Offset is applied to the message as a Long property named KAFKA_OFFSET.
  • Enable jms.forward.kafka.headers to add each header from the SinkRecord to the JMS Message as a String property.

Note

The connector converts the record key and headers to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted. No other conversion is done to the key and headers before forwarding them on the JMS Message. If another format is needed, out-of-the-box or custom Kafka Connect Transformations can be used with the connector to transform the record keys and/or headers to the desired string representation before the JMS sink connector processes each record.

JMS Reconnection

Each JMS provider handles broken connections, reconnections, and failovers in their own way. As a result, the JMS sink connector delegates most client reconnection capabilities to the provider. The reconnection examples below illustrate some of the configurations exposed to enable reconnection and failover capabilities. It is recommended that you review your JMS provider documentation to find examples of how the provider implements and exposes these features. Any configurations given to the connector that are not part of the connector’s primary configurations are passed along to the InitialContext when setting up the JMS connection.

The JMS sink connector tasks handle JMS Exceptions thrown from the JMS Producer by rethrowing them as a RetriableException. This triggers the connector to retry the entire batch. To avoid sending duplicate data to JMS, the connector tasks track the most recent offset processed for a topic and partition and skips ahead to the first record that has not already been sent to JMS. When retrying a batch in this scenario, the task automatically refreshes the JMS connection to ensure that any transient connection failures are mitigated.

Reconnection Examples

ActveMQ Quick Start

This quick start uses the JMS Sink Connector to consume records from Kafka and send them to an ActiveMQ broker.

  1. Install ActiveMQ

  2. Start ActiveMQ

  3. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-jms-sink:latest
    
  4. Download the activemq-all jar and copy it into the JMS Sink Connector’s plugin folder. This needs to be done on every Connect worker node and the workers must be restarted to pick up the client jar.

    Tip

    The plugin should be located in your Confluent Platform installation at share/confluent-hub-components/kafka-connect-jms-sink/lib

  5. Start Confluent Platform.

    confluent start
    
  6. Produce test data to the jms-messages topic in Kafka.

    seq 10 | confluent produce jms-messages
    
  7. Create a jms-sink.json file with the following contents:

    {
      "name": "JmsSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.JmsSinkConnector",
        "tasks.max": "1",
        "topics": "jms-messages",
        "java.naming.factory.initial": "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
        "java.naming.provider.url": "tcp://localhost:61616",
        "java.naming.security.principal": "connectuser",
        "java.naming.security.credentials": "connectpassword",
        "jndi.connection.factory": "connectionFactory",
        "jms.destination.type": "queue",
        "jms.destination.name": "connector-quickstart",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
    
  8. Load the JMS Sink Connector.

    confluent load jms​ -d jms-sink.json
    

    Important

    Don’t use the Confluent CLI in production environments.

  9. Confirm that the connector is in a RUNNING state.

    confluent status jms
    
  10. Navigate to the ActiveMQ Admin UI to confirm the messages were delivered to the connector-quickstart queue.

    Tip

    The default credentials for the ActiveMQ Admin UI are admin/admin

Solace Quick Start

This quick start uses the JMS Sink Connector to consume records from Kafka and send them to a Solace PubSub+ broker.

  1. Start the Solace PubSub+ Standard broker.

    docker run -d --name "solace" \
        -p 8080:8080 -p 55555:55555 \
        --shm-size=1000000000 \
        --ulimit nofile=2448:38048 \
        -e username_admin_globalaccesslevel=admin \
        -e username_admin_password=admin \
        -e system_scaling_maxconnectioncount=100 \
        solace/solace-pubsub-standard:9.1.0.77
    
  2. Create a Solace Queue in the default Message VPN.

    1. Once the solace docker container has started, navigate to http://localhost:8080 in your browser and login with admin/admin.
    2. Select the default Message VPN on the home screen.
    3. Select “Queues” in the left menu to navigate to the Queues page.
    4. On the Queues page, select the “+ Queue” button in the upper right and name the Queue connector-quickstart.
  3. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-jms-sink:latest
    
  4. Download the sol-jms jar and copy it into the JMS Sink Connector’s plugin folder. This needs to be done on every Connect worker node and the workers must be restarted to pick up the client jar.

    Tip

    The plugin should be located in your Confluent Platform installation at share/confluent-hub-components/kafka-connect-jms-sink/lib

  5. Start Confluent Platform.

    confluent start
    
  6. Produce test data to the jms-messages topic in Kafka.

    seq 10 | confluent produce jms-messages
    
  7. Create a jms-sink.json file with the following contents:

    {
      "name": "JmsSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.JmsSinkConnector",
        "tasks.max": "1",
        "topics": "jms-messages",
        "java.naming.factory.initial": "com.solacesystems.jndi.SolJNDIInitialContextFactory",
        "java.naming.provider.url": "smf://localhost:55555",
        "java.naming.security.principal": "admin",
        "java.naming.security.credentials": "admin",
        "jndi.connection.factory": "/jms/cf/default",
        "Solace_JMS_VPN": "default",
        "jms.destination.type": "queue",
        "jms.destination.name": "connector-quickstart",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
    
  8. Load the JMS Sink Connector.

    confluent load jms​ -d jms-sink.json
    

    Important

    Don’t use the Confluent CLI in production environments.

  9. Confirm that the connector is in a RUNNING state.

    confluent status jms
    
  10. Navigate to the Solace UI to confirm the messages were delivered to the connector-quickstart queue.

    Tip

    The default credentials for the Solace UI are admin/admin

TIBCO EMS Quick Start

This quick start uses the JMS Sink Connector to consume records from Kafka and send them to TIBCO Enterprise Message Service - Community Edition.

  1. Download and unzip TIBCO EMS Community Edition.

  2. Run the TIBCOUniversalInstaller-mac.command and step through the TIBCO Universal Installer.

    Tip

    Remember your TIBCO_HOME directory. It is used in the upcoming steps.

  3. Start TIBCO EMS with default configurations.

    ~/TIBCO_HOME/ems/8.4/bin/tibemsd
    
  4. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-jms-sink:latest
    
  5. Copy ~/TIBCO_HOME/ems/8.4/lib/tibjms.jar into the JMS Sink Connector’s plugin folder. This needs to be done on every Connect worker node and the workers must be restarted to pick up the client jar.

    Tip

    The plugin should be located in your Confluent Platform installation at share/confluent-hub-components/kafka-connect-jms-sink/lib

  6. Start Confluent Platform.

    confluent start
    
  7. Produce test data to the jms-messages topic in Kafka.

    seq 10 | confluent produce jms-messages
    
  8. Create a jms-sink.json file with the following contents:

    {
      "name": "JmsSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.JmsSinkConnector",
        "tasks.max": "1",
        "topics": "jms-messages",
        "java.naming.provider.url": "tibjmsnaming://localhost:7222",
        "java.naming.factory.initial": "com.tibco.tibjms.naming.TibjmsInitialContextFactory",
        "jndi.connection.factory": "QueueConnectionFactory",
        "java.naming.security.principal": "admin",
        "java.naming.security.credentials": "",
        "jms.destination.type": "queue",
        "jms.destination.name": "connector-quickstart",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
    
  9. Load the JMS Sink Connector.

    confluent load jms​ -d jms-sink.json
    

    Important

    Don’t use the Confluent CLI in production environments.

  10. Confirm that the connector is in a RUNNING state.

    confluent status jms
    
  11. Validate that there are messages on the queue using the tibemsadmin tool.

    ~/TIBCO_HOME/ems/8.4/bin/tibemsadmin -server "tcp://localhost:7222" -user admin
    # admin password is blank by default
    
    tcp://localhost:7222> show queue connector-quickstart
    

Additional documentation