重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Amazon CloudWatch Metrics Sink Connector for Confluent Cloud¶
注釈
Confluent Platform 用にコネクターをローカルにインストールする場合は、「Amazon CloudWatch Metrics Sink Connector for Confluent Platform」を参照してください。
Apache Kafka® のトピックから Amazon CloudWatch Metrics にデータをエクスポートするには、Amazon CloudWatch Metrics Sink Connector を使用します。コネクターは、Kafka レコードとして構造体オブジェクトのみを受け取ります。レコードは、name
、type
、timestamp
、dimensions
、values
の各フィールドで構成する必要があります。values
フィールドはメトリックの値を表します。これも、構造体オブジェクトである必要があります。values
の詳細については、「定義済みスキーマ」を参照してください。
以下に、入力構造体オブジェクトレコードの例を示します。
{
"name": string,
"type": string,
"timestamp": long,
"dimensions": {
"<dimension-1>": string,
...
},
"values": {
"<datapoint-1>": double,
"<datapoint-2>": double,
...
}
}
このコネクターは、1 つのタスクから開始して、さらにタスクを追加して水平に拡張できます。ただし、タスクが複数の場合でも、パフォーマンスは Amazon による制限(150 トランザクション/秒)を受けます。アカウントのトランザクションの制限を増やす場合は、Amazon にお問い合わせください。
機能¶
Amazon CloudWatch Metrics Sink Connector には、以下の機能があります。
- 少なくとも 1 回のデリバリー: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。
- 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。ただし、パフォーマンスは Amazon による制限(150 トランザクション/秒)を受けます。アカウントのトランザクションの制限を増やす場合は、Amazon にお問い合わせください。
- サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマ(JSON-SR)、および Protobuf 入力フォーマットをサポートします。これらの スキーマレジストリ ベースのフォーマットを使用するには、Schema Registry を有効にしておく必要があります。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、Amazon CloudWatch Metrics Sink Connector の制限事項を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
定義済みスキーマ¶
コネクターは、type
フィールドに応じて 4 つの定義済みスキーマ(Gauge、Meter、Histogram、Timer)のいずれかに values
構造体を当てはめようとします。サポートされているタイプは、gauge
、meter
、histogram
、timer
、custom
です。
注釈
type
の値がcustom
の場合、いずれのタイプのスキーマにも当てはまる catch all メカニズムがあります。ただし、type
フィールドの値は必ずcustom
とします。values
構造体の各値は、必ずタイプをdouble
とします。
Gauge スキーマ¶
{
"doubleValue": double
}
Meter スキーマ¶
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double
}
Histogram スキーマ¶
{
"count": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double,
}
Timer スキーマ¶
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double
}
カスタムスキーマのサンプル¶
{
"posts": double,
"puts": double,
"patches": double,
"deletes": double,
}
レコードマッピング¶
values
構造体の各値は、同じ timestamp
および dimensions
フィールドと、プレフィックスとして name
フィールドを使用して、それぞれの MetricDatum
オブジェクトにマップされます。たとえば、以下では values
構造体に値が 5 つあるので、5 つの MetricDatum
オブジェクトに個々にマップされます。
{
"name": "sample_meter_metric",
"type": "meter",
"timestamp": 23480239402348234,
"dimensions": {
"service": "ec2-2312",
"method": "update"
},
"values": {
"count": 12,
"oneMinuteRate": 5.2,
"fiveMinuteRate": 4.7,
"fifteenMinuteRate": 4.9,
"meanRate": 5.1"
}
}
以下は、oneMinuteRate
フィールドが個別の MetricDatum
オブジェクトにマッピングされている例です。
{
"name": "sample_meter_metric_oneMinuteRate",
"timestamp": 23480239402348234,
"dimensions": {
"service": "ec2-2312",
"method": "update"
},
"value": 5.2
}
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud Amazon CloudWatch Metrics Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、Amazon CloudWatch にレコードを送信するようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
- AWS 上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
- ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
- アクセスキー を構成した AWS アカウント。
- Amazon CloudWatch Metrics リージョンは、Confluent Cloud クラスターが存在するリージョン(コネクターの実行場所)と同じでなければなりません。ハードコーディングされたコネクターのエンドポイント URL は、
https://monitoring.{kafka-cluster-region}.amazonaws.com
に設定されます。これは、Amazon CloudWatch リージョンを Kafka クラスターリージョンに設定するものです。
- Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
- 既存の サービスアカウント のリソース ID を入力する。
- コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
- Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: コネクターの詳細を入力します。¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク ( * ) は必須項目であることを示しています。
Add Amazon CloudWatch Metrics Sink Connector 画面で、以下を実行します。
既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。
新しいトピックを作成するには、+Add new topic をクリックします。
- Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。以下のいずれかのオプションを選択できます。
- Global Access: コネクターは、ユーザーがアクセス権限を持つすべての対象にアクセスできます。グローバルアクセスの場合、コネクターのアクセス権限は、ユーザーのアカウントにリンクされます。このオプションは本稼働環境では推奨されません。
- Granular access: コネクターのアクセスが制限されます。コネクターのアクセス権限は サービスアカウント から制御できます。本稼働環境にはこのオプションをお勧めします。
- Use an existing API key: 保存済みの API キーおよびシークレット部分を入力できます。API キーとシークレットを入力するか Cloud Console でこれらを生成することもできます。
- Continue をクリックします。
- AWS access key ID フィールドに、Amazon CloudWatch Metrics への接続に使用される Amazon アクセスキーを入力します。
- AWS secret access key フィールドに、Amazon CloudWatch Metrics への接続に使用される Amazon シークレットキーを入力します。
- Amazon CloudWatch Metrics namespace フィールドに、使用する CloudWatch Metrics リージョンで有効なメトリクスの名前空間を入力します。詳細については、「CloudWatch メトリクスを発行する AWS のサービス」を参照してください。
- Continue をクリックします。
注釈
Cloud Console に表示されない構成プロパティでは、デフォルト値が使用されます。すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
Input Kafka record value で、Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を AVRO、JSON_SR、または PROTOBUF から選択します。スキーマベースのメッセージフォーマットを使用するには、有効なスキーマが Schema Registry に存在する必要があります。
Show advanced configurations
Behavior on malformed metric: 想定されるフィールドが Kafka レコードに含まれていない場合のコネクターの動作。使用可能なオプションは、
LOG
およびFAIL
です。LOG
の場合、正しくない形式のレコードがログに記録されスキップされます。FAIL
の場合、コネクターがエラーになります。Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
Continue をクリックします。
選択するトピックのパーティション数に基づいて、推奨タスク数が表示されます。
- 推奨されたタスク数を変更するには、Tasks フィールドに、コネクターで使用する タスク の数を入力します。
- Continue をクリックします。
ステップ 5: Amazon CloudWatch メトリクスを確認します。¶
Amazon CloudWatch のメトリクスを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
Confluent CLI を使用する場合¶
Confluent Cloud CLI でコネクターをセットアップして実行するには、次の手順を実行します。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe CloudWatchMetricsSink
出力例:
Following are the required configs:
connector.class: CloudWatchMetricsSink
input.data.format
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
aws.access.key.id
aws.secret.access.key
aws.cloudwatch.metrics.namespace
tasks.max
topics
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"connector.class": "CloudWatchMetricsSink",
"name": "CloudWatchMetricsSink_0",
"input.data.format": "AVRO"
"topics": "<my_topic_0>"
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"aws.access.key.id": "****************",
"aws.secret.access.key": "********************************************",
"aws.cloudwatch.metrics.namespace": "<namespace>",
"tasks.max": "1"
}
以下の必須プロパティの定義にご注意ください。
"connector.class"
: コネクターのプラグイン名を指定します。"name"
: 新しいコネクターの名前を設定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR (JSON スキーマ)、および PROTOBUF です。スキーマベースのメッセージフォーマットを使用する場合は、Confluent Cloud Schema Registry を構成しておく必要があります。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。"aws.access.key.id"
および"aws.secret.access.key"
: AWS のアクセスキー ID とシークレットを入力します。これらのセットアップ方法については、「プログラムによるアクセス」を参照してください。"aws.cloudwatch.metrics.namespace"
: 使用する CloudWatch Metrics リージョンで有効なメトリクスの名前空間を入力します。"tasks.max"
: このコネクターで使用できる タスク の数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。注釈
パフォーマンスは Amazon による制限(150 トランザクション/秒)を受けます。アカウントのトランザクションの制限を増やす場合は、Amazon にお問い合わせください。
Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config amazon-cloudwatch-metrics-sink-config.json
出力例:
Created connector CloudWatchMetricsSink_0 lcc-ix4dl
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type
+-----------+-------------------------+---------+------+
lcc-ix4dl | CloudWatchMetricsSink_0 | RUNNING | sink
ステップ 6: Amazon CloudWatch メトリクスを確認します。¶
Amazon CloudWatch のメトリクスを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティを使用します。
データの取得元とするトピック(Which topics do you want to get data from?)¶
topics
特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
- 型: list
- 重要度: 高
入力メッセージ(Input messages)¶
input.data.format
Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、または PROTOBUF です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- 重要度: 高
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
AWS 認証情報(AWS Credentials)¶
aws.access.key.id
Amazon CloudWatch への接続に使用される AWS アクセスキー。
- 型: password
- 重要度: 高
aws.secret.access.key
Amazon CloudWatch への接続に使用される AWS シークレットキー。
- 型: password
- 重要度: 高
Amazon CloudWatch Metrics への接続方法(How should we connect to Amazon CloudWatch Metrics?)¶
aws.cloudwatch.metrics.namespace
必要なメトリクスに関連付けられた Amazon CloudWatch Metrics の名前空間。
- 型: string
- 重要度: 高
エラーの処理方法(How should we handle errors?)¶
behavior.on.malformed.metric
想定されるフィールドが kafka レコードに含まれていない場合のコネクターの動作。使用可能なオプションは、「LOG」および「FAIL」です。「LOG」の場合、正しくない形式のレコードがログに記録されスキップされます。「FAIL」の場合、コネクターがエラーになります。
- 型: string
- デフォルト: FAIL
- 重要度: 低
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- 指定可能な値: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。