AMPS Source Connector for Confluent Platform

The Kafka Connect Advanced Message Processing System (AMPS) Source connector allows you to export data from AMPS to Apache Kafka®. The connector subscribes to messages from an AMPS topic and writes this data to a Kafka topic.

Features

The AMPS Source Connector supports the following features:

  • The AMPS message data is sent in its ByteArray format as Kafka records values to Kafka. It also optionally forwards AMPS message headers as Kafka record headers. One can implement a custom Converter or use Single Message Transforms (SMTs) to convert ByteArray values to any compatible format.
  • Any Kafka record produced by the connector has the same timestamp as the corresponding AMPS message, which translates to the time when AMPS processed the message. If such a processed time isn’t available, the connector uses Kafka’s current system time. For example, the current system time is used for AMP Out of Focus (oof) messages.
  • The connector guarantees that messages from the AMPS topic are delivered at least once to the corresponding Kafka topic. In the case of a connector restart, there may be duplicate records in the Kafka topic.
  • The connector supports Kerberos authenticated AMPS installations.

State of the World (SOW) topic mapping

The messages from a SOW topic in AMPS are modeled similarly to a KTable. More precisely, the value in a record should be interpreted as an UPDATE of the last value for the same record key, if applicable. If the same record key doesn’t exist, the update should be considered an INSERT.

Additionally, null record values, or tombstone records, are interpreted as DELETE for the record’s key. Note that the oof option must be set in the subscription in order to record message deletes on AMPS. See config property amps.options for more details.

The SOW key from the AMPS message is used as the Kafka record key which facilitates this interpretation. Note that the corresponding Kafka topic log compacted should closely model a SOW topic in AMPS.

Note

Since AMPS doesn’t guarantee any particular order for SOW messages, no ordering can be guaranteed for SOW records in Kafka.

Transactional logged topic mapping

A message from a transactional logged topic in AMPS is modeled as is. The value in a record is the AMPS message data. The key for an AMPS message from a transactional logged topic is the AMPS topic name. This ensures all the messages go to a single partition of Kafka. The connector supports two different bookmarks to subscribe to such a AMPS topic - NOW and EPOCH. For more details, see config property amps.bookmark.

Limitations

The connector currently doesn’t support historical SOW queries. That is, amps.topic.type configuration can either be SOW or Transactional logged.

Prerequisites

The following are required to run the Kafka Connect AMPS source connector:

  • Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Java 1.8
  • AMPS Java Client : Connector built with version 5.3.0.1; versions 5.3.x.x should work
  • AMPS Server : Tested with version 5.3; versions 5.3.x.x recommended. Earlier versions should work because AMPS ensures backward compatibility, However, with very old versions there is the potential for missing features.

Install the AMPS Source Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-amps:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-amps:1.0.0-preview

If you are running a multi-node Connect cluster, the AMPS connector and AMPS Client JAR must be installed on every Connect worker in the cluster. See below for details.

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Installing the AMPS Client JAR

The AMPS Source connector uses the AMPS Java Client API to connect to AMPS. In order for this to work, the connector must have the AMPS Client for Java.

  1. Click Here to download AMPS Java Client, version 5.3.0.1.
  2. Place the JAR file into the share/confluent-hub-components/confluentinc-kafka-connect-amps/lib directory in your Confluent Platform installation on each of the Connect worker nodes.
  3. Restart all of the Connect worker nodes.

Note

The share/confluent-hub-components/confluentinc-kafka-connect-amps/lib directory referenced above is for Confluent Platform when this connector is installed through Confluent Hub. If you are using a different installation, find the location where the Confluent connector JAR files are located and place the AMPS Client JAR file into the same directory.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see AMPS Source Connector Configuration Properties.

Quick Start

In this quick start guide, the AMPS Source connector is used to consume messages from an SOW topic called Orders on AMPS that has Kerberos authentication enabled. It then sends these messages as records to a Kafka topic named AMPS_Orders with headers being forwarded from the AMPS messages.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.

