JMS Client のインストール

前提条件
「Confluent Platform Confluent のシステム要件」を参照してください。

インストール

JMS Client は、Java アプリケーション内で使用するライブラリです。

Maven ベースのプロジェクトで kafka-jms-client を参照するには、まず Confluent Maven リポジトリ を pom.xml に追加します。:

<repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven/</url>
    </repository>
</repositories>

次に、Confluent JMS Client および JMS API 仕様の依存関係を追加します。

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-jms-client</artifactId>
    <version>6.2.4</version>
</dependency>
<dependency>
    <groupId>org.apache.geronimo.specs</groupId>
    <artifactId>geronimo-jms_1.1_spec</artifactId>
    <version>1.1</version>
</dependency>

Maven を使用しない場合は、次の URL にアクセスして、JMS Client JAR ファイルを直接ダウンロードできます。

http://packages.confluent.io/maven/io/confluent/kafka-jms-client/6.2.4/kafka-jms-client
-6.2.4.jar

"Fat" JAR(JMS Client およびその依存関係がすべて含まれているファイル)が必要な場合は、「付録 1 - シェーディングされた Fat JAR の作成」の手順に従って作成することができます。

kafka-jms-client の使用方法は、JMS API と似ています。

次のプログラム例では、KafkaConnectionFactory インスタンスを使用して、JMS 準拠の ConnectionSessionMessageProducer オブジェクトを作成しています。MessageProducer を使用して、50 件の TextMessage メッセージを Apache Kafka® トピック test-queue に送信します。このトピックがキューの役割を果たします。MessageConsumer が作成され、これらのメッセージの読み出しに使用されます。

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import io.confluent.kafka.jms.JMSClientConfig;
import io.confluent.kafka.jms.KafkaConnectionFactory;

public class App {

    public static void main(String[] args) throws JMSException {
        Properties settings = new Properties();
        settings.put(JMSClientConfig.CLIENT_ID_CONFIG, "test-client-2");
        settings.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        settings.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181");

        ConnectionFactory connectionFactory = new KafkaConnectionFactory(settings);
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination testQueue = session.createQueue("test-queue");

        MessageProducer producer = session.createProducer(testQueue);
        for (int i=0; i<50; i++) {
            TextMessage message = session.createTextMessage();
            message.setText("This is a text message");
            producer.send(message);
        }

        MessageConsumer consumer = session.createConsumer(testQueue);
        while (true) {
            TextMessage message = (TextMessage)consumer.receive();
            System.out.println(message.getText());
        }
    }
}

付録 1 - シェーディングされた Fat JAR の作成

シナリオによっては、1 つのファイルに JMS Client とその依存関係がすべてまとめられた "Fat" JAR が役立つ場合があります。Confluent ではこの形式の JMS Client を配布していませんが、Fat JAR は簡単に作成することができます。

  1. 下記の pom.xml ファイルを空のディレクトリにコピーします。
  2. Maven コマンド mvn package を実行します。

作成されたアーティファクトは、target ディレクトリに pom.xml ファイルとともに保存されます。注: 提供されている pom.xml ファイルでは、依存関係がバンドルされるだけでなく、名前空間の衝突を避けるため、名前空間 confluent.shaded. で "シェーディング" されます。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>io.confluent</groupId>
    <artifactId>kafka-jms-client-fat</artifactId>
    <version>6.2.4</version>

    <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-jms-client</artifactId>
            <version>6.2.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <relocations>
                                <relocation>
                                    <pattern>org.I0Itec.</pattern>
                                    <shadedPattern>confluent.shaded.org.I0Itec.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.yammer.metrics.</pattern>
                                    <shadedPattern>confluent.shaded.com.yammer.metrics.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>joptsimple</pattern>
                                    <shadedPattern>confluent.shaded.joptsimple</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.zookeeper.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.zookeeper.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.jute.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.jute.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.kafka.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.kafka.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.log4j.</pattern>
                                    <shadedPattern>confluent.shaded.org.apache.log4j.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.common.</pattern>
                                    <shadedPattern>confluent.shaded.com.google.common.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.thirdparty.</pattern>
                                    <shadedPattern>confluent.shaded.com.google.thirdparty.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.fasterxml.jackson.</pattern>
                                    <shadedPattern>confluent.shaded.com.fasterxml.jackson.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>net.jpountz.</pattern>
                                    <shadedPattern>confluent.shaded.net.jpountz.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.xerial.snappy.</pattern>
                                    <shadedPattern>confluent.shaded.org.xerial.snappy.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.jose4j.</pattern>
                                    <shadedPattern>confluent.shaded.org.jose4j.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>io.confluent.common.</pattern>
                                    <shadedPattern>confluent.shaded.io.confluent.common.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>io.confluent.license.</pattern>
                                    <shadedPattern>confluent.shaded.io.confluent.license.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>kafka.</pattern>
                                    <shadedPattern>confluent.shaded.kafka.</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>scala</pattern>
                                    <shadedPattern>confluent.shaded.scala.</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <filtering>true</filtering>
            </resource>
        </resources>

    </build>
</project>