JMS Client

Overview

Java Message Service (JMS) is a widely used messaging API that is included as part of the Java Platform, Enterprise Edition. kafka-jms-client is an implementation of the JMS 1.1 provider interface that allows Apache Kafka or the Confluent Platform to be used as a JMS message broker.

[kafka-jms-client] <--- kafka protocol ---> [kafka broker]

Applications written using Confluent’s kafka-jms-client can exchange messages with other application written using kafka-jms-client as well as any application that produces or consumes messages using any other Kafka client library.

The purpose of this client is to allow existing applications written to the JMS API to be able to publish and subscribe to Kafka as a replacement for any other JMS broker.

If you are interested in integrating Kafka with an existing JMS broker (i.e. without replacing it) then you are encouraged to look at the Kafka Connect API or any pre-built Kafka Source or Sink Connectors for JMS.

If you are interested in writing new Java applications then you are encouraged to use the Java Kafka Producer/Consumer APIs as they provide advanced features not available when using the kafka-jms-client.

Example

Usage of kafka-jms-client should be familiar to existing users of the JMS API.

The following example program uses a KafkaConnectionFactory instance to create JMS compliant Connection, Session and MessageProducer objects. The MessageProducer is then used to send 50 TextMessage messages to the Kafka topic test-queue, which is acting as a queue. A MessageConsumer is then created and used to read back these messages.

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import io.confluent.kafka.jms.JMSClientConfig;
import io.confluent.kafka.jms.KafkaConnectionFactory;

public class App {

    public static void main(String[] args) throws JMSException {
        Properties settings = new Properties();
        settings.put(JMSClientConfig.CLIENT_ID_CONFIG, "test-client-2");
        settings.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        settings.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181");

        ConnectionFactory connectionFactory = new KafkaConnectionFactory(settings);
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination testQueue = session.createQueue("test-queue");

        MessageProducer producer = session.createProducer(testQueue);
        for (int i=0; i<50; i++) {
            TextMessage message = session.createTextMessage();
            message.setText("This is a text message");
            producer.send(message);
        }

        MessageConsumer consumer = session.createConsumer(testQueue);
        while (true) {
            TextMessage message = (TextMessage)consumer.receive();
            System.out.println(message.getText());
        }
    }
}

Requirements

  • Java 7 (or higher).
  • A Kafka cluster running verision 0.11.0 brokers or higher (included in Confluent Enterprise 3.3 or higher).
  • A current Confluent Enterprise subscription. Note: The client can also be used without a license key for a trial period of 30 days.

Installation

The JMS client is a library that you use from within your Java applications.

To reference kafka-jms-client in a maven based project, first add the Confluent Maven repository to your pom.xml:

<repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven/</url>
    </repository>
</repositories>

Then add a dependency on the Confluent JMS client as well as the JMS API specification (note: replace the text ‘[version]’ with ‘4.1.1‘):

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-jms-client</artifactId>
    <version>[version]</version>
</dependency>
<dependency>
    <groupId>org.apache.geronimo.specs</groupId>
    <artifactId>geronimo-jms_1.1_spec</artifactId>
    <version>1.1</version>
</dependency>

If you don’t use Maven, you can download the JMS Client .jar file directly by navigating to the following URL (note: replace the text ‘[version]’ with ‘4.1.1‘):

http://packages.confluent.io/maven/io/confluent/kafka-jms-client/[version]/kafka-jms-client
-[version].jar

If you require a ‘fat’ .jar (one that includes the JMS Client and all of it’s dependencies), you can make one by following the instructions in appendix 1.

Development Guide

Import KafkaConnectionFactory as well as all the standard JMS interfaces you would with any other JMS provider:

import io.confluent.kafka.jms.KafkaConnectionFactory;

import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.Session;
import javax.jms.TextMessage;

Configuration

