Amazon CloudWatch Metrics Sink Connector for Confluent Platform

The Amazon CloudWatch Metrics Sink connector is used to export data to Amazon CloudWatch Metrics from a Kafka topic. The connector will only accept Struct objects as a Kafka record’s value, where there must be name, type, timestamp, dimensions, and values fields. The values field refers to a metric’s values, and is also expected to be a Struct object.

The input Struct object used as the record’s value should look like the following:

{
  "name": string,
  "type": string,
  "timestamp": long,
  "dimensions": {
    "<dimension-1>": string,
    ...
  },
  "values": {
    "<datapoint-1>": double,
    "<datapoint-2>": double,
    ...
  }
}

This connector can start at one task supporting all exportation of data and can scale horizontally by adding more tasks but the performance will still be limited by Amazon at 150 transactions per second. You can request a limit increase from Amazon directly.

Features

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Amazon CloudWatch Metrics Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Defined schemas

The connector will attempt to fit the values Struct into one of the four defined schemas (Gauge, Meter, Histogram, Timer) depending on the type field. type field can be one of gauge, meter, histogram, timer or custom. Alternatively, if the value for type is custom, there is a catch all mechanism that accounts for any type of schema, but the type field’s value must be custom. Each value in the values Struct must be of type double.

Gauge schema

{
  "doubleValue": double
}

Meter schema

{
  "count": double,
  "oneMinuteRate": double,
  "fiveMinuteRate": double,
  "fifteenMinuteRate": double,
  "meanRate": double
}

Histogram schema

{
  "count": double,
  "max": double,
  "min": double,
  "mean": double,
  "stdDev": double,
  "sum": double,
  "median": double,
  "percentile75th": double,
  "percentile95th": double,
  "percentile98th": double,
  "percentile99th": double,
  "percentile999th": double,
}

Timer schema

{
  "count": double,
  "oneMinuteRate": double,
  "fiveMinuteRate": double,
  "fifteenMinuteRate": double,
  "meanRate": double,
  "max": double,
  "min": double,
  "mean": double,
  "stdDev": double,
  "sum": double,
  "median": double,
  "percentile75th": double,
  "percentile95th": double,
  "percentile98th": double,
  "percentile99th": double,
  "percentile999th": double
}

Sample custom schema

{
  "posts": double,
  "puts": double,
  "patches": double,
  "deletes": double,
}

Record mapping

Each value in the values Struct will be mapped to its own MetricDatum object using the same timestamp and dimensions fields and the name field as a prefix. For example, the following will be mapped to five separate MetricDatum objects since there are five values in the values Struct:

{
  "name": "sample_meter_metric",
  "type": "meter",
  "timestamp": 23480239402348234,
  "dimensions": {
    "service": "ec2-2312",
    "method": "update"
  },
  "values": {
    "count": 12,
    "oneMinuteRate": 5.2,
    "fiveMinuteRate": 4.7,
    "fifteenMinuteRate": 4.9,
    "meanRate": 5.1"
  }
}

The following is an example of a mapping from the oneMinuteRate field to its own MetricDatum object:

{
  "name": "sample_meter_metric_oneMinuteRate",
  "timestamp": 23480239402348234,
  "dimensions": {
    "service": "ec2-2312",
    "method": "update"
  },
  "value": 5.2
}

Install the Amazon CloudWatch Metrics Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later.

  • Connect: Confluent Platform 4.1.0 or later.

  • Java 1.8.

  • AWS account.

  • At minimum, the AWS permission cloudwatch:PutMetricData is required for this connector.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-aws-cloudwatch-metrics:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-aws-cloudwatch-metrics:2.0.0
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent license properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Amazon CloudWatch Metrics Sink Connector for Confluent Platform.

Quick Start

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Preliminary setup

To add a new connector plugin you must restart Connect. Use the Confluent CLI command to restart Connect.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

confluent local services connect stop && confluent local services connect start

Your output should resemble:

Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]

Check if the Amazon CloudWatch Metrics plugin has been installed correctly and picked up by the plugin loader:

curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep metrics

Your output should resemble:

"io.confluent.connect.aws.cloudwatch.AwsCloudWatchMetricsSinkConnector"

Sink Connector Configuration

Start the services using the Confluent CLI:

confluent local start

Create a configuration file named aws-cloudwatch-metrics-sink-config.json with the following contents.

 {
  "name": "aws-cloudwatch-metrics-sink",
  "config": {
    "name": "aws-cloudwatch-metrics-sink",
    "topics": "cloudwatch-metrics-topic",
    "connector.class": "io.confluent.connect.aws.cloudwatch.metrics.AwsCloudWatchMetricsSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",

    "aws.cloudwatch.metrics.url": "https://monitoring.us-east-2.amazonaws.com",
    "aws.cloudwatch.metrics.namespace": "service-namespace",
    "behavior.on.malformed.metric": "fail",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1"
  }
}

The important configuration parameters used here are:

  • aws.cloudwatch.metrics.url: The endpoint URL that the sink connector uses to push the given metrics.
  • aws.cloudwatch.metrics.namespace: The Amazon CloudWatch Metrics namespace associated with the desired metrics.
  • tasks.max: The maximum number of tasks that should be created for this connector.

Run this command to start the Amazon CloudWatch Metrics sink connector.

Caution

You must include a double dash (--) between the topic name and your flag. For more information, see this post.

confluent local load aws-cloudwatch-metrics-sink --config aws-cloudwatch-metrics-sink-config.json

To check that the connector started successfully view the Connect worker’s log by running:

confluent local services connect log

Produce test data to the cloudwatch-metrics-topic topic in Kafka using the Confluent CLI confluent local produce command.

  kafka-avro-console-producer \
