Amazon CloudWatch Logs Source Connector for Confluent Cloud¶
注釈
If you are installing the connector locally for Confluent Platform, see Amazon CloudWatch Logs Source Connector for Confluent Platform.
Amazon CloudWatch Logs からデータをインポートし、それを Apache Kafka® のトピックに書き込むには、 Kafka Connect CloudWatch Logs Source Connector for Confluent Cloud を使用します。このコネクターは、単一のロググループからデータを取得し、ログストリームごとに 1 つのトピックに書き込むことができます。各ログストリームのトピック名をカスタマイズするために、Kafka のトピックフォーマットの構成プロパティ(CLI プロパティ kafka.topic.format
)を使用できます。
このコネクターは、すべてのデータのインポートをサポートする 1 つのタスクから開始して、ログストリームごとに 1 つのタスクを設定するレベルまで拡張できます。ログストリームごとに 1 つのタスクにすると、Amazon がサポートする最大ログストリーム数(100,000 ログ/秒または 10 MB/秒)にまでパフォーマンスを引き上げることができます。
機能¶
Amazon CloudWatch Logs Source Connector には、以下の機能があります。
- 少なくとも 1 回のデリバリー: コネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。
- 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。このコネクターは、すべてのデータのインポートをサポートする 1 つのタスクから開始して、ログストリームごとに 1 つのタスクを設定するレベルまで拡張できます。ログストリームごとに 1 つのタスクにすると、Amazon がサポートする最大ログストリーム数(100,000 ログ/秒または 10 MB/秒)にまでパフォーマンスを引き上げることができます。
- トピックフォーマットのカスタマイズ: このコネクターは、単一のロググループからデータを取得し、ログストリームごとに 1 つのトピックに書き込むことができます。各ログストリームのトピック名をカスタマイズするために、Kafka のトピックフォーマットの構成プロパティ(CLI プロパティ
kafka.topic.format
)を使用できます。 - サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマ(JSON-SR)、および JSON(スキーマレス)出力フォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマなど)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
See Cloud connector limitations for additional information.
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud Amazon CloudWatch Logs Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、イベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
アマゾンウェブサービス (AWS)上の Confluent Cloud クラスターへのアクセスを許可されていること。
Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
ネットワークに関する考慮事項については、「ネットワークアクセス」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
アクセスキー を構成した AWS アカウント。コネクターを設定するときに、これらのアクセスキーを使用します。IAM のアイデンティティには以下の権限(IAM ポリシー)が必要です。
logs:GetLogEvents
logs:DescribeLogStreams
詳細については、『CloudWatch Logs でのアイデンティティベースのポリシー(IAM ポリシー)の使用』を参照してください。
Amazon CloudWatch 接続の詳細情報。詳細については、『ロググループとログストリームの操作』を参照してください。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: 接続をセットアップします。¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク ( * ) は必須項目であることを示しています。
- Name にコネクター名を入力します。
- Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。サービスアカウントのリソース ID を選択するか、API キーとシークレットを入力できます(または、Cloud Console でこれらを生成します)。
- Kafka Topic Format を入力します。Kafka トピックの名前の生成に使用されるトピックフォーマットです。このフォーマット文字列には、元のロググループ名およびログストリーム名を表すプレースホルダーとして
${log-group}
および${log-stream}
を含めることができます。たとえば、ロググループlog-group-1
およびログストリームlog-stream-1
の場合、confluent.${log-group}.${log-stream}
は、トピック名confluent.log-group-1.log-stream-1
にマップされます。 - Output Kafka record value で Kafka 出力レコード値のフォーマット(Kafka トピックに送られるデータ)を AVRO、JSON_SR(JSON スキーマ)、または JSON(スキーマレス)から選択します。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
- AWS 認証情報を入力します。これらのセットアップ方法については、「プログラムによるアクセス」を参照してください。
- Amazon CloudWatch connection の詳細情報を入力します。
- CloudWatch Logs Endpoint URL:
https://logs.us-east-1.amazonaws.com
のように入力します。詳細については、『Amazon CloudWatch Logs エンドポイントとクォータ』を参照してください。 - CloudWatch Logs Group Name: ログストリームが含まれている Amazon CloudWatch 上のロググループの名前。
- CloudWatch Log Stream Name(s): ログレコードをトラッキングする Amazon CloudWatch 上のログストリームのリスト。このフィールドを空にすると、ロググループ内のすべてのログストリームがトラッキングされます。
- AWS Poll Interval in Milliseconds: 更新のために行うエンドポイントのポーリング間にコネクターが待機する時間(ミリ秒)。デフォルト値は
1000
ミリ秒(1 秒)です。
- CloudWatch Logs Endpoint URL:
- このコネクターで使用する タスク の数を入力します。コネクターは、1 つまたは複数のタスクの実行をサポートしています。このコネクターは、すべてのデータのインポートをサポートする 1 つのタスクから開始して、ログストリームごとに 1 つのタスクを設定するレベルまで拡張できます。ログストリームごとに 1 つのタスクにすると、Amazon がサポートする最大ログストリーム数(100,000 ログ/秒または 10 MB/秒)にまでパフォーマンスを引き上げることができます。
- Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and descriptions.
ステップ 7: レコードを確認します。¶
レコードが Kafka のトピックに生成されていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
Confluent CLI の使用¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe CloudWatchLogsSource
出力例:
Following are the required configs:
connector.class: CloudWatchLogsSource
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
output.data.format
aws.access.key.id
aws.secret.access.key
aws.cloudwatch.logs.url
aws.cloudwatch.log.group
tasks.max
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の入力例は、典型的なコネクター構成を示しています。起動時に、コネクターは、ロググループ cloudwatch-group
のログストリーム stream-1
および stream-2
からのデータを消費します。それにより、データを Kafka のトピック logs.cloudwatch-group.stream-1
および logs.cloudwatch-group.stream-2
に生成します。
{
"name": "CloudWatchLogsSourceConnector_0",
"config": {
"connector.class": "CloudWatchLogsSource",
"name": "CloudWatchLogsSourceConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"kafka.topic.format": "logs.${log-group}.${log-stream}",
"output.data.format": "JSON",
"aws.access.key.id": "<INSERT AWS API KEY>",
"aws.secret.access.key": "<INSERT AWS API SECRET>",
"aws.cloudwatch.logs.url": "https://logs.us-east-1.amazonaws.com",
"aws.cloudwatch.log.group": "cloudwatch-group",
"aws.cloudwatch.log.streams": "stream-1, stream-2",
"aws.poll.interval.ms": "1500",
"tasks.max": "1"
}
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"name"
: 新しいコネクターの名前を設定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"kafka.topic.format"
: Kafka トピックの名前の生成に使用されるトピックフォーマット。このフォーマット文字列には、元のロググループ名およびログストリーム名を表すプレースホルダーとして${log-group}
および${log-stream}
を含めることができます。たとえば、ロググループlog-group-1
およびログストリームlog-stream-1
の場合、confluent.${log-group}.${log-stream}
は、トピック名confluent.log-group-1.log-stream-1
にマップされます。"output.data.format"
: 出力データフォーマット( Kafka トピックに送られるデータ)として、AVRO、JSON_SR(JSON スキーマ)、または JSON(スキーマレス)を入力します。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。"aws.access.key.id"
および"aws.secret.access.key"
: AWS のアクセスキー ID とシークレットを入力します。これらのセットアップ方法については、「プログラムによるアクセス」を参照してください。"aws.cloudwatch.logs.url"
:https://logs.us-east-1.amazonaws.com
のように入力します。詳細については、『Amazon CloudWatch Logs エンドポイントとクォータ』を参照してください。"aws.cloudwatch.log.group"
: ログストリームが含まれている Amazon CloudWatch 上のロググループの名前。"aws.cloudwatch.log.streams"
: ログレコードをトラッキングする Amazon CloudWatch 上のログストリームのリスト。このプロパティを使用しない場合は、ロググループ内のすべてのログストリームがトラッキングされます。"aws.poll.interval.ms"
: 更新のために行うエンドポイントのポーリング間にコネクターが待機する時間(ミリ秒)。デフォルト値は1000
ミリ秒(1 秒)です。"tasks.max"
: このコネクターで使用する タスク の数を入力します。コネクターは、1 つまたは複数のタスクの実行をサポートしています。このコネクターは、すべてのデータのインポートをサポートする 1 つのタスクから開始して、ログストリームごとに 1 つのタスクを設定するレベルまで拡張できます。ログストリームごとに 1 つのタスクにすると、Amazon がサポートする最大ログストリーム数(100,000 ログ/秒または 10 MB/秒)にまでパフォーマンスを引き上げることができます。
Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and descriptions.
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config cloudwatch-logs-source-config.json
出力例:
Created connector CloudWatchLogsSourceConnector_0 lcc-do6vzd
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+----------------------------- --+---------+--------+-------+
lcc-do6vzd | CloudWatchLogsSourceConnector_0 | RUNNING | source | |
ステップ 6: レコードを確認します。¶
レコードが Kafka のトピック logs.cloudwatch-group.stream-1
および logs.cloudwatch-group.stream-2
に生成されていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
構成プロパティ¶
Use the following configuration properties with this connector.
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
- Importance: high
kafka.api.key
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
- Type: password
- Importance: high
How do you want to define topic names?¶
kafka.topic.format
Topic format to use for generating the names of the Apache Kafka® topics to publish data to. This format string can contain ${log-group} and ${log-stream} as a placeholder for the original log group and log stream names.
- Type: string
- Default: ${log-group}.${log-stream}
- Importance: high
Output messages¶
output.data.format
Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO and JSON_SR.
- Type: string
- Importance: high
AWS Credentials¶
aws.access.key.id
The Amazon Access Key used to connect to Amazon CloudWatch.
- Type: password
- Importance: high
aws.secret.access.key
The Amazon Secret Key used to connect to Amazon CloudWatch.
- Type: password
- Importance: high
How should we connect to Amazon CloudWatch Logs?¶
aws.cloudwatch.logs.url
The URL to use as the endpoint for connecting to Amazon CloudWatch for Logs. For example, https://logs.us-east-1.amazonaws.com.
- Type: string
- Importance: high
CloudWatch Logs details¶
aws.cloudwatch.log.group
必要なログストリームを含める Amazon CloudWatch のロググループの名前。
- Type: string
- Importance: high
aws.cloudwatch.log.streams
List of the log stream(s) on Amazon CloudWatch under which the desired log records are sent through. If the field is left empty, all log streams under the log group will be tracked.
- Type: list
- Importance: high
aws.poll.interval.ms
Time in milliseconds to wait between two consecutive polls to the Amazon CloudWatch endpoint.
- 型: int
- Default: 1000 (1 second)
- Valid Values: [0,...]
- Importance: high
Number of tasks for this connector¶
tasks.max
- 型: int
- Valid Values: [1,...]
- Importance: high
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。