PagerDuty Sink Connector for Confluent Platform

The Kafka Connect PagerDuty Sink Connector is used to read records from kafka topic and create Pagerduty incidents.

Features

The PagerDuty Sink Connector offers the following features:

  • At-least-Once Semantics: The connector creates PagerDuty incident for each record in kafka topic. However, duplicates are still possible to occur when failure, rescheduling or re-configuration happens. This semantics is followed when behavior.on.error is set to fail mode. In case of log and ignore modes, the connector promises at-most semantics.
  • Automatic Retries: The PagerDuty sink connector may experience network failures while connecting to the PagerDuty endpoint. The connector will automatically retry with exponential backoff to create incidents. The property pagerduty.max.retry.time.seconds controls the maximum time until which the connector will retry creating the incidents.
  • Supports HTTPS Proxy: The connector can connect to Pagerduty using an HTTPS proxy server.

Prerequisites

The following are required to run the Kafka Connect Pagerduty Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8
  • PagerDuty account with Team tier. Follow the link to create a PagerDuty account.

Install PagerDuty Sink Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-pagerduty-sink:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-pagerduty-sink:1.0.0-preview

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Quick Start

The quick start guide uses PagerDuty Sink Connector to consume records from a Kafka topic and create incidents in PagerDuty.

Start Confluent

Start the Confluent services using the following Confluent CLI command:

confluent local start

Important

Do not use the Confluent CLI in production environments.

Property-based example

Create a configuration file pagerduty-sink.properties with the following content. This file should be placed inside the Confluent Platform installation directory. This configuration is used typically along with standalone workers.

name=pagerduty-sink-connector

topics=incidents
connector.class=io.confluent.connect.pagerduty.PagerDutySinkConnector
tasks.max=1

pagerduty.api.key=****
behavior.on.error=fail

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=

Run the connector with this configuration.

confluent local load pagerduty-sink-connector -- -d pagerduty-sink.properties

The output should resemble:

 {
    "name":"pagerduty-sink-connector",
    "config":{
        "topics":"incidents",
        "tasks.max":"1",
        "connector.class":"io.confluent.connect.pagerduty.PagerDutySinkConnector",

        "pagerduty.api.key":"****",
        "behavior.on.error":"fail",

        "key.converter":"io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url":"http://localhost:8081",
        "value.converter":"io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://localhost:8081",

        "confluent.topic.bootstrap.servers":"localhost:9092",
        "confluent.topic.replication.factor":"1",
        "name":"pagerduty-sink-connector"
     },
    "tasks":[
      {
        "connector":"pagerduty-sink-connector",
        "task":0
      }
     ],
     "type":"sink"
}

Confirm that the connector is in a RUNNING state.

confluent local status pagerduty-sink-connector

The output should resemble:

{
   "name":"pagerduty-sink-connector",
   "connector":{
      "state":"RUNNING",
      "worker_id":"127.0.1.1:8083"
   },
   "tasks":[
      {
         "id":0,
         "state":"RUNNING",
         "worker_id":"127.0.1.1:8083"
      }
   ],
   "type":"sink"
}

REST-based example

Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API

{
  "name" : "pagerduty-sink-connector",
  "config" : {
    "topics":"incidents",
    "connector.class":"io.confluent.connect.pagerduty.PagerDutySinkConnector",
    "tasks.max" : "1",

    "pagerduty.api.key":"****",
    "behavior.on.error":"fail",

    "key.converter":"io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url":"http://localhost:8081",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",

    "confluent.topic.bootstrap.servers":"localhost:9092",
    "confluent.topic.replication.factor":"1",
    "confluent.license":" Omit to enable trial mode "
  }
}

Note

Change the confluent.topic.bootstrap.servers property to include your broker address(es) and change the confluent.topic.replication.factor to 3 for staging or production use.

Use curl to post a configuration to one of the Kafka Connect workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors

Use the following command to update the configuration of existing connector.

curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/pagerduty-sink-connector/config

Confirm that the connector is in a RUNNING state by running the following command:

curl http://localhost:8083/connectors/pagerduty-sink-connector/status | jq

The output should resemble:

{
   "name":"pagerduty-sink-connector",
   "connector":{
      "state":"RUNNING",
      "worker_id":"127.0.1.1:8083"
   },
   "tasks":[
      {
         "id":0,
         "state":"RUNNING",
         "worker_id":"127.0.1.1:8083"
      }
   ],
   "type":"sink"
}

To produce Avro data to Kafka topic: incidents, use the following command.

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic incidents --property value.schema='{"type":"record","name":"details","fields":[{"name":"fromEmail","type":"string, {"name":"serviceId","type":"string"},{"name":"incidentTitle","type":"string"}]}'

While the console is waiting for the input, use the following three records and paste each of them on the console.

{"fromEmail":"user1@abc.com", "serviceId":"pagerduty-service-id", "incidentTitle":"Incident Title x 0"}
{"fromEmail":"user2@abc.com", "serviceId":"pagerduty-service-id", "incidentTitle":"Incident Title x 1"}
{"fromEmail":"user3@abc.com", "serviceId":"pagerduty-service-id", "incidentTitle":"Incident Title x 2"}

Note

The fromEmail in the records should be of registered PagerDuty user to allow creation of incidents

Finally, check the PagerDuty incident dashboard to see the newly created incidents.

Record Schema

The PagerDuty Sink Connector expects the value of records in Kafka topic to be either of type JSON String or Avro. In either case, the value should adhere to the following conditions:

  1. Must include fromEmail, serviceId and incidentTitle fields in the value of the Kafka record.
  2. The body, urgency, escalationPolicy and priority fields are optional.

Following is the value schema for Pagerduty incident:

{
   "name":"PagerdutyValueSchema",
   "type":"record",
   "fields":[
      {
         "name":"fromEmail",
         "type":"string"
      },
      {
         "name":"serviceId",
         "type":"string"
      },
      {
         "name":"incidentTitle",
         "type":"string"
      },
      {
         "name":"priorityId",
         "type":"string",
         "isOptional":true
      },
      {
         "name":"urgency",
         "type":"string",
         "isOptional":true
      },
      {
         "name":"bodyDetails",
         "type":"string",
         "isOptional":true
      },
      {
         "name":"escalationPolicyId",
         "type":"string",
         "isOptional":true
      }
   ]
}

Additional Documentation