Prerequisites:

  • Confluent Platform is installed and services are running by using the Confluent CLI Confluent CLI commands.

    Note

    This quick start assumes that you are using the Confluent CLI commands, but standalone installations are also supported. By default ZooKeeper, Apache Kafka®, Schema Registry, Kafka Connect REST API, and Kafka Connect are started with the confluent local start command.

  • Kafka and Schema Registry are running locally on the default ports.

Install AMPS locally and publish messages

  1. Get the AMPS distribution and follow the Getting Started guide to set up AMPS. Create the AMPS config file (for example config.xml) and set it to the following :

    <?xml version="1.0" encoding="UTF-8"?>
    
    <AMPSConfig>
    
      <Name>AMPS-Quick-Start</Name>
    
      <Admin>
        <InetAddr>localhost:8085</InetAddr>
      </Admin>
    
      <Transports>
        <Transport>
          <Name>any-tcp</Name>
          <Type>tcp</Type>
          <InetAddr>9007</InetAddr>
          <Protocol>amps</Protocol>
        </Transport>
      </Transports>
    
      <SOW>
        <Topic>
          <Name>Orders</Name>
          <FileName>./sow/%n.sow</FileName>
          <MessageType>json</MessageType>
          <Key>/id</Key>
        </Topic>
      </SOW>
    
    </AMPSConfig>
    
  2. Run bin/ampServer <path-to-config.xml> from the AMPS installation folder to start the AMPS server.

  3. Use the spark utility provided by AMPS to quickly publish few records to the Orders topic.

    # run from the AMPS installation folder
    $ bin/spark publish -server localhost:9007 -topic Orders -type json
    {"id": 1, "order": "Apples"}
    {"id": 2, "order": "Oranges"}
    total messages published: 2 (Infinity/s)
    

Start the AMPS source connector

  1. Create a JSON file for the AMPS Source connector (for example, amps-source.json).

    // substitute <> with your config
    {
      "name": "amps-source-connector",
      "config": {
          "connector.class": "io.confluent.connect.amps.AmpsSourceConnector",
          "tasks.max": "1",
          "kafka.topic" : "AMPS_Orders",
          "forward.kafka.headers": "true",
          "amps.servers": "<amps-servers-uri>",
          "amps.topic": "Orders",
          "amps.topic.type": "sow",
          "amps.command": "sow_and_subscribe",
          "amps.kerberos.keytab": "<path-to-user-keytab>",
          "amps.kerberos.principal": "<user-principal>",
          "amps.options": "timestamp,oof",
          "confluent.topic.bootstrap.servers": "localhost:9092",
          "confluent.topic.replication.factor": 1,
          "confluent.license": "<license>", // leave it empty for evaluation license
      }
    }
    
  2. Load the AMPS source connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local start. For more information, see confluent local.

    confluent local load amps -- -d amps-source.json
    
  3. Confirm that the connector is in a RUNNING state.

    confluent local status amps-source-connector
    
  4. Confirm the messages were delivered to the configured topic in Kafka.

    confluent local consume AMPS_Orders -- --from-beginning
    

Batch Size Tuning

The AMPS Source Connector Configuration Properties provides options to tune both the AMPS batch size, amps.batch.size and the Kafka batch size, batch.size.

  • AMPS Batch Size: Tuning the AMPS batch size may produce better network utilization and improve the overall performance of the connector. The larger the AMPS batch size, the more messages AMPS sends to the network layer at a time, which can result in fewer packets being sent. For smaller messages, you should set this configuration to a higher value, like 50 or 100. However, for large messages, even though a large AMPS batch size value may improve network utilization, the overall performance of the connector may decrease as these require larger sized buffers to store them.
  • Kafka Batch Size: Tuning the Kafka batch size is recommended for large topics on AMPS. You should set the Kafka batch size configuration to a higher value, like 1000. This is recommended for extremely large SOW topics; that is, topics with upwards of a million messages.