重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Amazon CloudWatch Metrics Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Amazon CloudWatch Metrics Sink Connector for Confluent Platform」を参照してください。

Apache Kafka® のトピックから Amazon CloudWatch Metrics にデータをエクスポートするには、Amazon CloudWatch Metrics Sink Connector を使用します。コネクターは、Kafka レコードとして構造体オブジェクトのみを受け取ります。レコードは、nametypetimestampdimensionsvalues の各フィールドで構成する必要があります。values フィールドはメトリックの値を表します。これも、構造体オブジェクトである必要があります。values の詳細については、「定義済みスキーマ」を参照してください。

以下に、入力構造体オブジェクトレコードの例を示します。

{
  "name": string,
  "type": string,
  "timestamp": long,
  "dimensions": {
    "<dimension-1>": string,
    ...
  },
  "values": {
    "<datapoint-1>": double,
    "<datapoint-2>": double,
    ...
  }
}

このコネクターは、1 つのタスクから開始して、さらにタスクを追加して水平に拡張できます。ただし、タスクが複数の場合でも、パフォーマンスは Amazon による制限(150 トランザクション/秒)を受けます。アカウントのトランザクションの制限を増やす場合は、Amazon にお問い合わせください。

機能

Amazon CloudWatch Metrics Sink Connector には、以下の機能があります。

  • 少なくとも 1 回のデリバリー: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。
  • 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。ただし、パフォーマンスは Amazon による制限(150 トランザクション/秒)を受けます。アカウントのトランザクションの制限を増やす場合は、Amazon にお問い合わせください。
  • サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマ(JSON-SR)、および Protobuf 入力フォーマットをサポートします。これらの スキーマレジストリ ベースのフォーマットを使用するには、Schema Registry を有効にしておく必要があります。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

定義済みスキーマ

コネクターは、type フィールドに応じて 4 つの定義済みスキーマ(Gauge、Meter、Histogram、Timer)のいずれかに values 構造体を当てはめようとします。サポートされているタイプは、gaugemeterhistogramtimercustom です。

注釈

  • type の値が custom の場合、いずれのタイプのスキーマにも当てはまる catch all メカニズムがあります。ただし、type フィールドの値は必ず custom とします。
  • values 構造体の各値は、必ずタイプを double とします。

Gauge スキーマ

{
  "doubleValue": double
}

Meter スキーマ

{
  "count": double,
  "oneMinuteRate": double,
  "fiveMinuteRate": double,
  "fifteenMinuteRate": double,
  "meanRate": double
}

Histogram スキーマ

{
  "count": double,
  "max": double,
  "min": double,
  "mean": double,
  "stdDev": double,
  "sum": double,
  "median": double,
  "percentile75th": double,
  "percentile95th": double,
  "percentile98th": double,
  "percentile99th": double,
  "percentile999th": double,
}

Timer スキーマ

{
  "count": double,
  "oneMinuteRate": double,
  "fiveMinuteRate": double,
  "fifteenMinuteRate": double,
  "meanRate": double,
  "max": double,
  "min": double,
  "mean": double,
  "stdDev": double,
  "sum": double,
  "median": double,
  "percentile75th": double,
  "percentile95th": double,
  "percentile98th": double,
  "percentile99th": double,
  "percentile999th": double
}

カスタムスキーマのサンプル

{
  "posts": double,
  "puts": double,
  "patches": double,
  "deletes": double,
}

レコードマッピング

values 構造体の各値は、同じ timestamp および dimensions フィールドと、プレフィックスとして name フィールドを使用して、それぞれの MetricDatum オブジェクトにマップされます。たとえば、以下では values 構造体に値が 5 つあるので、5 つの MetricDatum オブジェクトに個々にマップされます。

{
  "name": "sample_meter_metric",
  "type": "meter",
  "timestamp": 23480239402348234,
  "dimensions": {
    "service": "ec2-2312",
    "method": "update"
  },
  "values": {
    "count": 12,
    "oneMinuteRate": 5.2,
    "fiveMinuteRate": 4.7,
    "fifteenMinuteRate": 4.9,
    "meanRate": 5.1"
  }
}

以下は、oneMinuteRate フィールドが個別の MetricDatum オブジェクトにマッピングされている例です。

