JMS Client Development Guide¶
Import KafkaConnectionFactory
and all of the standard JMS interfaces:
import io.confluent.kafka.jms.KafkaConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.Session;
import javax.jms.TextMessage;
Configuration¶
Required Configuration Properties¶
bootstrap.servers
- A list of host/port pairs to use for establishing the initial connection to your Apache Kafka® cluster (of the form:host1:port1,host2:port2,....
). Note that the client will make use of all servers in your cluster irrespective of which servers are specified via this property for bootstrapping. You may want to specify more than one in case one of the servers in your list is down at the time of initialization.client.id
- Under the hood, the Confluent JMS Client makes use of one or more Kafka clients for communication your Kafka cluster. Theclient.id
of these clients is set to the value of this configuration property appended with a globally unique id (guid). Theclient.id
string is passed to the server when making requests and is useful for debugging purposes.confluent.license
- A license key string provided to you by Confluent under the terms of a Confluent Enterprise subscription agreement. If not specified, you may use the client for a trial period of 30 days after which it will stop working.Tip
For complete license information for Confluent Platform, see Confluent Platform Licenses.
confluent.topic
- Name of the Kafka topic used for Confluent Platform configuration, including licensing information. The default name for this topic is_confluent-command
. To learn more, see License topic configuration and License topic ACLs.confluent.topic.replication.factor
- The replication factor for the Kafka topic used for Confluent Platform configuration, including licensing information. This is used only if the topic does not already exist, and the default of three is appropriate for production use. If you are using a development environment with less than three brokers, you must set this to the number of brokers (e.g. 1).
Configuration properties are set in the same way as any other Kafka client:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("confluent.topic", "foo_confluent-command");
props.put("confluent.topic.replication.factor", "3");
props.put("client.id", "my-jms-client");
Optional Configuration Properties¶
allow.out.of.order.acknowledge
- If true, does not throw an exception if a message is acknowledged out of order (which implicitly acknowledges any messages before it). default value isfalse
.jms.fallback.message.type
- If the JMS Message type header is not associated with a message, fallback to this message type.consumer.group.id
- A string that uniquely identifies the group of consumer processes to which this client belongs. If not specified, this defaults toconfluent-jms
in the case of queues andconfluent-jms-{uuid}
in the case of topics, where {uuid} is a unique value for each consumer. This naming strategy provides load balancer semantics in the case of queues and publish-subscribe semantics in the case of topics, as required by the JMS Specification.jms.consumer.poll.timeout.ms
- The maximum length of time Kafka consumers should block when retrieving records from Kafka. You should not need to adjust this value.jms.consumer.close.timeout.ms
- The maximum number of milliseconds to wait for a clean shutdown when closing aMessageConsumer
.message.listener.null.wait.ms
- The number of milliseconds to wait before polling Kafka for new messages if no messages were retrieved in a message listener poll loop. Reducing this value will improve consume latency in low throughput scenarios at the expense of higher network/CPU overhead.connection.stop.timeout.ms
- The maximum number of milliseconds to wait for the message listener threads to cleanly shutdown when connection.stop() has been called.jms.create.connection.ignore.authenticate
- If true, connection creation methods on ConnectionFactory that have username and password parameters will fall through to the corresponding methods that do not have these parameters (the parameters will be ignored). If false, use of these methods will result in a JMSException being thrown.message.listener.max.redeliveries
- The maximum number of times a message will be redelivered to a MessageConsumer listener when the session is in AUTO_ACKNOWLEDGE mode. Default value is 10.
Standard Kafka Configuration Properties (Optional)¶
All of the configuration properties of the underlying Java Kafka client library may be specified. Simply prefix the desired property with producer.
or consumer.
as appropriate. For example:
props.put("producer.linger.ms", "1");
props.put("consumer.heartbeat.interval.ms", "1000");
Confluent Specific Features (Optional)¶
To configure client interceptors, which are required to enable Confluent Control Center message delivery monitoring, set the following Kafka properties:
props.put("producer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
props.put("consumer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");
Enabling TLS Encryption (Optional)¶
Security settings match those of the native Kafka java producer and consumer. Security settings are applied to both production and consumption of messages (you do not need to prefix security settings with consumer.
or producer.
).
If client authentication is not required in the broker, then the following is a minimal configuration example:
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/var/private/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "test1234");
If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "test1234");
props.put("ssl.key.password", "test1234");
Developing JMS Applications¶
Creating a ConnectionFactory¶
A connection factory can be created as follows:
ConnectionFactory connectionFactory = new KafkaConnectionFactory(props);
KafkaConnectionFactory
also implements the QueueConnectionFactory
and TopicConnectionFactory
interfaces:
QueueConnectionFactory queueConnectionFactory = new KafkaConnectionFactory(props);
TopicConnectionFactory topicConnectionFactory = new KafkaConnectionFactory(props);
Creating a Destination¶
Keep in mind that topics and queues are both backed by Kafka topics, so if you create and use a topic and queue with the same name, they will both be associated with the same Kafka topic.
Also note that Destination
names must follow the same naming restrictions of Kafka topics so the maximum length is 249 symbols and letters, . (dot), _ (underscore), and - (minus) can be used. Take care to avoid the use of the ‘/’ character which is sometimes used in other JMS Topic names.
Queue testQueue = session.createQueue("test_queue");
Topic testTopic = session.createTopic("test_topic");
Destination destination = testTopic;
It’s possible to specify a queue or topic backed by more than one Kafka topic using a regular expression. For example:
Queue testQueue = session.createQueue("regex(test_queue[12])");
Creating a Connection¶
Create a Connection
object as follows:
Connection connection = connectionFactory.createConnection();
Creating Sessions¶
Kafka sessions support both AUTO_ACKNOWLEDGE
and CLIENT_ACKNOWLEDGE
modes for consumer acknowledgements. Transactions are not supported - you should set the transacted
argument to false
.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Producing / Consuming Messages¶
MessageProducer
example:
MessageProducer producer = session.createProducer(testTopic);
TextMessage message = session.createTextMessage();
message.setText("This is a text message");
producer.send(message);
MessageConsumer
example:
MessageConsumer consumer = this.session.createConsumer(testQueue);
while(true) {
Message message = consumer.receiveNoWait();
// TODO: Process message
}
JNDI Context¶
A simple JNDI InitialContextFactory
implementation is provided that can be used to lookup JMS
ConnectionFactory
, QueueConnectionFactory
and TopicConnectionFactory
objects as well
as Destination
objects. For example:
Context ctx = new InitialContext();
ConnectionFactory cf = (ConnectionFactory)ctx.lookup("ConnectionFactory");
QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("QueueConnectionFactory");
TopicConnectionFactory qcf = (TopicConnectionFactory)ctx.lookup("TopicConnectionFactory");
Queue queue = (Queue)ctx.lookup("foo");
Topic topic = (Topic)ctx.lookup("bar");
You’ll need to set the java.naming.factory.initial
system property to
io.confluent.kafka.jms.KafkaInitialContextFactory
, either using a -D
command
line option or in a jndi.properties
file located somewhere on your classpath.
Also, queue and topic name lookups and JMS Client configuration properties need to be specified.
Here’s an example jndi.properties
file:
java.naming.factory.initial = io.confluent.kafka.jms.KafkaInitialContextFactory
# JMS Client properties
client.id = testing-01
confluent.topic = localhost:9092
confluent.topic.replication.factor = 3
bootstrap.servers = localhost:9092
# Register queues in JNDI using the form:
# queue.[jndiName] = [physicalName]
# Register topics in JNDI using the form:
# topic.[jndiName] = [physicalName]
queue.foo = foo
topic.bar = bar
As an alternative to using system properties or a jndi.properties
file, you can programmatically
pass properties into the InitialContext
constructor for all or a subset of your configuration.
For example:
Hashtable props = new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY, "io.confluent.kafka.jms.KafkaInitialContextFactory");
props.put(JMSClientConfig.CLIENT_ID_CONFIG, "testing-01");
props.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181");
props.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("topic.bar", "bar");
Context ctx = new InitialContext(props);
As a convenience, KafkaInitialContextFactory
automatically translates SSL system properties to
the relevant kafka properties.
System Property | Kafka Property |
---|---|
javax.net.ssl.trustStore | ssl.truststore.location |
javax.net.ssl.trustStoreType | ssl.truststore.type |
javax.net.ssl.trustStorePassword | ssl.truststore.password |
javax.net.ssl.keyStore | ssl.keystore.location |
javax.net.ssl.keyStoreType | ssl.keystore.type |
javax.net.ssl.keyStorePassword | ssl.keystore.password |
Threading¶
The Java Messaging Specification states that a session may not be operated on by more than one thread at a time. This restriction applies to the JMS Client. However, different sessions created from a single connection may be used concurrently, as per the specification.