Required Configuration Properties

  • bootstrap.servers - A list of host/port pairs to use for establishing the initial connection to your Kafka cluster (of the form: host1:port1,host2:port2,....). Note that the client will make use of all servers in your cluster irrespective of which servers are specified via this property for bootstrapping. You may want to specify more than one in case one of the servers in your list is down at the time of initialization.
  • client.id - Under the hood, the JMS Client makes use of one or more Kafka clients for communication your Kafka cluster. The client.id of these clients is set to the value of this configuration property appended with a globally unique id (guid). The client.id string is passed to the server when making requests and is useful for debugging purposes.
  • confluent.license - A license key string provided to you by Confluent under the terms of a Confluent Enterprise subscription agreement. If not specified, you may use the client for a trial period of 30 days after which it will stop working.
  • zookeeper.connect - [only required if you have not provided a valid license key]. A ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server associated with your Kafka cluster.

Configuration properties are set in the same way as any other Kafka client:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181);
props.put("client.id", "my-jms-client");

Optional Configuration Properties

  • allow.out.of.order.acknowledge - If true, does not throw an exception if a message is acknowledged out of order (which implicitly acknowledges any messages before it). default value is false.
  • jms.fallback.message.type - If the JMS Message type header is not associated with a message, fallback to this message type.
  • consumer.group.id - A string that uniquely identifies the group of consumer processes to which this client belongs. If not specified, this defaults to confluent-jms in the case of queues and confluent-jms-{uuid} in the case of topics, where {uuid} is a unique value for each consumer. This naming strategy provides load balancer semantics in the case of queues and publish-subscribe semantics in the case of topics, as required by the JMS Specification.
  • zookeeper.session.timeout.ms - ZooKeeper connection timeout (unused if you provide a valid license key).
  • zookeeper.connect.timeout.ms - ZooKeeper connection establishment timeout (unused if you provide a valid license key).
  • jms.consumer.poll.timeout.ms - The maximum length of time Kafka consumers should block when retrieving records from Kafka. You should not need to adjust this value.
  • jms.consumer.close.timeout.ms - The maximum number of milliseconds to wait for a clean shutdown when closing a MessageConsumer.
  • message.listener.null.wait.ms - The number of milliseconds to wait before polling Kafka for new messages if no messages were retrieved in a message listener poll loop. You should not need to adjust this value.
  • connection.stop.timeout.ms - The maximum number of milliseconds to wait for the message listener threads to cleanly shutdown when connection.stop() has been called.

Standard Kafka Configuration Properties (Optional)

All of the configuration properties of the underlying Java Kafka client library may be specified. Simply prefix the desired property with producer. or consumer. as appropriate. For example:

props.put("producer.linger.ms", "1");
props.put("consumer.heartbeat.interval.ms", "1000");

Confluent Specific Features (Optional)

To configure client interceptors, which are required to enable Confluent Control Center message delivery monitoring, set the following Kafka properties:

props.put("producer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
props.put("consumer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");

Enabling TLS Encryption (Optional)

Security settings match those of the native Kafka java producer and consumer. Security settings are applied to both production and consumption of messages (you do not need to prefix security settings with consumer. or producer.).

If client authentication is not required in the broker, then the following is a minimal configuration example:

props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/var/private/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "test1234");

If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:

props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "test1234");
props.put("ssl.key.password", "test1234");

Developing JMS Applications

Creating a ConnectionFactory

A connection factory can be created as follows:

ConnectionFactory connectionFactory = new KafkaConnectionFactory(props);

KafkaConnectionFactory also implements the QueueConnectionFactory and TopicConnectionFactory interfaces:

QueueConnectionFactory queueConnectionFactory = new KafkaConnectionFactory(props);
TopicConnectionFactory topicConnectionFactory = new KafkaConnectionFactory(props);

Creating a Destination

Keep in mind that topics and queues are both backed by Kafka topics, so if you create and use a topic and queue with the same name, they will both be associated with the same Kafka topic.

Also note that Destination names must follow the same naming restrictions of Kafka topics so the maximum length is 249 symbols and letters, . (dot), _ (underscore), and - (minus) can be used. Take care to avoid the use of the ‘/’ character which is sometimes used in other JMS Topic names.

Queue testQueue = session.createQueue("test_queue");
Topic testTopic = session.createTopic("test_topic");
Destination destination = testTopic;

Creating a Connection

Create a Connection object as follows:

Connection connection = connectionFactory.createConnection();

