AppDynamics Metrics Sink Connector for Confluent Platform

The Kafka Connect AppDynamics Metrics Sink connector is used to export metrics from Apache Kafka® topic to AppDynamics using the AppDynamics Machine Agent. The connector accepts Struct and schemaless JSON as a Kafka record’s value. The name and values fields are required. The values field refers to a metric’s values and is also expected to be a Struct object in the case when the Kafka record’s value is of type Struct and nested JSON in the case when the Kafka record’s value is of type schemaless JSON.

The input Struct or schemaless JSON object used as the record’s value should resemble the following:

{
  "name": string,
  "type": string,
  "timestamp": long,
  "dimensions":{
    "aggregatorType": string,
    ...
  },
  "values":{
    "doubleValue": double
  }
}

Note

The qualifier value AVERAGE is used by default if the aggregatorType property is not present in the dimensions struct. The possible values for aggregatorType are AVERAGE, SUM and OBSERVATION. Refer to Appdynamics documentation for details.

This connector can start with one task that exports data to AppDynamics. The connector can scale by adding more tasks. Note that as more tasks are added, connector performance may be limited by AppDynamics transaction processing.

Features

The AppDynamics Metrics Sink connector includes the following features:

Exactly Once Delivery

The connector ensures exactly once delivery of metrics to the AppDynamics machine agent. However, exactly once delivery is not ensured if the machine agent fails while sending metrics to the AppDynamics Controller.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The AppDynamics Metrics Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. Multiple tasks may improve performance when moving a large amount of data.

Supported types for Kafka record value

The connector accepts Kafka record values as Struct type, schemaless JSON type, and JSON string type.

Supported Metrics and Schemas

The connector supports metrics of type Gauge. Kafka topics that contain these metrics must have records that adhere to the following schema.

Gauge schema

{
  "doubleValue": double
}

Record Mapping

Each Kafka record is converted to AppDynamics metric object. The example below shows the original form:

{
  "name": "sample_metric",
  "type": "gauge",
  "timestamp": 23480239402348234,
  "dimensions": {
    "aggregatorType": "AVERAGE"
  },
  "values": {
    "doubleValue": 28945
  }
}

The example below shows the converted AppDynamics metric object:

{
  "metricName": "sample_metric",
  "aggregatorType": "AVERAGE",
  "value": 28945
}

Install the AppDynamics Metrics Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

Install the connector using the Confluent CLI

To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-appdynamics-metrics:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-appdynamics-metrics:2.0.0

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for AppDynamics Metrics Sink Connector for Confluent Platform.

Quick Start

Complete the following instructions.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Preliminary setup

Prior to running the connector, set up the AppDynamics account and the Controller. Once these are configured, install and configure the Machine Agent using the following documentation:

Set the following properties in the machine agent controller-info.xml file. Use the information from the AppDynamics account and the Controller configurations.

<controller-info>
    <controller-host></controller-host>
    <controller-port></controller-port>
    <controller-ssl-enabled></controller-ssl-enabled>
    <enable-orchestration></enable-orchestration>
    <account-access-key></account-access-key>
    <account-name></account-name>
    <sim-enabled></sim-enabled>
    <application-name></application-name>
    <tier-name></tier-name>
    <node-name></node-name>
</controller-info>

To add the new connector plugin you must restart Connect. Use the Confluent CLI command to restart Connect.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

confluent local services connect stop && confluent local services connect start

Your output should resemble:

Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]

Verify that the AppDynamics plugin has been installed correctly and recognized by the plugin loader:

curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep appdynamics

Example output:

"io.confluent.connect.appdynamics.metrics.AppDynamicsMetricsSinkConnector"

Sink Connector Configuration

If not running, start Confluent Platform:

confluent local services start

Create a configuration file named appdynamics-metrics-sink-config.json with the following contents.

 {
  "name": "appdynamics-metrics-sink",
  "config": {
    "topics": "appdynamics-metrics-topic",
    "connector.class": "io.confluent.connect.appdynamics.metrics.AppDynamicsMetricsSinkConnector",
    "tasks.max": "1",
    "machine.agent.host": "<host>",
    "machine.agent.port": "<port>",
    "behavior.on.error": "fail",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "reporter.bootstrap.servers": "localhost:9092",
    "reporter.result.topic.replication.factor": "1",
    "reporter.error.topic.replication.factor": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081"
  }
}

