.. _client_jms: JMS Client ========== .. contents:: Table of Contents Overview --------- Java Message Service (JMS) is a widely used messaging API that is included as part of the Java Platform, Enterprise Edition. `kafka-jms-client` is an implementation of the `JMS 1.1 provider interface `_ that allows Apache Kafka or the Confluent Platform to be used as a JMS message broker. ``[kafka-jms-client] <--- kafka protocol ---> [kafka broker]`` Applications written using Confluent's `kafka-jms-client` can exchange messages with other application written using `kafka-jms-client` as well as any application that produces or consumes messages using any other Kafka client library. The purpose of this client is to allow existing applications written to the JMS API to be able to publish and subscribe to Kafka as a replacement for any other JMS broker. If you are interested in integrating Kafka with an existing JMS broker (i.e. without replacing it) then you are encouraged to look at the Kafka Connect API or any pre-built Kafka Source or Sink Connectors for JMS. If you are interested in writing new Java applications then you are encouraged to use the Java Kafka Producer/Consumer APIs as they provide advanced features not available when using the `kafka-jms-client`. Example ------- Usage of `kafka-jms-client` should be familiar to existing users of the JMS API. The following example program uses a ``KafkaConnectionFactory`` instance to create JMS compliant ``Connection``, ``Session`` and ``MessageProducer`` objects. The ``MessageProducer`` is then used to send 50 ``TextMessage`` messages to the Kafka topic ``test-queue``, which is acting as a queue. A ``MessageConsumer`` is then created and used to read back these messages. .. code-block:: java import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import io.confluent.kafka.jms.JMSClientConfig; import io.confluent.kafka.jms.KafkaConnectionFactory; public class App { public static void main(String[] args) throws JMSException { Properties settings = new Properties(); settings.put(JMSClientConfig.CLIENT_ID_CONFIG, "test-client-2"); settings.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); settings.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181"); ConnectionFactory connectionFactory = new KafkaConnectionFactory(settings); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination testQueue = session.createQueue("test-queue"); MessageProducer producer = session.createProducer(testQueue); for (int i=0; i<50; i++) { TextMessage message = session.createTextMessage(); message.setText("This is a text message"); producer.send(message); } MessageConsumer consumer = session.createConsumer(testQueue); while (true) { TextMessage message = (TextMessage)consumer.receive(); System.out.println(message.getText()); } } } Requirements ------------ * Java 7 (or higher). * A Kafka cluster running verision 0.11.0 brokers or higher (included in Confluent Enterprise 3.3 or higher). * A current Confluent Enterprise subscription. Note: The client can also be used without a license key for a trial period of 30 days. Installation ------------ The JMS client is a library that you use from within your Java applications. To reference `kafka-jms-client` in a maven based project, first add the Confluent Maven repository to your pom.xml:: confluent http://packages.confluent.io/maven/ Then add a dependency on the Confluent JMS client as well as the JMS API specification (note: replace the text '[version]' with '|release|'):: io.confluent kafka-jms-client [version] org.apache.geronimo.specs geronimo-jms_1.1_spec 1.1 If you don't use Maven, you can download the JMS Client .jar file directly by navigating to the following URL (note: replace the text '[version]' with '|release|'):: http://packages.confluent.io/maven/io/confluent/kafka-jms-client/[version]/kafka-jms-client -[version].jar If you require a 'fat' .jar (one that includes the JMS Client and all of it's dependencies), you can make one by following the instructions in :ref:`appendix 1 `. .. _client_jms_developing: Development Guide ----------------- Import ``KafkaConnectionFactory`` as well as all the standard JMS interfaces you would with any other JMS provider: .. code-block:: java import io.confluent.kafka.jms.KafkaConnectionFactory; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.Session; import javax.jms.TextMessage; Configuration ^^^^^^^^^^^^^ Required Configuration Properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * ``bootstrap.servers`` - A list of host/port pairs to use for establishing the initial connection to your Kafka cluster (of the form: ``host1:port1,host2:port2,....``). Note that the client will make use of all servers in your cluster irrespective of which servers are specified via this property for bootstrapping. You may want to specify more than one in case one of the servers in your list is down at the time of initialization. * ``client.id`` - Under the hood, the JMS Client makes use of one or more Kafka clients for communication your Kafka cluster. The ``client.id`` of these clients is set to the value of this configuration property appended with a globally unique id (guid). The ``client.id`` string is passed to the server when making requests and is useful for debugging purposes. * ``confluent.license`` - A license key string provided to you by Confluent under the terms of a Confluent Enterprise subscription agreement. If not specified, you may use the client for a trial period of 30 days after which it will stop working. * ``zookeeper.connect`` - `[only required if you have not provided a valid license key]`. A ZooKeeper connection string in the form ``hostname:port`` where host and port are the host and port of a ZooKeeper server associated with your Kafka cluster. Configuration properties are set in the same way as any other Kafka client: .. code-block:: java Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("zookeeper.connect", "localhost:2181); props.put("client.id", "my-jms-client"); Optional Configuration Properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * ``allow.out.of.order.acknowledge`` - If true, does not throw an exception if a message is acknowledged out of order (which implicitly acknowledges any messages before it). default value is ``false``. * ``jms.fallback.message.type`` - If the JMS Message type header is not associated with a message, fallback to this message type. * ``consumer.group.id`` - A string that uniquely identifies the group of consumer processes to which this client belongs. If not specified, this defaults to ``confluent-jms`` in the case of queues and ``confluent-jms-{uuid}`` in the case of topics, where `{uuid}` is a unique value for each consumer. This naming strategy provides `load balancer` semantics in the case of queues and `publish-subscribe` semantics in the case of topics, as required by the JMS Specification. * ``zookeeper.session.timeout.ms`` - ZooKeeper connection timeout (unused if you provide a valid license key). * ``zookeeper.connect.timeout.ms`` - ZooKeeper connection establishment timeout (unused if you provide a valid license key). * ``jms.consumer.poll.timeout.ms`` - The maximum length of time Kafka consumers should block when retrieving records from Kafka. You should not need to adjust this value. * ``jms.consumer.close.timeout.ms`` - The maximum number of milliseconds to wait for a clean shutdown when closing a ``MessageConsumer``. * ``message.listener.null.wait.ms`` - The number of milliseconds to wait before polling Kafka for new messages if no messages were retrieved in a message listener poll loop. You should not need to adjust this value. * ``connection.stop.timeout.ms`` - The maximum number of milliseconds to wait for the message listener threads to cleanly shutdown when connection.stop() has been called. Standard Kafka Configuration Properties (Optional) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ All of the configuration properties of the underlying Java Kafka client library may be specified. Simply prefix the desired property with ``producer.`` or ``consumer.`` as appropriate. For example: .. code-block:: java props.put("producer.linger.ms", "1"); props.put("consumer.heartbeat.interval.ms", "1000"); Confluent Specific Features (Optional) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To configure client interceptors, which are required to enable Confluent Control Center message delivery monitoring, set the following Kafka properties: .. code-block:: java props.put("producer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"); props.put("consumer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"); Enabling TLS Encryption (Optional) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Security settings match those of the native Kafka java producer and consumer. Security settings are applied to both production and consumption of messages (you do not need to prefix security settings with ``consumer.`` or ``producer.``). If client authentication is not required in the broker, then the following is a minimal configuration example: .. code-block:: java props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "/var/private/ssl/kafka.client.truststore.jks"); props.put("ssl.truststore.password", "test1234"); If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured: .. code-block:: java props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks"); props.put("ssl.keystore.password", "test1234"); props.put("ssl.key.password", "test1234"); Developing JMS Applications ^^^^^^^^^^^^^^^^^^^^^^^^^^^ Creating a ConnectionFactory ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ A connection factory can be created as follows: .. code-block:: java ConnectionFactory connectionFactory = new KafkaConnectionFactory(props); ``KafkaConnectionFactory`` also implements the ``QueueConnectionFactory`` and ``TopicConnectionFactory`` interfaces: .. code-block:: java QueueConnectionFactory queueConnectionFactory = new KafkaConnectionFactory(props); TopicConnectionFactory topicConnectionFactory = new KafkaConnectionFactory(props); Creating a Destination ~~~~~~~~~~~~~~~~~~~~~~ Keep in mind that topics and queues are both backed by Kafka topics, so if you create and use a topic and queue with the same name, they will both be associated with the same Kafka topic. Also note that ``Destination`` names must follow the same naming restrictions of Kafka topics so the maximum length is 249 symbols and letters, . (dot), _ (underscore), and - (minus) can be used. Take care to avoid the use of the '/' character which is sometimes used in other JMS Topic names. .. code-block:: java Queue testQueue = session.createQueue("test_queue"); Topic testTopic = session.createTopic("test_topic"); Destination destination = testTopic; Creating a Connection ~~~~~~~~~~~~~~~~~~~~~ Create a ``Connection`` object as follows: .. code-block:: java Connection connection = connectionFactory.createConnection(); Creating Sessions ~~~~~~~~~~~~~~~~~ Kafka sessions support both ``AUTO_ACKNOWLEDGE`` and ``CLIENT_ACKNOWLEDGE`` modes for consumer acknowledgements. Transactions are not supported - you should set the ``transacted`` argument to ``false``. .. code-block:: java Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Producing / Consuming Messages ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``MessageProducer`` example: .. code-block:: java MessageProducer producer = session.createProducer(testTopic); TextMessage message = session.createTextMessage(); message.setText("This is a text message"); producer.send(message); ``MessageConsumer`` example: .. code-block:: java MessageConsumer consumer = this.session.createConsumer(testQueue); while(true) { Message message = consumer.receiveNoWait(); // TODO: Process message } JNDI Context ~~~~~~~~~~~~ A simple JNDI ``InitialContextFactory`` implementation is provided that can be used to lookup JMS ``ConnectionFactory``, ``QueueConnectionFactory`` and ``TopicConnectionFactory`` objects as well as ``Destination`` objects. For example: .. code-block:: java Context ctx = new InitialContext(); ConnectionFactory cf = (ConnectionFactory)ctx.lookup("ConnectionFactory"); QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("QueueConnectionFactory"); TopicConnectionFactory qcf = (TopicConnectionFactory)ctx.lookup("TopicConnectionFactory"); Queue queue = (Queue)ctx.lookup("foo"); Topic topic = (Topic)ctx.lookup("bar"); You'll need to set the ``java.naming.factory.initial`` system property to ``io.confluent.kafka.jms.KafkaInitialContextFactory``, either using a ``-D`` command line option or in a ``jndi.properties`` file located somewhere on your classpath. Also, queue and topic name lookups and JMS Client configuration properties need to be specified. Here's an example ``jndi.properties`` file: .. code-block:: java java.naming.factory.initial = io.confluent.kafka.jms.KafkaInitialContextFactory # JMS Client properties client.id = testing-01 zookeeper.connect = localhost:2181 bootstrap.servers = localhost:9092 # Register queues in JNDI using the form: # queue.[jndiName] = [physicalName] # Register topics in JNDI using the form: # topic.[jndiName] = [physicalName] queue.foo = foo topic.bar = bar As an alternative to using system properties or a ``jndi.properties`` file, you can programmatically pass properties into the ``InitialContext`` constructor for all or a subset of your configuration. For example: .. code-block:: java Hashtable props = new Hashtable(); props.put(Context.INITIAL_CONTEXT_FACTORY, "io.confluent.kafka.jms.KafkaInitialContextFactory"); props.put(JMSClientConfig.CLIENT_ID_CONFIG, "testing-01"); props.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181"); props.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put("topic.bar", "bar"); Context ctx = new InitialContext(props); As a convenience, ``KafkaInitialContextFactory`` automatically translates SSL system properties to the relevant kafka properties. ================================= ========================================================================================================= System Property Kafke Property ================================= ========================================================================================================= javax.net.ssl.trustStore ssl.truststore.location javax.net.ssl.trustStoreType ssl.truststore.type javax.net.ssl.trustStorePassword ssl.truststore.password javax.net.ssl.keyStore ssl.keystore.location javax.net.ssl.keyStoreType ssl.keystore.type javax.net.ssl.keyStorePassword ssl.keystore.password ================================= ========================================================================================================= Threading ~~~~~~~~~ The Java Messaging Specification states that a session may not be operated on by more than one thread at a time. This restriction applies to the Kafka JMS Client. However, different sessions created from a single connection may be used concurrently, as per the specification. JMS 1.1 Compatibility --------------------- Where possible, the Kafka JMS Client is a complete implementation of the JMS 1.1 specification. However there are some JMS concepts that either do not map 1:1 to Kafka, or simply do not make sense at all in Kafka (such as nonpersistent messages). JMS brokers typically provide distinct `Topic `_ and `Queue `_ functionality that reflects that in the JMS Specification. By contrast, Kafka provides only one abstraction - partitioned topics - which are actually distributed commit logs and behave differently to the traditional messaging system notion of a topic. In practice, Kafka topics can mimic the behavior of either topics or queues in the traditional messaging system sense. Additional notes on JMS 1.1 compatibility are outlined below. Features ^^^^^^^^ Both JMS messaging models are supported: * Publish/Subscribe (Topics) * Point-to-Point (Queues) All JMS message types are supported including: * ``TextMessage`` * ``BytesMessage`` * ``MapMessage`` * ``StreamMessage`` * ``ObjectMessage`` All JMS message header values are transported although some are ignored because they do not make sense in Kafka: * ``JMSDestination`` * ``JMSDeliveryMode`` - Ignored. Kafka only has one delivery mode which is persisted. * ``JMSExpiration`` - Stored, but not enforced. * ``JMSPriority`` - Stored, but not recognized. * ``JMSMessageID`` * ``JMSTimestamp`` * ``JMSCorrelationID`` * ``JMSReplyTo`` * ``JMSType`` * ``JMSRedelivered`` Request/Reply features are supported using durable Topics/Queues (but not using temporary endpoints). Unsupported Features ^^^^^^^^^^^^^^^^^^^^ The following functionality has either limited support or no support: Session ~~~~~~~ ======================================================= ======================================================================================= Method Notes ======================================================= ======================================================================================= createDurableSubscriber(Topic, String) Durable Subscribers are not supported. createDurableSubscriber(Topic, String, String, boolean) Durable Subscribers are not supported. getMessageListener() Session based MessageListeners are not supported. setMessageListener(MessageListener) Session based MessageListeners are not supported. rollback() Transactions are not supported by Kafka. recover() Transactions are not supported by Kafka commit() Transactions are not supported by Kafka. createBrowser(Queue, String) QueueBrowsers with message selectors are not supported. unsubscribe(String) Durable Subscribers are not supported. createConsumer(Destination, String) Message selectors are not not supported. An exception will be thrown if one is set. createConsumer(Destination, String, boolean) Message selectors are not not supported. An exception will be thrown if one is set. createTemporaryTopic() Kafka does not have a concept of a temporary topic. getTransacted() Transactions are not supported. This will always return false. createTemporaryQueue() Kafka does not have a concept of a temporary topic. ======================================================= ======================================================================================= ConnectionFactory ~~~~~~~~~~~~~~~~~ ================================= ========================================================================================================= Method Notes ================================= ========================================================================================================= createConnection(String, String) Not supported. Authentication is only supported via Kafka configuration properties. ================================= ========================================================================================================= Connection ~~~~~~~~~~ ======================================= ================================================================================================================ Method Notes ======================================= ================================================================================================================ setClientID(String) Setting the client id is only supported by using `client.id` in the settings passed to KafkaConnectionFactory. ======================================= ================================================================================================================ .. _appendix_1: Appendix 1 - Creating a Shaded Fat .jar --------------------------------------- In some scenarios, it is useful to have a 'fat' .jar that bundles the JMS Client together with all of its dependencies in a single file. Confluent does not distribute the JMS Client in this form, but you can build a fat .jar yourself easily enough: 1. Copy the ``pom.xml`` below into an empty directory (note: replace the text '[version]' with '|release|'). 2. Execute the maven command ``mvn package``. The resulting artifact will be placed in the ``target`` directory along side the ``pom.xml`` file. Note: In addition to bundling dependencies, the provided ``pom.xml`` file *shades* them under the namespace ``confluent.shaded.`` in order to avoid potential namespace clashes. pom.xml:: 4.0.0 io.confluent kafka-jms-client-fat [version] confluent http://packages.confluent.io/maven/ io.confluent kafka-jms-client [version] org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade org.I0Itec. confluent.shaded.org.I0Itec. com.yammer.metrics. confluent.shaded.com.yammer.metrics. joptsimple confluent.shaded.joptsimple org.apache.zookeeper. confluent.shaded.org.apache.zookeeper. org.apache.jute. confluent.shaded.org.apache.jute. org.apache.kafka. confluent.shaded.org.apache.kafka. org.apache.log4j. confluent.shaded.org.apache.log4j. com.google.common. confluent.shaded.com.google.common. com.google.thirdparty. confluent.shaded.com.google.thirdparty. com.fasterxml.jackson. confluent.shaded.com.fasterxml.jackson. net.jpountz. confluent.shaded.net.jpountz. org.xerial.snappy. confluent.shaded.org.xerial.snappy. org.jose4j. confluent.shaded.org.jose4j. io.confluent.common. confluent.shaded.io.confluent.common. io.confluent.license. confluent.shaded.io.confluent.license. kafka. confluent.shaded.kafka. scala confluent.shaded.scala. src/main/resources true