重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Solace Sink Connector for Confluent Cloud¶
注釈
Confluent Platform 用にコネクターをローカルにインストールする場合は、「Solace Sink Connector for Confluent Platform」を参照してください。
Kafka Connect Solace Sink Connector for Confluent Cloud は、データを Apache Kafka® から Solace PubSub+ Event Broker クラスターにエクスポートするために使用されます。
機能¶
- 少なくとも 1 回のデリバリー: コネクターによって、レコードが少なくとも 1 回は配信されることが保証されます。
- 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。
- Input data formats: このコネクターは、Avro、JSON スキーマ、Protobuf、JSON(スキーマレス)、文字列、Bytes の入力データフォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、Solace Sink Connector の制限事項を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud Solace Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、Solace にイベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
- アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
- Solace アカウントの認証情報、および Solace ブローカーの接続の詳細情報(ホスト、VPN(必要な場合)、JMS 送信先名など)。
- ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
- Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
- 既存の サービスアカウント のリソース ID を入力する。
- コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
- Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: コネクターの詳細情報を入力します。¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク ( * ) は必須項目であることを示しています。
Add Solace Sink Connector 画面で、以下を実行します。
既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。
新しいトピックを作成するには、+Add new topic をクリックします。
- Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。以下のいずれかのオプションを選択できます。
- Global Access: コネクターは、ユーザーがアクセス権限を持つすべての対象にアクセスできます。グローバルアクセスの場合、コネクターのアクセス権限は、ユーザーのアカウントにリンクされます。このオプションは本稼働環境では推奨されません。
- Granular access: コネクターのアクセスが制限されます。コネクターのアクセス権限は サービスアカウント から制御できます。本稼働環境にはこのオプションをお勧めします。
- Use an existing API key: 保存済みの API キーおよびシークレット部分を入力できます。API キーとシークレットを入力するか Cloud Console でこれらを生成することもできます。
- Continue をクリックします。
- Azure Cognitive Search Sink Connector の詳細情報を入力します。
- Solace host: 接続先メッセージブローカーの IP またはホスト名とポート(省略可)。ポートを指定しない場合、デフォルトのポート番号は、55555(圧縮未使用時)または 55003(圧縮使用時)です。
- Solace username: Solace での認証に使用するユーザー名。
- Solace password: Solace での認証に使用するパスワード。
- Message VPN: Solace メッセージブローカーに接続する際に使用するメッセージ VPN。
- SSL Keystore: Solace 用の SSL 対応 VPN のキーストア。
- SSL Keystore Password: Solace 用の SSL 対応 VPN のキーストアのパスワード。
- SSL Truststore: Solace の SSL 対応 VPN 用のサーバー CA 証明書情報を含むトラストストア。
- SSL Truststore Password: Solace 用の SSL 対応 VPN のトラストストアのパスワード。
- Validate SSL Certificate: SSL 証明書を検証するかどうかを選択します。
- Continue をクリックします。
注釈
Cloud Console に表示されない構成プロパティでは、デフォルト値が使用されます。すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
Input Kafka record value で、Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を AVRO、JSON_SR(JSON スキーマ)、PROTOBUF、JSON(スキーマレス)、STRING または BYTES から選択します。スキーマレジストリ ベースのフォーマット(AVRO、JSON_SR(JSON スキーマ)、PROTOBUF など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
Dynamic Durables では、コネクターが、キューまたはトピックのエンドポイント(永続サブスクリプション名のサポートに使用される)をメッセージブローカーで作成するかどうかを選択します。
queue
を送信先とする場合で、キューがまだ存在しないときは、このプロパティをtrue
に設定します。キューが存在する場合は、このプロパティをfalse
に設定します。このプロパティを誤って設定すると、コネクターでエラーが発生します。JMS Destination Name に、メッセージが書き込まれる JMS の送信先 の名前を入力します。
Show advanced configurations
Compression Level: Solace に送信されるメッセージの ZLIB 圧縮レベルを設定します。デフォルト値は
-1
で、コネクターが JNDI 接続圧縮レベルを使用することを指定します。0
はコネクターでの圧縮なしを指定します。レベル-1
から9
を指定すると、データ圧縮が有効になります。1
では圧縮率が最も低く、データスループットは最も高速になります。9
では圧縮率が最も高く、データスループットは最も低速になります。Client Description: データ接続用のメッセージブローカーのアプリケーション説明。デフォルトは Kafka Connect です。
JMS Destination Type: JMS 送信先のタイプ。デフォルト値は
queue
です。Forward Kafka Record Key: Kafka レコードキーが文字列に変換され、JMS Message プロパティ
JMSCorrelationID
に転送されるかどうかを指定します。デフォルト値はfalse
です。Forward Kafka Record Metadata: Kafka レコードのメタデータが JMS メッセージのプロパティに転送されるかどうかを指定します。デフォルトは
false
です。Forward Kafka Record Headers: Kafka レコードヘッダーを string プロパティとして JMS メッセージに追加するかどうかを指定します。デフォルトは
false
です。JMS Message Format: JMS メッセージの値のフォーマット。デフォルト値は
"string"
です。Character Encoding: メッセージを書き込む際に使用する文字エンコーディング。デフォルトは
UTF-8
です。
変換と述語については、Single Message Transforms(SMT) のドキュメントを参照してください。このコネクターでサポートされていない SMT のリストについては、「サポートされない変換」を参照してください。
Continue をクリックします。
選択するトピックのパーティション数に基づいて、推奨タスク数が表示されます。
- 推奨されたタスク数を変更するには、Tasks フィールドに、コネクターで使用する タスク の数を入力します。
- Continue をクリックします。
接続の詳細情報を確認します。
Launch をクリックします。
コネクターのステータスが Provisioning から Running に変わります。
ステップ 5: Solace で結果を確認します。¶
Solace イベントブローカーにメッセージがストリーミングされていることを検証します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
Confluent CLI の使用¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe SolaceSink
出力例:
Following are the required configs:
connector.class: SolaceSink
name
input.data.format
kafka.auth.mode
kafka.api.key
kafka.api.secret
solace.host
solace.username
solace.password
solace.dynamic.durables
jms.destination.name
tasks.max
topics
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。その他の構成プロパティの値と詳細については、「構成プロパティ」を参照してください。
{
"name": "SolaceConnector_0",
"config": {
"topics": "pageviews",
"connector.class": "SolaceSink",
"name": "SolaceSinkConnector_0",
"input.data.format": "STRING",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"solace.host": "tcps://mr83451.messaging.solace.cloud:55443",
"solace.username": "<username>",
"solace.password": "<password>",
"solace.dynamic.durables": "true",
"jms.destination.name": "<destination-name>",
"tasks.max": "1"
}
}
以下のプロパティ定義に注意してください。
"name"
: 新しいコネクターの名前を設定します。"connector.class"
: コネクターのプラグイン名を指定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、STRING、JSON (スキーマレス)、BYTES です。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用する場合は、Schema Registry を構成しておく必要があります。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"solace.<...>"
: 接続の詳細情報を入力します。solace host
: 接続先メッセージブローカーの IP またはホスト名とポート(省略可能)を指定します。非 SSL ホストエントリで、ポートを指定せず圧縮も使用しない場合に使用されるデフォルトのポート番号は、55555
です。圧縮を使用する場合のデフォルトポートは55003
です。SSL 接続(圧縮ありなし問わず)のデフォルトポートは55443
です。SSL ホストエントリは、以下の例のようになります(有効なスキーム: tcp、tcps、smf、または smfs)。詳しくは、『Data Connection Properties』を参照してください。"solace.dynamic.durables"
: キューまたはトピックのエンドポイント(永続サブスクリプション名のサポートに使用される)がメッセージブローカーで作成されるかどうかを指定します。queue
を送信先とする場合で、キューがまだ存在しないときは、このプロパティをtrue
に設定します。キューが存在する場合は、このプロパティをfalse
に設定します。このプロパティを誤って設定すると、コネクターでエラーが発生します。"jms.destination.name"
: メッセージの送信先とするqueue
またはtopic
の名前を入力します。"tasks.max"
: コネクターが使用する タスク の最大数。タスクが多いほどパフォーマンスが向上する可能性があります。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。このコネクターでサポートされていない SMT のリストについては、「サポートされない変換」を参照してください。
すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
ステップ 4: 構成ファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config solace-sink-config.json
出力例:
Created connector SolaceSinkConnector_0 lcc-ix4dl
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type
+-----------+------------------------+---------+------+
lcc-ix4dl | SolaceSinkConnector_0 | RUNNING | sink
ステップ 6: Solace で結果を確認します。¶
Solace イベントブローカーにメッセージがストリーミングされていることを検証します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティを使用します。
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
データの取得元とするトピック(Which topics do you want to get data from?)¶
topics
特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
- 型: list
- 重要度: 高
入力メッセージ(Input messages)¶
input.data.format
Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、STRING、BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
Solace クラスターへの接続方法(How should we connect to your Solace cluster?)¶
solace.host
接続先メッセージブローカーの IP またはホスト名とポート(省略可)。ポートを指定しない場合、デフォルトのポート番号は、55555(圧縮未使用時)または 55003(圧縮使用時)です。
- 型: string
- 重要度: 高
solace.username
Solace での認証に使用するユーザー名。
- 型: string
- 重要度: 高
solace.password
Solace での認証に使用するパスワード。
- 型: password
- 重要度: 高
solace.vpn
Solace メッセージブローカーに接続する際に使用するメッセージ VPN。
- 型: string
- 重要度: 中
接続の詳細(Connection details)¶
solace.compression.level
Solace に書き込まれるメッセージの ZLIB 圧縮レベル。圧縮レベルに指定可能な値は -1 から 9 で、-1 を指定すると、JNDI 接続の圧縮レベルが使用されます。0 は圧縮なしです。1 から 9 を指定すると、データ圧縮が有効になります(1 では圧縮率が最も低く、データスループットは最も高速になります。9 では圧縮率が最も高く、データスループットは最も低速になります)。
- 型: int
- デフォルト: -1
- 指定可能な値: [-1,...,9]
- 重要度: 中
solace.dynamic.durables
キューまたはトピックのエンドポイント(永続サブスクリプション名のサポートに使用される)がメッセージブローカーで作成されるかどうかを指定します。queue を送信先とする場合で、キューがまだ存在しないときは、このプロパティを true に設定します。それ以外では false にします。このプロパティを誤って設定すると、コネクターがエラーになります。
- 型: boolean
- 重要度: 低
solace.client.description
データ接続用のメッセージブローカーのアプリケーション説明。
- 型: string
- デフォルト: Kafka Connect
- 重要度: 低
Solace へのセキュアな接続(Solace secure connection)¶
solace.ssl.keystore.file
Solace 用の SSL 対応 VPN のキーストア。
- 型: password
- デフォルト: [hidden]
- 重要度: 中
solace.ssl.keystore.password
Solace 用の SSL 対応 VPN のキーストアのパスワード。
- 型: password
- 重要度: 中
solace.ssl.truststore.file
Solace の SSL 対応 VPN 用のサーバー CA 証明書情報を含むトラストストア。
- 型: password
- デフォルト: [hidden]
- 重要度: 中
solace.ssl.truststore.password
Solace 用の SSL 対応 VPN のトラストストアのパスワード。
- 型: password
- 重要度: 中
solace.ssl.validate.certificate
SSL 証明書を検証するかどうかを指定します。
- 型: boolean
- デフォルト: true
- 重要度: 中
JMS の詳細(JMS details)¶
jms.destination.name
メッセージの書き込み先である JMS 送信先の名前。
- 型: string
- 重要度: 高
jms.destination.type
JMS 送信先のタイプ。
- 型: string
- デフォルト: queue
- 重要度: 高
jms.forward.kafka.key
Kafka レコードキーを文字列に変換し、JMS メッセージのプロパティ JMSCorrelationID に転送します。
- 型: boolean
- デフォルト: false
- 重要度: 低
jms.forward.kafka.metadata
Kafka レコードのメタデータを JMS メッセージのプロパティに転送します。これには、レコードのトピック、パーティション、オフセットが含められます。
- 型: boolean
- デフォルト: false
- 重要度: 低
jms.forward.kafka.headers
Kafka レコードの各ヘッダーを JMS メッセージに string プロパティとして追加します。
- 型: boolean
- デフォルト: false
- 重要度: 低
JMS フォーマッター(JMS formatter)¶
jms.message.format
JMS メッセージ値のフォーマット。
- 型: string
- デフォルト: string
- 重要度: 高
character.encoding
メッセージの書き込み時に使用する文字エンコーディング。
- 型: string
- デフォルト: UTF-8
- 重要度: 低
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- 指定可能な値: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。