JMS Client for Confluent Platform

Java Message Service (JMS) is a widely used messaging API that is included as part of the Java Platform, Enterprise Edition. Confluent JMS Client (kafka-jms-client) is an implementation of the JMS 1.1 provider interface that allows Apache Kafka® or Confluent Platform to be used as a JMS message broker.

[kafka-jms-client] <--- kafka protocol ---> [kafka broker]

Applications written using the Confluent kafka-jms-client can exchange messages with other application written using kafka-jms-client as well as any application that produces or consumes messages using any other Kafka client library.

The purpose of this client is to allow existing applications written to the JMS API to be able to publish and subscribe to Kafka as a replacement for any other JMS broker.

If you are interested in integrating Kafka with an existing JMS broker (that is, without replacing it) then you are encouraged to look at the Kafka Connect API or any pre-built Kafka Source or Sink Connectors for JMS.

If you are interested in writing new Java applications then you are encouraged to use the Java Kafka Producer/Consumer APIs as they provide advanced features not available when using the kafka-jms-client.

JMS 1.1 Compatibility

Where possible, the Kafka JMS Client is a complete implementation of the JMS 1.1 specification. However there are some JMS concepts that either do not map 1:1 to Kafka, or simply do not make sense at all in Kafka (such as non-persistent messages).

JMS brokers typically provide distinct Topic and Queue functionality that reflects the topic and queue functionality in the JMS Specification. By contrast, Kafka provides only one abstraction - partitioned topics - which are actually distributed commit logs and behave differently than the traditional messaging system notion of a topic. In practice, Kafka topics can mimic the behavior of either topics or queues in the traditional messaging system sense.

Additional notes on JMS 1.1 compatibility are outlined below.

Features

Both JMS messaging models are supported:

  • Publish/Subscribe (Topics)
  • Point-to-Point (Queues)

All JMS message types are supported including:

  • TextMessage
  • BytesMessage
  • MapMessage
  • StreamMessage
  • ObjectMessage

Treatment of JMS header values:

  • JMSDestination - Set (overwritten) to on send and available in message at the destination.
  • JMSDeliveryMode - Set (overwritten) to DEAULT_DELIVERY_MODE (PERSISTED) on send.
  • JMSExpiration - Ignored. Overwritten with 0 on send.
  • JMSPriority - Ignored. Set (overwritten) with DEFAULT_PRIORITY (4) on send.
  • JMSMessageID - Set (overwritten) with a unique id for the message on send and available in message at destination.
  • JMSTimestamp - Set (overwritten) on message send and transmitted to destination if this has not been disabled using MessageProducer.setDisableMessageTimestamp.
  • JMSCorrelationID - Value set by user is transmitted to destination.
  • JMSReplyTo - Value set by user is transmitted to destination.
  • JMSType - Value set by user is transmitted to destination.
  • JMSRedelivered - Ignored. Set (overwritten) to false on send.

Only durable topics/queues are supported.

Unsupported Features

The following functionality has either limited support or no support:

Session

Method Notes
createDurableSubscriber(Topic, String) Durable Subscribers are not supported.
createDurableSubscriber(Topic, String, String, boolean) Durable Subscribers are not supported.
getMessageListener() Session based MessageListeners are not supported.
setMessageListener(MessageListener) Session based MessageListeners are not supported.
rollback() Transactions are not supported by Kafka.
recover() Transactions are not supported by Kafka
commit() Transactions are not supported by Kafka.
createBrowser(Queue, String) QueueBrowsers with message selectors are not supported.
unsubscribe(String) Durable Subscribers are not supported.
createConsumer(Destination, String) Message selectors are not supported. An exception will be thrown if one is set.
createConsumer(Destination, String, boolean) Message selectors are not supported. An exception will be thrown if one is set.
createTemporaryTopic() Kafka does not have a concept of a temporary topic.
getTransacted() Transactions are not supported. This will always return false.
createTemporaryQueue() Kafka does not have a concept of a temporary topic.

ConnectionFactory

Method Notes
createConnection(String, String) Not supported. Authentication is only supported via Kafka configuration properties.

Connection

Method Notes
setClientID(String) Setting the client id is only supported by using client.id in the settings passed to KafkaConnectionFactory.

JMS Client Installation

Prerequisites
See the Confluent Platform Confluent Platform System Requirements.

Installation

The JMS Client is a library that you use from within your Java applications.

