PagerDuty Sink Connector for Confluent Cloud¶
注釈
If you are installing the connector locally for Confluent Platform, see PagerDuty Sink Connector for Confluent Platform.
Kafka Connect PagerDuty Sink Connector は、Apache Kafka® トピックからレコードを読み取り、PagerDuty インシデント を作成します。
機能¶
PagerDuty Sink Connector は、次の機能をサポートしています。
- 少なくとも 1 回のデリバリー: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。
- 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
- 自動再試行: PagerDuty Sink Connector が PagerDuty エンドポイントへの接続に失敗すると、指数関数的バックオフを使用して接続を自動的に再試行します。
構成プロパティの値と詳細については、「構成プロパティ」を参照してください。
詳しくは、Cloud コネクターの制限事項 を参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud PagerDuty Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、PagerDuty ディレクトリにイベントをストリーミングするようにコネクターを構成する基本的な方法について説明します。
- 前提条件
- アマゾンウェブサービス ( AWS )、Microsoft Azure ( Azure )、または Google Cloud Platform ( GCP )上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- PagerDuty API キー。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
- シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
Step 4: Enter the connector details.¶
注釈
- Ensure you have all your prerequisites completed.
- アスタリスク( * )は必須項目であることを示しています。
At the Add PagerDuty Sink Connector screen, complete the following:
If you've already populated your Kafka topics, select the topic(s) you want to connect from the Topics list.
To create a new topic, click +Add new topic.
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
- In the PagerDuty API Key field, enter PagerDuty API key with write permissions to create incidents. For more information, see the PagerDuty docs.
- Click Continue.
注釈
Configuration properties that are not shown in the Cloud Console use the default values. See 構成プロパティ for all property values and definitions.
Select an Input Kafka record value format (data coming from the Kafka topic): AVRO, JSON_SR, or PROTOBUF. A valid schema must be available in Schema Registry to use a schema-based message format.
Show advanced configurations
Maximum Retry Time (ms): PagerDuty Sink Connector が PagerDuty エンドポイントへの接続に失敗すると、指数関数的バックオフを使用して接続を自動的に再試行します。このプロパティは、コネクターがリクエストを再試行する時間の長さ(ミリ秒)を制御します。デフォルト値は
10000
ミリ秒(10 秒)です。この値は、必ず1000
ミリ秒(1 秒)以上に設定してください。Behavior on Error: 想定されるフィールドが Kafka レコードに含まれていない場合のコネクターの動作。指定可能なオプションは、
log
、fail
(デフォルト)、ignore
です。log
を選択すると、コネクターはエラーをログに記録し、正しくない形式のレコードをスキップします。fail
を選択すると、コネクタータスクが失敗します。ignore
を選択すると、エラーがスキップされます。Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and definitions.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of recommended tasks, enter the number of tasks for the connector to use in the Tasks field.
- Click Continue.
Verify the connection details.
Click Launch.
コネクターのステータスが Provisioning から Running に変わります。
ステップ 5: PagerDuty インシデントを確認します。¶
インシデントが PagerDuty ホストに生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。
注釈
すべての 前提条件 を満たしていることを確認してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe PagerDutySink
出力例:
The following are required configs:
connector.class : PagerDutySink
topics
input.data.format
name
kafka.api.key
kafka.api.secret
pagerduty.api.key
tasks.max
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"connector.class": "PagerDutySink",
"topics": "incidents",
"input.data.format": "JSON",
"name": "PagerDutySinkConnector_0",
"kafka.api.key": "****************",
"kafka.api.secret": "*********************************",
"pagerduty.api.key": "a1b2CDe3...",
"tasks.max": "1",
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。"input.data.format"
:(Kafka トピックから送られるデータ)AVRO、PROTOBUF、または JSON_SR を設定します。有効なスキーマが Schema Registry に存在する必要があります。"name"
: 新しいコネクターの名前を設定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"pagerduty.api.key"
: インシデントを作成するための書き込みアクセス許可を持つ PagerDuty API キー。詳細については、PagerDuty のドキュメント を参照してください。"tasks.max"
: このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and descriptions.
ステップ 3: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config pagerduty-sink-config.json
出力例:
Created connector PagerDutySinkConnector_0 lcc-do6vzd
ステップ 4: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd | PagerDutySinkConnector_0 | RUNNING | sink | |
ステップ 5: PagerDuty インシデントを確認します。¶
インシデントが PagerDuty ホストに生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
Use the following configuration properties with this connector.
Which topics do you want to get data from?¶
topics
Identifies the topic name or a comma-separated list of topic names.
- Type: list
- Importance: high
Input messages¶
input.data.format
Sets the input Kafka record value format. Valid entries are AVRO, JSON_SR and PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured
- Type: string
- Importance: high
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
- Importance: high
kafka.api.key
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
- Type: password
- Importance: high
PagerDuty details¶
pagerduty.api.key
PagerDuty API key with write permissions to create incidents.
- Type: password
- Importance: high
pagerduty.max.retry.time.ms
In case of error, while executing a post request, the connector will retry until this time (in ms) elapses. The default value is 10000 (10 seconds). It's recommended to set this value to be at least 1 second.
- Type: int
- Default: 10000 (10 seconds)
- Valid Values: [1000,...]
- Importance: low
behavior.on.error
The connector's behavior if the kafka record does not contain an expected field. Valid options are 'log', 'fail' ans 'ignore'. 'log' will log and skip the malformed records, 'fail' will fail the connector and 'ignore' will ignore the error.
- Type: string
- Default: fail
- Importance: low
Number of tasks for this connector¶
tasks.max
- Type: int
- Valid Values: [1,...]
- Importance: high
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。