Solace Sink Connector for Confluent Platform

The Kafka Connect Solace Sink connector is used to move messages from Apache Kafka® to a Solace PubSub+ cluster.

If you are required to use the Java Naming and Directory Interface™ (JNDI) to connect to Solace, there is a general JMS Sink connector for Confluent Platform available that uses a JNDI-based mechanism to connect to the JMS broker.

Features

The Solace Sink connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Solace Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

JMS message formats

The format of outgoing JMS Message values is configured with the jms.message.format property, using one of the following options:

string (default)

When using the string Message Format, record values are run through Values.convertToString(...) from the Connect Data package and produced as a JMS TextMessage.

Primitive values are converted to their string equivalent and structured objects are transformed to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted.

Tip

Single Message Transformation can be used with the configured jms.message.format to transform the record value to the desired string representation before the connector processes each record.

avro

Record values are serialized without the Avro schema information and produced as a JMS BytesMessage. JMS consumers must have the schema to deserialize the data.

Important

The connector attempts to infer the Avro schema for records that have no schema. If the connector cannot infer the schema, the task is killed. If you are processing data without a schema, consider using one of the other jms.message.format configurations.

json

Record values are converted to a UTF-8 encoded JSON representation and produced as a JMS TextMessage.

bytes

Record values are passed along in bytes form without any conversion.

Important

Record values must be converted to bytes form before the connector processes them. Configure the value.converter property to org.apache.kafka.connect.converters.ByteArrayConverter to ensure that record values arrive in byte format.

Kafka property forwarding to JMS

The connector can be configured to forward various values from the Kafka record to the JMS Message.

  • Enable jms.forward.kafka.key to convert the record’s key to a string and forward it as the JMSCorrelationID.
  • Enable jms.forward.kafka.metadata to forward the record’s topic, partition, and offset on JMS Message properties.
    • Kafka topic is applied to the message as a string property named KAFKA_TOPIC.
    • Partition is applied to the message as an Int property named KAFKA_PARTITION.
    • Offset is applied to the message as a Long property named KAFKA_OFFSET.
  • Enable jms.forward.kafka.headers to add each header from the SinkRecord to the JMS Message as a string property.

Note

The connector converts the record key and headers to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays), which are unquoted. No other conversion is done to the key and headers before forwarding them on to the JMS Message. If another format is needed, out-of-the-box or custom Single Message Transformation can be used with the connector to transform the record keys and/or headers to the desired string representation before the JMS sink connector processes each record.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration

For a complete list of configuration properties for this connector, see Solace Sink Connector Configuration Properties.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the Solace Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later

  • Connect: Confluent Platform 4.1.0 or later, or Kafka 1.1.0 or later (requires header support in Connect)

  • Solace cluster with JMS 1.1 support

  • com.solacesystems:sol-jms Client Library. For more details, see Solace Client Library

  • Java 1.8

  • An install of the Confluent Hub Client

    Note

    This is installed by default with Confluent Enterprise

  • An install of the latest (latest) connector version

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-solace-sink:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-solace-sink:1.0.0-preview
    

Solace Client Library

The Kafka Connect Solace connector does not come with the Solace JMS client library.

If you are running a multi-node Connect cluster, the Solace connector and Solace JMS client JAR must be installed on every Connect worker in the cluster. See below for details.

Install the Solace JMS Client Library

This connector relies on a provided com.solacesystems:sol-jms client JAR distributed by Solace. The connector will fail to create a connection to Solace if you have not installed the JAR on each Connect worker node.

The installation steps are:

  1. Download the Solace JMS API. Additional versions are available on Maven.
  2. Unzip the download and copy only the lib/sol-jms-{version}.jar file into the share/java/kafka-connect-solace-sink directory of your Confluent Platform installation on each worker node. If downloading the library from Maven, you do not need to unzip anything as the jar file is the only artifact downloaded.
  3. Restart all of the Connect worker nodes.

Note

The share/java/kafka-connect-solace-sink directory mentioned above is for Confluent Platform. If you are using a different installation, find the location of the Confluent Solace sink connector JAR files and place the sol-jms JAR file into the same directory.

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick Start

This quick start uses the Solace Sink connector to consume records from Kafka and send them to a Solace PubSub+ Standard broker.

  1. Start a Solace PubSub+ Standard docker container.

    docker run -d --name "solace" --hostname "solace" \
      -p 8080:8080 -p 55555:55555 -p 5550:5550 \
      --shm-size=1000000000 \
      --tmpfs /dev/shm \
      --ulimit nofile=2448:38048 \
      -e username_admin_globalaccesslevel=admin \
      -e username_admin_password=admin \
      -e system_scaling_maxconnectioncount=100 \
      solace/solace-pubsub-standard:9.1.0.77
    
  2. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent connect plugin install confluentinc/kafka-connect-solace-sink:latest
    
  3. Install the Solace JMS Client Library.

  4. Start the Confluent Platform.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local start
    
  5. Produce test data to the sink-messages topic in Kafka.

    seq 10 | confluent local produce sink-messages
    
  6. Create a solace-sink.json file with the following contents:

    {
      "name": "SolaceSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.SolaceSinkConnector",
        "tasks.max": "1",
        "topics": "sink-messages",
        "solace.host": "smf://localhost:55555",
        "solace.username": "admin",
        "solace.password": "admin",
        "solace.dynamic.durables": "true",
        "jms.destination.type": "queue",
        "jms.destination.name": "connector-quickstart",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
    
  7. Load the Solace Sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local load solace --config solace-sink.json
    
  8. Confirm the connector is in a RUNNING state.

    confluent local status solace
    
  9. Navigate to the Solace UI to confirm the messages were delivered to the connector-quickstart queue in the default Message VPN.

    Tip

    The default credentials for the Solace Admin UI are admin/admin