Creating Sessions

Kafka sessions support both AUTO_ACKNOWLEDGE and CLIENT_ACKNOWLEDGE modes for consumer acknowledgements. Transactions are not supported - you should set the transacted argument to false.

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Producing / Consuming Messages

MessageProducer example:

MessageProducer producer = session.createProducer(testTopic);
TextMessage message = session.createTextMessage();
message.setText("This is a text message");
producer.send(message);

MessageConsumer example:

MessageConsumer consumer = this.session.createConsumer(testQueue);
while(true) {
    Message message = consumer.receiveNoWait();
    // TODO: Process message
}

JNDI Context

A simple JNDI InitialContextFactory implementation is provided that can be used to lookup JMS ConnectionFactory, QueueConnectionFactory and TopicConnectionFactory objects as well as Destination objects. For example:

Context ctx = new InitialContext();

ConnectionFactory cf = (ConnectionFactory)ctx.lookup("ConnectionFactory");
QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("QueueConnectionFactory");
TopicConnectionFactory qcf = (TopicConnectionFactory)ctx.lookup("TopicConnectionFactory");

Queue queue = (Queue)ctx.lookup("foo");
Topic topic = (Topic)ctx.lookup("bar");

You’ll need to set the java.naming.factory.initial system property to io.confluent.kafka.jms.KafkaInitialContextFactory, either using a -D command line option or in a jndi.properties file located somewhere on your classpath.

Also, queue and topic name lookups and JMS Client configuration properties need to be specified. Here’s an example jndi.properties file:

java.naming.factory.initial = io.confluent.kafka.jms.KafkaInitialContextFactory

# JMS Client properties

client.id = testing-01
zookeeper.connect = localhost:2181
bootstrap.servers = localhost:9092

# Register queues in JNDI using the form:
#   queue.[jndiName] = [physicalName]

# Register topics in JNDI using the form:
#   topic.[jndiName] = [physicalName]

queue.foo = foo
topic.bar = bar

As an alternative to using system properties or a jndi.properties file, you can programmatically pass properties into the InitialContext constructor for all or a subset of your configuration. For example:

Hashtable props = new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY, "io.confluent.kafka.jms.KafkaInitialContextFactory");
props.put(JMSClientConfig.CLIENT_ID_CONFIG, "testing-01");
props.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181");
props.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("topic.bar", "bar");

Context ctx = new InitialContext(props);

As a convenience, KafkaInitialContextFactory automatically translates SSL system properties to the relevant kafka properties.

System Property Kafke Property
javax.net.ssl.trustStore ssl.truststore.location
javax.net.ssl.trustStoreType ssl.truststore.type
javax.net.ssl.trustStorePassword ssl.truststore.password
javax.net.ssl.keyStore ssl.keystore.location
javax.net.ssl.keyStoreType ssl.keystore.type
javax.net.ssl.keyStorePassword ssl.keystore.password

Threading

The Java Messaging Specification states that a session may not be operated on by more than one thread at a time. This restriction applies to the Kafka JMS Client. However, different sessions created from a single connection may be used concurrently, as per the specification.

JMS 1.1 Compatibility

Where possible, the Kafka JMS Client is a complete implementation of the JMS 1.1 specification. However there are some JMS concepts that either do not map 1:1 to Kafka, or simply do not make sense at all in Kafka (such as nonpersistent messages).

JMS brokers typically provide distinct Topic and Queue functionality that reflects that in the JMS Specification. By contrast, Kafka provides only one abstraction - partitioned topics - which are actually distributed commit logs and behave differently to the traditional messaging system notion of a topic. In practice, Kafka topics can mimic the behavior of either topics or queues in the traditional messaging system sense.

Additional notes on JMS 1.1 compatibility are outlined below.

Features

Both JMS messaging models are supported:

  • Publish/Subscribe (Topics)
  • Point-to-Point (Queues)

All JMS message types are supported including:

  • TextMessage
  • BytesMessage
  • MapMessage
  • StreamMessage
  • ObjectMessage

