WebLogic JMS Source Connector for Confluent Platform¶
Caution
Preview connectors aren’t currently supported, nor are they recommended for production use.
The Kafka Connect WebLogic JMS Source connector is used to read messages from an Oracle WebLogic JMS Server and write them into an Apache Kafka® topic.
Note
Confluent Platform also includes a general JMS Source connector for Confluent Platform that uses a JNDI-based mechanism to connect to the JMS broker. The WebLogic connector also connects using JNDI, but includes special support for JMS 2.0 Shared Subscriptions.
Features¶
The WebLogic JMS Source connector includes the following features:
- At least once delivery
- JMS message types
- Retries and reconnection
- JMS 2.0 Shared Subscription support
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
JMS message types¶
The connector currently supports TextMessage and BytesMessage. The connector does not currently support ObjectMessage or StreamMessage.
Retries and reconnection¶
The connector can be configured to retry on retriable errors using the
max.retry.time
configuration property. This property sets the maximum time
in milliseconds (ms) the connector will attempt to retry. The property defaults
to 3600000 ms (1 hour). The connector uses exponential backoff after each retry attempt.
That is, each subsequent retry attempt interval increases exponentially with jitter. The WebLogic
JMS connector will attempt a retry for the following exceptions during connection/receive attempts:
javax.jms.IllegalStateException
weblogic.jms.common.JMSExceptionIllegalStateException
The IllegalStateException is thrown when a receive is attempted but the server is down. The JMSException is thrown when a connection is attempted and the server is down. Note that these retries will only activate if the connector was already successfully provisioned, and then at some point later on connection/receive fails. That is, a failure to connect during provisioning will not result in retries.
Limitations¶
The WebLogic JMS Source connector doesn’t support the Java Naming and Directory Interface (JNDI) lookup service.
Installing the WebLogic Source Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- Kafka 1.1+ or Confluent Platform 4.1+
- Oracle Weblogic v12.2.1.3+
- An installation of the Weblogic JMS client library JAR files. For help with downloading the JAR files, see the Client Libraries section.
Installing the connector using Confluent Hub¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent-hub install confluentinc/kafka-connect-weblogic:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent-hub install confluentinc/kafka-connect-weblogic:12.2.0
Installing the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent license properties for license properties and information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for WebLogic JMS Source Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Client Libraries¶
To use the Kafka Connect WebLogic JMS Source connector, you must download the WebLogic JMS client library JAR files. Complete the following steps to get these JAR files.
Note
The exact list of JAR files from the WebLogic client library may vary depending on the Java software version.
Follow the instructions at the WebLogic Support page Fusion Middleware Programming Stand-alone Clients for Oracle WebLogic Server. The page contains an overview of the available clients and how to install them. The connector is designed to be compatible with the WebLogic JMS thin client(wlthint3client.jar) but may be compatible with others, depending on your Weblogic Server version and which JARs it provides.
Copy all of the required JAR files to the correct folder in
/share/
:cp WL_HOME/server/lib/wlthint3client.jar /$CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-weblogic/
Important
These JAR files need to be copied for each of your Confluent Platform installations where you want to run this connector.
List the JAR files to verify that they were copied successfully.
ls -l /$CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-weblogic/
Ensure you have the
wlthint3client.jar
file.
Connecting to WebLogic Source¶
Before you can use this connector, you must install the WebLogic client JARs into this connector’s installation directory.
This connector connects to WebLogic using JNDI to create an instance of the JMS ConnectionFactory for your messaging system. See the section “Setting Up JNDI Environment Properties for the InitialContext” under Oracle’s WebLogic JNDI documentation.
The following example shows a typical configuration of the connector for use with distributed mode:
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.weblogic.WeblogicSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"java.naming.factory.initial": "weblogic.jndi.WLInitialContextFactory",
"jms.destination.name":"MyWeblogicTopic",
"jms.destination.type":"topic",
"java.naming.provider.url": "t3://<host>:7001/",
"connection.factory.name": "com.oracle.webservices.api.jms.ConnectionFactory",
"confluent.license": "",
"java.naming.security.principal": "weblogic",
"java.naming.security.credentials": "welcome1",
"tasks.max" : "1",
"jms.client.id": "id1",
"jms.subscription.durable": true,
"jms.subscription.shared": true,
"jms.subscription.name": "sub1",
"confluent.topic.bootstrap.servers":"localhost:9092"
}
}
The connector supports other configuration options not included in the example above.
Topics¶
This connector consumes messages from the JMS broker using the configured message selectors and writes them to a single Kafka topic. If you want to write messages to multiple topics, use a simple message transform that routes the messages based upon your criteria.
Acknowledgement Mode¶
The connector internally uses CLIENT_ACKNOWLEDGE
mode to receive and
acknowledge messages from the JMS broker. In this mode, acknowledging any
message will acknowledge every message received (see section 6.2.10 in the JMS
Spec). To
prevent messages from being prematurely acknowledged, the connector processes
only one message at time. In other words, the connector will not attempt to
receive new messages until the last message is committed to a Kafka topic. This
might compromise the throughput of the Connector, but messages will be
transferred to Kafka successfully.
Schemas¶
The WebLogic connector produces messages with keys and values that adhere to the schemas described in the following sections.
io.confluent.connect.jms.Key¶
This schema is used to store the incoming MessageID on the message interface. This will ensure that when that if the same message ID arrives it will end up in the same partition. In practice this should never occur. The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). |
io.confluent.connect.jms.Value¶
This schema is used to store the value of the JMS message. The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
messageID | STRING | yes | This field stores the value of Message.getJMSMessageID(). | |
messageType | STRING | yes | This field stores the type of message that was received. This corresponds
to the sub-interfaces of Message.
BytesMessage =
bytes , MapMessage = map ,
ObjectMessage =
object , StreamMessage =
stream and TextMessage =
text . The corresponding field will be populated with the values from the
respective message sub-interface. |
|
timestamp | INT64 | yes | Data from the getJMSTimestamp() method. | |
deliveryMode | INT32 | yes | This field stores the value of Message.getJMSDeliveryMode(). method. | |
correlationID | STRING | no | This field stores the value of Message.getJMSCorrelationID(). method. | |
replyTo | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
destination | Destination | no | This schema is used to represent a JMS Destination, and is either queue or topic. | |
redelivered | BOOLEAN | yes | This field stores the value of Message.getJMSRedelivered(). | |
type | STRING | no | This field stores the value of Message.getJMSType(). | |
expiration | INT64 | no | This field stores the value of Message.getJMSExpiration(). | |
priority | INT32 | no | This field stores the value of Message.getJMSPriority(). | |
properties | Map of STRING, PropertyValue | yes | This field stores the data from all of the properties for the Message indexed by their propertyName. | |
bytes | BYTES | no | This field stores the value from BytesMessage.html.readBytes(byte[]). | |
map | Map of STRING, PropertyValue | no | This field stores the data from all of the map entries returned from MapMessage.getMapNames() for the Message indexed by their key. | |
text | STRING | no | This field stores the value from TextMessage.html.getText(). |
io.confluent.connect.jms.Destination¶
This schema is used to represent a JMS Destination, and is either queue or topic. The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
destinationType | STRING | yes | The type of JMS Destination, and either queue or topic . |
|
name | STRING | yes | The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue¶
This schema is used to store the data that is found in the properties of the
message. To ensure that the proper type mappings are preserved field
propertyType
stores the value type for the field. The corresponding field in
the schema will contain the data for the property. This ensures that the data is
retrievable as the type returned by Message.getObjectProperty().
The schema defines the following fields:
Name | Schema | Required | Default Value | Documentation |
---|---|---|---|---|
propertyType | STRING | yes | The Java type of the property on the Message. One of boolean , byte ,
short , integer , long , float , double , or string . |
|
boolean | BOOLEAN | no | The value stored as a boolean. Null unless propertyType is set to boolean . |
|
byte | INT8 | no | The value stored as a byte. Null unless propertyType is set to byte . |
|
short | INT16 | no | The value stored as a short. Null unless propertyType is set to short . |
|
integer | INT32 | no | The value stored as a integer. Null unless propertyType is set to integer . |
|
long | INT64 | no | The value stored as a long. Null unless propertyType is set to long . |
|
float | FLOAT32 | no | The value stored as a float. Null unless propertyType is set to float . |
|
double | FLOAT64 | no | The value stored as a double. Null unless propertyType is set to double . |
|
string | STRING | no | The value stored as a string. Null unless propertyType is set to string . |