Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
JMS Client¶
Table of Contents
Overview¶
Java Message Service (JMS) is a widely used messaging API that is included as part of the Java Platform, Enterprise Edition. kafka-jms-client is an implementation of the JMS 1.1 provider interface that allows Apache Kafka or the Confluent Platform to be used as a JMS message broker.
[kafka-jms-client] <--- kafka protocol ---> [kafka broker]
Applications written using Confluent’s kafka-jms-client can exchange messages with other application written using kafka-jms-client as well as any application that produces or consumes messages using any other Kafka client library.
The purpose of this client is to allow existing applications written to the JMS API to be able to publish and subscribe to Kafka as a replacement for any other JMS broker.
If you are interested in integrating Kafka with an existing JMS broker (i.e. without replacing it) then you are encouraged to look at the Kafka Connect API or any pre-built Kafka Source or Sink Connectors for JMS.
If you are interested in writing new Java applications then you are encouraged to use the Java Kafka Producer/Consumer APIs as they provide advanced features not available when using the kafka-jms-client.
Example¶
Usage of kafka-jms-client should be familiar to existing users of the JMS API.
The following example program uses a KafkaConnectionFactory
instance to create JMS
compliant Connection
, Session
and MessageProducer
objects. The MessageProducer
is then used to send 50 TextMessage
messages to the Kafka topic test-queue
,
which is acting as a queue. A MessageConsumer
is then created and used to read back
these messages.
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import io.confluent.kafka.jms.JMSClientConfig;
import io.confluent.kafka.jms.KafkaConnectionFactory;
public class App {
public static void main(String[] args) throws JMSException {
Properties settings = new Properties();
settings.put(JMSClientConfig.CLIENT_ID_CONFIG, "test-client-2");
settings.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
settings.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181");
ConnectionFactory connectionFactory = new KafkaConnectionFactory(settings);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination testQueue = session.createQueue("test-queue");
MessageProducer producer = session.createProducer(testQueue);
for (int i=0; i<50; i++) {
TextMessage message = session.createTextMessage();
message.setText("This is a text message");
producer.send(message);
}
MessageConsumer consumer = session.createConsumer(testQueue);
while (true) {
TextMessage message = (TextMessage)consumer.receive();
System.out.println(message.getText());
}
}
}
Requirements¶
- Java 7 (or higher).
- A Kafka cluster running verision 0.11.0 brokers or higher (included in Confluent Enterprise 3.3 or higher).
- A current Confluent Enterprise subscription. Note: The client can also be used without a license key for a trial period of 30 days.
Installation¶
The JMS client is a library that you use from within your Java applications.
To reference kafka-jms-client in a maven based project, first add the Confluent Maven repository to your pom.xml:
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
Then add a dependency on the Confluent JMS client as well as the JMS API specification (note: replace the text ‘[version]’ with ‘4.1.3’):
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-jms-client</artifactId>
<version>[version]</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1</version>
</dependency>
If you don’t use Maven, you can download the JMS Client .jar file directly by navigating to the following URL (note: replace the text ‘[version]’ with ‘4.1.3’):
http://packages.confluent.io/maven/io/confluent/kafka-jms-client/[version]/kafka-jms-client
-[version].jar
If you require a ‘fat’ .jar (one that includes the JMS Client and all of it’s dependencies), you can make one by following the instructions in appendix 1.
Development Guide¶
Import KafkaConnectionFactory
as well as all the standard JMS interfaces you would with any other JMS provider:
import io.confluent.kafka.jms.KafkaConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.Session;
import javax.jms.TextMessage;
Configuration¶
Required Configuration Properties¶
bootstrap.servers
- A list of host/port pairs to use for establishing the initial connection to your Kafka cluster (of the form:host1:port1,host2:port2,....
). Note that the client will make use of all servers in your cluster irrespective of which servers are specified via this property for bootstrapping. You may want to specify more than one in case one of the servers in your list is down at the time of initialization.client.id
- Under the hood, the JMS Client makes use of one or more Kafka clients for communication your Kafka cluster. Theclient.id
of these clients is set to the value of this configuration property appended with a globally unique id (guid). Theclient.id
string is passed to the server when making requests and is useful for debugging purposes.confluent.license
- A license key string provided to you by Confluent under the terms of a Confluent Enterprise subscription agreement. If not specified, you may use the client for a trial period of 30 days after which it will stop working.zookeeper.connect
- [only required if you have not provided a valid license key]. A ZooKeeper connection string in the formhostname:port
where host and port are the host and port of a ZooKeeper server associated with your Kafka cluster.
Configuration properties are set in the same way as any other Kafka client:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181);
props.put("client.id", "my-jms-client");
Optional Configuration Properties¶
allow.out.of.order.acknowledge
- If true, does not throw an exception if a message is acknowledged out of order (which implicitly acknowledges any messages before it). default value isfalse
.jms.fallback.message.type
- If the JMS Message type header is not associated with a message, fallback to this message type.consumer.group.id
- A string that uniquely identifies the group of consumer processes to which this client belongs. If not specified, this defaults toconfluent-jms
in the case of queues andconfluent-jms-{uuid}
in the case of topics, where {uuid} is a unique value for each consumer. This naming strategy provides load balancer semantics in the case of queues and publish-subscribe semantics in the case of topics, as required by the JMS Specification.zookeeper.session.timeout.ms
- ZooKeeper connection timeout (unused if you provide a valid license key).zookeeper.connect.timeout.ms
- ZooKeeper connection establishment timeout (unused if you provide a valid license key).jms.consumer.poll.timeout.ms
- The maximum length of time Kafka consumers should block when retrieving records from Kafka. You should not need to adjust this value.jms.consumer.close.timeout.ms
- The maximum number of milliseconds to wait for a clean shutdown when closing aMessageConsumer
.message.listener.null.wait.ms
- The number of milliseconds to wait before polling Kafka for new messages if no messages were retrieved in a message listener poll loop. You should not need to adjust this value.connection.stop.timeout.ms
- The maximum number of milliseconds to wait for the message listener threads to cleanly shutdown when connection.stop() has been called.jms.create.connection.ignore.authenticate
- If true, connection creation methods on ConnectionFactory that have username and password parameters will fall through to the corresponding methods that do not have these parameters (the parameters will be ignored). If false, use of these methods will result in a JMSException being thrown.
Standard Kafka Configuration Properties (Optional)¶
All of the configuration properties of the underlying Java Kafka client library may be specified. Simply prefix the desired property with producer.
or consumer.
as appropriate. For example:
props.put("producer.linger.ms", "1");
props.put("consumer.heartbeat.interval.ms", "1000");
Confluent Specific Features (Optional)¶
To configure client interceptors, which are required to enable Confluent Control Center message delivery monitoring, set the following Kafka properties:
props.put("producer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
props.put("consumer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");
Enabling TLS Encryption (Optional)¶
Security settings match those of the native Kafka java producer and consumer. Security settings are applied to both production and consumption of messages (you do not need to prefix security settings with consumer.
or producer.
).
If client authentication is not required in the broker, then the following is a minimal configuration example:
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/var/private/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "test1234");
If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "test1234");
props.put("ssl.key.password", "test1234");
Developing JMS Applications¶
Creating a ConnectionFactory¶
A connection factory can be created as follows:
ConnectionFactory connectionFactory = new KafkaConnectionFactory(props);
KafkaConnectionFactory
also implements the QueueConnectionFactory
and TopicConnectionFactory
interfaces:
QueueConnectionFactory queueConnectionFactory = new KafkaConnectionFactory(props);
TopicConnectionFactory topicConnectionFactory = new KafkaConnectionFactory(props);
Creating a Destination¶
Keep in mind that topics and queues are both backed by Kafka topics, so if you create and use a topic and queue with the same name, they will both be associated with the same Kafka topic.
Also note that Destination
names must follow the same naming restrictions of Kafka topics so the maximum length is 249 symbols and letters, . (dot), _ (underscore), and - (minus) can be used. Take care to avoid the use of the ‘/’ character which is sometimes used in other JMS Topic names.
Queue testQueue = session.createQueue("test_queue");
Topic testTopic = session.createTopic("test_topic");
Destination destination = testTopic;
Creating a Connection¶
Create a Connection
object as follows:
Connection connection = connectionFactory.createConnection();
Creating Sessions¶
Kafka sessions support both AUTO_ACKNOWLEDGE
and CLIENT_ACKNOWLEDGE
modes for consumer acknowledgements. Transactions are not supported - you should set the transacted
argument to false
.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Producing / Consuming Messages¶
MessageProducer
example:
MessageProducer producer = session.createProducer(testTopic);
TextMessage message = session.createTextMessage();
message.setText("This is a text message");
producer.send(message);
MessageConsumer
example:
MessageConsumer consumer = this.session.createConsumer(testQueue);
while(true) {
Message message = consumer.receiveNoWait();
// TODO: Process message
}
JNDI Context¶
A simple JNDI InitialContextFactory
implementation is provided that can be used to lookup JMS
ConnectionFactory
, QueueConnectionFactory
and TopicConnectionFactory
objects as well
as Destination
objects. For example:
Context ctx = new InitialContext();
ConnectionFactory cf = (ConnectionFactory)ctx.lookup("ConnectionFactory");
QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("QueueConnectionFactory");
TopicConnectionFactory qcf = (TopicConnectionFactory)ctx.lookup("TopicConnectionFactory");
Queue queue = (Queue)ctx.lookup("foo");
Topic topic = (Topic)ctx.lookup("bar");
You’ll need to set the java.naming.factory.initial
system property to
io.confluent.kafka.jms.KafkaInitialContextFactory
, either using a -D
command
line option or in a jndi.properties
file located somewhere on your classpath.
Also, queue and topic name lookups and JMS Client configuration properties need to be specified.
Here’s an example jndi.properties
file:
java.naming.factory.initial = io.confluent.kafka.jms.KafkaInitialContextFactory
# JMS Client properties
client.id = testing-01
zookeeper.connect = localhost:2181
bootstrap.servers = localhost:9092
# Register queues in JNDI using the form:
# queue.[jndiName] = [physicalName]
# Register topics in JNDI using the form:
# topic.[jndiName] = [physicalName]
queue.foo = foo
topic.bar = bar
As an alternative to using system properties or a jndi.properties
file, you can programmatically
pass properties into the InitialContext
constructor for all or a subset of your configuration.
For example:
Hashtable props = new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY, "io.confluent.kafka.jms.KafkaInitialContextFactory");
props.put(JMSClientConfig.CLIENT_ID_CONFIG, "testing-01");
props.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181");
props.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("topic.bar", "bar");
Context ctx = new InitialContext(props);
As a convenience, KafkaInitialContextFactory
automatically translates SSL system properties to
the relevant kafka properties.
System Property | Kafke Property |
---|---|
javax.net.ssl.trustStore | ssl.truststore.location |
javax.net.ssl.trustStoreType | ssl.truststore.type |
javax.net.ssl.trustStorePassword | ssl.truststore.password |
javax.net.ssl.keyStore | ssl.keystore.location |
javax.net.ssl.keyStoreType | ssl.keystore.type |
javax.net.ssl.keyStorePassword | ssl.keystore.password |
JMS 1.1 Compatibility¶
Where possible, the Kafka JMS Client is a complete implementation of the JMS 1.1 specification. However there are some JMS concepts that either do not map 1:1 to Kafka, or simply do not make sense at all in Kafka (such as nonpersistent messages).
JMS brokers typically provide distinct Topic and Queue functionality that reflects that in the JMS Specification. By contrast, Kafka provides only one abstraction - partitioned topics - which are actually distributed commit logs and behave differently to the traditional messaging system notion of a topic. In practice, Kafka topics can mimic the behavior of either topics or queues in the traditional messaging system sense.
Additional notes on JMS 1.1 compatibility are outlined below.
Features¶
Both JMS messaging models are supported:
- Publish/Subscribe (Topics)
- Point-to-Point (Queues)
All JMS message types are supported including:
TextMessage
BytesMessage
MapMessage
StreamMessage
ObjectMessage
All JMS message header values are transported although some are ignored because they do not make sense in Kafka:
JMSDestination
JMSDeliveryMode
- Ignored. Kafka only has one delivery mode which is persisted.JMSExpiration
- Stored, but not enforced.JMSPriority
- Stored, but not recognized.JMSMessageID
JMSTimestamp
JMSCorrelationID
JMSReplyTo
JMSType
JMSRedelivered
Request/Reply features are supported using durable Topics/Queues (but not using temporary endpoints).
Unsupported Features¶
The following functionality has either limited support or no support:
Session¶
Method | Notes |
---|---|
createDurableSubscriber(Topic, String) | Durable Subscribers are not supported. |
createDurableSubscriber(Topic, String, String, boolean) | Durable Subscribers are not supported. |
getMessageListener() | Session based MessageListeners are not supported. |
setMessageListener(MessageListener) | Session based MessageListeners are not supported. |
rollback() | Transactions are not supported by Kafka. |
recover() | Transactions are not supported by Kafka |
commit() | Transactions are not supported by Kafka. |
createBrowser(Queue, String) | QueueBrowsers with message selectors are not supported. |
unsubscribe(String) | Durable Subscribers are not supported. |
createConsumer(Destination, String) | Message selectors are not not supported. An exception will be thrown if one is set. |
createConsumer(Destination, String, boolean) | Message selectors are not not supported. An exception will be thrown if one is set. |
createTemporaryTopic() | Kafka does not have a concept of a temporary topic. |
getTransacted() | Transactions are not supported. This will always return false. |
createTemporaryQueue() | Kafka does not have a concept of a temporary topic. |
ConnectionFactory¶
Method | Notes |
---|---|
createConnection(String, String) | Not supported. Authentication is only supported via Kafka configuration properties. |
Connection¶
Method | Notes |
---|---|
setClientID(String) | Setting the client id is only supported by using client.id in the settings passed to KafkaConnectionFactory. |
Appendix 1 - Creating a Shaded Fat .jar¶
In some scenarios, it is useful to have a ‘fat’ .jar that bundles the JMS Client together with all of its dependencies in a single file. Confluent does not distribute the JMS Client in this form, but you can build a fat .jar yourself easily enough:
- Copy the
pom.xml
below into an empty directory (note: replace the text ‘[version]’ with ‘4.1.3’). - Execute the maven command
mvn package
.
The resulting artifact will be placed in the target
directory along side the pom.xml
file.
Note: In addition to bundling dependencies, the provided pom.xml
file shades them under
the namespace confluent.shaded.
in order to avoid potential namespace clashes.
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.confluent</groupId>
<artifactId>kafka-jms-client-fat</artifactId>
<version>[version]</version>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-jms-client</artifactId>
<version>[version]</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>org.I0Itec.</pattern>
<shadedPattern>confluent.shaded.org.I0Itec.</shadedPattern>
</relocation>
<relocation>
<pattern>com.yammer.metrics.</pattern>
<shadedPattern>confluent.shaded.com.yammer.metrics.</shadedPattern>
</relocation>
<relocation>
<pattern>joptsimple</pattern>
<shadedPattern>confluent.shaded.joptsimple</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.zookeeper.</pattern>
<shadedPattern>confluent.shaded.org.apache.zookeeper.</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.jute.</pattern>
<shadedPattern>confluent.shaded.org.apache.jute.</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.kafka.</pattern>
<shadedPattern>confluent.shaded.org.apache.kafka.</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.log4j.</pattern>
<shadedPattern>confluent.shaded.org.apache.log4j.</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.common.</pattern>
<shadedPattern>confluent.shaded.com.google.common.</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.thirdparty.</pattern>
<shadedPattern>confluent.shaded.com.google.thirdparty.</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson.</pattern>
<shadedPattern>confluent.shaded.com.fasterxml.jackson.</shadedPattern>
</relocation>
<relocation>
<pattern>net.jpountz.</pattern>
<shadedPattern>confluent.shaded.net.jpountz.</shadedPattern>
</relocation>
<relocation>
<pattern>org.xerial.snappy.</pattern>
<shadedPattern>confluent.shaded.org.xerial.snappy.</shadedPattern>
</relocation>
<relocation>
<pattern>org.jose4j.</pattern>
<shadedPattern>confluent.shaded.org.jose4j.</shadedPattern>
</relocation>
<relocation>
<pattern>io.confluent.common.</pattern>
<shadedPattern>confluent.shaded.io.confluent.common.</shadedPattern>
</relocation>
<relocation>
<pattern>io.confluent.license.</pattern>
<shadedPattern>confluent.shaded.io.confluent.license.</shadedPattern>
</relocation>
<relocation>
<pattern>kafka.</pattern>
<shadedPattern>confluent.shaded.kafka.</shadedPattern>
</relocation>
<relocation>
<pattern>scala</pattern>
<shadedPattern>confluent.shaded.scala.</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</build>
</project>