Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
AMPS Source Connector for Confluent Platform¶
The Kafka Connect Advanced Message Processing System (AMPS) Source connector allows you to export data from AMPS to Apache Kafka®. The connector subscribes to messages from an AMPS topic and writes this data to a Kafka topic.
Features¶
The AMPS Source Connector supports the following features:
- The AMPS message data is sent in its
ByteArray
format as Kafka records values to Kafka. It also optionally forwards AMPS message headers as Kafka record headers. One can implement a customConverter
or use Single Message Transforms (SMTs) to convertByteArray
values to any compatible format. - Any Kafka record produced by the connector has the same timestamp as the corresponding AMPS message, which translates to the time when AMPS processed the message. If such a processed time isn’t available, the connector uses Kafka’s current system time. For example, the current system time is used for AMP Out of Focus (
oof
) messages. - The connector guarantees that messages from the AMPS topic are delivered at least once to the corresponding Kafka topic. In the case of a connector restart, there may be duplicate records in the Kafka topic.
- The connector supports Kerberos authenticated AMPS installations.
State of the World (SOW) topic mapping¶
The messages from a SOW topic in AMPS are modeled similarly to a KTable. More precisely, the value in a record should be interpreted as an UPDATE of the last value for the same record key, if applicable. If the same record key doesn’t exist, the update should be considered an INSERT.
Additionally, null
record values, or tombstone records, are interpreted as
DELETE for the record’s key. Note that the oof
option must be set in the
subscription in order to record message deletes on AMPS. See config
property amps.options
for more details.
The SOW key from the AMPS message is used as the Kafka record key which facilitates this interpretation. Note that the corresponding Kafka topic log compacted should closely model a SOW topic in AMPS.
Note
Since AMPS doesn’t guarantee any particular order for SOW messages, no ordering can be guaranteed for SOW records in Kafka.
Transactional logged topic mapping¶
A message from a transactional logged topic in AMPS is modeled as is. The
value in a record is the AMPS message data. The key for an AMPS message from a
transactional logged topic is the AMPS topic name. This ensures all the messages
go to a single partition of Kafka. The connector supports two different bookmarks
to subscribe to such a AMPS topic - NOW
and EPOCH
. For more details, see
config property amps.bookmark
.
Limitations¶
The connector currently doesn’t support historical SOW queries. That is,
amps.topic.type
configuration can either
be SOW or Transactional logged.
Prerequisites¶
The following are required to run the Kafka Connect AMPS source connector:
- Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Java 1.8
- AMPS Java Client : Connector built with version 5.3.0.1; versions 5.3.x.x should work
- AMPS Server : Tested with version 5.3; versions 5.3.x.x recommended. Earlier versions should work because AMPS ensures backward compatibility, However, with very old versions there is the potential for missing features.
Install the AMPS Source Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-amps:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-amps:1.0.0-preview
If you are running a multi-node Connect cluster, the AMPS connector and AMPS Client JAR must be installed on every Connect worker in the cluster. See below for details.
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Installing the AMPS Client JAR¶
The AMPS Source connector uses the AMPS Java Client API to connect to AMPS. In order for this to work, the connector must have the AMPS Client for Java.
- Click Here to download AMPS Java Client, version 5.3.0.1.
- Place the JAR file into the
share/confluent-hub-components/confluentinc-kafka-connect-amps/lib
directory in your Confluent Platform installation on each of the Connect worker nodes. - Restart all of the Connect worker nodes.
Note
The share/confluent-hub-components/confluentinc-kafka-connect-amps/lib
directory referenced above is for Confluent Platform when this connector is installed
through Confluent Hub. If you are using a different installation, find the
location where the Confluent connector JAR files are located and place the
AMPS Client JAR file into the same directory.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see AMPS Source Connector Configuration Properties.
Quick Start¶
In this quick start guide, the AMPS Source connector is used to consume messages
from an SOW topic called Orders
on AMPS that has Kerberos authentication
enabled. It then sends these messages as records to a Kafka topic named
AMPS_Orders
with headers being forwarded from the AMPS messages.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Prerequisites:
Confluent Platform is installed and services are running by using the Confluent CLI Confluent CLI commands.
Note
This quick start assumes that you are using the Confluent CLI commands, but standalone installations are also supported. By default ZooKeeper, Apache Kafka®, Schema Registry, Kafka Connect REST API, and Kafka Connect are started with the
confluent local start
command.Kafka and Schema Registry are running locally on the default ports.
Install AMPS locally and publish messages¶
Get the AMPS distribution and follow the Getting Started guide to set up AMPS. Create the AMPS config file (for example
config.xml
) and set it to the following :<?xml version="1.0" encoding="UTF-8"?> <AMPSConfig> <Name>AMPS-Quick-Start</Name> <Admin> <InetAddr>localhost:8085</InetAddr> </Admin> <Transports> <Transport> <Name>any-tcp</Name> <Type>tcp</Type> <InetAddr>9007</InetAddr> <Protocol>amps</Protocol> </Transport> </Transports> <SOW> <Topic> <Name>Orders</Name> <FileName>./sow/%n.sow</FileName> <MessageType>json</MessageType> <Key>/id</Key> </Topic> </SOW> </AMPSConfig>
Run
bin/ampServer <path-to-config.xml>
from the AMPS installation folder to start the AMPS server.Use the
spark
utility provided by AMPS to quickly publish few records to theOrders
topic.# run from the AMPS installation folder $ bin/spark publish -server localhost:9007 -topic Orders -type json {"id": 1, "order": "Apples"} {"id": 2, "order": "Oranges"} total messages published: 2 (Infinity/s)
Start the AMPS source connector¶
Create a JSON file for the AMPS Source connector (for example,
amps-source.json
).// substitute <> with your config { "name": "amps-source-connector", "config": { "connector.class": "io.confluent.connect.amps.AmpsSourceConnector", "tasks.max": "1", "kafka.topic" : "AMPS_Orders", "forward.kafka.headers": "true", "amps.servers": "<amps-servers-uri>", "amps.topic": "Orders", "amps.topic.type": "sow", "amps.command": "sow_and_subscribe", "amps.kerberos.keytab": "<path-to-user-keytab>", "amps.kerberos.principal": "<user-principal>", "amps.options": "timestamp,oof", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": 1, "confluent.license": "<license>", // leave it empty for evaluation license } }
Load the AMPS source connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local start
. For more information, see confluent local.confluent local load amps -- -d amps-source.json
Confirm that the connector is in a
RUNNING
state.confluent local status amps-source-connector
Confirm the messages were delivered to the configured topic in Kafka.
confluent local consume AMPS_Orders -- --from-beginning
Batch Size Tuning¶
The AMPS Source Connector Configuration Properties provides options to tune both the AMPS
batch size, amps.batch.size
and the Kafka batch size, batch.size
.
- AMPS Batch Size: Tuning the AMPS batch size may produce better network utilization and improve the overall performance of the connector. The larger the AMPS batch size, the more messages AMPS sends to the network layer at a time, which can result in fewer packets being sent. For smaller messages, you should set this configuration to a higher value, like
50
or100
. However, for large messages, even though a large AMPS batch size value may improve network utilization, the overall performance of the connector may decrease as these require larger sized buffers to store them. - Kafka Batch Size: Tuning the Kafka batch size is recommended for large topics on AMPS. You should set the Kafka batch size configuration to a higher value, like
1000
. This is recommended for extremely large SOW topics; that is, topics with upwards of a million messages.