TIBCO Sink Connector for Confluent Platform¶
The Kafka Connect TIBCO Sink connector is used to move messages from Kafka to TIBCO Enterprise Messaging Service (EMS).
Note
If you are required to use the Java Naming and Directory Interface™ (JNDI) to connect to TIBCO EMS, there is a general JMS Sink connector for Confluent Platform available that uses a JNDI-based mechanism to connect to the JMS broker.
Features¶
The TIBCO Sink connector includes the following features:
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- JMS message format configuration
- Kafka property forwarding to JMS
At least once delivery¶
This connector guarantees that records are delivered to the Kafka topic at least once. If the connector restarts, there may be some duplicate records in the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The TIBCO Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
JMS message format¶
The format of outgoing JMS message values is configured with the
jms.message.format
property, using one of the following options:
string (default)¶
When using the string
message format, record values are run through
Values.convertToString(...)
from the Connect Data package and produced as a
JMS TextMessage
.
Primitive values are converted to their string equivalent and structured objects are transformed to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted.
Note that Single Message
Transformation can be used with the configured
jms.message.format
to transform the record value to the desired string
representation before the connector processes each record.
avro¶
Record values are serialized without the Avro schema information and produced as
a JMS BytesMessage
. JMS consumers must have the schema to deserialize the data.
It it is important to note that the connector attempts to infer the Avro schema
for records that have no schema. If the connector cannot infer the schema, the
task is killed. If you are processing data without a schema, consider using one
of the other jms.message.format
configurations.
json¶
Record values are converted to a UTF-8 encoded JSON representation and produced
as a JMS TextMessage
.
bytes¶
Record values are passed along in bytes form without any conversion. Note that
record values must be converted to bytes form before the connector processes
them. Configure the value.converter
property to
org.apache.kafka.connect.converters.ByteArrayConverter
to ensure the record
values arrive in byte format.
Forward Kafka properties to JMS¶
The connector can be configured to forward various values from the Kafka record to the JMS Message.
- Enable
jms.forward.kafka.key
to convert the record’s key to a string and forward it as theJMSCorrelationID
. - Enable
jms.forward.kafka.metadata
to forward the record’s topic, partition, and offset on JMS Message properties.- Kafka topic is applied to the message as a string property named
KAFKA_TOPIC
. - Partition is applied to the message as an Int property named
KAFKA_PARTITION
. - Offset is applied to the message as a Long property named
KAFKA_OFFSET
.
- Kafka topic is applied to the message as a string property named
- Enable
jms.forward.kafka.headers
to add each header from the SinkRecord to the JMS Message as a string property.
Note
The connector converts the record key and headers to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays), which are unquoted. No other conversion is done to the key and headers before forwarding them on to the JMS Message. If another format is needed, out-of-the-box or custom Single Message Transformation can be used with the connector to transform the record keys and/or headers to the desired string representation before the JMS sink connector processes each record.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license, and for information about the license topic, refer to License topic configuration.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for TIBCO Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the TIBCO Sink connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
- Connect: Confluent Platform 4.1.0 or later, or Kafka 1.1.0 or later (requires header support in Connect).
- TIBCO EMS with JMS 1.1 support.
tibjms
Client Library. For more details, see Installing TIBCO JMS client library.- Java 8+. Note that Java 8 is deprecated in versions 7.2 and later of Confluent Platform. For more details, view Java compatibility with Confluent Platform by version.
- An install of the Confluent Hub Client. Note that this is installed by default with Confluent Enterprise.
Install the connector using the Confluent CLI¶
To install the latest
connector version, navigate to your Confluent Platform installation
directory and run the following command:
confluent connect plugin install confluentinc/kafka-connect-tibco-source:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-tibco-source:1.0.0-preview
TIBCO client library¶
The Kafka Connect TIBCO connector does not come with the TIBCO JMS client library. If you are running a multi-node Connect cluster, the TIBCO connector and TIBCO JMS client JAR must be installed on every Connect worker in the cluster. For detailed instructions, see Installing TIBCO JMS client library.
Installing TIBCO JMS client library¶
This connector relies on a provided tibjms
client JAR that is included in
the TIBCO EMS installation. The connector will fail to create a connection to
TIBCO EMS if you have not installed the JAR on each Connect worker node.
The installation steps are:
Download and Install TIBCO Enterprise Message Service™ (Mac or Linux). If you have already installed TIBCO EMS, skip to the next step.
Unzip the download and copy only the
tibco/ems/{version}/lib/tibjms.jar
file into theshare/java/kafka-connect-tibco-sink
directory of your Confluent Platform installation on each worker node.Note that The
share/java/kafka-connect-tibco-sink
directory is for Confluent Platform. If you are using a different installation, find the location of the Confluent TIBCO Sink connector JAR files and place thetibjms
JAR file into the same directory.Restart all of the Connect worker nodes.
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick start¶
This quick start uses the TIBCO Sink connector to consume records from Kafka and send them to TIBCO Enterprise Message Service™ - Community Edition.
Download TIBCO Enterprise Message Service™ - Community Edition (Mac or Linux) and run the appropriate installer. See the TIBCO Enterprise Message Service™ Installation Guide for more details. Similar documentation is available for each version of TIBCO EMS.
Install the connector through the Confluent Hub Client.
# run from your CP installation directory confluent connect plugin install confluentinc/kafka-connect-tibco-sink:latest
Start Confluent Platform.
confluent local start
Produce test data to the
sink-messages
topic in Kafka.seq 10 | confluent local produce sink-messages
Create a
tibco-sink.json
file with the following contents:{ "name": "TibcoSinkConnector", "config": { "connector.class": "io.confluent.connect.jms.TibcoSinkConnector", "tasks.max": "1", "topics": "sink-messages", "tibco.url": "tcp://localhost:7222", "tibco.username": "admin", "tibco.password": "", "jms.destination.type": "queue", "jms.destination.name": "connector-quickstart", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } }
Load the TIBCO Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local load tibco --config tibco-sink.json
Confirm that the connector is in a
RUNNING
state.confluent local status tibco
Confirm the messages were delivered to the
connector-quickstart
queue in TIBCO.# open TIBCO admin tool (password is empty) tibco/ems/8.4/bin/tibemsadmin -server "tcp://localhost:7222" -user admin > show queue connector-quickstart