JMS Client 開発ガイド

KafkaConnectionFactory および標準の JMS インターフェイスをすべてインポートします。

import io.confluent.kafka.jms.KafkaConnectionFactory;

import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.Session;
import javax.jms.TextMessage;

構成

必須の構成プロパティ

  • bootstrap.servers - Apache Kafka® クラスターとの初期接続の確立に使用するホストとポートのペアのリスト(形式 : host1:port1,host2:port2,....)。このブートストラップ用のプロパティでどのサーバーが指定されているかにかかわらず、クライアントではクラスター内のすべてのサーバーが使用されることに注意してください。初期化時にリスト内のいずれかのサーバーがダウンした場合に備えて、複数指定しておくことができます。

  • client.id - 内部的に、Confluent JMS Client は、Kafka クラスターとの通信に 1 つ以上の Kafka クライアントを使用します。それらのクライアントの client.id には、この構成プロパティの値に、グローバルに一意の ID(guid)を付加したものが設定されます。client.id 文字列は、リクエストの送信時にサーバーに渡され、デバッグに役立ちます。

  • confluent.license - Confluent エンタープライズサブスクリプション契約条件に基づいて、Confluent からお客様に提供されたライセンスキー文字列。指定されていない場合、試用期間の 30 日間クライアントを使用することができますが、その後は使用できなくなります。

    ちなみに

    Confluent Platform のライセンスの詳細については、「Confluent Platform ライセンス」を参照してください。

  • confluent.topic - Confluent Platform の構成(ライセンス情報など)で使用される Kafka のトピックの名前。このトピックのデフォルトの名前は _confluent-command です。詳細については、「ライセンストピックの構成 」および「ライセンストピックの ACL 」を参照してください。

  • confluent.topic.replication.factor - Confluent Platform の構成(ライセンス情報など)で使用される Kafka のトピックのレプリケーション係数。これは、トピックがまだ存在しない場合にのみ使用されます。デフォルトの 3 は、本稼働環境での使用に適しています。ブローカー数が 3 未満の開発環境で使用する場合は、そのブローカー数をこのプロパティに設定する必要があります("1" など)。

構成プロパティの設定方法は、他の Kafka クライアントと同様です。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("confluent.topic", "foo_confluent-command");
props.put("confluent.topic.replication.factor", "3");
props.put("client.id", "my-jms-client");

省略可能な構成プロパティ

  • allow.out.of.order.acknowledge - true にすると、メッセージの確認応答が順番どおりでない場合に例外がスローされません(順番が前のメッセージは暗黙的に確認応答が行われたことになります)。デフォルト値は false です。
  • jms.fallback.message.type - JMS メッセージタイプヘッダーがメッセージと関連付けられていない場合、このメッセージタイプにフォールバックされます。
  • consumer.group.id - このクライアントが属するコンシューマープロセスのグループを一意に識別する文字列。指定しないと、キューの場合は confluent-jms、トピックの場合は confluent-jms-{uuid} がデフォルトで使用されます({uuid} はコンシューマーごとに一意の値)。この命名方法により、JMS 仕様の要件に従って、キューの場合は "ロードバランサー" セマンティクスが、トピックの場合は "パブリッシュ/サブスクライブ" セマンティクスが提供されます。
  • jms.consumer.poll.timeout.ms - Kafka からレコードを取得する際に、Kafka コンシューマーがブロックする最長時間。この値を調整する必要はありません。
  • jms.consumer.close.timeout.ms - MessageConsumer の終了時にクリーンシャットダウンを待つための待機時間の最大値(単位: ミリ秒)。
  • message.listener.null.wait.ms - メッセージリスナーのポーリングループでメッセージが取得されなかった場合に、Kafka に対して新しいメッセージのポーリングを行うまでの待機時間(単位: ミリ秒)。スループットが低い場合、この値を小さくすると、消費のレイテンシが改善しますが、ネットワークや CPU のオーバーヘッドが高くなります。
  • connection.stop.timeout.ms - connection.stop() が呼び出された場合に、メッセージリスナースレッドのクリーンシャットダウンを待つための待機時間の最大値(単位: ミリ秒)。
  • jms.create.connection.ignore.authenticate - true の場合、ユーザー名とパスワードのパラメーターを持つ ConnectionFactory の接続作成メソッドが、これらのパラメーターを持たない、対応するメソッドにフォールスルーされます(パラメーターは無視されます)。false の場合、これらのメソッドを使用すると、JMSException がスローされます。
  • message.listener.max.redeliveries - セッションが AUTO_ACKNOWLEDGE モードの場合に、メッセージが MessageConsumer リスナーに再配信される回数の上限。デフォルト値は 10 です。

標準の Kafka の構成プロパティ(省略可)

基盤となる Java Kafka クライアントライブラリの構成プロパティはすべて指定できます。必要に応じて、目的のプロパティに producer. または consumer. のプレフィックスを付加します。以下に例を示します。

props.put("producer.linger.ms", "1");
props.put("consumer.heartbeat.interval.ms", "1000");

Confluent 固有の機能(省略可)

Confluent Control Center のメッセージデリバリーモニタリングを有効にするために必要な、クライアントインターセプターを構成するには、以下の Kafka プロパティを設定します。

