Solace Source Connector for Confluent Platform¶
The Kafka Connect Solace Source connector is used to move messages from a Solace PubSub+ cluster to Apache Kafka®.
Messages are consumed from the Solace broker using the configured message selectors and written to a single Kafka topic. A Single Message Transformation can be used to route messages to multiple Kafka topics.
Solace PubSub+ uses the Solace Message Format (SMF) protocol, a proprietary
binary message format, for client and message broker communications. To ensure
compatibility with the Solace Source connector, messages should be published to
Solace using SMF or they may not have a MessageID
(required for JMS). For
more information, see the Solace documentation on Message Components.
Note
If you are required to use the Java Naming and Directory Interface™ (JNDI) to connect to Solace, there is a general JMS Source connector for Confluent Platform available that uses a JNDI-based mechanism to connect to the JMS broker.
Features¶
The Solace Source connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Multiple tasks¶
The Solace Source connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
JMS messages¶
The connector currently supports consuming the following JMS messages:
The connector does not support ObjectMessage or StreamMessage.
Schemas¶
The Solace Source connector produces messages with keys and values that adhere to the schemas described in the following sections.
io.confluent.connect.jms.Key¶
This schema is used to store the incoming MessageID on the message interface. This will ensure that when the same message ID arrives it will end up in the same partition. In practice this should never occur. The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). |
io.confluent.connect.jms.Value¶
This schema is used to store the value of the JMS message. The schema defines the following fields:
Name | Schemna | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). | |
messageType | STRING | yes | This field stores the type of message that was received. This corresponds
to the sub-interfaces of Message.
BytesMessage =
bytes , MapMessage = map ,
ObjectMessage =
object , StreamMessage =
stream and TextMessage =
text . The corresponding field will be populated with the values from the
respective message sub-interface. |
|
timestamp | INT64 | yes | Data from the getJMSTimestamp() method. | |
deliveryMode | INT32 | yes | This field stores the value of Message.getJMSDeliveryMode(). method. | |
correlationID | STRING | no | This field stores the value of Message.getJMSCorrelationID(). method. | |
replyTo | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
destination | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
redelivered | BOOLEAN | yes | This field stores the value of Message.getJMSRedelivered(). | |
type | STRING | no | This field stores the value of Message.getJMSType(). | |
expiration | INT64 | no | This field stores the value of Message.getJMSExpiration(). | |
priority | INT32 | no | This field stores the value of Message.getJMSPriority(). | |
properties | Map of STRING, PropertyValue | yes | This field stores the data from all of the properties for the Message indexed by their propertyName. | |
bytes | BYTES | no | This field stores the value from BytesMessage.html.readBytes(byte[]). | |
map | Map of STRING, PropertyValue | no | This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key. | |
text | STRING | no | This field stores the value from TextMessage.html.getText(). |
io.confluent.connect.jms.Destination¶
This schema is used to represent a JMS Destination, and is either queue or topic.
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
destinationType | STRING | yes | The type of JMS Destination, and either queue or topic . |
|
name | STRING | yes | The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue¶
This schema is used to store the data that is found in the properties of the
message. To ensure that the proper type mappings are preserved field
propertyType
stores the value type for the field. The corresponding field in
the schema will contain the data for the property. This ensures that the data is
retrievable as the type returned by Message.getObjectProperty().
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
propertyType | STRING | yes | The Java type of the property on the Message. One of boolean , byte ,
short , integer , long , float , double , or string . |
|
boolean | BOOLEAN | no | The value stored as a boolean. Null unless propertyType is set to boolean . |
|
byte | INT8 | no | The value stored as a byte. Null unless propertyType is set to byte . |
|
short | INT16 | no | The value stored as a short. Null unless propertyType is set to short . |
|
integer | INT32 | no | The value stored as a integer. Null unless propertyType is set to integer . |
|
long | INT64 | no | The value stored as a long. Null unless propertyType is set to long . |
|
float | FLOAT32 | no | The value stored as a float. Null unless propertyType is set to float . |
|
double | FLOAT64 | no | The value stored as a double. Null unless propertyType is set to double . |
|
string | STRING | no | The value stored as a string. Null unless propertyType is set to string . |
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration¶
For a complete list of configuration properties for this connector, see Configuration Reference for Solace Source Connector for Confluent Platform. To understand how the connector internally configures the acknowledgement mode, see the following section.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Acknowledgement mode¶
The connector internally uses CLIENT_ACKNOWLEDGE
mode to receive and
acknowledge messages from the JMS broker. In this mode, acknowledging any
message will acknowledge every message received (see section 6.2.10 in the JMS
2.0 Specification). To prevent
messages from being prematurely acknowledged, the connector processes only one
message at time. In other words, the connector will not try to receive new
messages until the last message is committed to a Kafka topic. This might
compromise the throughput of the connector, but messages will be transferred to
Kafka successfully.
Install the Solace Source Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
Connect: Confluent Platform 4.1.0 or later, or Kafka 1.1.0 or later (requires header support in Connect)
Solace cluster with JMS 1.1 support
com.solacesystems:sol-jms
Client Library. For more details, see Install the Solace JMS Client LibraryJava 1.8
An install of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-solace-source:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-solace-source:1.0.0-preview
Solace Client Library¶
The Kafka Connect Solace Source connector does not come with the Solace JMS client library.
If you are running a multi-node Connect cluster, the connector and Solace JMS client JAR must be installed on every Connect worker in the cluster. See below for details.
Install the Solace JMS Client Library¶
This connector relies on a provided com.solacesystems:sol-jms
client JAR
distributed by Solace. The connector will fail to create a connection to Solace
if you have not installed the JAR on each Connect worker node.
The installation steps are:
Download the Solace JMS API.
Note
This connector is only compatible with 10.19.0 version of Solace JMS API.
Unzip the download and copy only the
lib/sol-jms-{version}.jar
file into theshare/java/kafka-connect-solace-source
directory of your Confluent Platform installation on each worker node. If downloading the library from Maven, you do not need to unzip anything as the jar file is the only artifact downloaded.Restart all of the Connect worker nodes.
Note
The share/java/kafka-connect-solace-source
directory mentioned above is
for Confluent Platform. If you are using a different installation, find the location of
the Confluent Solace source connector JAR files and place the sol-jms
JAR file into the same directory.
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
This quick start uses the Solace Source connector to consume records from a Solace PubSub+ Standard broker and send them to Kafka.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent connect plugin install confluentinc/kafka-connect-solace-source:latest
Start the Confluent Platform.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local services start
. For more information, see confluent local.confluent local start
Start a Solace PubSub+ Standard docker container.
docker run -d --name "solace" \ -p 8080:8080 -p 55555:55555 -p 9000:9000 \ --shm-size=1000000000 \ --tmpfs /dev/shm \ --ulimit nofile=2448:38048 \ -e username_admin_globalaccesslevel=admin \ -e username_admin_password=admin \ solace/solace-pubsub-standard:9.1.0.77
Once the Solace docker container has started, navigate to the Solace UI and configure a
connector-quickstart
queue in theDefault
message VPN.Publish messages to the Solace queue using the REST endpoint.
curl -X POST -d "m1" http://localhost:9000/Queue/connector-quickstart -H "Content-Type: text/plain" -H "Solace-Message-ID: 1000" # repeat the above command to send additional messages (change the Solace-Message-ID header on each message)
Create a
solace-source.json
file with the following contents:{ "name": "SolaceSourceConnector", "config": { "connector.class": "io.confluent.connect.solace.SolaceSourceConnector", "tasks.max": "1", "kafka.topic": "from-solace-messages", "solace.host": "smf://localhost:55555", "solace.username": "admin", "solace.password": "admin", "jms.destination.type": "queue", "jms.destination.name": "connector-quickstart", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } }
Load the Solace Source connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local load solace --config solace-source.json
Confirm the connector is in a
RUNNING
state.confluent local status SolaceSourceConnector
Confirm the messages were delivered to the
from-solace-messages
topic in Kafka.kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic from-solace-messages --from-beginning