--broker-list localhost:9092 --topic cloudwatch-metrics-topic \
--property parse.key=true \
--property key.separator=, \
--property key.schema='{"type":"string"}' \
--property value.schema='{"name": "myMetric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "dimensions","type": {"name": "dimensions","type": "record","fields": [{"name": "dimensions1","type": "string"},{"name": "dimensions2","type": "string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"count", "type": "double"},{"name":"oneMinuteRate", "type": "double"},{"name":"fiveMinuteRate", "type": "double"},{"name":"fifteenMinuteRate", "type": "double"},{"name":"meanRate", "type": "double"}]}}]}'

Important

Timestamp must specify a time within the past two weeks in milliseconds.

"key1", {"name" : "test_meter","type" : "meter", "timestamp" : 1574667646013, "dimensions" : {"dimensions1" : "InstanceID","dimensions2" : "i-aaba32d4"},"values" : {"count" : 32423.0,"oneMinuteRate" : 342342.2,"fiveMinuteRate" : 34234.2,"fifteenMinuteRate" : 2123123.1,"meanRate" : 2312312.1}}

Using the AWS CloudWatch CLI, you can view the metrics being produced to Amazon CloudWatch.

aws cloudwatch list-metrics --namespace service-namespace

Finally, stop the Confluent services using the command:

confluent local stop

AWS Credentials

By default, the Amazon CloudWatch Metrics connector looks for AWS credentials in the following locations and in the following order:

  1. The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.

    export AWS_ACCESS_KEY_ID=<your_access_key_id>
    export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
    

    The AWS_ACCESS_KEY and AWS_SECRET_KEY can be used instead, but are not recognized by the AWS CLI.

  2. The aws.accessKeyId and aws.secretKey Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.

  3. The ~/.aws/credentials file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the following format:

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the AWS CloudWatch Metrics connector will not be able to find the credentials.

    See AWS Credentials File Format for additional details.

Choose one of the above to define the AWS credentials that the Amazon CloudWatch Metrics connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.

Note

Confluent recommends using either Environment variables or a Credentials file because these are the most straightforward, and they can be checked using the AWS CLI tool before running the connector.

Credentials Providers

A credentials provider is a Java class that implements the com.amazon.auth .AWSCredentialsProvider interface in the AWS Java library and returns AWS credentials from the environment. By default the Amazon CloudWatch Metrics connector configuration property aws.credentials.provider.class uses the com.amazon.auth.DefaultAWSCredentialsProviderChain class. This class and interface implementation chains together five other credential provider classes.

The com.amazonaws.auth.DefaultAWSCredentialsProviderChain implementation looks for credentials in the following order:

  1. Environment variables using the com.amazonaws.auth.EnvironmentVariableCredentialsProvider class implementation. This implementation uses environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. Environment variables AWS_ACCESS_KEY and AWS_SECRET_KEY are also supported by this implementation; however, these two variables are only recognized by the AWS SDK for Java and are not recommended.

  2. Java system properties using the com.amazonaws.auth.SystemPropertiesCredentialsProvider class implementation. This implementation uses Java system properties aws.accessKeyId and aws.secretKey.

  3. Credentials file using the com.amazonaws.auth.profile.ProfileCredentialsProvider class implementation. This implementation uses a credentials file located in the path ~/.aws/credentials. This credentials provider can be used by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the following format:

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the AWS CloudWatch Metrics connector will not be able to find the credentials.

    See AWS Credentials File Format for additional details.

Using Other Implementations

You can use a different credentials provider. To do this, set the aws.credentials.provider.class property to the name of any class that implements the com.amazon.auth.AWSCredentialsProvider interface.

Important

If you are using a different credentials provider, do not include the aws.access.key.id and aws.secret.key.id in the connector configuration file. If these parameters are included, they will override the custom credentials provider class.

Complete the following steps to use a different credentials provider:

  1. Find or create a Java credentials provider class that implements the com.amazon.auth. AWSCredentialsProvider interface.

  2. Put the class file in a JAR file.

  3. Place the JAR file in the share/java/kafka-connect-aws-cloudwatch-metrics directory on all |kconnect| workers.

  4. Restart the Connect workers.

  5. Change the Amazon CloudWatch Metrics connector property file to use your custom credentials. Add the provider class entry aws.credentials.provider.class=<className> in the Amazon CloudWatch Metrics connector properties file.

    Important

    You must use the fully qualified class name in the <className> entry.

Examples

Property-based example

name=aws-cloudwatch-metrics-sink
topics=cloudwatch-metrics-topic
connector.class=io.confluent.connect.aws.cloudwatch.metrics.AwsCloudWatchMetricsSinkConnector
tasks.max=1
aws.access.key.id=< Optional Configuration >
aws.secret.access.key=< Optional Configuration >
aws.cloudwatch.metrics.namespace=< Required Configuration >
behavior.on.malformed.metric=< Optional Configuration >
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

REST-based example

This configuration is used typically along with distributed workers. Write the following JSON to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect workers. Check here for more information about the Kafka Connect Kafka Connect REST Interface.

{
  "name" : "aws-cloudwatch-metrics-sink-connector",
  "config" : {
   "name": "aws-cloudwatch-metrics-sink",
   "connector.class": "io.confluent.connect.aws.cloudwatch.metrics.AwsCloudWatchMetricsSinkConnector",
   "tasks.max": "1",
   "aws.cloudwatch.metrics.url": "https://monitoring.us-east-2.amazonaws.com",
   "aws.cloudwatch.metrics.namespace": "service-namespace",
   "behavior.on.malformed.metric": "fail",
   "confluent.topic.bootstrap.servers": "localhost:9092",
   "confluent.topic.replication.factor": "1"
  }
}

Use curl to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect workers.

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json \
http://localhost:8083/connectors/aws-cloudwatch-metrics-sink-connector/config