JMS Client Development Guide

Import KafkaConnectionFactory and all of the standard JMS interfaces:

import io.confluent.kafka.jms.KafkaConnectionFactory;

import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.Session;
import javax.jms.TextMessage;

Configuration

Required Configuration Properties

  • bootstrap.servers - A list of host/port pairs to use for establishing the initial connection to your Kafka cluster (of the form: host1:port1,host2:port2,....). Note that the client will make use of all servers in your cluster irrespective of which servers are specified via this property for bootstrapping. You may want to specify more than one in case one of the servers in your list is down at the time of initialization.
  • client.id - Under the hood, the JMS Client makes use of one or more Kafka clients for communication your Kafka cluster. The client.id of these clients is set to the value of this configuration property appended with a globally unique id (guid). The client.id string is passed to the server when making requests and is useful for debugging purposes.
  • confluent.license - A license key string provided to you by Confluent under the terms of a Confluent Enterprise subscription agreement. If not specified, you may use the client for a trial period of 30 days after which it will stop working.
  • zookeeper.connect - [only required if you have not provided a valid license key]. A ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server associated with your Kafka cluster.

Configuration properties are set in the same way as any other Kafka client:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181);
props.put("client.id", "my-jms-client");

Optional Configuration Properties

  • allow.out.of.order.acknowledge - If true, does not throw an exception if a message is acknowledged out of order (which implicitly acknowledges any messages before it). default value is false.
  • jms.fallback.message.type - If the JMS Message type header is not associated with a message, fallback to this message type.
  • consumer.group.id - A string that uniquely identifies the group of consumer processes to which this client belongs. If not specified, this defaults to confluent-jms in the case of queues and confluent-jms-{uuid} in the case of topics, where {uuid} is a unique value for each consumer. This naming strategy provides load balancer semantics in the case of queues and publish-subscribe semantics in the case of topics, as required by the JMS Specification.
  • zookeeper.session.timeout.ms - ZooKeeper connection timeout (unused if you provide a valid license key).
  • zookeeper.connect.timeout.ms - ZooKeeper connection establishment timeout (unused if you provide a valid license key).
  • jms.consumer.poll.timeout.ms - The maximum length of time Kafka consumers should block when retrieving records from Kafka. You should not need to adjust this value.
  • jms.consumer.close.timeout.ms - The maximum number of milliseconds to wait for a clean shutdown when closing a MessageConsumer.
  • message.listener.null.wait.ms - The number of milliseconds to wait before polling Kafka for new messages if no messages were retrieved in a message listener poll loop. You should not need to adjust this value.
  • connection.stop.timeout.ms - The maximum number of milliseconds to wait for the message listener threads to cleanly shutdown when connection.stop() has been called.
  • jms.create.connection.ignore.authenticate - If true, connection creation methods on ConnectionFactory that have username and password parameters will fall through to the corresponding methods that do not have these parameters (the parameters will be ignored). If false, use of these methods will result in a JMSException being thrown.

Standard Kafka Configuration Properties (Optional)

All of the configuration properties of the underlying Java Kafka client library may be specified. Simply prefix the desired property with producer. or consumer. as appropriate. For example:

props.put("producer.linger.ms", "1");
props.put("consumer.heartbeat.interval.ms", "1000");

Confluent Specific Features (Optional)

To configure client interceptors, which are required to enable Confluent Control Center message delivery monitoring, set the following Kafka properties:

props.put("producer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
props.put("consumer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");

Enabling TLS Encryption (Optional)

Security settings match those of the native Kafka java producer and consumer. Security settings are applied to both production and consumption of messages (you do not need to prefix security settings with consumer. or producer.).

If client authentication is not required in the broker, then the following is a minimal configuration example:

props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/var/private/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "test1234");

If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:

props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "test1234");
props.put("ssl.key.password", "test1234");

Developing JMS Applications

Creating a ConnectionFactory

A connection factory can be created as follows:

ConnectionFactory connectionFactory = new KafkaConnectionFactory(props);

KafkaConnectionFactory also implements the QueueConnectionFactory and TopicConnectionFactory interfaces:

