.. _client_jms_developing: |cjms| Development Guide ---------------------------- Import ``KafkaConnectionFactory`` and all of the standard JMS interfaces: .. codewithvars:: bash import io.confluent.kafka.jms.KafkaConnectionFactory; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.Session; import javax.jms.TextMessage; .. _client_jms_config: Configuration ^^^^^^^^^^^^^ Required Configuration Properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * ``bootstrap.servers`` - A list of host/port pairs to use for establishing the initial connection to your |ak-tm| cluster (of the form: ``host1:port1,host2:port2,....``). Note that the client will make use of all servers in your cluster irrespective of which servers are specified via this property for bootstrapping. You may want to specify more than one in case one of the servers in your list is down at the time of initialization. * ``client.id`` - Under the hood, the |cjms-full| makes use of one or more |ak| clients for communication your |ak| cluster. The ``client.id`` of these clients is set to the value of this configuration property appended with a globally unique id (guid). The ``client.id`` string is passed to the server when making requests and is useful for debugging purposes. * ``confluent.license`` - A license key string provided to you by Confluent under the terms of a Confluent Enterprise subscription agreement. If not specified, you may use the client for a trial period of 30 days after which it will stop working. .. include:: ../../includes/license-ref.rst * ``confluent.topic`` - Name of the |ak| topic used for |cp| configuration, including licensing information. The default name for this topic is ``_confluent-command``. To learn more, see :platform:`License topic configuration|connect/license.html#license-topic-configuration` and :platform:`License topic ACLs|connect/license.html#license-topic-acls`. * ``confluent.topic.replication.factor`` - The replication factor for the |ak| topic used for |cp| configuration, including licensing information. This is used only if the topic does not already exist, and the default of three is appropriate for production use. If you are using a development environment with less than three brokers, you must set this to the number of brokers (e.g. `1`). Configuration properties are set in the same way as any other |ak| client: .. codewithvars:: bash Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("confluent.topic", "foo_confluent-command"); props.put("confluent.topic.replication.factor", "3"); props.put("client.id", "my-jms-client"); Optional Configuration Properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * ``allow.out.of.order.acknowledge`` - If true, does not throw an exception if a message is acknowledged out of order (which implicitly acknowledges any messages before it). default value is ``false``. * ``jms.fallback.message.type`` - If the JMS Message type header is not associated with a message, fallback to this message type. * ``consumer.group.id`` - A string that uniquely identifies the group of consumer processes to which this client belongs. If not specified, this defaults to ``confluent-jms`` in the case of queues and ``confluent-jms-{uuid}`` in the case of topics, where `{uuid}` is a unique value for each consumer. This naming strategy provides `load balancer` semantics in the case of queues and `publish-subscribe` semantics in the case of topics, as required by the JMS Specification. * ``jms.consumer.poll.timeout.ms`` - The maximum length of time |ak| consumers should block when retrieving records from |ak|. You should not need to adjust this value. * ``jms.consumer.close.timeout.ms`` - The maximum number of milliseconds to wait for a clean shutdown when closing a ``MessageConsumer``. * ``message.listener.null.wait.ms`` - The number of milliseconds to wait before polling |ak| for new messages if no messages were retrieved in a message listener poll loop. Reducing this value will improve consume latency in low throughput scenarios at the expense of higher network/CPU overhead. * ``connection.stop.timeout.ms`` - The maximum number of milliseconds to wait for the message listener threads to cleanly shutdown when connection.stop() has been called. * ``jms.create.connection.ignore.authenticate`` - If true, connection creation methods on ConnectionFactory that have username and password parameters will fall through to the corresponding methods that do not have these parameters (the parameters will be ignored). If false, use of these methods will result in a JMSException being thrown. * ``message.listener.max.redeliveries`` - The maximum number of times a message will be redelivered to a MessageConsumer listener when the session is in AUTO_ACKNOWLEDGE mode. Default value is 10. Standard |ak| Configuration Properties (Optional) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ All of the configuration properties of the underlying Java |ak| client library may be specified. Simply prefix the desired property with ``producer.`` or ``consumer.`` as appropriate. For example: .. codewithvars:: bash props.put("producer.linger.ms", "1"); props.put("consumer.heartbeat.interval.ms", "1000"); Confluent Specific Features (Optional) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To configure client interceptors, which are required to enable Confluent Control Center message delivery monitoring, set the following |ak| properties: .. codewithvars:: bash props.put("producer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"); props.put("consumer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"); Enabling TLS Encryption (Optional) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Security settings match those of the native |ak| java producer and consumer. Security settings are applied to both production and consumption of messages (you do not need to prefix security settings with ``consumer.`` or ``producer.``). If client authentication is not required in the broker, then the following is a minimal configuration example: .. codewithvars:: bash props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "/var/private/ssl/kafka.client.truststore.jks"); props.put("ssl.truststore.password", "test1234"); If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured: .. codewithvars:: bash props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks"); props.put("ssl.keystore.password", "test1234"); props.put("ssl.key.password", "test1234"); Developing JMS Applications ^^^^^^^^^^^^^^^^^^^^^^^^^^^ Creating a ConnectionFactory ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ A connection factory can be created as follows: .. codewithvars:: bash ConnectionFactory connectionFactory = new KafkaConnectionFactory(props); ``KafkaConnectionFactory`` also implements the ``QueueConnectionFactory`` and ``TopicConnectionFactory`` interfaces: .. codewithvars:: bash QueueConnectionFactory queueConnectionFactory = new KafkaConnectionFactory(props); TopicConnectionFactory topicConnectionFactory = new KafkaConnectionFactory(props); Creating a Destination ~~~~~~~~~~~~~~~~~~~~~~ Keep in mind that topics and queues are both backed by |ak| topics, so if you create and use a topic and queue with the same name, they will both be associated with the same |ak| topic. Also note that ``Destination`` names must follow the same naming restrictions of |ak| topics so the maximum length is 249 symbols and letters, . (dot), _ (underscore), and - (minus) can be used. Take care to avoid the use of the '/' character which is sometimes used in other JMS Topic names. .. codewithvars:: bash Queue testQueue = session.createQueue("test_queue"); Topic testTopic = session.createTopic("test_topic"); Destination destination = testTopic; It's possible to specify a queue or topic backed by more than one |ak| topic using a regular expression. For example: .. codewithvars:: bash Queue testQueue = session.createQueue("regex(test_queue[12])"); Creating a Connection ~~~~~~~~~~~~~~~~~~~~~ Create a ``Connection`` object as follows: .. codewithvars:: bash Connection connection = connectionFactory.createConnection(); Creating Sessions ~~~~~~~~~~~~~~~~~ |ak| sessions support both ``AUTO_ACKNOWLEDGE`` and ``CLIENT_ACKNOWLEDGE`` modes for consumer acknowledgements. Transactions are not supported - you should set the ``transacted`` argument to ``false``. .. codewithvars:: bash Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Producing / Consuming Messages ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``MessageProducer`` example: .. codewithvars:: bash MessageProducer producer = session.createProducer(testTopic); TextMessage message = session.createTextMessage(); message.setText("This is a text message"); producer.send(message); ``MessageConsumer`` example: .. codewithvars:: bash MessageConsumer consumer = this.session.createConsumer(testQueue); while(true) { Message message = consumer.receiveNoWait(); // TODO: Process message } JNDI Context ~~~~~~~~~~~~ A simple JNDI ``InitialContextFactory`` implementation is provided that can be used to lookup JMS ``ConnectionFactory``, ``QueueConnectionFactory`` and ``TopicConnectionFactory`` objects as well as ``Destination`` objects. For example: .. codewithvars:: bash Context ctx = new InitialContext(); ConnectionFactory cf = (ConnectionFactory)ctx.lookup("ConnectionFactory"); QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("QueueConnectionFactory"); TopicConnectionFactory qcf = (TopicConnectionFactory)ctx.lookup("TopicConnectionFactory"); Queue queue = (Queue)ctx.lookup("foo"); Topic topic = (Topic)ctx.lookup("bar"); You'll need to set the ``java.naming.factory.initial`` system property to ``io.confluent.kafka.jms.KafkaInitialContextFactory``, either using a ``-D`` command line option or in a ``jndi.properties`` file located somewhere on your classpath. Also, queue and topic name lookups and |cjms| configuration properties need to be specified. Here's an example ``jndi.properties`` file: .. codewithvars:: bash java.naming.factory.initial = io.confluent.kafka.jms.KafkaInitialContextFactory # JMS Client properties client.id = testing-01 confluent.topic = localhost:9092 confluent.topic.replication.factor = 3 bootstrap.servers = localhost:9092 # Register queues in JNDI using the form: # queue.[jndiName] = [physicalName] # Register topics in JNDI using the form: # topic.[jndiName] = [physicalName] queue.foo = foo topic.bar = bar As an alternative to using system properties or a ``jndi.properties`` file, you can programmatically pass properties into the ``InitialContext`` constructor for all or a subset of your configuration. For example: .. codewithvars:: bash Hashtable props = new Hashtable(); props.put(Context.INITIAL_CONTEXT_FACTORY, "io.confluent.kafka.jms.KafkaInitialContextFactory"); props.put(JMSClientConfig.CLIENT_ID_CONFIG, "testing-01"); props.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181"); props.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put("topic.bar", "bar"); Context ctx = new InitialContext(props); As a convenience, ``KafkaInitialContextFactory`` automatically translates TLS system properties to the relevant kafka properties. ================================= ========================================================================================================= System Property |ak| Property ================================= ========================================================================================================= javax.net.ssl.trustStore ssl.truststore.location javax.net.ssl.trustStoreType ssl.truststore.type javax.net.ssl.trustStorePassword ssl.truststore.password javax.net.ssl.keyStore ssl.keystore.location javax.net.ssl.keyStoreType ssl.keystore.type javax.net.ssl.keyStorePassword ssl.keystore.password ================================= ========================================================================================================= Threading ~~~~~~~~~ The Java Messaging Specification states that a session may not be operated on by more than one thread at a time. This restriction applies to the |cjms|. However, different sessions created from a single connection may be used concurrently, as per the specification.