{
  "name": "sample_meter_metric_oneMinuteRate",
  "timestamp": 23480239402348234,
  "dimensions": {
    "service": "ec2-2312",
    "method": "update"
  },
  "value": 5.2
}

クイックスタート

このクイックスタートを使用して、Confluent Cloud Amazon CloudWatch Metrics Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、Amazon CloudWatch にレコードを送信するようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • AWS 上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
  • アクセスキー を構成した AWS アカウント。
  • Amazon CloudWatch Metrics リージョンは、Confluent Cloud クラスターが存在するリージョン(コネクターの実行場所)と同じでなければなりません。ハードコーディングされたコネクターのエンドポイント URL は、https://monitoring.{kafka-cluster-region}.amazonaws.com に設定されます。これは、Amazon CloudWatch リージョンを Kafka クラスターリージョンに設定するものです。
  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Amazon CloudWatch Metrics Sink connector card.

Amazon CloudWatch Metrics Sink Connector Card

ステップ 4: コネクターの詳細を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク ( * ) は必須項目であることを示しています。

Add Amazon CloudWatch Metrics Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: Amazon CloudWatch メトリクスを確認します。

Amazon CloudWatch のメトリクスを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

Confluent CLI を使用する場合

Confluent Cloud CLI でコネクターをセットアップして実行するには、次の手順を実行します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe CloudWatchMetricsSink

出力例:

Following are the required configs:
connector.class: CloudWatchMetricsSink
input.data.format
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
aws.access.key.id
aws.secret.access.key
aws.cloudwatch.metrics.namespace
tasks.max
topics

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "CloudWatchMetricsSink",
  "name": "CloudWatchMetricsSink_0",
  "input.data.format": "AVRO"
  "topics": "<my_topic_0>"
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "<my-kafka-api-key>",
  "kafka.api.secret": "<my-kafka-api-secret>",
  "aws.access.key.id": "****************",
  "aws.secret.access.key": "********************************************",
  "aws.cloudwatch.metrics.namespace": "<namespace>",
  "tasks.max": "1"
}

以下の必須プロパティの定義にご注意ください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "name": 新しいコネクターの名前を設定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "input.data.format": Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVROJSON_SR (JSON スキーマ)、および PROTOBUF です。スキーマベースのメッセージフォーマットを使用する場合は、Confluent Cloud Schema Registry を構成しておく必要があります。

  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • "aws.access.key.id" および "aws.secret.access.key": AWS のアクセスキー ID とシークレットを入力します。これらのセットアップ方法については、「プログラムによるアクセス」を参照してください。

  • "aws.cloudwatch.metrics.namespace": 使用する CloudWatch Metrics リージョンで有効なメトリクスの名前空間を入力します。

  • "tasks.max": このコネクターで使用できる タスク の数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。

    注釈

    パフォーマンスは Amazon による制限(150 トランザクション/秒)を受けます。アカウントのトランザクションの制限を増やす場合は、Amazon にお問い合わせください。

Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config amazon-cloudwatch-metrics-sink-config.json

出力例:

Created connector CloudWatchMetricsSink_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |       Name              | Status  | Type
+-----------+-------------------------+---------+------+
lcc-ix4dl   | CloudWatchMetricsSink_0 | RUNNING | sink

ステップ 6: Amazon CloudWatch メトリクスを確認します。

Amazon CloudWatch のメトリクスを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、または PROTOBUF です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

AWS 認証情報(AWS Credentials)

aws.access.key.id

Amazon CloudWatch への接続に使用される AWS アクセスキー。

  • 型: password
  • 重要度: 高
aws.secret.access.key

Amazon CloudWatch への接続に使用される AWS シークレットキー。

  • 型: password
  • 重要度: 高

Amazon CloudWatch Metrics への接続方法(How should we connect to Amazon CloudWatch Metrics?)

aws.cloudwatch.metrics.namespace

必要なメトリクスに関連付けられた Amazon CloudWatch Metrics の名前空間。

  • 型: string
  • 重要度: 高

エラーの処理方法(How should we handle errors?)

behavior.on.malformed.metric

想定されるフィールドが kafka レコードに含まれていない場合のコネクターの動作。使用可能なオプションは、「LOG」および「FAIL」です。「LOG」の場合、正しくない形式のレコードがログに記録されスキップされます。「FAIL」の場合、コネクターがエラーになります。

  • 型: string
  • デフォルト: FAIL
  • 重要度: 低

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png