TIBCO Source Connector for Confluent Platform

The Kafka Connect TIBCO Source connector is used to move messages from TIBCO Enterprise Messaging Service (EMS) to Apache Kafka®.

Messages are consumed from the TIBCO EMS broker using the configured message selectors and written to a single Kafka topic. A Single Message Transformation can be used to route messages to multiple Kafka topics.

The connector currently supports consuming JMS TextMessage and BytesMessage but not ObjectMessage or StreamMessage.

Note

If you are required to use the Java Naming and Directory Interface™ (JNDI) to connect to TIBCO EMS, there is a general JMS Source connector for Confluent Platform available that uses a JNDI-based mechanism to connect to the JMS broker.

Features

At least once delivery

This connector guarantees that records are delivered to the Kafka topic at least once. If the connector restarts, there may be some duplicate records in the Kafka topic.

Multiple tasks

The TIBCO Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Prerequisites

The following are required to run the Kafka Connect TIBCO Source connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above (requires header support in Connect)
  • TIBCO EMS with JMS 1.1 support
  • tibjms Client Library (See Installing TIBCO JMS Client Library)
  • Java 1.8

Install the TIBCO Source Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Important

You must install the connector on every machine where Connect will run.

  • An install of the Confluent Hub Client.

    Note

    This is installed by default with Confluent Enterprise.

  • An install of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-tibco-source:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-tibco-source:1.0.0-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see TIBCO Source Connector Configuration Properties. To understand how the connector internally configures the acknowledgement mode, see the following section.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Acknowledgement mode

The connector internally uses CLIENT_ACKNOWLEDGE mode to receive and acknowledge messages from the JMS broker. In this mode, acknowledging any message will acknowledge every message received (see section 6.2.10 in the JMS 2.0 Specification). To prevent messages from being prematurely acknowledged, the connector processes only one message at time. In other words, the connector will not try to receive new messages until the last message is committed to a Kafka topic. This might compromise the throughput of the connector, but messages will be transferred to Kafka successfully.

TIBCO Client Library

The Kafka Connect TIBCO Source connector does not come with the TIBCO JMS client library.

If you are running a multi-node Connect cluster, the connector and TIBCO JMS client JAR must be installed on every Connect worker in the cluster. See below for details.

Installing TIBCO JMS Client Library

This connector relies on a provided tibjms client JAR that is included in the TIBCO EMS installation. The connector will fail to create a connection to TIBCO EMS if you have not installed the JAR on each Connect worker node.

The installation steps are:

  1. Download and Install TIBCO Enterprise Message Service™ (Mac or Linux). If you have already installed TIBCO EMS, skip to the next step. #. Unzip the download and copy only the tibco/ems/{version}/lib/tibjms.jar file into the share/java/kafka-connect-tibco-source directory of your Confluent Platform installation on each worker node. #. Restart all of the Connect worker nodes.

Note

The share/java/kafka-connect-tibco-source directory mentioned above is for Confluent Platform. If you are using a different installation, find the location of the Confluent TIBCO Source connector JAR files and place the tibjms JAR file into the same directory.

Schemas

The Tibco Source connector produces messages with keys and values that adhere to the schemas described in the following sections.

io.confluent.connect.jms.Key

This schema is used to store the incoming MessageID on the message interface. This will ensure that when the same message ID arrives it will end up in the same partition. In practice this should never occur. The schema defines the following fields:

Name Schema Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().

io.confluent.connect.jms.Value

This schema is used to store the value of the JMS message. The schema defines the following fields:

Name Schemna Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().
messageType STRING yes   This field stores the type of message that was received. This corresponds to the sub-interfaces of Message. BytesMessage = bytes, MapMessage = map, ObjectMessage = object, StreamMessage = stream and TextMessage = text. The corresponding field will be populated with the values from the respective message sub-interface.
timestamp INT64 yes   Data from the getJMSTimestamp() method.
deliveryMode INT32 yes   This field stores the value of Message.getJMSDeliveryMode(). method.
correlationID STRING no   This field stores the value of Message.getJMSCorrelationID(). method.
replyTo Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
destination Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
redelivered BOOLEAN yes   This field stores the value of Message.getJMSRedelivered().
type STRING no   This field stores the value of Message.getJMSType().
expiration INT64 no   This field stores the value of Message.getJMSExpiration().
priority INT32 no   This field stores the value of Message.getJMSPriority().
properties Map of STRING, PropertyValue yes   This field stores the data from all of the properties for the Message indexed by their propertyName.
bytes BYTES no   This field stores the value from BytesMessage.html.readBytes(byte[]).
map Map of STRING, PropertyValue no   This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key.
text STRING no   This field stores the value from TextMessage.html.getText().

