.. _connect_elasticsearch: Confluent's JMS Client for Apache Kafka ========================================= .. contents:: Table of Contents Overview --------- The Java Message Service (JMS) is a client messaging API used by distributed Java applications for publish/subscribe and point to point communications. The kafka-jms-client is an implementation of the `JMS 1.1 provider interface `_ that uses the Apache Kafka wire protocol to talk to one or more Kafka brokers. ``[kafka-jms-client]<---kafka protocol--->[kafka broker]`` Applications written using this client library can exchange messages with other applications written with the kafka-jms-client as well as with any other Kafka clients that produce or consume messages using any of the other Kafka client libraries. The purpose of this client is to allow existing applications written to the JMS API to be able to publish and subscribe to Kafka as a replacement for any other JMS broker. If you are interested in integrating Kafka with an existing JMS broker (i.e. without replacing it) then you are encouraged to look at the Kafka Connect API or any pre-built Kafka Source or Sink Connectors for JMS. If you are interested in writing new Java applications then you are encouraged to use the Java Kafka Producer/Consumer APIs as they provide advanced features not available when using the kafka-jms-client. Examples ---------- Using the kafka-jms-client should be familiar to existing users of the JMS API. Producing messages ^^^^^^^^^^^^^^^^^^^^^ The following examples create a JMS compliant Connection and Session and then either a JMS MessageProducer is created and this wraps a Kafka producer. Messages are stored in Kafka using an Avro message body and key: .. code-block:: java ConnectionFactory connectionFactory = new KafkaConnectionFactory(settings); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination testTopic = session.createQueue("test_topic"); MessageProducer producer = session.createProducer(testTopic); for (int i = 0; i < 50; i++) { TextMessage message = session.createTextMessage(); message.setText("This is a text message"); producer.send(message); } Consuming messages ^^^^^^^^^^^^^^^^^^^ .. code-block:: java ConnectionFactory connectionFactory = new KafkaConnectionFactory(settings); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination testTopic = session.createQueue("test_topic"); MessageConsumer consumer = this.session.createConsumer(testTopic); while(true) { Message message = consumer.receiveNoWait(); //TODO: Do things. } Requirements -------------- Java 7 (or higher) is required to run applications written with the kafka-jms-client library. Applications using this client library are required to have the ``confluent-kafka-jms-.jar`` configured in the CLASSPATH. The client requires a connection to a running Kafka cluster (brokers and zookeeper). The Kafka brokers must be version 0.10.0 or higher (included in Confluent Enterprise 3.2 or higher). The `Confluent Schema Registry `_ is required to store the Avro Schemas associated with the JMS messages stored in Kafka and to enable support for JMS Header Properties. The Confluent Schema Registry is part of the Confluent Platform, you can read more on how to `use the schema registry `_ To use the kafka-jms-client you must have a current Confluent Enterprise subcription. Installation ------------- Download and install the Confluent Enterprise distribution (Version 3.2 or higher) from `Confluent `_. JMS 1.1 Compatibility ----------------------- The kafka-jms-client is a very complete implementation of the JMS 1.1 specification. However there are some JMS concepts that either do not map 1:1 to Kafka, or simply do not make sense at all in Kafka (such as nonpersistent messages). JMS brokers typically have separate Topics and `Queues `_ while Kafka has Topics and Topic Partitions (which can act like Queues). This JMS client stores all messages in a Kafka topic. If `Session.createQueue() `_ or `Session.createTopic() `_ is called and KafkaDestination will be returned. This will always map to a Topic on the Kafka Broker. ================== ================================================================================ JMS Header Notes ================== ================================================================================ JMSDeliveryMode Delivery mode is ignored. Kafka only has one delivery mode which is persisted. JMSPriority Priority is stored in the message envelope but is not recognized. JMSExpiration The JMS Expiration is stored but not enforced on the broker. In a future release the client could be configured to drop messages that have reached the expiration. ================== ================================================================================ Features --------- All JMS Messaging Models are supported including: * Publish/Subscribe (Topics) * Point-to-Point (Queues) All JMS Message Types are supported including: * TextMessage * BytesMessage * MapMessage * StreamMessage * ObjectMessage All JMS Message Header values are transported although some are ignored because they do not make sense in Kafka: * JMSDestination * JMSDeliveryMode * JMSExpiration * JMSPriority * JMSMessageID * JMSTimestamp * JMSCorrelationID * JMSReplyTo * JMSType * JMSRedelivered Request/Reply features are supported using durable Topics/Queues (but not using using temporary endpoints) Unsupported Features --------------------- The kafka-jms-client supports the full JMS 1.1 specification with the following exceptions which are classes which have either limited support, no support, or other important notes about how the feature is supported. Session ^^^^^^^^ ======================================================= ======================================================================================= Method Notes ======================================================= ======================================================================================= createDurableSubscriber(Topic, String) Durable Subscribers are not supported. createDurableSubscriber(Topic, String, String, boolean) Durable Subscribers are not supported. getMessageListener() Session based MessageListeners are not supported. setMessageListener(MessageListener) Session based MessageListeners are not supported. rollback() Transactions are not supported by Kafka. This is a noop. recover() Transactions are not supported by Kafka commit() Transactions are not supported by Kafka. This is a noop. createBrowser(Queue, String) QueueBrowsers with message selectors are not supported. unsubscribe(String) Durable Subscribers are not supported. createConsumer(Destination, String) Message selectors are not not supported. An exception will be thrown if one is set. createConsumer(Destination, String, boolean) Message selectors are not not supported. An exception will be thrown if one is set. createTemporaryTopic() Kafka does not have a concept of a temporary topic. getTransacted() Transactions are not supported by Kafka. This will always return false. createTemporaryQueue() Kafka does not have a concept of a temporary topic. ======================================================= ======================================================================================= ConnectionFactory ^^^^^^^^^^^^^^^^^^ ================================= ========================================================================================================= Method Notes ================================= ========================================================================================================= createConnection(String, String) This method calls createConnection() internally. Username and password of this override are ignored. ================================= ========================================================================================================= Connection ^^^^^^^^^^^^ ======================================= ================================================================================================================ Method Notes ======================================= ================================================================================================================ setExceptionListener(ExceptionListener) ExceptionListeners are not supported. This currently does nothing. setClientID(String) Setting the client id is only supported by using `client.id` in the settings passed to KafkaConnectionFactory. getExceptionListener() ExceptionListeners are not supported. This currently does nothing. ======================================= ================================================================================================================ Developing JMS Clients -------------------------- Dependancies ^^^^^^^^^^^^^ Add the Confluent Maven repository to your pom.xml:: confluent http://packages.confluent.io/maven/ Add a dependency on the Confluent JMS client:: io.confluent kafka-jms-client 3.2.0 Import the JMS KafkaConnectionFactory from Confluent and then just use all the standard JMS interfaces as you would with any other JMS provider. .. code-block:: java import io.confluent.kafka.jms.KafkaConnectionFactory; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.Session; import javax.jms.TextMessage; As documented in the JMS specification, a typical JMS-based application executes the following steps: * create a ConnectionFactory * create Destination objects * create a Connection * create one or more Sessions * create MessageProducers and MessageConsumers * start message delivery on the Connection Configure and Create a ConnectionFactory ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The KafkaConnectionFactory requires the following settings to work: * ``bootstrap.servers`` - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form ``host1:port1,host2:port2,....`` Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). * ``schema.registry.url`` - Address of your Confluent Schema Registry in URL format * ``zookeeper.connect`` - Specifies the ZooKeeper connection string in the form ``hostname:port`` where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form ``hostname1:port1,hostname2:port2,hostname3:port3``. * ``client.id`` - An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging. * ``group.id`` - A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. * ``confluent.license`` - a license key string provided to you by Confluent under the terms of a Confluent Enterprise subscription agreement. Setting Required Properties ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Set your configuration settings just as you would for a any other Kafka client. .. code-block:: java Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("schema.registry.url", "http://localhost:8081"); props.put("zookeeper.connect", "localhost:2181"); props.put("client.id", "MySampleJMSProducer"); props.put("group.id", "jmsconsumergroup1"); props.put("confluent.license", "RLKGWF3S8Q7LA5N4AEQ74VOWYD3VL297"); Enabling Confluent Specific Features (Optional) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ All of the features of the underlying Kafka Client library are included in the single jar and can be setup in the same way as documented in the Native Kafka Producer / Consumer API documentation. For example to configure client interceptors for enable Control Center stream monitoring simply set the correct Kafka properties as follows: .. code-block:: java props.put("producer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"); props.put("consumer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"); Enabling TLS Encryption (Optional) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ All security settings as the same as with a Native Kafka Java client applications. Just make sure to set all the security settings before creating the ConnectionFactory as follows: If client authentication is not required in the broker, then the following is a minimal configuration example: .. code-block:: java props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "/var/private/ssl/kafka.client.truststore.jks"); props.put("ssl.truststore.password", "test1234"); If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured: .. code-block:: java props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks"); props.put("ssl.keystore.password", "test1234"); props.put("ssl.key.password", "test1234"); Create a ConnectionFactory ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Then use the configuration settings to create a ConnectionFactory. .. code-block:: java ConnectionFactory connectionFactory = new KafkaConnectionFactory(props); Creating a Destination ^^^^^^^^^^^^^^^^^^^^^^^^^ Keep in mind that Topics and Queues are treated as the same thing in Kafka so if you create and use a Topic and a Queue with the same name, you will be using the same underlying JMS Destination which is actually a Kafka Topic. Also note that Destination names must follow the same naming restrictions of Kafka Topics so max length is 249 symbols and letters, . (dot), _ (underscore), and - (minus) can be used. Take care to avoid the use of the '/' character which is sometimes used in other JMS Topic names. .. code-block:: java Destination testTopic = session.createQueue("test_topic"); Creating a Connection ^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: java Connection connection = connectionFactory.createConnection(); Creating Sessions ^^^^^^^^^^^^^^^^^^^^ Create a JMS session, without specifying it to be a transacted session (not currently supported) and selecting either ``AUTO_ACKNOWLEDGE`` or ``CLIENT_ACKNOWLEDGE`` mode for consumer acknowledgements. .. code-block:: java Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Producing / Consuming Messages ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Example JMS Producer .. code-block:: java MessageProducer producer = session.createProducer(testTopic); TextMessage message = session.createTextMessage(); message.setText("This is a text message"); producer.send(message); Example JMS Consumer .. code-block:: java MessageConsumer consumer = this.session.createConsumer(testTopic); while(true) { Message message = consumer.receiveNoWait(); //TODO: Do things. } Threading ----------- Producer and Consumer must only call the JMS APIs from a single thread because the underlying client libraries are not thread safe.