Weblogic JMS Source Connector for Confluent Platform
The Kafka Connect Weblogic JMS Source Connector is used to read messages from an
Oracle Weblogic JMS Server and write them into
an Apache Kafka® topic.
Note
Confluent Platform also includes a general JMS Source Connector for
Confluent Platform that uses
a JNDI-based mechanism to connect to the JMS broker. The Weblogic connector also connects using
JNDI, but includes special support for JMS 2.0 Shared Subscriptions.
Features
The Weblogic JMS Source Connector includes the following features:
At least once delivery
This connector guarantees that records are delivered at least once to the Kafka
topic. If the connector restarts, there may be some duplicate
records in the Kafka topic.
Retries and reconnection
The connector can be configured to retry on retriable errors using the
max.retry.time
configuration property. This property sets the maximum time
in milliseconds (ms) the connector will attempt to retry. The property defaults
to 3600000 ms (1 hour). The connector uses exponential backoff after each retry attempt.
That is, each subsequent retry attempt interval increases exponentially with jitter. The Weblogic
JMS connector will attempt a retry for the following exceptions during connection/receive attempts:
javax.jms.IllegalStateException
weblogic.jms.common.JMSExceptionIllegalStateException
The IllegalStateException is thrown when a receive is attempted but the server is down. The JMSException is
thrown when a connection is attempted and the server is down. Note that these retries will only activate if the
connector was already successfully provisioned, and then at some point later on connection/receive fails. That is,
a failure to connect during provisioning will not result in retries.
Jms 2.0 Shared Subscription Support
This connector supports consuming from JMS 2.0 shared subscriptions.
Note that as JMS 2.0 support only begins in Weblogic 12.2.1.3, this connector only officially supports Weblogic versions >= 12.2.1.3. Further,
there is an implementation incompatibility in Oracle’s Weblogic 12.2.1.3 that limits connectors using shared subscriptions to have a maximum tasks.max
connector configuration of 1.
If you wish to leverage JMS shared subscriptions and have Weblogic 12.2.1.3, one option is to use multiple connectors each with tasks.max
set to 1, with each
connector subscribed to the same JMS topic.
When connecting to Weblogic versions > 12.2.1.3, the connector can use more than one task during a shared subscription and each task will be a
JMS shared consumer of the Weblogic JMS topic.
Install the Weblogic Source Connector
You can install this connector by using the Confluent Hub client installation
instructions or by
manually downloading the ZIP file.
Prerequisites
- Kafka 1.1+ or Confluent Platform 4.1+
- Oracle Weblogic v12.2.1.3+
Note
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (latest
) connector version.
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent-hub install confluentinc/kafka-connect-weblogic:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent-hub install confluentinc/kafka-connect-weblogic:11.0.0
Client Libraries
To use the Kafka Connect Weblogic JMS Source connector, you must download the Weblogic JMS
client library JAR files. Complete the following steps to get these JAR files.
Note
The exact list of JAR files from the Weblogic client library may vary depending on
the Java software version.
Follow the instructions at the Weblogic Support page Fusion Middleware Programming Stand-alone Clients for Oracle WebLogic Server.
The page contains an overview of the available clients and how to install them. The connector is designed to be compatible with
the Weblogic JMS thin client(wlthint3client.jar) but may be compatible with others, depending on your Weblogic Server version and which JARs it provides.
Copy all of the required JAR files to the correct folder in /share/
:
cp WL_HOME/server/lib/wlthint3client.jar /$CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-weblogic/
Important
These JAR files need to be copied for each of your Confluent Platform installations
where you want to run this connector.
List the JAR files to verify that they were copied successfully.
ls -l /$CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-weblogic/
Ensure you have the wlthint3client.jar
file.
Connecting to Weblogic Source
Before you can use this connector, you must install the Weblogic client JARs into
this connector’s installation directory.
This connector connects to Weblogic using JNDI to create
an instance of the JMS ConnectionFactory for
your messaging system. See the section “Setting Up JNDI Environment Properties for the InitialContext” under Oracle’s
Weblogic JNDI documentation.
The following example shows a typical configuration of the connector for use
with distributed mode:
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.weblogic.WeblogicSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"java.naming.factory.initial": "weblogic.jndi.WLInitialContextFactory",
"jms.destination.name":"MyWeblogicTopic",
"jms.destination.type":"topic",
"java.naming.provider.url": "t3://<host>:7001/",
"connection.factory.name": "com.oracle.webservices.api.jms.ConnectionFactory",
"confluent.license": "",
"java.naming.security.principal": "weblogic",
"java.naming.security.credentials": "welcome1",
"tasks.max" : "1",
"jms.client.id": "id1",
"jms.subscription.durable": true,
"jms.subscription.shared": true,
"jms.subscription.name": "sub1",
"confluent.topic.bootstrap.servers":"localhost:9092"
}
}
The connector supports other configuration options not included in the example above.
Topics
This connector consumes messages from the JMS broker using the configured
message selectors and writes them to a
single Kafka topic. If you want to write messages to multiple topics, use a
simple message transform that routes the messages based upon your criteria.
Acknowledgement Mode
The connector internally uses CLIENT_ACKNOWLEDGE
mode to receive and
acknowledge messages from the JMS broker. In this mode, acknowledging any
message will acknowledge every message received (see section 6.2.10 in the JMS
Spec). To
prevent messages from being prematurely acknowledged, the connector processes
only one message at time. In other words, the connector will not attempt to
receive new messages until the last message is committed to a Kafka topic. This
might compromise the throughput of the Connector, but messages will be
transferred to Kafka successfully.
Schemas
The Weblogic connector produces messages with keys and values that adhere to the
schemas described in the following sections.
io.confluent.connect.jms.Key
This schema is used to store the incoming MessageID on the message interface.
This will ensure that when that if the same message ID arrives it will end up in
the same partition. In practice this should never occur. The schema defines the
following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
messageID |
STRING |
yes |
|
This field stores the value of
Message.getJMSMessageID(). |
io.confluent.connect.jms.Value
This schema is used to store the value of the JMS message. The schema defines
the following fields:
io.confluent.connect.jms.Destination
This schema is used to represent a JMS Destination, and is either queue or topic. The schema
defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
destinationType |
STRING |
yes |
|
The type of JMS Destination, and either queue or topic . |
name |
STRING |
yes |
|
The name of the destination. This will be the value of
Queue.getQueueName()
or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue
This schema is used to store the data that is found in the properties of the
message. To ensure that the proper type mappings are preserved field
propertyType
stores the value type for the field. The corresponding field in
the schema will contain the data for the property. This ensures that the data is
retrievable as the type returned by Message.getObjectProperty().
The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
propertyType |
STRING |
yes |
|
The Java type of the property on the Message. One of boolean , byte ,
short , integer , long , float , double , or string . |
boolean |
BOOLEAN |
no |
|
The value stored as a boolean. Null unless propertyType is set to boolean . |
byte |
INT8 |
no |
|
The value stored as a byte. Null unless propertyType is set to byte . |
short |
INT16 |
no |
|
The value stored as a short. Null unless propertyType is set to short . |
integer |
INT32 |
no |
|
The value stored as a integer. Null unless propertyType is set to integer . |
long |
INT64 |
no |
|
The value stored as a long. Null unless propertyType is set to long . |
float |
FLOAT32 |
no |
|
The value stored as a float. Null unless propertyType is set to float . |
double |
FLOAT64 |
no |
|
The value stored as a double. Null unless propertyType is set to double . |
string |
STRING |
no |
|
The value stored as a string. Null unless propertyType is set to string . |