io.confluent.connect.jms.Destination

This schema is used to represent a JMS Destination, and is either queue or topic.

The schema defines the following fields:

Name Schema Required Default Value Documentation
destinationType STRING yes   The type of JMS Destination, and either queue or topic.
name STRING yes   The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName().

io.confluent.connect.jms.PropertyValue

This schema is used to store the data that is found in the properties of the message. To ensure that the proper type mappings are preserved field propertyType stores the value type for the field. The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by Message.getObjectProperty(). The schema defines the following fields:

Name Schema Required Default Value Documentation
propertyType STRING yes   The Java type of the property on the Message. One of boolean, byte, short, integer, long, float, double, or string.
boolean BOOLEAN no   The value stored as a boolean. Null unless propertyType is set to boolean.
byte INT8 no   The value stored as a byte. Null unless propertyType is set to byte.
short INT16 no   The value stored as a short. Null unless propertyType is set to short.
integer INT32 no   The value stored as a integer. Null unless propertyType is set to integer.
long INT64 no   The value stored as a long. Null unless propertyType is set to long.
float FLOAT32 no   The value stored as a float. Null unless propertyType is set to float.
double FLOAT64 no   The value stored as a double. Null unless propertyType is set to double.
string STRING no   The value stored as a string. Null unless propertyType is set to string.

Quick Start

This quick start uses the TIBCO Source connector to consume records from TIBCO Enterprise Message Service™ - Community Edition and send them to Kafka.

  1. Download TIBCO Enterprise Message Service™ - Community Edition (Mac or Linux) and run the appropriate installer. See the TIBCO Enterprise Message Service™ Installation Guide for more details. Similar documentation is available for each version of TIBCO ßEMS.

  2. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-tibco-source:latest
    
  3. Install the TIBCO JMS Client Library.

  4. Start Confluent Platform.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services start
    
  5. Create a connector-quickstart queue with the TIBCO Admin Tool.

    # connect to TIBCO with the Admin Tool (PASSWORD IS EMPTY)
    tibco/ems/8.4/bin/tibemsadmin -server "tcp://localhost:7222" -user admin
    
    > create queue connector-quickstart
    

    Important

    Java 6+ must be installed to run the TIBCO Samples. Run java -version to check if Java installed’s on your system. The command will print out the current Java version if Java is installed. If Java is not installed, go through the Java Installation Guide before moving on to the next step.

  6. Compile the TIBCO Java samples so that they can be run in the following step.

    # setup Java's classpath so that the Java compiler can find the imports of the samples
    cd tibco/ems/8.4/samples/java
    export TIBEMS_JAVA=tibco/ems/8.4/lib
    CLASSPATH=${TIBEMS_JAVA}/jms-2.0.jar:${CLASSPATH}
    CLASSPATH=.:${TIBEMS_JAVA}/tibjms.jar:${TIBEMS_JAVA}/tibjmsadmin.jar:${CLASSPATH}
    export CLASSPATH
    
    # compile the java classes (run from the tibco/ems/8.4/samples/java directory)
    javac *.java
    
  7. Produce a set of messages to the connector-quickstart queue.

    cd tibco/ems/8.4/samples/java
    
    # produce 5 test messages
    java tibjmsMsgProducer -user admin -queue connector-quickstart m1 m2 m3 m4 m5
    
    ------------------------------------------------------------------------
    tibjmsMsgProducer SAMPLE
    ------------------------------------------------------------------------
    Server....................... localhost
    User......................... admin
    Destination.................. connector-quickstart
    Send Asynchronously.......... false
    Message Text.................
    m1
    m2
    m3
    m4
    m5
    ------------------------------------------------------------------------
    
    Publishing to destination 'connector-quickstart'
    
    Published message: m1
    Published message: m2
    Published message: m3
    Published message: m4
    Published message: m5
    
  8. Create a tibco-source.json file with the following contents:

    {
      "name": "TibcoSourceConnector",
      "config": {
        "connector.class": "io.confluent.connect.tibco.TibcoSourceConnector",
        "tasks.max": "1",
        "kafka.topic": "from-tibco-messages",
        "tibco.url": "tcp://localhost:7222",
        "tibco.username": "admin",
        "tibco.password": "",
        "jms.destination.type": "queue",
        "jms.destination.name": "connector-quickstart",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
    
  9. Load the TIBCO Source connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load tibco --config tibco-source.json
    
  10. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status TibcoSourceConnector
    
  11. Confirm the messages were delivered to the from-tibco-messages topic in Kafka.

    confluent local services kafka consume from-tibco-messages --from-beginning