Amazon CloudWatch Logs Source Connector for Confluent Platform¶
The AWS CloudWatch Logs Source connector is used to import data from AWS CloudWatch Logs, and write them into a Kafka topic. Moreover, the connector sources from a single log group and writes to one topic per log stream. There is a topic format configuration available to customize the topic names of each log stream. If specific customizations for topics such as multiple log streams writing to the same topic are desired, SMTs can be used for such actions.
This connector can start at one task supporting all importation of data and can scale up to one task per log stream which will raise performance to the highest that Amazon supports (100,000 logs per second or 10 MB per second).
Features¶
The AWS CloudWatch Logs Source connector includes the following features:
At least once delivery¶
Records imported from AWS CloudWatch Logs are delivered with at least once semantics. Duplicates will generally be limited, however, as there will only be repeats in the chance of unexpected termination of the connector.
Multiple tasks¶
The AWS CloudWatch Logs Source connector supports running one or more tasks.
You can specify the number of tasks in the tasks.max
configuration
parameter. Multiple tasks may improve performance when moving a large amount of
data.
Topic format customizability¶
Because this connector is designed to write to a topic per log stream, custom topic formats can be created or all records can be written to exactly one topic.
Log stream selection¶
The log streams from which logs are imported from can be specified, or as a default, all will be used.
Install the AWS CloudWatch Logs Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later.
Connect: Confluent Platform 4.1.0 or later.
Java 1.8.
AWS account.
At least one AWS CloudWatch log group and log stream in AWS CloudWatch Logs.
At minimum, AWS permissions
logs:GetLogEvents
,logs:DescribeLogGroups
, andlogs:DescribeLogStreams
are required for this connector. Confluent recommends you review the CloudWatch Logs Permissions Reference.An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-aws-cloudwatch-logs:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-aws-cloudwatch-logs:1.3.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Amazon CloudWatch Logs Source Connector for Confluent Platform.
Quick Start¶
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Preliminary setup¶
To add a new connector plugin you must restart Connect. Use the Confluent CLI command to restart Connect.
confluent local services connect stop && confluent local services connect start
Your output should resemble:
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Check if the AWS CloudWatch Logs plugin has been installed correctly and picked up by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep "cloudwatch logs"
Your output should resemble:
"io.confluent.connect.aws.cloudwatch.AwsCloudWatchSourceConnector"
AWS CloudWatch Logs setup¶
You can use the AWS Management Console to set up your AWS CloudWatch log group and log stream as shown in the Amazon CloudWatch Logs User Guide, or you can complete the following steps:
Sign up for an AWS account.
Set up AWS Credentials.
Create a log group in AWS CloudWatch Logs.
aws logs create-log-group --log-group my-log-group
Create a log stream in AWS CloudWatch Logs.
aws logs create-log-stream --log-group my-log-group --log-stream my-log-stream
Insert records into your log stream. If this is the first time inserting logs into a new log stream, a sequence token is not needed. However, after the first
put-log-event
, a sequence token is returned. You will need the sequence token as a parameter for the nextput-log-event
.aws logs put-log-events --log-group my-log-group --log-stream my-log-stream --log-events timestamp=<time>,message=some-string
The example shows a log event at a specified timestamp with a specified message put into the specified log stream and log group.
Enter the following command to get a sequence token:
aws logs describe-log-streams --log-group my-log-group
Output providing the sequence token is displayed:
{ "logStreams": [ { "logStreamName": "my-log-stream", "creationTime": 1569709821347, "lastIngestionTime": 1569709984113, "uploadSequenceToken": "49595785783592846449895609848346364951147972276040781330", "storedBytes": 0 } ] }
The example below shows how you can use the sequence token to generate logs for your stream.
aws logs put-log-events --log-group my-log-group --log-stream my-log-stream --log-events timestamp=<time>,message=bananas --sequence-token 49595785783592846449895609848346364951147972276040781330
Source Connector Configuration¶
Start the services using the Confluent CLI:
confluent local services start
Create a configuration file named aws-cloudwatch-logs-source-config.json with the following contents.
{
"name": "aws-cloudwatch-logs-source",
"config": {
"connector.class": "io.confluent.connect.aws.cloudwatch.logs.AwsCloudWatchSourceConnector",
"tasks.max": "1",
"aws.cloudwatch.logs.url": "https://logs.us-east-2.amazonaws.com",
"aws.cloudwatch.log.group": "my-log-group",
"aws.cloudwatch.log.streams": "my-log-stream",
"name": "aws-cloudwatch-logs-source",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
The important configuration parameters used here are:
aws.cloudwatch.logs.url: The endpoint URL that the source connector connects to pull the specified logs.
aws.cloudwatch.log.group: The AWS CloudWatch log group under which the log streams are contained.
aws.cloudwatch.log.streams: A list of AWS CloudWatch log streams from which the logs are pulled from. The default value is to use all log streams from the configured log group.
tasks.max: The maximum number of tasks that should be created for this connector.
You may pass your AWS Credentials to the AWS CloudWatch Logs Connector through your source connector configuration. To pass AWS credentials in the source configuration set the aws.access.key.id and the aws.secret.key.id: parameters.
"aws.access.key.id":<your-access-key-id> "aws.secret.access.key":<your-secret-access-key>
Run this command to start the AWS CloudWatch Logs Source connector.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load aws-cloudwatch-logs-source --config aws-cloudwatch-logs-source-config.json
To check that the connector started successfully view the Connect worker’s log by running:
confluent local services connect log
Start a Kafka Consumer in a separate terminal session to view the data exported by the connector into the kafka topic
path/to/confluent/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-log-group.my-log-stream --from-beginning
Finally, stop the Confluent services using the command:
confluent local stop
Remove unused resources¶
Delete your log group and clean up resources to avoid incurring any unintended charges.
aws logs delete-log-group --log-group my-log-group
AWS Credentials¶
By default, the AWS CloudWatch Logs connector looks for AWS credentials in the following locations and in the following order:
The
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.export AWS_ACCESS_KEY_ID=<your_access_key_id> export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
The
AWS_ACCESS_KEY
andAWS_SECRET_KEY
can be used instead, but are not recognized by the AWS CLI.The
aws.accessKeyId
andaws.secretKey
Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.The
~/.aws/credentials
file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:aws configure
You can also manually create the credentials file using a text editor. The file should contain lines in the following format:
[default] aws_access_key_id = <your_access_key_id> aws_secret_access_key = <your_secret_access_key>
Note
When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the AWS CloudWatch Logs connector will not be able to find the credentials.
See AWS Credentials File Format for additional details.
Choose one of the above to define the AWS credentials that the AWS CloudWatch Logs connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.
Note
Confluent recommends using either Environment variables or a Credentials file because these are the most straightforward, and they can be checked using the AWS CLI tool before running the connector.
Caution
If you configure one of the AWS key and AWS secret key implementations (as detailed above), credentials can not also be supplied through the following Credentials Providers or by using the Trusted Account Credentials implementation. Attempting to provide credentials using multiple implementations will cause authentication failure.
Credentials Providers¶
A credentials provider is a Java class that implements the com.amazon.auth
.AWSCredentialsProvider interface in the AWS Java library and returns
AWS credentials from the environment. By default the AWS CloudWatch Logs
connector configuration property aws.credentials.provider.class
uses the
com.amazon.auth.DefaultAWSCredentialsProviderChain class. This class and interface implementation chains
together five other credential provider classes.
The com.amazonaws.auth.DefaultAWSCredentialsProviderChain implementation looks for credentials in the following order:
Environment variables using the com.amazonaws.auth.EnvironmentVariableCredentialsProvider class implementation. This implementation uses environment variables
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
. Environment variablesAWS_ACCESS_KEY
andAWS_SECRET_KEY
are also supported by this implementation; however, these two variables are only recognized by the AWS SDK for Java and are not recommended.Java system properties using the com.amazonaws.auth.SystemPropertiesCredentialsProvider class implementation. This implementation uses Java system properties
aws.accessKeyId
andaws.secretKey
.Credentials file using the com.amazonaws.auth.profile.ProfileCredentialsProvider class implementation. This implementation uses a credentials file located in the path
~/.aws/credentials
. This credentials provider can be used by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:aws configure
You can also manually create the credentials file using a text editor. The file should contain lines in the following format:
[default] aws_access_key_id = <your_access_key_id> aws_secret_access_key = <your_secret_access_key>
Note
When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the AWS CloudWatch Logs connector will not be able to find the credentials.
See AWS Credentials File Format for additional details.
Using Trusted Account Credentials¶
This connector can assume a role and use credentials from a separate trusted account. This is a default feature provided with recent versions of this connector that include an updated version of the AWS SDK.
Important
You cannot use assumed role credentials to access AWS through a proxy server without first passing environment variables or system properties. This is due to an AWS SDK limitation.
After you create the trust relationship, an IAM user or an application from the trusted account can
use the AWS Security Token Service (AWS STS)
AssumeRole
API operation. This operation provides temporary security credentials that enable
access to AWS resources for the connector. For details, see
Creating a Role to Delegate Permissions to an IAM User.
- Example:
Profile in ~/.aws/credentials: [default] role_arn=arn:aws:iam::037803949979:role/kinesis_cross_account_role source_profile=staging role_session_name = OPTIONAL_SESSION_NAME [staging] aws_access_key_id = <STAGING KEY> aws_secret_access_key = <STAGING SECRET>
To allow the connector to assume a role with the right permissions, set the
Amazon Resource Name (ARN)
for this role. Additionally, you must choose between source_profile
or credential_source
as the way to get credentials that have permission to assume the role, in the environment where the
connector is running.
Note
When setting up trusted account credentials, be aware that the approach of loading profiles from
both ~/.aws/credentials
and ~/.aws/config
does not work when configuring this connector.
Assumed role settings and credentials must be placed in the ~/.aws/credentials
file.
Using Other Implementations¶
You can use a different credentials provider. To do this, set the
aws.credentials.provider.class
property to the name of any class that
implements the com.amazon.auth.AWSCredentialsProvider interface.
Important
If you are using a different credentials provider, do not include the
aws.access.key.id
and aws.secret.key.id
in the connector
configuration file. If these parameters are included, they will override the
custom credentials provider class.
Complete the following steps to use a different credentials provider:
Find or create a Java credentials provider class that implements the com.amazon.auth. AWSCredentialsProvider interface.
Put the class file in a JAR file.
Place the JAR file in the
share/java/kafka-connect-aws-cloudwatch-logs
directory on all Connect workers.Restart the Connect workers.
Change the AWS CloudWatch Logs connector property file to use your custom credentials. Add the provider class entry
aws.credentials.provider.class=<className>
in the AWS CloudWatch Logs connector properties file.Important
You must use the fully qualified class name in the
<className>
entry.
Examples¶
Property based example¶
name=aws-cloudwatch-logs-source-connector
connector.class=io.confluent.connect.aws.cloudwatch.logs.AwsCloudWatchSourceConnector
tasks.max=1
aws.access.key.id=< Optional Configuration >
aws.secret.access.key=< Optional Configuration >
aws.cloudwatch.log.group=< Required Configuration >
aws.cloudwatch.log.streams=< Optional Configuration >
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
REST based example¶
This configuration is used typically along with distributed
workers. Write the following JSON to
connector.json
, configure all of the required values, and use the command
below to post the configuration to one the distributed connect worker(s). Check
here for more information about the Kafka Connect REST
API.
{
"name" : "aws-cloudwatch-logs-source-connector",
"config" : {
"name" : "aws-cloudwatch-logs-source-connector",
"connector.class" : "io.confluent.connect.aws.cloudwatch.logs.AwsCloudWatchSourceConnector",
"tasks.max" : "1",
"aws.access.key.id" : "< Optional Configuration >",
"aws.secret.access.key" : "< Optional Configuration >",
"aws.cloudwatch.log.group" : "< Required Configuration >",
"aws.cloudwatch.log.streams : "< Optional Configuration - defaults to all log streams in
the log group >"
}
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change
http://localhost:8083/
the endpoint of one of your Kafka Connect workers.
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json \
http://localhost:8083/connectors/aws-cloudwatch-logs-source-connector/config