重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Amazon CloudWatch Logs Source Connector for Confluent Cloud¶
注釈
Confluent Platform 用にコネクターをローカルにインストールする場合は、「Amazon CloudWatch Logs Source Connector for Confluent Platform」を参照してください。
Amazon CloudWatch Logs からデータをインポートし、それを Apache Kafka® のトピックに書き込むには、 Kafka Connect CloudWatch Logs Source Connector for Confluent Cloud を使用します。このコネクターは、単一のロググループからデータを取得し、ログストリームごとに 1 つのトピックに書き込むことができます。各ログストリームのトピック名をカスタマイズするために、Kafka のトピックフォーマットの構成プロパティ(CLI プロパティ kafka.topic.format
)を使用できます。
このコネクターは、すべてのデータのインポートをサポートする 1 つのタスクから開始して、ログストリームごとに 1 つのタスクを設定するレベルまで拡張できます。ログストリームごとに 1 つのタスクにすると、Amazon がサポートする最大ログストリーム数(100,000 ログ/秒または 10 MB/秒)にまでパフォーマンスを引き上げることができます。
機能¶
Amazon CloudWatch Logs Source Connector には、以下の機能があります。
- 少なくとも 1 回のデリバリー: コネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。
- 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。このコネクターは、すべてのデータのインポートをサポートする 1 つのタスクから開始して、ログストリームごとに 1 つのタスクを設定するレベルまで拡張できます。ログストリームごとに 1 つのタスクにすると、Amazon がサポートする最大ログストリーム数(100,000 ログ/秒または 10 MB/秒)にまでパフォーマンスを引き上げることができます。
- トピックフォーマットのカスタマイズ: このコネクターは、単一のロググループからデータを取得し、ログストリームごとに 1 つのトピックに書き込むことができます。各ログストリームのトピック名をカスタマイズするために、Kafka のトピックフォーマットの構成プロパティ(CLI プロパティ
kafka.topic.format
)を使用できます。 - サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマ(JSON-SR)、および JSON(スキーマレス)出力フォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマなど)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、Amazon CloudWatch Logs Source Connector の制限事項を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud Amazon CloudWatch Logs Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、イベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
アマゾンウェブサービス (AWS)上の Confluent Cloud クラスターへのアクセスを許可されていること。
Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
アクセスキー を構成した AWS アカウント。コネクターを設定するときに、これらのアクセスキーを使用します。IAM のアイデンティティには以下の権限(IAM ポリシー)が必要です。
logs:GetLogEvents
logs:DescribeLogStreams
詳細については、『CloudWatch Logs でのアイデンティティベースのポリシー(IAM ポリシー)の使用』を参照してください。
Amazon CloudWatch 接続の詳細情報。詳細については、『ロググループとログストリームの操作』を参照してください。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
Step 4: Enter the connector details.¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク ( * ) は必須項目であることを示しています。
At the Add CloudWatch Logs Source Connector screen, complete the following:
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
Add the Amazon CloudWatch connection authentication details:
- AWS Access Key ID: The Amazon Access Key used to connect to Amazon CloudWwatch.
- AWS Secret Access Key: The Amazon Secret Key used to connect to Amazon CloudWwatch.
For information about how to set these up, see Access Keys.
Click Continue.
Add the following details:
- Select the output record value format (data going to the Kafka topic): AVRO, JSON, or JSON_SR (JSON Schema). Schema Registry must be enabled to use a スキーマレジストリ-based format (for example, Avro, or JSON Schema). For additional information, see スキーマレジストリ Enabled Environments.
- Amazon CloudWatch Logs Endpoint URL: The URL to use as the
endpoint for connecting to Amazon CloudWatch for Logs. For example,
https://logs.us-east-1.amazonaws.com
. - Amazon CloudWatch Logs Group Name: The name of the log group on Amazon CloudWatch under which the desired log streams are contained.
Show advanced configurations
CloudWatch Log Stream Name(s): ログレコードをトラッキングする Amazon CloudWatch 上のログストリームのリスト。このフィールドを空にすると、ロググループ内のすべてのログストリームがトラッキングされます。
AWS Poll Interval in Milliseconds: 更新のために行うエンドポイントのポーリング間にコネクターが待機する時間(ミリ秒)。デフォルト値は
1000
ミリ秒(1 秒)です。Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
For all property values and definitions, see 構成プロパティ.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of tasks, use the Range Slider to select the desired number of tasks.
- Click Continue.
Step 5: Check for records.¶
レコードが Kafka のトピックに生成されていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
Confluent CLI の使用¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe CloudWatchLogsSource
出力例:
Following are the required configs:
connector.class: CloudWatchLogsSource
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
output.data.format
aws.access.key.id
aws.secret.access.key
aws.cloudwatch.logs.url
aws.cloudwatch.log.group
tasks.max
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の入力例は、典型的なコネクター構成を示しています。起動時に、コネクターは、ロググループ cloudwatch-group
のログストリーム stream-1
および stream-2
からのデータを消費します。それにより、データを Kafka のトピック logs.cloudwatch-group.stream-1
および logs.cloudwatch-group.stream-2
に生成します。
{
"name": "CloudWatchLogsSourceConnector_0",
"config": {
"connector.class": "CloudWatchLogsSource",
"name": "CloudWatchLogsSourceConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"kafka.topic.format": "logs.${log-group}.${log-stream}",
"output.data.format": "JSON",
"aws.access.key.id": "<INSERT AWS API KEY>",
"aws.secret.access.key": "<INSERT AWS API SECRET>",
"aws.cloudwatch.logs.url": "https://logs.us-east-1.amazonaws.com",
"aws.cloudwatch.log.group": "cloudwatch-group",
"aws.cloudwatch.log.streams": "stream-1, stream-2",
"aws.poll.interval.ms": "1500",
"tasks.max": "1"
}
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"name"
: 新しいコネクターの名前を設定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"kafka.topic.format"
: Kafka トピックの名前の生成に使用されるトピックフォーマット。このフォーマット文字列には、元のロググループ名およびログストリーム名を表すプレースホルダーとして${log-group}
および${log-stream}
を含めることができます。たとえば、ロググループlog-group-1
およびログストリームlog-stream-1
の場合、confluent.${log-group}.${log-stream}
は、トピック名confluent.log-group-1.log-stream-1
にマップされます。"output.data.format"
: 出力データフォーマット( Kafka トピックに送られるデータ)として、AVRO、JSON_SR(JSON スキーマ)、または JSON(スキーマレス)を入力します。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。"aws.access.key.id"
および"aws.secret.access.key"
: AWS のアクセスキー ID とシークレットを入力します。これらのセットアップ方法については、「プログラムによるアクセス」を参照してください。"aws.cloudwatch.logs.url"
:https://logs.us-east-1.amazonaws.com
のように入力します。詳細については、『Amazon CloudWatch Logs エンドポイントとクォータ』を参照してください。"aws.cloudwatch.log.group"
: ログストリームが含まれている Amazon CloudWatch 上のロググループの名前。"aws.cloudwatch.log.streams"
: ログレコードをトラッキングする Amazon CloudWatch 上のログストリームのリスト。このプロパティを使用しない場合は、ロググループ内のすべてのログストリームがトラッキングされます。"aws.poll.interval.ms"
: 更新のために行うエンドポイントのポーリング間にコネクターが待機する時間(ミリ秒)。デフォルト値は1000
ミリ秒(1 秒)です。"tasks.max"
: このコネクターで使用する タスク の数を入力します。コネクターは、1 つまたは複数のタスクの実行をサポートしています。このコネクターは、すべてのデータのインポートをサポートする 1 つのタスクから開始して、ログストリームごとに 1 つのタスクを設定するレベルまで拡張できます。ログストリームごとに 1 つのタスクにすると、Amazon がサポートする最大ログストリーム数(100,000 ログ/秒または 10 MB/秒)にまでパフォーマンスを引き上げることができます。
Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config cloudwatch-logs-source-config.json
出力例:
Created connector CloudWatchLogsSourceConnector_0 lcc-do6vzd
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+----------------------------- --+---------+--------+-------+
lcc-do6vzd | CloudWatchLogsSourceConnector_0 | RUNNING | source | |
ステップ 6: レコードを確認します。¶
レコードが Kafka のトピック logs.cloudwatch-group.stream-1
および logs.cloudwatch-group.stream-2
に生成されていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティを使用します。
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
トピック名の定義方法(How do you want to define topic names?)¶
kafka.topic.format
データのパブリッシュ先である Apache Kafka® のトピックの名前の生成に使用されるトピック用フォーマット。このフォーマット文字列には、元のロググループ名およびログストリーム名を表すプレースホルダーとして ${log-group} および ${log-stream} を含めることができます。
- 型: string
- デフォルト: ${log-group}.${log-stream}
- 重要度: 高
出力メッセージ(Output messages)¶
output.data.format
Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- 重要度: 高
AWS 認証情報(AWS Credentials)¶
aws.access.key.id
Amazon CloudWatch への接続に使用される Amazon アクセスキー。
- 型: password
- 重要度: 高
aws.secret.access.key
Amazon CloudWatch への接続に使用される Amazon シークレットキー。
- 型: password
- 重要度: 高
Amazon CloudWatch Logs への接続方法(How should we connect to Amazon CloudWatch Logs?)¶
aws.cloudwatch.logs.url
ログを取得するために Amazon CloudWatch に接続するエンドポイントとして使用する URL。たとえば、https://logs.us-east-1.amazonaws.com とします。
- 型: string
- 重要度: 高
CloudWatch Logs の詳細¶
aws.cloudwatch.log.group
必要なログストリームを含める Amazon CloudWatch のロググループの名前。
- 型: string
- 重要度: 高
aws.cloudwatch.log.streams
必要なログレコードを送信する Amazon CloudWatch のログストリームのリスト。このフィールドを空にすると、ロググループ内のすべてのログストリームがトラッキングされます。
- 型: list
- 重要度: 高
aws.poll.interval.ms
Amazon CloudWatch エンドポイントに対する 2 回の連続ポーリングの間に待機する時間(ミリ秒)。
- 型: int
- デフォルト: 1000(1 秒)
- 指定可能な値: [0,...]
- 重要度: 高
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- 指定可能な値: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。