To reference kafka-jms-client in a Maven-based project, first add the Confluent Maven repository to your pom.xml:

<repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven/</url>
    </repository>
</repositories>

Then add a dependency on the Confluent JMS Client and the JMS API specification:

<dependencies>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-jms-client</artifactId>
        <version>7.7.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.geronimo.specs</groupId>
        <artifactId>geronimo-jms_1.1_spec</artifactId>
        <version>1.1</version>
    </dependency>
</dependencies>

If you don’t use Maven, you can download the JMS Client JAR file directly by navigating to the following URL.

http://packages.confluent.io/maven/io/confluent/kafka-jms-client/7.7.0/kafka-jms-client
-7.7.0.jar

If you require a “fat” JAR (one that includes the JMS Client and all of its dependencies), you can make one by following the instructions in Appendix 1 - Creating a Shaded Fat JAR.

Example

Usage of kafka-jms-client is similar to the JMS API.

The following example program uses a KafkaConnectionFactory instance to create JMS compliant Connection, Session and MessageProducer objects. The MessageProducer is then used to send 50 TextMessage messages to the Apache Kafka® topic test-queue, which is acting as a queue. A MessageConsumer is then created and used to read back these messages.

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import io.confluent.kafka.jms.JMSClientConfig;
import io.confluent.kafka.jms.KafkaConnectionFactory;

public class App {

    public static void main(String[] args) throws JMSException {
        Properties settings = new Properties();
        settings.put(JMSClientConfig.CLIENT_ID_CONFIG, "test-client-2");
        settings.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        settings.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181");

        ConnectionFactory connectionFactory = new KafkaConnectionFactory(settings);
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination testQueue = session.createQueue("test-queue");

        MessageProducer producer = session.createProducer(testQueue);
        for (int i=0; i<50; i++) {
            TextMessage message = session.createTextMessage();
            message.setText("This is a text message");
            producer.send(message);
        }

        MessageConsumer consumer = session.createConsumer(testQueue);
        while (true) {
            TextMessage message = (TextMessage)consumer.receive();
            System.out.println(message.getText());
        }
    }
}

Appendix 1 - Creating a Shaded Fat JAR

In some scenarios, it is useful to have a “fat” JAR that bundles the JMS Client together with all of its dependencies in a single file. Confluent does not distribute the JMS Client in this form, but you can build a fat JAR yourself easily enough:

  1. Copy the pom.xml below into an empty directory.
  2. Execute the Maven command mvn package.

The resulting artifact will be placed in the target directory along side the pom.xml file. Note: In addition to bundling dependencies, the provided pom.xml file shades them under the namespace confluent.shaded. to avoid potential namespace clashes.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>io.confluent</groupId>
    <artifactId>kafka-jms-client-fat</artifactId>
    <version>7.7.0</version>

    <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-jms-client</artifactId>
            <version>7.7.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <relocations>
                                <relocation>
                                    <pattern>org.I0Itec.</pattern>
                                    <shadedPattern>confluent.shaded.org.I0Itec.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.yammer.metrics.</pattern>
                                    <shadedPattern>confluent.shaded.com.yammer.metrics.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>joptsimple</pattern>
                                    <shadedPattern>confluent.shaded.joptsimple</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.zookeeper.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.zookeeper.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.jute.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.jute.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.kafka.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.kafka.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.log4j.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.log4j.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.common.</pattern>
                                    <shadedPattern>confluent.shaded.com.google.common.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.thirdparty.</pattern>
                                    <shadedPattern>confluent.shaded.com.google.thirdparty.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.fasterxml.jackson.</pattern>
                                    <shadedPattern>confluent.shaded.com.fasterxml.jackson.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>net.jpountz.</pattern>
                                    <shadedPattern>confluent.shaded.net.jpountz.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.xerial.snappy.</pattern>
                                    <shadedPattern>confluent.shaded.org.xerial.snappy.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.jose4j.</pattern>
                                    <shadedPattern>confluent.shaded.org.jose4j.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>io.confluent.common.</pattern>
                                    <shadedPattern>confluent.shaded.io.confluent.common.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>io.confluent.license.</pattern>
                                    <shadedPattern>confluent.shaded.io.confluent.license.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>kafka.</pattern>
                                    <shadedPattern>confluent.shaded.kafka.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>scala</pattern>
                                    <shadedPattern>confluent.shaded.scala.</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <filtering>true</filtering>
            </resource>
        </resources>

    </build>
</project>