props.put("producer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
props.put("consumer.interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");

TLS 暗号化の有効化(省略可)

セキュリティ設定は、ネイティブの Kafka Java プロデューサーおよびコンシューマーの設定と一致します。セキュリティ設定は、メッセージの生成と消費の両方に適用されます(セキュリティ設定に consumer. または producer. のプレフィックスを付加する必要はありません)。

ブローカーでクライアント認証が必須でない場合の、最小限の構成の例を以下に示します。

props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/var/private/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "test1234");

クライアント認証が必要な場合は、手順 1 のようにキーストアを作成する必要があり、以下のような構成も必要になります。

props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "test1234");
props.put("ssl.key.password", "test1234");

JMS アプリケーションの開発

ConnectionFactory の作成

接続ファクトリを作成する方法は、次のとおりです。

ConnectionFactory connectionFactory = new KafkaConnectionFactory(props);

KafkaConnectionFactory でも QueueConnectionFactory および TopicConnectionFactory インターフェイスが実装されます。

QueueConnectionFactory queueConnectionFactory = new KafkaConnectionFactory(props);
TopicConnectionFactory topicConnectionFactory = new KafkaConnectionFactory(props);

送信先の作成

トピックとキューはどちらも Kafka トピックでサポートされるため、同じ名前のトピックとキューを作成して使用すると、どちらも同じ Kafka トピックに関連付けられます。

また、Destination の名前は、Kafka トピックと同じ命名の制限に従う必要があることに注意してください。長さは最大 249 文字まで、記号と文字、"."(ドット)、"_"(アンダースコア)、"-"(マイナス)を使用できます。"/" 文字は、他の JMS トピック名で使用されている場合がありますが、使用しないようにしてください。

Queue testQueue = session.createQueue("test_queue");
Topic testTopic = session.createTopic("test_topic");
Destination destination = testTopic;

正規表現を使用して、複数の Kafka トピックでサポートされるキューまたはトピックを指定することもできます。以下に例を示します。

Queue testQueue = session.createQueue("regex(test_queue[12])");

接続の作成

Connection オブジェクトを作成する方法は、次のとおりです。

Connection connection = connectionFactory.createConnection();

セッションの作成

Kafka セッションでは、コンシューマーの確認応答で AUTO_ACKNOWLEDGECLIENT_ACKNOWLEDGE の両方のモードがサポートされます。トランザクションはサポートされません。 transacted 引数には、false を設定する必要があります。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

生成/消費のメッセージ

MessageProducer の例

MessageProducer producer = session.createProducer(testTopic);
TextMessage message = session.createTextMessage();
message.setText("This is a text message");
producer.send(message);

MessageConsumer の例

MessageConsumer consumer = this.session.createConsumer(testQueue);
while(true) {
    Message message = consumer.receiveNoWait();
    // TODO: Process message
}

JNDI コンテキスト

シンプルな JNDI InitialContextFactory の実装が用意されており、JMS の ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory オブジェクトおよび Destination オブジェクトのルックアップに使用できます。以下に例を示します。

Context ctx = new InitialContext();

ConnectionFactory cf = (ConnectionFactory)ctx.lookup("ConnectionFactory");
QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("QueueConnectionFactory");
TopicConnectionFactory qcf = (TopicConnectionFactory)ctx.lookup("TopicConnectionFactory");

Queue queue = (Queue)ctx.lookup("foo");
Topic topic = (Topic)ctx.lookup("bar");

-D コマンドラインオプションを使用するか、クラスパスのどこかにある jndi.properties ファイルを使用して、java.naming.factory.initial システムプロパティに io.confluent.kafka.jms.KafkaInitialContextFactory を設定する必要があります。

また、キューとトピック名のルックアップおよび JMS Client の構成プロパティを指定する必要があります。jndi.properties ファイルの例を以下に示します。

java.naming.factory.initial = io.confluent.kafka.jms.KafkaInitialContextFactory

# JMS Client properties

client.id = testing-01
confluent.topic = localhost:9092
confluent.topic.replication.factor = 3
bootstrap.servers = localhost:9092

# Register queues in JNDI using the form:
#   queue.[jndiName] = [physicalName]

# Register topics in JNDI using the form:
#   topic.[jndiName] = [physicalName]

queue.foo = foo
topic.bar = bar

システムプロパティまたは jndi.properties ファイルを使用する以外の方法として、構成のすべてまたは一部について、プログラムを使用して InitialContext コンストラクターにプロパティを渡すことができます。以下に例を示します。

Hashtable props = new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY, "io.confluent.kafka.jms.KafkaInitialContextFactory");
props.put(JMSClientConfig.CLIENT_ID_CONFIG, "testing-01");
props.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181");
props.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("topic.bar", "bar");

Context ctx = new InitialContext(props);

KafkaInitialContextFactory により、SSL システムプロパティは、関連する Kafka のプロパティに自動的に変換されるため、変換を行う必要はありません。

システムプロパティ Kafka プロパティ
javax.net.ssl.trustStore ssl.truststore.location
javax.net.ssl.trustStoreType ssl.truststore.type
javax.net.ssl.trustStorePassword ssl.truststore.password
javax.net.ssl.keyStore ssl.keystore.location
javax.net.ssl.keyStoreType ssl.keystore.type
javax.net.ssl.keyStorePassword ssl.keystore.password

スレッド

JMS では、1 つのセッションの処理を同時に複数のスレッドで行うことはできないとされています。この制限は、JMS Client にも適用されます。ただし、この仕様に沿って、単一の接続から作成された複数のセッションを同時に使用することはできます。