Note

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

Enter the following command to start the AppDynamics Metrics sink connector:

confluent local services connect connector load appdynamics-metrics-sink --config appdynamics-metrics-sink-config.json

Verify that the connector started by viewing the Connect worker log. Enter the following command:

confluent local services connect log

Produce test data to the appdynamics-metrics-topic topic in Kafka using the Confluent CLI confluent local services kafka produce command.

  kafka-avro-console-producer \
--broker-list localhost:9092 --topic appdynamics-metrics-topic \
--property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "dimensions", "type": {"name": "dimensions", "type": "record", "fields": [{"name": "aggregatorType", "type":"string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'
{"name":"Custom Metrics|Tier-1|CPU-Usage", "dimensions":{"aggregatorType":"AVERAGE"},  "values":{"doubleValue":5.639623848362502}}

You can view the metrics being produced using an AppDynamics Dashboard. You can produce AVRO, schemaless JSON, and JSON String data to the Kafka topic.

When you are ready, stop Confluent services using the following command:

confluent local stop

Examples

Property-based example

Create a configuration file for the connector. This file is included with the connector in etc/kafka-connect-appdynamics-metrics/appdynamics-metrics-sink-connector.properties. This configuration is typically used for standalone workers.

name=appdynamics-metrics-sink
topics=appdynamics-metrics-topic
connector.class=io.confluent.connect.appdynamics.metrics.AppDynamicsMetricsSinkConnector
tasks.max=1
machine.agent.host=<host>
machine.agent.port=<port>
behavior.on.error=fail
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
reporter.bootstrap.servers=localhost:9092
reporter.result.topic.replication.factor=1
reporter.error.topic.replication.factor=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

Note

Before starting the connector:

  • Make sure to supply the machine.agent.host, machine.agent.port and behavior.on.error properties.
  • Make sure that the machine agent is set up and the controller configurations in the <machine-agent-path>/conf/controller-info.xml file are properly set. See Preliminary setup for additional information.

Tip

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

Enter the following command to load the configuration and start the connector:

confluent local services connect connector load appdynamics-metrics-sink --config appdynamics-metrics-sink-connector.properties

Example output:

{
 "name": "appdynamics-metrics-sink",
 "config": {
     "connector.class": "io.confluent.connect.appdynamics.metrics.AppDynamicsMetricsSinkConnector",
     "tasks.max":"1",
     "topics":"appdynamics-metrics-topic",
     "machine.agent.host": "<host>",
     "machine.agent.port": "<port>",
     "behavior.on.error": "fail",
     "confluent.topic.bootstrap.servers":"localhost:9092",
     "confluent.topic.replication.factor":"1",
     "reporter.bootstrap.servers": "localhost:9092",
     "reporter.result.topic.replication.factor": "1",
     "reporter.error.topic.replication.factor": "1",
     "key.converter": "io.confluent.connect.avro.AvroConverter",
     "key.converter.schema.registry.url": "http://localhost:8081",
     "value.converter": "io.confluent.connect.avro.AvroConverter",
     "value.converter.schema.registry.url": "http://localhost:8081"
 },
  "tasks": []
}

REST-based example

This configuration is typically used with distributed workers. Create a JSON file named connector.json and enter all the required properties. An example of the JSON to use is provided below:

{
    "name": "appdynamics-metrics-sink",
    "config": {
        "connector.class": "io.confluent.connect.appdynamics.metrics.AppDynamicsMetricsSinkConnector",
        "tasks.max":"1",
        "topics":"appdynamics-metrics-topic",
        "machine.agent.host": "<host>",
        "machine.agent.port": "<port>",
        "behavior.on.error": "fail",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "confluent.topic.replication.factor":"1",
        "reporter.bootstrap.servers": "localhost:9092",
        "reporter.result.topic.replication.factor": "1",
        "reporter.error.topic.replication.factor": "1",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081"
    }
}

Note

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

Use curl to post the configuration to one of the Connect workers. Change http://localhost:8083/ to the endpoint of the Connect worker.

curl -sS -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

For more information, see the Kafka Connect Kafka Connect REST Interface.