Confluent’s JMS Client for Apache Kafka

Overview

The Java Message Service (JMS) is a client messaging API used by distributed Java applications for publish/subscribe and point to point communications.

The kafka-jms-client is an implementation of the JMS 1.1 provider interface that uses the Apache Kafka wire protocol to talk to one or more Kafka brokers.

[kafka-jms-client]<---kafka protocol--->[kafka broker]

Applications written using this client library can exchange messages with other applications written with the kafka-jms-client as well as with any other Kafka clients that produce or consume messages using any of the other Kafka client libraries.

The purpose of this client is to allow existing applications written to the JMS API to be able to publish and subscribe to Kafka as a replacement for any other JMS broker.

If you are interested in integrating Kafka with an existing JMS broker (i.e. without replacing it) then you are encouraged to look at the Kafka Connect API or any pre-built Kafka Source or Sink Connectors for JMS.

If you are interested in writing new Java applications then you are encouraged to use the Java Kafka Producer/Consumer APIs as they provide advanced features not available when using the kafka-jms-client.

Examples

Using the kafka-jms-client should be familiar to existing users of the JMS API.

Producing messages

The following examples create a JMS compliant Connection and Session and then either a JMS MessageProducer is created and this wraps a Kafka producer. Messages are stored in Kafka using an Avro message body and key:

ConnectionFactory connectionFactory = new KafkaConnectionFactory(settings);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination testTopic = session.createQueue("test_topic");
MessageProducer producer = session.createProducer(testTopic);
for (int i = 0; i < 50; i++) {
  TextMessage message = session.createTextMessage();
  message.setText("This is a text message");
  producer.send(message);
}

Consuming messages

ConnectionFactory connectionFactory = new KafkaConnectionFactory(settings);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination testTopic = session.createQueue("test_topic");
MessageConsumer consumer = this.session.createConsumer(testTopic);
while(true) {
    Message message = consumer.receiveNoWait();
    //TODO: Do things.
}

Requirements

Java 7 (or higher) is required to run applications written with the kafka-jms-client library.

Applications using this client library are required to have the confluent-kafka-jms-<version>.jar configured in the CLASSPATH.

The client requires a connection to a running Kafka cluster (brokers and zookeeper). The Kafka brokers must be version 0.10.0 or higher (included in Confluent Enterprise 3.2 or higher).

The Confluent Schema Registry is required to store the Avro Schemas associated with the JMS messages stored in Kafka and to enable support for JMS Header Properties. The Confluent Schema Registry is part of the Confluent Platform, you can read more on how to use the schema registry

To use the kafka-jms-client you must have a current Confluent Enterprise subcription.

Installation

Download and install the Confluent Enterprise distribution (Version 3.2 or higher) from Confluent.

JMS 1.1 Compatibility

The kafka-jms-client is a very complete implementation of the JMS 1.1 specification. However there are some JMS concepts that either do not map 1:1 to Kafka, or simply do not make sense at all in Kafka (such as nonpersistent messages).

JMS brokers typically have separate Topics and Queues while Kafka has Topics and Topic Partitions (which can act like Queues). This JMS client stores all messages in a Kafka topic. If Session.createQueue() or Session.createTopic() is called and KafkaDestination will be returned. This will always map to a Topic on the Kafka Broker.

JMS Header Notes
JMSDeliveryMode Delivery mode is ignored. Kafka only has one delivery mode which is persisted.
JMSPriority Priority is stored in the message envelope but is not recognized.
JMSExpiration The JMS Expiration is stored but not enforced on the broker. In a future release the client could be configured to drop messages that have reached the expiration.

Features

All JMS Messaging Models are supported including:

  • Publish/Subscribe (Topics)
  • Point-to-Point (Queues)

All JMS Message Types are supported including:

  • TextMessage
  • BytesMessage
  • MapMessage
  • StreamMessage
  • ObjectMessage

All JMS Message Header values are transported although some are ignored because they do not make sense in Kafka:

  • JMSDestination
  • JMSDeliveryMode
  • JMSExpiration
  • JMSPriority
  • JMSMessageID
  • JMSTimestamp
  • JMSCorrelationID
  • JMSReplyTo
  • JMSType
  • JMSRedelivered

Request/Reply features are supported using durable Topics/Queues (but not using using temporary endpoints)

Unsupported Features

The kafka-jms-client supports the full JMS 1.1 specification with the following exceptions which are classes which have either limited support, no support, or other important notes about how the feature is supported.

Session

