.. _aws_cloudwatch_metrics_sink_connector: Amazon CloudWatch Metrics Sink Connector for |cp| ================================================= The |kconnect-long| Amazon CloudWatch Metrics sink connector is used to export data to Amazon CloudWatch Metrics from a Kafka topic. The connector will only accept Struct objects as a |ak| record's value, where there must be ``name``, ``type``, ``timestamp``, ``dimensions``, and ``values`` fields. The ``values`` field refers to a metric's values, and is also expected to be a Struct object. The input Struct object used as the record's value should look like the following: .. codewithvars:: bash { "name": string, "type": string, "timestamp": long, "dimensions": { "": string, ... }, "values": { "": double, "": double, ... } } This connector can start at one task supporting all exportation of data and can scale horizontally by adding more tasks but the performance will still be **limited by Amazon at 150 transactions per second**. You can request a limit increase from Amazon directly. Prerequisites ------------- The following are required to run the |kconnect-long| Amazon CloudWatch Metrics Connector: * |ak| Broker: |cp| 3.3.0 or above * |kconnect|: |cp| 4.1.0 or above * Java 1.8 * |aws| account * At minimum, the AWS permission ``cloudwatch:PutMetricData`` is required for this connector. Features -------- The connector will attempt to fit the ``values`` Struct into one of the four defined schemas (Gauge, Meter, Histogram, Timer) depending on the ``type`` field. Alternatively, if the value for ``type`` is ``custom``, there is a catch all mechanism that accounts for any type of schema, but the ``type`` field's value **must** be ``custom``. Each value in the ``values`` Struct **must** be of type double. ------------ Gauge Schema ------------ .. codewithvars:: bash { "doubleValue": double } ------------ Meter Schema ------------ .. codewithvars:: bash { "count": double, "oneMinuteRate": double, "fiveMinuteRate": double, "fifteenMinuteRate": double, "meanRate": double } ---------------- Histogram Schema ---------------- .. codewithvars:: bash { "count": double, "max": double, "min": double, "mean": double, "stdDev": double, "sum": double, "median": double, "percentile75th": double, "percentile95th": double, "percentile98th": double, "percentile99th": double, "percentile999th": double, } ------------ Timer Schema ------------ .. codewithvars:: bash { "count": double, "oneMinuteRate": double, "fiveMinuteRate": double, "fifteenMinuteRate": double, "meanRate": double, "max": double, "min": double, "mean": double, "stdDev": double, "sum": double, "median": double, "percentile75th": double, "percentile95th": double, "percentile98th": double, "percentile99th": double, "percentile999th": double } -------------------- Sample Custom Schema -------------------- .. codewithvars:: bash { "posts": double, "puts": double, "patches": double, "deletes": double, } -------------- Record Mapping -------------- Each value in the ``values`` Struct will be mapped to its own ``MetricDatum`` object using the same ``timestamp`` and ``dimensions`` fields and the ``name`` field as a prefix. For example: .. codewithvars:: bash { "name": "sample_meter_metric", "type": "meter", "timestamp": 23480239402348234, "dimensions": { "service": "ec2-2312", "method": "update" }, "values": { "count": 12, "oneMinuteRate": 5.2, "fiveMinuteRate": 4.7, "fifteenMinuteRate": 4.9, "meanRate": 5.1" } } will be mapped to five separate ``MetricDatum`` objects since there are five values in the ``values`` Struct. An example of a mapping from the ``oneMinuteRate`` field to its own ``MetricDatum`` object: .. codewithvars:: bash { "name": "sample_meter_metric_oneMinuteRate", "timestamp": 23480239402348234, "dimensions": { "service": "ec2-2312", "method": "update" }, "value": 5.2 } Install the Amazon CloudWatch Metrics Connector ----------------------------------------------- .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-aws-cloudwatch-metrics:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-aws-cloudwatch-metrics:1.0.0-preview -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file `__ for your connector and then follow the manual connector installation :ref:`instructions `. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`aws_cloudwatch_metrics_sink_connector_license_config` for license properties and :ref:`aws_cloudwatch_metrics_sink_license_topic_configuration` for information about the license topic. .. _aws_cloudwatch_metrics_quickstart: Quick Start ----------- ----------------- Preliminary Setup ----------------- To add a new connector plugin you must restart |kconnect|. Use the :ref:`Confluent CLI ` command to restart |kconnect|. .. include:: ../../includes/cli-new.rst .. codewithvars:: bash |confluent_stop| connect && |confluent_start| connect Your output should resemble: :: Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Check if the Amazon CloudWatch Metrics plugin has been installed correctly and picked up by the plugin loader: :: curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep metrics Your output should resemble: :: "io.confluent.connect.aws.cloudwatch.AwsCloudWatchMetricsSinkConnector" ------------------------------ Sink Connector Configuration ------------------------------ Start the services using the |confluent-cli|: .. codewithvars:: bash |confluent_start| Create a configuration file named aws-cloudwatch-metrics-sink-config.json with the following contents. :: { "name": "aws-cloudwatch-metrics-sink", "config": { "name": "aws-cloudwatch-metrics-sink", "topics": "cloudwatch-metrics-topic", "connector.class": "io.confluent.connect.aws.cloudwatch.metrics.AwsCloudWatchMetricsSinkConnector", "tasks.max": "1", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "aws.cloudwatch.metrics.url": "https://monitoring.us-east-2.amazonaws.com", "aws.cloudwatch.metrics.namespace": "service-namespace", "behavior.on.malformed.metric": "FAIL", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } } The important configuration parameters used here are: - **aws.cloudwatch.metrics.url**: The endpoint URL that the sink connector uses to push the given metrics. - **aws.cloudwatch.metrics.namespace**: The Amazon CloudWatch Metrics namespace associated with the desired metrics. - **tasks.max**: The maximum number of tasks that should be created for this connector. Run this command to start the Amazon CloudWatch Metrics sink connector. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| aws-cloudwatch-metrics-sink|dash| -d aws-cloudwatch-metrics-sink-config.json To check that the connector started successfully view the Connect worker's log by running: .. codewithvars:: bash |confluent_log| connect Produce test data to the ``cloudwatch-metrics-topic`` topic in |ak| using the :ref:`cli` |confluent_produce| command. .. codewithvars:: bash kafka-avro-console-producer \ --broker-list localhost:9092 --topic cloudwatch-metrics-topic \ --property parse.key=true \ --property key.separator=, \ --property key.schema='{"type":"string"}' \ --property value.schema='{"name": "myMetric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "dimensions","type": {"name": "dimensions","type": "record","fields": [{"name": "dimensions1","type": "string"},{"name": "dimensions2","type": "string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"count", "type": "double"},{"name":"oneMinuteRate", "type": "double"},{"name":"fiveMinuteRate", "type": "double"},{"name":"fifteenMinuteRate", "type": "double"},{"name":"meanRate", "type": "double"}]}}]}' .. important:: Timestamp must specify a time within the past two weeks in milliseconds. .. codewithvars:: bash "key1", {"name" : "test_meter","type" : "meter", "timestamp" : 1574667646013, "dimensions" : {"dimensions1" : "InstanceID","dimensions2" : "i-aaba32d4"},"values" : {"count" : 32423.0,"oneMinuteRate" : 342342.2,"fiveMinuteRate" : 34234.2,"fifteenMinuteRate" : 2123123.1,"meanRate" : 2312312.1}} Using the |aws| CloudWatch CLI, you can view the metrics being produced to Amazon CloudWatch. :: aws cloudwatch list-metrics --namespace service-namespace Finally, stop the Confluent services using the command: .. codewithvars:: bash |confluent_stop| .. _aws_cloudwatch_metrics_awscredentials: ----------------- |aws| Credentials ----------------- By default, the Amazon CloudWatch Metrics connector looks for |aws| credentials in the following locations and in the following order: #. The ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY`` environment variables accessible to the |kconnect| worker processes where the connector will be deployed. These variables are recognized by the |aws| CLI and all |aws| SDKs (except for the |aws| SDK for .NET). You use export to set these variables. .. sourcecode:: bash export AWS_ACCESS_KEY_ID= export AWS_SECRET_ACCESS_KEY= The ``AWS_ACCESS_KEY`` and ``AWS_SECRET_KEY`` can be used instead, but are not recognized by the |aws| CLI. #. The ``aws.accessKeyId`` and ``aws.secretKey`` Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the |aws| SDK for Java and are not recommended. #. The ``~/.aws/credentials`` file located in the home directory of the operating system user that runs the |kconnect| worker processes. These credentials are recognized by most |aws| SDKs and the |aws| CLI. Use the following |aws| CLI command to create the credentials file: .. sourcecode:: bash aws configure You can also manually create the credentials file using a text editor. The file should contain lines in the following format: .. sourcecode:: bash [default] aws_access_key_id = aws_secret_access_key = .. note:: When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the |kconnect| worker processes and that the credentials file is in this user's home directory. Otherwise, the AWS CloudWatch Metrics connector will not be able to find the credentials. See `AWS Credentials File Format `__ for additional details. Choose one of the above to define the |aws| credentials that the Amazon CloudWatch Metrics connectors use, verify the credentials implementation is set correctly, and then restart all of the |kconnect| worker processes. .. note:: Confluent recommends using either **Environment variables** or a **Credentials file** because these are the most straightforward, and they can be checked using the |aws| CLI tool before running the connector. Credentials Providers ^^^^^^^^^^^^^^^^^^^^^ A *credentials provider* is a Java class that implements the `com.amazon.auth .AWSCredentialsProvider `__ interface in the |aws| Java library and returns |aws| credentials from the environment. By default the Amazon CloudWatch Metrics connector configuration property ``aws.credentials.provider.class`` uses the `com.amazon.auth.DefaultAWSCredentialsProviderChain `__ class. This class and interface implementation chains together five other credential provider classes. The `com.amazonaws.auth.DefaultAWSCredentialsProviderChain `__ implementation looks for credentials in the following order: #. **Environment variables** using the `com.amazonaws.auth.EnvironmentVariableCredentialsProvider `__ class implementation. This implementation uses environment variables ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY``. Environment variables ``AWS_ACCESS_KEY`` and ``AWS_SECRET_KEY`` are also supported by this implementation; however, these two variables are only recognized by the |aws| SDK for Java and are not recommended. #. **Java system properties** using the `com.amazonaws.auth.SystemPropertiesCredentialsProvider `__ class implementation. This implementation uses Java system properties ``aws.accessKeyId`` and ``aws.secretKey``. #. **Credentials file** using the `com.amazonaws.auth.profile.ProfileCredentialsProvider `__ class implementation. This implementation uses a credentials file located in the path ``~/.aws/credentials``. This credentials provider can be used by most |aws| SDKs and the |aws| CLI. Use the following |aws| CLI command to create the credentials file: .. sourcecode:: bash aws configure You can also manually create the credentials file using a text editor. The file should contain lines in the following format: .. sourcecode:: bash [default] aws_access_key_id = aws_secret_access_key = .. note:: When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the |kconnect| worker processes and that the credentials file is in this user's home directory. Otherwise, the AWS CloudWatch Metrics connector will not be able to find the credentials. See `AWS Credentials File Format `__ for additional details. .. _aws_cloudwatch_metrics_other_credentials_implementations: Using Other Implementations ^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can use a different credentials provider. To do this, set the ``aws.credentials.provider.class`` property to the name of any class that implements the `com.amazon.auth.AWSCredentialsProvider `__ interface. .. important:: If you are using a different credentials provider, do not include the ``aws.access.key.id`` and ``aws.secret.key.id`` in the connector configuration file. If these parameters are included, they will override the custom credentials provider class. Complete the following steps to use a different credentials provider: #. Find or create a Java credentials provider class that implements the `com.amazon.auth. AWSCredentialsProvider `__ interface. #. Put the class file in a JAR file. #. Place the JAR file in the ``share/java/kafka-connect-aws-cloudwatch-metrics`` directory on **all |kconnect| workers**. #. Restart the |kconnect| workers. #. Change the Amazon CloudWatch Metrics connector property file to use your custom credentials. Add the provider class entry ``aws.credentials.provider.class=`` in the Amazon CloudWatch Metrics connector properties file. .. important:: You must use the fully qualified class name in the ```` entry. .. _aws_cloudwatch_metrics_sink_connector_examples: Examples -------- ---------------------- Property-based example ---------------------- .. codewithvars:: properties :name: connector.properties :emphasize-lines: 6 name=aws-cloudwatch-metrics-sink topics=cloudwatch-metrics-topic connector.class=io.confluent.connect.aws.cloudwatch.metrics.AwsCloudWatchMetricsSinkConnector tasks.max=1 aws.access.key.id=< Optional Configuration > aws.secret.access.key=< Optional Configuration > aws.cloudwatch.metrics.namespace=< Required Configuration > behavior.on.malformed.metric=< Optional Configuration > confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 ------------------ REST-based example ------------------ This configuration is used typically along with :ref:`distributed workers `. Write the following JSON to ``connector.json``, configure all of the required values, and use the command below to post the configuration to one the distributed connect workers. Check here for more information about the |kconnect-long| :ref:`REST API `. .. codewithvars:: bash :emphasize-lines: 8,9 { "name" : "aws-cloudwatch-metrics-sink-connector", "config" : { "name": "aws-cloudwatch-metrics-sink", "connector.class": "io.confluent.connect.aws.cloudwatch.metrics.AwsCloudWatchMetricsSinkConnector", "tasks.max": "1", "aws.cloudwatch.metrics.url": "https://monitoring.us-east-2.amazonaws.com", "aws.cloudwatch.metrics.namespace": "service-namespace", "behavior.on.malformed.metric": "FAIL", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } } Use curl to post the configuration to one of the |kconnect-long| workers. Change ``http://localhost:8083/`` to the endpoint of one of your |kconnect-long| workers. .. codewithvars:: bash curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors .. codewithvars:: bash curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json \ http://localhost:8083/connectors/aws-cloudwatch-metrics-sink-connector/config ------------------------ Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 aws_cloudwatch_metrics_sink_connector_config changelog .. _this: #AWS-Credentials .. _AWS credentials: https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html .. _Default AWS Credentials Provider Chain: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html .. _AWS account: https://docs.aws.amazon.com/streams/latest/dev/before-you-begin.html#setting-up-sign-up-for-aws .. _Profile Credentials Provider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/profile/ProfileCredentialsProvider.html .. _AWS Credentials File Format: https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-file-format .. _Environment Variable Credentials Provider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/EnvironmentVariableCredentialsProvider.html .. _System Properties Credentials Provider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/SystemPropertiesCredentialsProvider.html .. _SystemPropertiesCredentialsProvider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/SystemPropertiesCredentialsProvider.html .. _AWSCredentialsProvider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html .. _Configurable interface: https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html