PagerDuty Sink Connector for Confluent Platform¶
Kafka Connect PagerDuty Sink Connector を使用して、Apache Kafka® のトピックからレコードを読み込み、PagerDuty インシデント を作成します。
機能¶
PagerDuty Sink Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
コネクターにより、Kafka のトピックの各レコードに PagerDuty インシデントが作成されます。ただし、失敗、再スケジュール、再構成が発生すると、やはり、重複が起こる可能性があります。このセマンティクスが実行されるのは、behavior.on.error
が fail
モードに設定されている場合です。log
モードおよび ignore
モードの場合は、コネクターによって "最大 1 回" のセマンティクスが採用されます。
複数のタスク¶
Google Firebase Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
自動再試行¶
PagerDuty Sink Connector で、PagerDuty エンドポイントとの接続時にネットワーク障害が発生する場合があります。コネクターでは、インシデントの作成を目的として指数関数的なバックオフを使用した再試行が自動的に行われます。コネクターでインシデントの作成を再試行されるまでの最大時間は、プロパティ pagerduty.max.retry.time.seconds
で管理します。
HTTPS プロキシ¶
コネクターは、HTTPS プロキシサーバーを使用して PagerDuty に接続できます。
前提条件¶
Kafka Connect PagerDuty Sink Connector を実行するには、以下が必要です。
- Kafka ブローカー: Confluent Platform 3.3.0 以上、または Kafka 0.11.0 以上
- Connect: Confluent Platform 4.0.0 以上、または Kafka 1.0.0 以上
- Java 1.8
- チーム階層 の PagerDuty アカウント
- PagerDuty のサービス ID
PagerDuty Sink Connector のインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従うか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
注釈
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-pagerduty:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-pagerduty:1.0.0-preview
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
このコネクターは、ライセンスキーがなくても 30 日間試用できます。
30 日間経過後は、 Confluent エンタープライズライセンス を取得することで、このコネクターを利用できます。Confluent では、契約者に Confluent エンタープライズライセンス キーを発行します。併せて、Confluent Platform とコネクターに関する エンタープライズレベルのサポート を提供します。既にご契約されている場合は、詳細について Confluent サポート(support@confluent.io)にお問い合わせください。
ライセンスのプロパティについては、「 Confluent Platform ライセンス」を参照してください。ライセンストピックの詳細については、「 ライセンストピックの構成」を参照してください。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「PagerDuty Sink Connector 構成プロパティ」を参照してください。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
クイックスタート¶
クイックスタートガイドでは、PagerDuty Sink Connector を使用して Kafka トピックからレコードを消費し、PagerDuty でインシデントを作成します。
サービス ID の取得¶
PagerDuty でサービスを作成済みの場合は、サービス ID を見つけるには Configuration の Services に移動します。サービスを選択します。サービス ID は URL バーに表示されます。例 : https://connector-test.pagerduty.com/service-directory/<serviceId>
。
PagerDuty にサービスを追加できます。サービスを追加する場合は、Integration type として Use our API directly を選択します。サービスを追加後、サービス ID をコピーできる URL にリダイレクトされます。
PagerDuty 統合設定
Confluent の開始¶
以下の Confluent CLI コマンドを使用して Confluent サービスを開始します。
confluent local services start
重要
Confluent CLI は、本稼働環境では使用しないでください。
プロパティベースの例¶
構成ファイル pagerduty-sink.properties
を以下の内容で作成します。このファイルは、Confluent Platform のインストールディレクトリ内に配置する必要があります。この構成は、通常、スタンドアロンワーカー で使用されます。
name=pagerduty-sink-connector
topics=incidents
connector.class=io.confluent.connect.pagerduty.PagerDutySinkConnector
tasks.max=1
pagerduty.api.key=****
behavior.on.error=fail
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
reporter.bootstrap.servers=localhost:9092
reporter.result.topic.replication.factor=1
reporter.error.topic.replication.factor=1
注釈
このコネクターを Kafka Connect Reporter と併せて使用する方法の詳細については、「Connect Reporter」を参照してください。
この構成でコネクターを実行します。
confluent local services connect connector load pagerduty-sink-connector --config pagerduty-sink.properties
出力は以下のようになります。
{
"name":"pagerduty-sink-connector",
"config":{
"topics":"incidents",
"tasks.max":"1",
"connector.class":"io.confluent.connect.pagerduty.PagerDutySinkConnector",
"pagerduty.api.key":"****",
"behavior.on.error":"fail",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"reporter.bootstrap.servers": "localhost:9092",
"reporter.result.topic.replication.factor":"1",
"reporter.error.topic.replication.factor":"1"
"name":"pagerduty-sink-connector"
},
"tasks":[
{
"connector":"pagerduty-sink-connector",
"task":0
}
],
"type":"sink"
}
注釈
このコネクターを Kafka Connect Reporter と併せて使用する方法の詳細については、「Connect Reporter」を参照してください。
コネクターが RUNNING
ステートであることを確認します。
confluent local services connect connector status pagerduty-sink-connector
出力は以下のようになります。
{
"name":"pagerduty-sink-connector",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
}
],
"type":"sink"
}
REST ベースの例¶
この設定は通常、 分散ワーカー で使用されます。以下の JSON を config.json
に書き込み、必要な値をすべて構成し、以下のコマンドを使用して、分散された Connect ワーカーのいずれかに構成をポストします。Kafka Connect Kafka Connect REST インターフェイス の詳細については、こちらを参照してください。
{
"name" : "pagerduty-sink-connector",
"config" : {
"topics":"incidents",
"connector.class":"io.confluent.connect.pagerduty.PagerDutySinkConnector",
"tasks.max" : "1",
"pagerduty.api.key":"****",
"behavior.on.error":"fail",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"confluent.license":" Omit to enable trial mode ",
"reporter.bootstrap.servers": "localhost:9092",
"reporter.result.topic.replication.factor":"1",
"reporter.error.topic.replication.factor":"1"
}
}
注釈
confluent.topic.bootstrap.servers
プロパティを変更してブローカーのアドレスを含めます。また、ステージングまたは本稼働環境では、confluent.topic.replication.factor
を 3
に変更します。
いずれかの Kafka Connect ワーカーに構成をポストするには、curl を使用します。http://localhost:8083/
は、Kafka Connect ワーカーのいずれかのエンドポイントに変更してください。
curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
以下のコマンドを使用して、既存のコネクターの構成をアップデートします。
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/pagerduty-sink-connector/config
以下のコマンドを実行して、コネクターが RUNNING
ステートであることを確認します。
curl http://localhost:8083/connectors/pagerduty-sink-connector/status | jq
出力は以下のようになります。
{
"name":"pagerduty-sink-connector",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
}
],
"type":"sink"
}
Kafka トピック incidents
に Avro データを生成するには、以下のコマンドを使用します。
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic incidents --property value.schema='{"type":"record","name":"details","fields":[{"name":"fromEmail","type":"string"}, {"name":"serviceId","type":"string"},{"name":"incidentTitle","type":"string"}]}'
コンソールが入力待ちの間に、以下の 3 つのレコードを使用し、それぞれをコンソールに貼り付けます。
{"fromEmail":"user1@abc.com", "serviceId":"<your-service-id>", "incidentTitle":"Incident Title x 0"}
{"fromEmail":"user2@abc.com", "serviceId":"<your-service-id>", "incidentTitle":"Incident Title x 1"}
{"fromEmail":"user3@abc.com", "serviceId":"<your-service-id>", "incidentTitle":"Incident Title x 2"}
注釈
レコードの fromEmail
には、インシデントの作成が許可されている登録済みの PagerDuty ユーザーのものを指定してください。
最後に、PagerDuty インシデントダッシュボードを表示して、新たに作成されたインシデントを確認します。
レコードスキーマ¶
PagerDuty Sink Connector では、Kafka トピックに含まれるレコードの値が、JSON String 型と Avro 型のどちらかであることが想定されています。いずれの場合でも、値が以下の条件を満たしていることが必要です。
- Kafka レコード値に
fromEmail
、serviceId
、incidentTitle
の各フィールドが含まれている必要があります。 body
、urgency
、escalationPolicy
、priority
の各フィールドは省略可能です。
Pagerduty インシデントの値スキーマを以下に示します。
{
"name":"PagerdutyValueSchema",
"type":"record",
"fields":[
{
"name":"fromEmail",
"type":"string"
},
{
"name":"serviceId",
"type":"string"
},
{
"name":"incidentTitle",
"type":"string"
},
{
"name":"priorityId",
"type":"string",
"isOptional":true
},
{
"name":"urgency",
"type":"string",
"isOptional":true
},
{
"name":"bodyDetails",
"type":"string",
"isOptional":true
},
{
"name":"escalationPolicyId",
"type":"string",
"isOptional":true
}
]
}