Kafka Connect TIBCO Sink Connector

The TIBCO Sink Connector is used to move messages from Kafka to TIBCO Enterprise Messaging Service (EMS).

Note

If you are required to use the Java Naming and Directory Interface™ (JNDI) to connect to TIBCO EMS, there is a general JMS Sink Connector available that uses a JNDI-based mechanism to connect to the JMS broker.

Prerequisites

The following are required to run the Kafka Connect TIBCO Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above (requires header support in Connect)
  • TIBCO EMS with JMS 1.1 support
  • tibjms Client Library (See Installing TIBCO JMS Client Library)
  • Java 1.8

Install TIBCO Sink Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-tibco-sink:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-tibco-sink:1.0.0-preview

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

TIBCO Client Library

The Kafka Connect TIBCO connector does not come with the TIBCO JMS client library.

If you are running a multi-node Connect cluster, the TIBCO connector and TIBCO JMS client JAR must be installed on every Connect worker in the cluster. See below for details.

Installing TIBCO JMS Client Library

This connector relies on a provided tibjms client JAR that is included in the TIBCO EMS installation. The connector will fail to create a connection to TIBCO EMS if you have not installed the JAR on each Connect worker node.

The installation steps are:

  1. Download and Install TIBCO Enterprise Message Service™ (Mac or Linux). If you have already installed TIBCO EMS, skip to the next step.
  2. Unzip the download and copy only the tibco/ems/{version}/lib/tibjms.jar file into the share/java/kafka-connect-tibco-sink directory of your Confluent Platform installation on each worker node.
  3. Restart all of the Connect worker nodes.

Note

The share/java/kafka-connect-tibco-sink directory mentioned above is for Confluent Platform. If you are using a different installation, find the location of the Confluent TIBCO Sink Connector JAR files and place the tibjms JAR file into the same directory.

JMS Message Formats

The format of outgoing JMS Message values is configured with the jms.message.format property, using one of the following options:

string (default)

When using the string Message Format, record values are run through Values.convertToString(...) from the Connect Data package and produced as a JMS TextMessage.

Primitive values are converted to their String equivalent and structured objects are transformed to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted.

Tip

Kafka Connect Transformations can be used with the configured jms.message.format to transform the record value to the desired string representation before the connector processes each record.

avro

Record values are serialized without the Avro schema information and produced as a JMS BytesMessage. JMS consumers must have the schema to deserialize the data.

Important

The connector attempts to infer the Avro schema for records that have no schema. If the connector cannot infer the schema, the task is killed. If you are processing data without a schema, consider using one of the other jms.message.format configurations.

json

Record values are converted to a UTF-8 encoded JSON representation and produced as a JMS TextMessage.

bytes

Record values are passed along in bytes form without any conversion.

Important

Record values must be converted to bytes form before the connector processes them. Configure the value.converter property to org.apache.kafka.connect.converters.ByteArrayConverter to ensure that record values arrive in byte format.

Forwarding Kafka Properties to JMS

The connector can be configured to forward various values from the Kafka record to the JMS Message.

  • Enable jms.forward.kafka.key to convert the record’s key to a String and forward it as the JMSCorrelationID.
  • Enable jms.forward.kafka.metadata to forward the record’s topic, partition, and offset on JMS Message properties.
    • Kafka topic is applied to the message as a String property named KAFKA_TOPIC.
    • Partition is applied to the message as an Int property named KAFKA_PARTITION.
    • Offset is applied to the message as a Long property named KAFKA_OFFSET.
  • Enable jms.forward.kafka.headers to add each header from the SinkRecord to the JMS Message as a String property.

Note

The connector converts the record key and headers to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted. No other conversion is done to the key and headers before forwarding them on the JMS Message. If another format is needed, out-of-the-box or custom Kafka Connect Transformations can be used with the connector to transform the record keys and/or headers to the desired string representation before the JMS sink connector processes each record.

Quick Start

This quick start uses the TIBCO Sink Connector to consume records from Kafka and send them to TIBCO Enterprise Message Service™ - Community Edition.

  1. Download TIBCO Enterprise Message Service™ - Community Edition (Mac or Linux) and run the appropriate installer. See the TIBCO Enterprise Message Service™ Installation Guide for more details. Similar documentation is available for each version of TIBCO EMS.

  2. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent-hub install confluentinc/kafka-connect-tibco-sink:latest
    
  3. Install the TIBCO JMS Client Library.

  4. Start the Confluent Platform.

    confluent start
    
  5. Produce test data to the sink-messages topic in Kafka.

    seq 10 | confluent produce sink-messages
    
  6. Create a tibco-sink.json file with the following contents:

    {
      "name": "TibcoSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.TibcoSinkConnector",
        "tasks.max": "1",
        "topics": "sink-messages",
        "tibco.url": "tcp://localhost:7222",
        "tibco.username": "admin",
        "tibco.password": "",
        "jms.destination.type": "queue",
        "jms.destination.name": "connector-quickstart",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
    
  7. Load the TIBCO Sink Connector.

    confluent load tibco​ -d tibco-sink.json
    
  8. Confirm that the connector is in a RUNNING state.

    confluent status tibco
    
  9. Confirm the messages were delivered to the connector-quickstart queue in TIBCO.

    # open TIBCO admin tool (password is empty)
    tibco/ems/8.4/bin/tibemsadmin -server "tcp://localhost:7222" -user admin
    
    > show queue connector-quickstart
    

Additional Documentation