Kafka Connect Solace Source Connector

The Solace Source Connector is used to move messages from a Solace PubSub+ cluster to Kafka.

Messages are consumed from the Solace broker using the configured message selectors and written to a single Kafka topic. A Kafka Connect Transformations can be used to route messages to multiple Kafka topics.

Solace PubSub+ uses the Solace Message Format (SMF) protocol, a proprietary binary message format, for client and message broker communications. To ensure compatibility with the Solace Source Connector, messages should be published to Solace using SMF or they may not have a MessageID (required for JMS). For more information, see the Solace documentation on Message Components.

The connector currently supports consuming JMS TextMessage and BytesMessage but not ObjectMessage or StreamMessage.

Note

If you are required to use the Java Naming and Directory Interface™ (JNDI) to connect to Solace, there is a general JMS Source Connector available that uses a JNDI-based mechanism to connect to the JMS broker.

Prerequisites

The following are required to run the Kafka Connect Solace Source Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above (requires header support in Connect)
  • Solace cluster with JMS 1.1 support
  • com.solacesystems:sol-jms Client Library (See Installing the Solace JMS Client Library)
  • Java 1.8

Install Solace Source Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-solace-source:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-solace-source:1.0.0-preview

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Solace Client Library

The Kafka Connect Solace Source Connector does not come with the Solace JMS client library.

If you are running a multi-node Connect cluster, the connector and Solace JMS client JAR must be installed on every Connect worker in the cluster. See below for details.

Installing the Solace JMS Client Library

This connector relies on a provided com.solacesystems:sol-jms client JAR distributed by Solace. The connector will fail to create a connection to Solace if you have not installed the JAR on each Connect worker node.

The installation steps are:

  1. Download the Solace JMS API. Additional versions are available on Maven.
  2. Unzip the download and copy only the lib/sol-jms-{version}.jar file into the share/java/kafka-connect-solace-source directory of your Confluent Platform installation on each worker node. If downloading the library from Maven, you do not need to unzip anything as the jar file is the only artifact downloaded.
  3. Restart all of the Connect worker nodes.

Note

The share/java/kafka-connect-solace-source directory mentioned above is for Confluent Platform. If you are using a different installation, find the location of the Confluent Solace source connector JAR files and place the sol-jms JAR file into the same directory.

Schemas

The connector produces Kafka messages with keys and values that adhere to the schemas described in the following sections.

io.confluent.connect.jms.Key

This schema stores the incoming MessageID on the message interface. This ensures that if the same message ID arrives, which is unlikely, it will end up in the same Kafka partition.

The schema defines the following fields:

Name Schema Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().

io.confluent.connect.jms.Value

This schema stores the value of the JMS message.

The schema defines the following fields:

Name Schema Required Default Value Documentation
messageID STRING yes   This field stores the value of Message.getJMSMessageID().
messageType STRING yes   This field stores the type of message that was received. This corresponds to the subinterfaces of Message. BytesMessage = bytes, MapMessage = map, ObjectMessage = object, StreamMessage = stream and TextMessage = text. The corresponding field will be populated with the values from the respective Message subinterface.
timestamp INT64 yes   Data from the getJMSTimestamp() method.
deliveryMode INT32 yes   This field stores the value of Message.getJMSDeliveryMode().
correlationID STRING no   This field stores the value of Message.getJMSCorrelationID().
replyTo Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
destination Destination no   This schema is used to represent a JMS Destination, and is either queue or topic.
redelivered BOOLEAN yes   This field stores the value of Message.getJMSRedelivered().
type STRING no   This field stores the value of Message.getJMSType().
expiration INT64 yes   This field stores the value of Message.getJMSExpiration().
priority INT32 yes   This field stores the value of Message.getJMSPriority().
properties Map of STRING, PropertyValue yes   This field stores the data from all of the properties for the Message indexed by their propertyName.
bytes BYTES no   This field stores the value from BytesMessage.html.readBytes(byte[]).
map Map of STRING, PropertyValue no   This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key.
text STRING no   This field stores the value from TextMessage.html.getText().

io.confluent.connect.jms.Destination

This schema represents a JMS Destination, and is either queue or topic.

The schema defines the following fields:

Name Schema Required Default Value Documentation
destinationType STRING yes   The type of JMS Destination, and either queue or topic.
name STRING yes   The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName().

io.confluent.connect.jms.PropertyValue

This schema stores the data found in the properties of the message. To ensure that type mappings are preserved, propertyType stores the type of the field. The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by Message.getObjectProperty().

The schema defines the following fields:

Name Schema Required Default Value Documentation
propertyType STRING yes   The java type of the property on the Message. One of boolean, byte, short, integer, long, float, double, or string.
boolean BOOLEAN no   The value stored as a boolean. Null unless propertyType is set to boolean.
byte INT8 no   The value stored as a byte. Null unless propertyType is set to byte.
short INT16 no   The value stored as a short. Null unless propertyType is set to short.
integer INT32 no   The value stored as a integer. Null unless propertyType is set to integer.
long INT64 no   The value stored as a long. Null unless propertyType is set to long.
float FLOAT32 no   The value stored as a float. Null unless propertyType is set to float.
double FLOAT64 no   The value stored as a double. Null unless propertyType is set to double.
string STRING no   The value stored as a string. Null unless propertyType is set to string.

Quick Start

This quick start uses the Solace Source Connector to consume records from a Solace PubSub+ Standard broker and send them to Kafka.

  1. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-solace-source:latest
    
  2. Install the Solace JMS Client Library.

  3. Start the Confluent Platform.

    confluent start
    
  4. Start a Solace PubSub+ Standard docker container.

    docker run -d --name "solace" \
      -p 8080:8080 -p 55555:55555 -p 9000:9000 \
      --shm-size=1000000000 \
      --tmpfs /dev/shm \
      --ulimit nofile=2448:38048 \
      -e username_admin_globalaccesslevel=admin \
      -e username_admin_password=admin \
      solace/solace-pubsub-standard:9.1.0.77
    
  5. Once the Solace docker container has started, navigate to the Solace UI and configure a connector-quickstart queue in the Default Message VPN.

  6. Publish messages to the Solace queue using the REST endpoint.

    curl -X POST -d "m1" http://localhost:9000/Queue/connector-quickstart -H "Content-Type: text/plain" -H "Solace-Message-ID: 1000"
    
    # repeat the above command to send additional messages (change the Solace-Message-ID header on each message)
    
  7. Create a solace-source.json file with the following contents:

    {
      "name": "SolaceSourceConnector",
      "config": {
        "connector.class": "io.confluent.connect.solace.SolaceSourceConnector",
        "tasks.max": "1",
        "kafka.topic": "from-solace-messages",
        "solace.host": "smf://localhost:55555",
        "solace.username": "admin",
        "solace.password": "admin",
        "jms.destination.type": "queue",
        "jms.destination.name": "connector-quickstart",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
    
  8. Load the Solace Source Connector.

    confluent load solace​ -d solace-source.json
    
  9. Confirm that the connector is in a RUNNING state.

    confluent status SolaceSourceConnector
    
  10. Confirm the messages were delivered to the from-solace-messages topic in Kafka.

    confluent consume from-solace-messages --from-beginning
    

Additional Documentation