QueueConnectionFactory queueConnectionFactory = new KafkaConnectionFactory(props);
TopicConnectionFactory topicConnectionFactory = new KafkaConnectionFactory(props);

Creating a Destination

Keep in mind that topics and queues are both backed by Kafka topics, so if you create and use a topic and queue with the same name, they will both be associated with the same Kafka topic.

Also note that Destination names must follow the same naming restrictions of Kafka topics so the maximum length is 249 symbols and letters, . (dot), _ (underscore), and - (minus) can be used. Take care to avoid the use of the '/' character which is sometimes used in other JMS Topic names.

Queue testQueue = session.createQueue("test_queue");
Topic testTopic = session.createTopic("test_topic");
Destination destination = testTopic;

It's possible to specify a queue or topic backed by more than one Kafka topic using a regular expression. For example:

Queue testQueue = session.createQueue("regex(test_queue[12])");

Creating a Connection

Create a Connection object as follows:

Connection connection = connectionFactory.createConnection();

Creating Sessions

Kafka sessions support both AUTO_ACKNOWLEDGE and CLIENT_ACKNOWLEDGE modes for consumer acknowledgements. Transactions are not supported - you should set the transacted argument to false.

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Producing / Consuming Messages

MessageProducer example:

MessageProducer producer = session.createProducer(testTopic);
TextMessage message = session.createTextMessage();
message.setText("This is a text message");
producer.send(message);

MessageConsumer example:

MessageConsumer consumer = this.session.createConsumer(testQueue);
while(true) {
    Message message = consumer.receiveNoWait();
    // TODO: Process message
}

JNDI Context

A simple JNDI InitialContextFactory implementation is provided that can be used to lookup JMS ConnectionFactory, QueueConnectionFactory and TopicConnectionFactory objects as well as Destination objects. For example:

Context ctx = new InitialContext();

ConnectionFactory cf = (ConnectionFactory)ctx.lookup("ConnectionFactory");
QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("QueueConnectionFactory");
TopicConnectionFactory qcf = (TopicConnectionFactory)ctx.lookup("TopicConnectionFactory");

Queue queue = (Queue)ctx.lookup("foo");
Topic topic = (Topic)ctx.lookup("bar");

You'll need to set the java.naming.factory.initial system property to io.confluent.kafka.jms.KafkaInitialContextFactory, either using a -D command line option or in a jndi.properties file located somewhere on your classpath.

Also, queue and topic name lookups and JMS Client configuration properties need to be specified. Here's an example jndi.properties file:

java.naming.factory.initial = io.confluent.kafka.jms.KafkaInitialContextFactory

# JMS Client properties

client.id = testing-01
zookeeper.connect = localhost:2181
bootstrap.servers = localhost:9092

# Register queues in JNDI using the form:
#   queue.[jndiName] = [physicalName]

# Register topics in JNDI using the form:
#   topic.[jndiName] = [physicalName]

queue.foo = foo
topic.bar = bar

As an alternative to using system properties or a jndi.properties file, you can programmatically pass properties into the InitialContext constructor for all or a subset of your configuration. For example:

Hashtable props = new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY, "io.confluent.kafka.jms.KafkaInitialContextFactory");
props.put(JMSClientConfig.CLIENT_ID_CONFIG, "testing-01");
props.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181");
props.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("topic.bar", "bar");

Context ctx = new InitialContext(props);

As a convenience, KafkaInitialContextFactory automatically translates SSL system properties to the relevant kafka properties.

System Property Kafka Property
javax.net.ssl.trustStore ssl.truststore.location
javax.net.ssl.trustStoreType ssl.truststore.type
javax.net.ssl.trustStorePassword ssl.truststore.password
javax.net.ssl.keyStore ssl.keystore.location
javax.net.ssl.keyStoreType ssl.keystore.type
javax.net.ssl.keyStorePassword ssl.keystore.password

Threading

The Java Messaging Specification states that a session may not be operated on by more than one thread at a time. This restriction applies to the Kafka JMS Client. However, different sessions created from a single connection may be used concurrently, as per the specification.