Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka Connect TIBCO Source Connector¶
The TIBCO Source Connector is used to move messages from TIBCO Enterprise Messaging Service (EMS) to Kafka.
Messages are consumed from the TIBCO EMS broker using the configured message selectors and written to a single Kafka topic. A Kafka Connect Transformations can be used to route messages to multiple Kafka topics.
The connector currently supports consuming JMS TextMessage and BytesMessage but not ObjectMessage or StreamMessage.
Note
If you are required to use the Java Naming and Directory Interface™ (JNDI) to connect to TIBCO EMS, there is a general JMS Source Connector available that uses a JNDI-based mechanism to connect to the JMS broker.
Prerequisites¶
The following are required to run the Kafka Connect TIBCO Source Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above (requires header support in Connect)
- TIBCO EMS with JMS 1.1 support
tibjms
Client Library (See Installing TIBCO JMS Client Library)- Java 1.8
Install TIBCO Source Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run this command to install the latest (latest
) connector version.
The connector must be installed on every machine where Connect will be run.
confluent-hub install confluentinc/kafka-connect-tibco-source:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-tibco-source:1.0.0-preview
Install Connector Manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
TIBCO Client Library¶
The Kafka Connect TIBCO Source Connector does not come with the TIBCO JMS client library.
If you are running a multi-node Connect cluster, the connector and TIBCO JMS client JAR must be installed on every Connect worker in the cluster. See below for details.
Installing TIBCO JMS Client Library¶
This connector relies on a provided tibjms
client JAR that is included in the TIBCO EMS installation.
The connector will fail to create a connection to TIBCO EMS if you have not installed the JAR on each Connect worker node.
The installation steps are:
- Download and Install TIBCO Enterprise Message Service™ (Mac or Linux). If you have already installed TIBCO EMS, skip to the next step.
- Unzip the download and copy only the
tibco/ems/{version}/lib/tibjms.jar
file into theshare/java/kafka-connect-tibco-source
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
Note
The share/java/kafka-connect-tibco-source
directory mentioned above is for Confluent Platform.
If you are using a different installation, find the location of the Confluent TIBCO
Source Connector JAR files and place the tibjms
JAR file into the same directory.
Schemas¶
The connector produces Kafka messages with keys and values that adhere to the schemas described in the following sections.
io.confluent.connect.jms.Key¶
This schema stores the incoming MessageID on the message interface. This ensures that if the same message ID arrives, which is unlikely, it will end up in the same Kafka partition.
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). |
io.confluent.connect.jms.Value¶
This schema stores the value of the JMS message.
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). | |
messageType | STRING | yes | This field stores the type of message that was received. This corresponds to the subinterfaces of Message. BytesMessage = bytes, MapMessage = map, ObjectMessage = object, StreamMessage = stream and TextMessage = text. The corresponding field will be populated with the values from the respective Message subinterface. | |
timestamp | INT64 | yes | Data from the getJMSTimestamp() method. | |
deliveryMode | INT32 | yes | This field stores the value of Message.getJMSDeliveryMode(). | |
correlationID | STRING | no | This field stores the value of Message.getJMSCorrelationID(). | |
replyTo | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
destination | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
redelivered | BOOLEAN | yes | This field stores the value of Message.getJMSRedelivered(). | |
type | STRING | no | This field stores the value of Message.getJMSType(). | |
expiration | INT64 | yes | This field stores the value of Message.getJMSExpiration(). | |
priority | INT32 | yes | This field stores the value of Message.getJMSPriority(). | |
properties | Map of STRING, PropertyValue | yes | This field stores the data from all of the properties for the Message indexed by their propertyName. | |
bytes | BYTES | no | This field stores the value from BytesMessage.html.readBytes(byte[]). | |
map | Map of STRING, PropertyValue | no | This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key. | |
text | STRING | no | This field stores the value from TextMessage.html.getText(). |
io.confluent.connect.jms.Destination¶
This schema represents a JMS Destination, and is either queue or topic.
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
destinationType | STRING | yes | The type of JMS Destination, and either queue or topic . |
|
name | STRING | yes | The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue¶
This schema stores the data found in the properties of the message. To ensure that type mappings are preserved, propertyType
stores the type of the field.
The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by Message.getObjectProperty().
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
propertyType | STRING | yes | The java type of the property on the Message. One of boolean , byte , short , integer , long , float , double , or string . |
|
boolean | BOOLEAN | no | The value stored as a boolean. Null unless propertyType is set to boolean . |
|
byte | INT8 | no | The value stored as a byte. Null unless propertyType is set to byte . |
|
short | INT16 | no | The value stored as a short. Null unless propertyType is set to short . |
|
integer | INT32 | no | The value stored as a integer. Null unless propertyType is set to integer . |
|
long | INT64 | no | The value stored as a long. Null unless propertyType is set to long . |
|
float | FLOAT32 | no | The value stored as a float. Null unless propertyType is set to float . |
|
double | FLOAT64 | no | The value stored as a double. Null unless propertyType is set to double . |
|
string | STRING | no | The value stored as a string. Null unless propertyType is set to string . |
Quick Start¶
This quick start uses the TIBCO Source Connector to consume records from TIBCO Enterprise Message Service™ - Community Edition and send them to Kafka.
Download TIBCO Enterprise Message Service™ - Community Edition (Mac or Linux) and run the appropriate installer. See the TIBCO Enterprise Message Service™ Installation Guide for more details. Similar documentation is available for each version of TIBCO EMS.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-tibco-source:latest
Start the Confluent Platform.
confluent start
Create a
connector-quickstart
queue with the TIBCO Admin Tool.# connect to TIBCO with the Admin Tool (PASSWORD IS EMPTY) tibco/ems/8.4/bin/tibemsadmin -server "tcp://localhost:7222" -user admin > create queue connector-quickstart
Important
Java 6+ must be installed to run the TIBCO Samples. Run
java -version
to check if Java installed’s on your system. The command will print out the current Java version if Java is installed. If Java is not installed, go through the Java Installation Guide before moving on to the next step.Compile the TIBCO Java samples so that they can be run in the following step.
# setup Java's classpath so that the Java compiler can find the imports of the samples cd tibco/ems/8.4/samples/java export TIBEMS_JAVA=tibco/ems/8.4/lib CLASSPATH=${TIBEMS_JAVA}/jms-2.0.jar:${CLASSPATH} CLASSPATH=.:${TIBEMS_JAVA}/tibjms.jar:${TIBEMS_JAVA}/tibjmsadmin.jar:${CLASSPATH} export CLASSPATH # compile the java classes (run from the tibco/ems/8.4/samples/java directory) javac *.java
Produce a set of messages to the
connector-quickstart
queue.cd tibco/ems/8.4/samples/java # produce 5 test messages java tibjmsMsgProducer -user admin -queue connector-quickstart m1 m2 m3 m4 m5 ------------------------------------------------------------------------ tibjmsMsgProducer SAMPLE ------------------------------------------------------------------------ Server....................... localhost User......................... admin Destination.................. connector-quickstart Send Asynchronously.......... false Message Text................. m1 m2 m3 m4 m5 ------------------------------------------------------------------------ Publishing to destination 'connector-quickstart' Published message: m1 Published message: m2 Published message: m3 Published message: m4 Published message: m5
Create a
tibco-source.json
file with the following contents:{ "name": "TibcoSourceConnector", "config": { "connector.class": "io.confluent.connect.tibco.TibcoSourceConnector", "tasks.max": "1", "kafka.topic": "from-tibco-messages", "tibco.url": "tcp://localhost:7222", "tibco.username": "admin", "tibco.password": "", "jms.destination.type": "queue", "jms.destination.name": "connector-quickstart", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } }
Load the TIBCO Source Connector.
confluent load tibco -d tibco-source.json
Confirm that the connector is in a
RUNNING
state.confluent status TibcoSourceConnector
Confirm the messages were delivered to the
from-tibco-messages
topic in Kafka.confluent consume from-tibco-messages --from-beginning