Method Notes
createDurableSubscriber(Topic, String) Durable Subscribers are not supported.
createDurableSubscriber(Topic, String, String, boolean) Durable Subscribers are not supported.
getMessageListener() Session based MessageListeners are not supported.
setMessageListener(MessageListener) Session based MessageListeners are not supported.
rollback() Transactions are not supported by Kafka. This is a noop.
recover() Transactions are not supported by Kafka
commit() Transactions are not supported by Kafka. This is a noop.
createBrowser(Queue, String) QueueBrowsers with message selectors are not supported.
unsubscribe(String) Durable Subscribers are not supported.
createConsumer(Destination, String) Message selectors are not not supported. An exception will be thrown if one is set.
createConsumer(Destination, String, boolean) Message selectors are not not supported. An exception will be thrown if one is set.
createTemporaryTopic() Kafka does not have a concept of a temporary topic.
getTransacted() Transactions are not supported by Kafka. This will always return false.
createTemporaryQueue() Kafka does not have a concept of a temporary topic.

ConnectionFactory

Method Notes
createConnection(String, String) This method calls createConnection() internally. Username and password of this override are ignored.

Connection

Method Notes
setExceptionListener(ExceptionListener) ExceptionListeners are not supported. This currently does nothing.
setClientID(String) Setting the client id is only supported by using client.id in the settings passed to KafkaConnectionFactory.
getExceptionListener() ExceptionListeners are not supported. This currently does nothing.

Developing JMS Clients

Dependancies

Add the Confluent Maven repository to your pom.xml:

<repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven/</url>
    </repository>
</repositories>

Add a dependency on the Confluent JMS client:

<dependency>
   <groupId>io.confluent</groupId>
   <artifactId>kafka-jms-client</artifactId>
   <version>3.2.0</version>
 </dependency>

Import the JMS KafkaConnectionFactory from Confluent and then just use all the standard JMS interfaces as you would with any other JMS provider.

import io.confluent.kafka.jms.KafkaConnectionFactory;

import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.Session;
import javax.jms.TextMessage;

As documented in the JMS specification, a typical JMS-based application executes the following steps:

  • create a ConnectionFactory
  • create Destination objects
  • create a Connection
  • create one or more Sessions
  • create MessageProducers and MessageConsumers
  • start message delivery on the Connection

Configure and Create a ConnectionFactory

The KafkaConnectionFactory requires the following settings to work:

  • bootstrap.servers - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
  • schema.registry.url - Address of your Confluent Schema Registry in URL format
  • zookeeper.connect - Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.
  • client.id - An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
  • group.id - A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.
  • confluent.license - a license key string provided to you by Confluent under the terms of a Confluent Enterprise subscription agreement.

Setting Required Properties

Set your configuration settings just as you would for a any other Kafka client.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
props.put("zookeeper.connect", "localhost:2181");
props.put("client.id", "MySampleJMSProducer");
props.put("group.id", "jmsconsumergroup1");
props.put("confluent.license", "RLKGWF3S8Q7LA5N4AEQ74VOWYD3VL297");

Enabling Confluent Specific Features (Optional)

All of the features of the underlying Kafka Client library are included in the single jar and can be setup in the same way as documented in the Native Kafka Producer / Consumer API documentation.

For example to configure client interceptors for enable Control Center stream monitoring simply set the correct Kafka properties as follows:

props.put("producer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
props.put("consumer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");

Enabling TLS Encryption (Optional)

All security settings as the same as with a Native Kafka Java client applications. Just make sure to set all the security settings before creating the ConnectionFactory as follows:

If client authentication is not required in the broker, then the following is a minimal configuration example:

props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/var/private/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "test1234");

If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:

props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "test1234");
props.put("ssl.key.password", "test1234");

Create a ConnectionFactory

Then use the configuration settings to create a ConnectionFactory.

ConnectionFactory connectionFactory = new KafkaConnectionFactory(props);

Creating a Destination

Keep in mind that Topics and Queues are treated as the same thing in Kafka so if you create and use a Topic and a Queue with the same name, you will be using the same underlying JMS Destination which is actually a Kafka Topic.

Also note that Destination names must follow the same naming restrictions of Kafka Topics so max length is 249 symbols and letters, . (dot), _ (underscore), and - (minus) can be used. Take care to avoid the use of the ‘/’ character which is sometimes used in other JMS Topic names.

Destination testTopic = session.createQueue("test_topic");

Creating a Connection

Connection connection = connectionFactory.createConnection();

Creating Sessions

Create a JMS session, without specifying it to be a transacted session (not currently supported) and selecting either AUTO_ACKNOWLEDGE or CLIENT_ACKNOWLEDGE mode for consumer acknowledgements.

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Producing / Consuming Messages

Example JMS Producer

MessageProducer producer = session.createProducer(testTopic);
TextMessage message = session.createTextMessage();
message.setText("This is a text message");
producer.send(message);

Example JMS Consumer

MessageConsumer consumer = this.session.createConsumer(testTopic);
while(true) {
    Message message = consumer.receiveNoWait();
    //TODO: Do things.
}

Threading

Producer and Consumer must only call the JMS APIs from a single thread because the underlying client libraries are not thread safe.