All JMS message header values are transported although some are ignored because they do not make sense in Kafka:

  • JMSDestination
  • JMSDeliveryMode - Ignored. Kafka only has one delivery mode which is persisted.
  • JMSExpiration - Stored, but not enforced.
  • JMSPriority - Stored, but not recognized.
  • JMSMessageID
  • JMSTimestamp
  • JMSCorrelationID
  • JMSReplyTo
  • JMSType
  • JMSRedelivered

Request/Reply features are supported using durable Topics/Queues (but not using temporary endpoints).

Unsupported Features

The following functionality has either limited support or no support:

Session

Method Notes
createDurableSubscriber(Topic, String) Durable Subscribers are not supported.
createDurableSubscriber(Topic, String, String, boolean) Durable Subscribers are not supported.
getMessageListener() Session based MessageListeners are not supported.
setMessageListener(MessageListener) Session based MessageListeners are not supported.
rollback() Transactions are not supported by Kafka.
recover() Transactions are not supported by Kafka
commit() Transactions are not supported by Kafka.
createBrowser(Queue, String) QueueBrowsers with message selectors are not supported.
unsubscribe(String) Durable Subscribers are not supported.
createConsumer(Destination, String) Message selectors are not not supported. An exception will be thrown if one is set.
createConsumer(Destination, String, boolean) Message selectors are not not supported. An exception will be thrown if one is set.
createTemporaryTopic() Kafka does not have a concept of a temporary topic.
getTransacted() Transactions are not supported. This will always return false.
createTemporaryQueue() Kafka does not have a concept of a temporary topic.

ConnectionFactory

Method Notes
createConnection(String, String) Not supported. Authentication is only supported via Kafka configuration properties.

Connection

Method Notes
setClientID(String) Setting the client id is only supported by using client.id in the settings passed to KafkaConnectionFactory.

Appendix 1 - Creating a Shaded Fat .jar

In some scenarios, it is useful to have a ‘fat’ .jar that bundles the JMS Client together with all of its dependencies in a single file. Confluent does not distribute the JMS Client in this form, but you can build a fat .jar yourself easily enough:

  1. Copy the pom.xml below into an empty directory (note: replace the text ‘[version]’ with ‘4.1.1‘).
  2. Execute the maven command mvn package.

The resulting artifact will be placed in the target directory along side the pom.xml file. Note: In addition to bundling dependencies, the provided pom.xml file shades them under the namespace confluent.shaded. in order to avoid potential namespace clashes.

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>io.confluent</groupId>
    <artifactId>kafka-jms-client-fat</artifactId>
    <version>[version]</version>

    <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-jms-client</artifactId>
            <version>[version]</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <relocations>
                                <relocation>
                                    <pattern>org.I0Itec.</pattern>
                                    <shadedPattern>confluent.shaded.org.I0Itec.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.yammer.metrics.</pattern>
                                    <shadedPattern>confluent.shaded.com.yammer.metrics.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>joptsimple</pattern>
                                    <shadedPattern>confluent.shaded.joptsimple</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.zookeeper.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.zookeeper.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.jute.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.jute.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.kafka.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.kafka.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.log4j.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.log4j.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.common.</pattern>
                                    <shadedPattern>confluent.shaded.com.google.common.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.thirdparty.</pattern>
                                    <shadedPattern>confluent.shaded.com.google.thirdparty.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.fasterxml.jackson.</pattern>
                                    <shadedPattern>confluent.shaded.com.fasterxml.jackson.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>net.jpountz.</pattern>
                                    <shadedPattern>confluent.shaded.net.jpountz.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.xerial.snappy.</pattern>
                                    <shadedPattern>confluent.shaded.org.xerial.snappy.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.jose4j.</pattern>
                                    <shadedPattern>confluent.shaded.org.jose4j.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>io.confluent.common.</pattern>
                                    <shadedPattern>confluent.shaded.io.confluent.common.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>io.confluent.license.</pattern>
                                    <shadedPattern>confluent.shaded.io.confluent.license.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>kafka.</pattern>
                                    <shadedPattern>confluent.shaded.kafka.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>scala</pattern>
                                    <shadedPattern>confluent.shaded.scala.</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <filtering>true</filtering>
            </resource>
        </resources>

    </build>
</project>