JMS Sink Connector for Confluent Platform¶
The Kafka Connect JMS Sink connector is used to move messages from Apache Kafka® to any JMS-compliant broker.
This connector uses Java Naming and Directory Interface™ (JNDI) to create an instance of the JMS ConnectionFactory for your messaging system. Additional details can be found in Client Library JARs.
Some of the supported JMS Brokers include:
Features¶
The JMS Sink connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The JMS Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Install the JMS Sink Connector¶
You can install this connector by using the Confluent Hub Client installation instructions, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
- Connect: Confluent Platform 4.1.0 or later, or Kafka 1.1.0 or later (requires header support in Connect)
- Java 1.8
- An installation of the JMS 1.1+ client library JAR files. For help with downloading the JAR files, see the Client Library JARs section.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-jms-sink:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-jms-sink:2.1.5
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for JMS Sink Connector for Confluent Platform.
Client Library JARs¶
The Kafka Connect JMS connector works with any JMS 1.1 compliant system. The connector does not come with client libraries.
If you are running a multi-node Connect cluster, the JMS connector and JMS Client JARs must be installed on every Connect worker in the cluster. See below for details.
Install JMS Client Libraries¶
This connector uses Java Naming and Directory Interface™ (JNDI) to create an instance of the JMS ConnectionFactory for your messaging system. In order for this to work, the connectors must have the relevant JMS client jars on the classpath adjacent to the connector.
The installation steps are:
- Find the JMS client library JAR for any JMS system that will be used.
- Place these JAR files into the
share/confluent-hub-components/kafka-connect-jms-sink/lib
directory of your Confluent Platform installation on each worker node. See the note below if you do not see this directory. - Restart all of the Connect worker nodes.
Note
The share/confluent-hub-components/kafka-connect-jms-sink/lib
directory mentioned above is the default
installation directory used by the Confluent Hub client. If you have customized this directory or are
using a different installation method, find the location of the Confluent JMS sink connector JAR files
and place the JMS client JAR file(s) into the same directory.
Instructions for several JMS providers are listed below.
TIBCO EMS¶
The following steps show how to download the client JAR for TIBCO EMS v8.5.0. These steps should also apply to other versions.
- Navigate to TIBCO EMS Downloads.
- Accept the User Agreement for the “TIBCO Enterprise Message Service™” download.
- Download “TIBCO Enterprise Message Service™ - Community Edition – Free Download - Linux”
- Unzip the
TIB_ems-ce_8.5.0.zip
file. This should result in aTIB_ems-ce_8.5.0
directory. - In the
TIB_ems-ce_8.5.0/tar
directory, extract the contents ofTIB_ems-ce_8.5.0_linux_x86_64-java_client.tar.gz
into a temporary directory. - In the temporary directory, copy only the
tibco/ems/8.5/lib/tibjms.jar
file into theshare/confluent-hub-components/kafka-connect-jms-sink/lib
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
Important
Do not place any other files from the TIBCO download into
the share/confluent-hub-components/kafka-connect-jms-sink/lib
directory in your Confluent Platform installation.
IBM MQ¶
- Follow IBM’s guide on Getting the IBM MQ classes for Java and JMS to download the IBM MQ client JAR.
- Copy only the
com.ibm.mq.allclient.jar
file into theshare/confluent-hub-components/kafka-connect-jms-sink/lib
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
Important
Do not place any other files from the IBM download into
the share/confluent-hub-components/kafka-connect-jms-sink/lib
directory in your Confluent Platform installation.
Solace¶
Important
When using the JMS Sink connector with Solace, you must manually install the
commons-lang-<version-A>.jar
file into the lib/
directory, where
<version-A>
is the version number of common-lang
your
sol-jms-<version-B>.jar
file depends on.
- Download the Solace JMS Client JAR from Solace’s Open APIs and Protocols.
- Copy the downloaded
sol-jms-<version-B>.jar
file and its dependencies, including thecommons-lang-<version-A>.jar
file into theshare/confluent-hub-components/kafka-connect-jms-sink/lib
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
Note
A comprehensive list of Solace JMS Client JAR versions can be found on Maven at com.solacesystems:sol-jms.
ActiveMQ¶
- Download the latest ActiveMQ Release
in the
tar.gz
format (Unix/Linux/Cygwin). - Extract the contents of the
tar.gz
download to a temporary directory. - From the temporary directory, copy only the
activemq-all-{version}.jar
file into theshare/confluent-hub-components/kafka-connect-jms-sink/lib
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
Important
Do not place any other files from the ActiveMQ download into
the share/confluent-hub-components/kafka-connect-jms-sink/lib
directory in your Confluent Platform installation.
JMS Message Formats¶
The format of outgoing JMS Message values is configured with the
jms.message.format
property, using one of the following options:
string (default)¶
When using the string
Message Format, record values are run through
Values.convertToString(...)
from the Connect Data package and produced as a
JMS TextMessage
.
Primitive values are converted to their String equivalent and structured objects are transformed to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted.
Tip
Single Message Transformation can
be used with the configured jms.message.format
to transform the record
value to the desired string representation before the connector processes
each record.
avro¶
Record values are serialized without the Avro schema information and produced as
a JMS BytesMessage
. JMS consumers must have the schema to deserialize
the data.
Important
The connector attempts to infer the Avro schema for records that have no
schema. If the connector cannot infer the schema, the task is killed. If you
are processing data without a schema, consider using one of the other
jms.message.format
configurations.
json¶
Record values are converted to a UTF-8 encoded JSON representation and produced
as a JMS TextMessage
.
bytes¶
Record values are passed along in bytes form without any conversion.
Important
Record values must be converted to bytes form before the connector processes
them. Configure the value.converter
property to
org.apache.kafka.connect.converters.ByteArrayConverter
to ensure that
record values arrive in byte format.
Forwarding Kafka Properties to JMS¶
The connector can be configured to forward various values from the Kafka record to the JMS Message.
- Enable
jms.forward.kafka.key
to convert the record’s key to a String and forward it as theJMSCorrelationID
. - Enable
jms.forward.kafka.metadata
to forward the record’s topic, partition, and offset on JMS Message properties.- Kafka topic is applied to the message as a String property named
KAFKA_TOPIC
. - Partition is applied to the message as an Int property named
KAFKA_PARTITION
. - Offset is applied to the message as a Long property named
KAFKA_OFFSET
.
- Kafka topic is applied to the message as a String property named
- Enable
jms.forward.kafka.headers
to add each header from the SinkRecord to the JMS Message as a String property.
Note
The connector converts the record key and headers to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted. No other conversion is done to the key and headers before forwarding them on the JMS Message. If another format is needed, out-of-the-box or custom Single Message Transformation can be used with the connector to transform the record keys and/or headers to the desired string representation before the JMS sink connector processes each record.
JMS Reconnection¶
Each JMS provider handles broken connections, reconnections, and failovers in their own way. As a result, the JMS sink
connector delegates most client reconnection capabilities to the provider. The reconnection examples below
illustrate some of the configurations exposed to enable reconnection and failover capabilities. It is recommended
that you review your JMS provider documentation to find examples of how the provider implements and exposes
these features. Any configurations given to the connector that are not part of the connector’s primary configurations
are passed along to the InitialContext
when setting up the JMS connection.
The JMS sink connector tasks handle JMS Exceptions thrown from the JMS Producer by rethrowing them
as a RetriableException
. This triggers the connector to retry the entire batch. To avoid sending
duplicate data to JMS, the connector tasks track the most recent offset processed for a topic and partition and
skips ahead to the first record that has not already been sent to JMS. When retrying a batch in this scenario,
the task automatically refreshes the JMS connection to ensure that any transient connection failures are mitigated.
Reconnection Examples¶
ActiveMQ Failover Transport and Auto Reconnection
java.naming.provider.url=failover:(tcp://amq-1:61616,tcp://amq-2:61616) # all other properties follow quick start example
-
java.naming.provider.url=smf://sol-1:55555,smf://sol-2:55555 SOLACE_JMS_JNDI_RECONNECT_RETRIES=25 # all other properties follow quick start example
TIBCO EMS FACTORY_RECONNECT_ATTEMPT_COUNT
java.naming.provider.url=tibjmsnaming://tibco-1:7222,tibjmsnaming://tibco-2:7222 com.tibco.tibjms.reconnect.attemptcount=25 # all other properties follow quick start example
ActveMQ Quick Start¶
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
This quick start uses the JMS Sink connector to consume records from Kafka and send them to an ActiveMQ broker.
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Install the connector by using the following CLI command:
# run from your Confluent Platform installation directory confluent connect plugin install confluentinc/kafka-connect-jms-sink:latest
Download the activemq-all JAR and copy it into the JMS Sink connector’s plugin folder. This needs to be done on every Connect worker node and you must restart the workers pick up the client JAR.
Start Confluent Platform using the confluent local command.
confluent local services start
Produce test data to the
jms-messages
topic in Kafka.seq 10 | confluent local services kafka produce jms-messages
Create a
jms-sink.json
file with the following contents:{ "name": "JmsSinkConnector", "config": { "connector.class": "io.confluent.connect.jms.JmsSinkConnector", "tasks.max": "1", "topics": "jms-messages", "java.naming.factory.initial": "org.apache.activemq.jndi.ActiveMQInitialContextFactory", "java.naming.provider.url": "tcp://localhost:61616", "java.naming.security.principal": "connectuser", "java.naming.security.credentials": "connectpassword", "connection.factory.name": "connectionFactory", "jms.destination.type": "queue", "jms.destination.name": "connector-quickstart", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } }
Load the JMS Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load jms --config jms-sink.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status jms
Navigate to the ActiveMQ Admin UI to confirm the messages were delivered to the
connector-quickstart
queue.Tip
The default credentials for the ActiveMQ Admin UI are
admin
/admin
Solace Quick Start¶
This quick start uses the JMS Sink connector to consume records from Kafka and send them to a Solace PubSub+ broker.
Start the Solace PubSub+ Standard broker.
docker run -d --name "solace" \ -p 8080:8080 -p 55555:55555 \ --shm-size=1000000000 \ --ulimit nofile=2448:38048 \ -e username_admin_globalaccesslevel=admin \ -e username_admin_password=admin \ -e system_scaling_maxconnectioncount=100 \ solace/solace-pubsub-standard:9.1.0.77
Create a Solace Queue in the
default
Message VPN.- Once the solace docker container has started, navigate to http://localhost:8080 in your browser and login with
admin
/admin
. - Select the
default
Message VPN on the home screen. - Select “Queues” in the left menu to navigate to the Queues page.
- On the Queues page, select the “+ Queue” button in the upper right and name the Queue
connector-quickstart
.
- Once the solace docker container has started, navigate to http://localhost:8080 in your browser and login with
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent connect plugin install confluentinc/kafka-connect-jms-sink:latest
Download the sol-jms jar and copy it into the JMS Sink connector’s plugin folder. This needs to be done on every Connect worker node and the workers must be restarted to pick up the client jar.
Tip
The plugin should be located in your Confluent Platform installation at
share/confluent-hub-components/kafka-connect-jms-sink/lib
Start Confluent Platform.
confluent local services start
Produce test data to the
jms-messages
topic in Kafka.seq 10 | confluent local services kafka produce jms-messages
Create a
jms-sink.json
file with the following contents:{ "name": "JmsSinkConnector", "config": { "connector.class": "io.confluent.connect.jms.JmsSinkConnector", "tasks.max": "1", "topics": "jms-messages", "java.naming.factory.initial": "com.solacesystems.jndi.SolJNDIInitialContextFactory", "java.naming.provider.url": "smf://localhost:55555", "java.naming.security.principal": "admin", "java.naming.security.credentials": "admin", "connection.factory.name": "/jms/cf/default", "Solace_JMS_VPN": "default", "jms.destination.type": "queue", "jms.destination.name": "connector-quickstart", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } }
Load the JMS Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load jms --config jms-sink.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status jms
Navigate to the Solace UI to confirm the messages were delivered to the
connector-quickstart
queue.Tip
The default credentials for the Solace UI are
admin
/admin
TIBCO EMS Quick Start¶
This quick start uses the JMS Sink connector to consume records from Kafka and send them to TIBCO Enterprise Message Service - Community Edition.
Download and unzip TIBCO EMS Community Edition.
Run the
TIBCOUniversalInstaller-mac.command
and step through the TIBCO Universal Installer.Tip
Remember your TIBCO_HOME directory. It is used in the upcoming steps.
Start TIBCO EMS with default configurations.
~/TIBCO_HOME/ems/8.4/bin/tibemsd
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent connect plugin install confluentinc/kafka-connect-jms-sink:latest
Copy
~/TIBCO_HOME/ems/8.4/lib/tibjms.jar
into the JMS Sink connector’s plugin folder. This needs to be done on every Connect worker node and the workers must be restarted to pick up the client jar.Tip
The plugin should be located in your Confluent Platform installation at
share/confluent-hub-components/kafka-connect-jms-sink/lib
Start Confluent Platform.
confluent local services start
Produce test data to the
jms-messages
topic in Kafka.seq 10 | confluent local services kafka produce jms-messages
Create a
jms-sink.json
file with the following contents:{ "name": "JmsSinkConnector", "config": { "connector.class": "io.confluent.connect.jms.JmsSinkConnector", "tasks.max": "1", "topics": "jms-messages", "java.naming.provider.url": "tibjmsnaming://localhost:7222", "java.naming.factory.initial": "com.tibco.tibjms.naming.TibjmsInitialContextFactory", "connection.factory.name": "QueueConnectionFactory", "java.naming.security.principal": "admin", "java.naming.security.credentials": "", "jms.destination.type": "queue", "jms.destination.name": "connector-quickstart", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } }
Load the JMS Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load jms --config jms-sink.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status jms
Validate that there are messages on the queue using the
tibemsadmin
tool.~/TIBCO_HOME/ems/8.4/bin/tibemsadmin -server "tcp://localhost:7222" -user admin # admin password is blank by default tcp://localhost:7222> show queue connector-quickstart
Connecting to Oracle WebLogic Server¶
You can connect to Oracle’s WebLogic Server using the JMS Sink connector. For a working example, see Oracle Weblogic Sink with JMS connector.
The following is a sample connector configuration:
{
"connector.class": "io.confluent.connect.jms.JmsSinkConnector",
"confluent.topic.bootstrap.servers": "broker:9092",
"topics": "sink-messages",
"java.naming.factory.initial": "weblogic.jndi.WLInitialContextFactory",
"java.naming.provider.url": "t3://weblogic-jms:7001",
"jms.destination.name": "myJMSServer/mySystemModule!myJMSServer@MyDistributedQueue",
"confluent.license": "",
"connection.factory.name": "myFactory",
"java.naming.security.principal": "weblogic",
"confluent.topic.replication.factor": "1",
"name": "jms-weblogic-topic-sink",
"jms.destination.type": "queue",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"java.naming.security.credentials": "welcome1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
You must add the Weblogic client library JAR to the connector /lib
folder:
Enter the following command to check the connector path:
grep plugin /etc/kafka-connect/kafka-connect.properties
Your output should resemble:
plugin.path=/usr/share/confluent-hub-components/confluentinc-kafka-connect-jms-sink
Add the JAR file to the
/lib
folder and verify the file is present in the folder:ls -larth /usr/share/confluent-hub-components/confluentinc-kafka-connect-jms-sink/lib/example3client.jar -rw-r--r-- 1 <username> <username> 8.7M Sep 29 10:29 /usr/share/confluent-hub-components/confluentinc-kafka-connect-jms-sink/lib/example3client.jar
See Client Library JARs for more information about installing client JARs.