ServiceNow Sink Connector for Confluent Platform¶
Kafka Connect ServiceNow Sink Connector は、Apache Kafka® のレコードをキャプチャして ServiceNow テーブル にリアルタイムでシンクするために使用されます。データは、ServiceNow テーブル API を使用して消費されます。
機能¶
ServiceNow Sink Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
ServiceNow Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
複数の HTTP リクエストメソッド¶
コネクターでは、POST、DELETE、PUT HTTP の各リクエストメソッドがサポートされています。PUT としての PATCH はサポートされておらず、PATCH の動作は ServiceNow テーブル API とまったく同じですので、注意が必要です。具体的なレコードそれぞれに対して使用されるリクエストメソッドは動的に選択されます。
- POST が選択されるのは、レコードキーが tombstone 値(null)の場合、または構造体のキーに
sysId
フィールドが存在する場合 - DELETE が選択されるのは、レコード値が tombstone 値(null)の場合
- PUT が選択されるのは、レコードに有効なキーと値が揃っている場合
HTTPS プロキシのサポート¶
HTTPS プロキシのサポート: HTTPS プロキシサーバーを使用して ServiceNow に接続できます。
結果レポートの生成¶
結果レポートの生成がサポートされています。ServiceNow Sink Connector では、正常な HTTP 応答の場合に、コネクターによって構成済みの成功トピックに対して、新たに作成された ServiceNow テーブルレコードの sysId
がキーで、フィールド requestMethod
、statusCode
、responseString
を持つ構造体オブジェクトが値であるレコードがレポートされます。この responseString
は、文字列として解析された応答本文です。非正常 HTTP 応答の場合、レポートレコードのキーは指定されていた元の sysId
に、値は上記のスキーマで説明されている構造体オブジェクトになります。POST リクエストが失敗である場合は、sysId
がレポートされません。
制限¶
ServiceNow Sink Connector では、HTTP 基本認証 のみがサポートされます。相互 TLS(mTLS) はサポートされません。
ServiceNow Sink Connector のインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従ってインストールするか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
重要
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-servicenow:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-servicenow:1.0.2-preview
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
このコネクターは、ライセンスキーがなくても 30 日間試用できます。
30 日間経過後は、 Confluent エンタープライズライセンス を取得することで、このコネクターを利用できます。Confluent では、契約者に Confluent エンタープライズライセンス キーを発行します。併せて、Confluent Platform とコネクターに関する エンタープライズレベルのサポート を提供します。既にご契約されている場合は、詳細について Confluent サポート(support@confluent.io)にお問い合わせください。
ライセンスのプロパティについては、「 Confluent Platform ライセンス」を参照してください。ライセンストピックの詳細については、「 ライセンストピックの構成」を参照してください。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「 ServiceNow Sink Connector 構成プロパティ」を参照してください。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター 」を参照してください。
前提条件¶
Kafka Connect ServiceNow Sink Connector を実行するには、以下が必要です。
クイックスタート¶
このクイックスタートガイドでは、ServiceNow Sink Connector を使用して、Kafka からレコードを消費し、それらを ServiceNow テーブルに送信します。このガイドでは、マルチテナント環境の使用が前提とされています。ローカルテストの場合については、「スタンドアロンモードでの Connect の実行 」を参照してください。
ServiceNow に
test_table
というテーブルを作成します。テーブルに列を 3 つ定義します。
Confluent Hub クライアント を使用してコネクターをインストールします。
# run from your confluent platform installation directory confluent-hub install confluentinc/kafka-connect-servicenow:latest
Confluent Platform を起動します。
ちなみに
Confluent CLI 開発用コマンドのコマンド構文は、5.3.0 で変更されています。該当するコマンドは
confluent local
に移行されています。たとえば、confluent start
の構文は、confluent local services start
に変わりました。詳細については、「confluent local 」を参照してください。confluent local services start
すべてのサービスのステータスを確認します。
confluent local services status
servicenow-sink.json
ファイルを以下の内容で作成します。注釈
ServiceNow のユーザー定義テーブルはすべて
u_
で始まります。// subsitute <> with your config { "name": "ServiceNowSinkConnector", "config": { "connector.class": "io.confluent.connect.servicenow.ServiceNowSinkConnector", "topics": "test_table", "servicenow.url": "https://<endpoint>.service-now.com/", "tasks.max": "1", "servicenow.table": "u_test_table", "servicenow.user": "<username>", "servicenow.password": "<password>", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.license": "<license>", // leave it empty for evaluation license "confluent.topic.replication.factor": "1", "reporter.bootstrap.servers": "localhost:9092", "reporter.error.topic.name": "test-error", "reporter.error.topic.replication.factor": 1, "reporter.error.topic.key.format": "string", "reporter.error.topic.value.format": "string", "reporter.result.topic.name": "test-result", "reporter.result.topic.key.format": "string", "reporter.result.topic.value.format": "string", "reporter.result.topic.replication.factor": 1 } }
注釈
このコネクターを Kafka Connect Reporter と併せて使用する方法の詳細については、「Connect Reporter 」を参照してください。
構成を Connect REST サーバーにポストして ServiceNow Sink Connector を読み込みます。
注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を含める必要があります。詳細については、こちらの投稿 を参照してください。confluent local services connect connector load ServiceNowSinkConnector --config servicenow-sink.json
コネクターが
RUNNING
ステートであることを確認します。confluent local services connect connector status ServiceNowSinkConnector
test_table
トピックにレコードを生成するには、まず、Kafka プロデューサーを起動します。注釈
ServiceNow のユーザー定義列はすべて
u_
で始まります。kafka-avro-console-producer \ --broker-list localhost:9092 --topic test_table \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"u_name","type":"string"}, {"name":"u_price", "type": "float"}, {"name":"u_quantity", "type": "int"}]}'
コンソールプロデューサーが入力待ちになっているので、引き続いてトピックにレコードを挿入できます。
{"u_name": "scissors", "u_price": 2.75, "u_quantity": 3} {"u_name": "tape", "u_price": 0.99, "u_quantity": 10} {"u_name": "notebooks", "u_price": 1.99, "u_quantity": 5}
ServiceNow UI を使用して、メッセージが ServiceNow テーブルにデリバリーされたことを確認します。
ServiceNow への JSON メッセージ値の書き込み¶
設定ファイルの例を以下に示します。
servicenow-sink-json.json
ファイルを以下の内容で作成します。注釈
ServiceNow のユーザー定義テーブルはすべて
u_
で始まります。// subsitute <> with your config { "name": "ServiceNowSinkJSONConnector", "config": { "connector.class": "io.confluent.connect.servicenow.ServiceNowSinkConnector", "topics": "test_table_json", "servicenow.url": "https://<endpoint>.service-now.com/", "tasks.max": "1", "servicenow.table": "u_test_table", "servicenow.user": "<username>", "servicenow.password": "<password>", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.license": "<license>", // leave it empty for evaluation license "confluent.topic.replication.factor": "1", "reporter.bootstrap.servers": "localhost:9092", "reporter.error.topic.name": "test-error", "reporter.error.topic.replication.factor": 1, "reporter.error.topic.key.format": "string", "reporter.error.topic.value.format": "string", "reporter.result.topic.name": "test-result", "reporter.result.topic.key.format": "string", "reporter.result.topic.value.format": "string", "reporter.result.topic.replication.factor": 1 } }
注釈
このコネクターを Kafka Connect Reporter と併せて使用する方法の詳細については、「Connect Reporter 」を参照してください。
構成を Connect REST サーバーにポストして ServiceNow Sink Connector を読み込みます。
注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を含める必要があります。詳細については、こちらの投稿 を参照してください。confluent local services connect connector load ServiceNowSinkJSONConnector --config servicenow-sink-json.json
コネクターが
RUNNING
ステートであることを確認します。confluent local services connect connector status ServiceNowSinkJSONConnector
test_table_json
トピックにレコードを生成するには、まず、Kafka プロデューサーを起動します。注釈
ServiceNow のユーザー定義列はすべて
u_
で始まります。kafka-console-producer \ --broker-list localhost:9092 \ --topic test_table_json
コンソールプロデューサーが入力待ちになっているので、引き続いてトピックにレコードを挿入できます。
{"schema": {"type": "struct", "fields": [{"type": "string", "optional": false, "field": "u_name"},{"type": "float", "optional": false, "field": "u_price"}, {"type": "int64","optional":false,"field": "u_quantity"}],"optional": false,"name": "products"}, "payload": {"u_name": "laptop", "u_price": 999.50, "u_quantity": 3}} {"schema": {"type": "struct", "fields": [{"type": "string", "optional": false, "field": "u_name"},{"type": "float", "optional": false, "field": "u_price"}, {"type": "int64","optional":false,"field": "u_quantity"}],"optional": false,"name": "products"}, "payload": {"u_name": "pencil", "u_price": 0.99, "u_quantity": 10}} {"schema": {"type": "struct", "fields": [{"type": "string", "optional": false, "field": "u_name"},{"type": "float", "optional": false, "field": "u_price"}, {"type": "int64","optional":false,"field": "u_quantity"}],"optional": false,"name": "products"}, "payload": {"u_name": "pen", "u_price": 1.99, "u_quantity": 5}}