Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
ActiveMQ Sink Connector for Confluent Platform¶
The Kafka Connect ActiveMQ Sink Connector is used to move messages from Apache Kafka® to an ActiveMQ cluster.
Note
If you are required to use the Java Naming and Directory Interface™ (JNDI) to connect to ActiveMQ, there is a general JMS Sink Connector available that uses a JNDI-based mechanism to connect to the JMS broker.
Prerequisites¶
The following are required to run the Kafka Connect ActiveMQ Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above (requires header support in Connect)
- ActiveMQ 5.x
- Java 1.8
Install the ActiveMQ Sink Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-activemq-sink:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-activemq-sink:1.0.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see ActiveMQ Sink Connector Configuration Properties.
Client Library JARs¶
The Kafka Connect ActiveMQ connector includes all of the client libraries required to work with ActiveMQ.
Note
The ActiveMQ Sink Connector uses the org.apache.activemq:activemq-client:5.14.4
client library.
Quick Start¶
This quick start uses the ActiveMQ Sink Connector to consume records from Kafka and send them to an ActiveMQ broker.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-activemq-sink:latest
Start Confluent Platform.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local start
. For more information, see confluent local.confluent local start
Produce test data to the
sink-messages
topic in Kafka.seq 10 | confluent local produce sink-messages
Create a
activemq-sink.json
file with the following contents:{ "name": "AMQSinkConnector", "config": { "connector.class": "io.confluent.connect.jms.ActiveMqSinkConnector", "tasks.max": "1", "topics": "sink-messages", "activemq.url": "tcp://localhost:61616", "activemq.username": "connectuser", "activemq.password": "connectuser", "jms.destination.type": "queue", "jms.destination.name": "connector-quickstart", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } }
Load the ActiveMQ Sink Connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local load jms -- -d activemq-sink.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local status AMQSinkConnector
Navigate to the ActiveMQ Admin UI or use the following ActiveMQ CLI command to confirm the messages were delivered to the
connector-quickstart
queue.Tip
The default credentials for the ActiveMQ Admin UI are
admin
/admin
./bin/activemq consumer --destination connector-quickstart --messageCount 10
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
JMS Message Formats¶
The format of outgoing JMS Message values is configured with the jms.message.format
property, using one of the following options:
string (default)¶
When using the string
Message Format, record values are run through Values.convertToString(...)
from the Connect Data package and produced as a JMS TextMessage
.
Primitive values are converted to their String equivalent and structured objects are transformed to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted.
Tip
Kafka Connect Transformations can be used with the configured jms.message.format
to transform the record value to the desired string representation before the connector processes each record.
avro¶
Record values are serialized without the Avro schema information and produced as a JMS BytesMessage
.
JMS consumers must have the schema to deserialize the data.
Important
The connector attempts to infer the Avro schema for records that have no schema.
If the connector cannot infer the schema, the task is killed. If you are processing
data without a schema, consider using one of the other jms.message.format
configurations.
json¶
Record values are converted to a UTF-8 encoded JSON representation and produced as a JMS TextMessage
.
bytes¶
Record values are passed along in bytes form without any conversion.
Important
Record values must be converted to bytes form before the connector processes them.
Configure the value.converter
property to org.apache.kafka.connect.converters.ByteArrayConverter
to ensure that record values arrive in byte format.
Forwarding Kafka Properties to JMS¶
The connector can be configured to forward various values from the Kafka record to the JMS Message.
- Enable
jms.forward.kafka.key
to convert the record’s key to a String and forward it as theJMSCorrelationID
. - Enable
jms.forward.kafka.metadata
to forward the record’s topic, partition, and offset on JMS Message properties.- Kafka topic is applied to the message as a String property named
KAFKA_TOPIC
. - Partition is applied to the message as an Int property named
KAFKA_PARTITION
. - Offset is applied to the message as a Long property named
KAFKA_OFFSET
.
- Kafka topic is applied to the message as a String property named
- Enable
jms.forward.kafka.headers
to add each header from the SinkRecord to the JMS Message as a String property.
Note
The connector converts the record key and headers to a sensible string representation that is similar to the JSON representation, with the exception of simple string values (not in objects or arrays) which are unquoted. No other conversion is done to the key and headers before forwarding them on the JMS Message. If another format is needed, out-of-the-box or custom Kafka Connect Transformations can be used with the connector to transform the record keys and/or headers to the desired string representation before the JMS sink connector processes each record.