Solace Sink Connector for Confluent Cloud¶
注釈
If you are installing the connector locally for Confluent Platform, see Solace Sink Connector for Confluent Platform.
Kafka Connect Solace Sink Connector for Confluent Cloud は、データを Apache Kafka® から Solace PubSub+ Event Broker クラスターにエクスポートするために使用されます。
機能¶
- 少なくとも 1 回のデリバリー: コネクターによって、レコードが少なくとも 1 回は配信されることが保証されます。
- 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。
- Input data formats: このコネクターは、Avro、JSON スキーマ、Protobuf、JSON(スキーマレス)、文字列、Bytes の入力データフォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
構成プロパティの値と詳細については、「構成プロパティ」を参照してください。Cloud コネクターの制限事項 も参照してください。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud Solace Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、Solace にイベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
- アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
- Solace アカウントの認証情報、および Solace ブローカーの接続の詳細情報(ホスト、VPN(必要な場合)、JMS 送信先名など)。
- ネットワークに関する考慮事項については、「ネットワークアクセス」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
- Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
- 既存の サービスアカウント のリソース ID を入力する。
- コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
- Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
Step 4: Enter the connector details.¶
注釈
- Ensure you have all your prerequisites completed.
- アスタリスク ( * ) は必須項目であることを示しています。
At the Add Solace Sink Connector screen, complete the following:
If you've already populated your Kafka topics, select the topic(s) you want to connect from the Topics list.
To create a new topic, click +Add new topic.
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
- Enter your Solace connection details:
- Solace host: IP or hostname and port (optional) of the message broker to connect to. If a port is not specified, the default port number is 55555 when compression is not in use, or 55003 when compression is in use.
- Solace username: Username to authenticate with Solace.
- Solace password: Password to authenticate with Solace.
- Message VPN: Message VPN to use when connecting to the Solace message broker.
- SSL Keystore: Keystore of SSL-enabled VPN for Solace.
- SSL Keystore Password: Keystore password for SSL-enabled VPN for Solace.
- SSL Truststore: Truststore containing server CA certificate for SSL-enabled VPN for Solace.
- SSL Truststore Password: Truststore password for SSL-enabled VPN for Solace.
- Validate SSL Certificate: Whether to validate the SSL certificates.
- Click Continue.
注釈
Configuration properties that are not shown in the Cloud Console use the default values. See 構成プロパティ for all property values and descriptions.
Select the Input Kafka record value format (data coming from the Kafka topic): AVRO, JSON_SR (JSON Schema), PROTOBUF, or JSON (schemaless), STRING, or BYTES. Schema Registry must be enabled to use a スキーマレジストリ-based format (for example, AVRO, JSON_SR (JSON Schema), or PROTOBUF). See 環境の制限 for additional information.
For Dynamic Durables, select whether the connector creates queues or topic endpoints (which are used to support durable subscription names) on the message broker. For a
queue
destination, set this property totrue
, if the queue doesn't exist already. If the queue does exist, set the property tofalse
. The connector will fail if you set this property incorrectly.Enter JMS Destination Name, the name of the JMS destination that messages are written to.
Show advanced configurations
Compression Level: Sets the ZLIB compression level for messages sent to Solace. The default value is
-1
, which specifies that the connector uses the JNDI connection compression level.0
specifies that the connector uses no compression. Levels-1
through9
enable data compression; where1
offers the least amount of compression and fastest data throughput, and9
offers the most compression and slowest data throughput.Client Description: Application description on the message broker for the data connection. Defaults to Kafka Connect.
JMS Destination Type: The type of JMS destination. The default value is
queue
.Forward Kafka Record Key: Whether to convert Kafka record key to a string and forward it on to the JMS Message property
JMSCorrelationID
. Default value isfalse
.Forward Kafka Record Metadata: Whether to forward Kafka record metadata on the JMS Message properties. Defaults to
false
.Forward Kafka Record Headers: Whether to add the Kafka record headers to the JMS Message as string properties. Defaults to
false
.JMS Message Format: The format for JMS message values. The default value is
string
.Character Encoding: The character encoding to use while writing messages. The default is
UTF-8
.
For information about transforms and predicates, see the Single Message Transforms (SMT) documentation for details. See サポートされない変換 for a list of SMTs that are not supported with this connector.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of recommended tasks, enter the number of tasks for the connector to use in the Tasks field.
- Click Continue.
Verify the connection details.
Click Launch.
コネクターのステータスが Provisioning から Running に変わります。
Step 5: Check the results in Solace.¶
Solace イベントブローカーにメッセージがストリーミングされていることを検証します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
Confluent CLI の使用¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe SolaceSink
出力例:
Following are the required configs:
connector.class: SolaceSink
name
input.data.format
kafka.auth.mode
kafka.api.key
kafka.api.secret
solace.host
solace.username
solace.password
solace.dynamic.durables
jms.destination.name
tasks.max
topics
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。その他の構成プロパティの値と詳細については、「構成プロパティ」を参照してください。
{
"name": "SolaceConnector_0",
"config": {
"topics": "pageviews",
"connector.class": "SolaceSink",
"name": "SolaceSinkConnector_0",
"input.data.format": "STRING",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"solace.host": "tcps://mr83451.messaging.solace.cloud:55443",
"solace.username": "<username>",
"solace.password": "<password>",
"solace.dynamic.durables": "true",
"jms.destination.name": "<destination-name>",
"tasks.max": "1"
}
}
以下のプロパティ定義に注意してください。
"name"
: 新しいコネクターの名前を設定します。"connector.class"
: コネクターのプラグイン名を指定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、STRING、JSON (スキーマレス)、BYTES です。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用する場合は、Schema Registry を構成しておく必要があります。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"solace.<...>"
: 接続の詳細情報を入力します。solace host
: 接続先メッセージブローカーの IP またはホスト名とポート(省略可能)を指定します。非 SSL ホストエントリで、ポートを指定せず圧縮も使用しない場合に使用されるデフォルトのポート番号は、55555
です。圧縮を使用する場合のデフォルトポートは55003
です。SSL 接続(圧縮ありなし問わず)のデフォルトポートは55443
です。SSL ホストエントリは、以下の例のようになります(有効なスキーム: tcp、tcps、smf、または smfs)。詳しくは、『Data Connection Properties』を参照してください。"solace.dynamic.durables"
: キューまたはトピックのエンドポイント(永続サブスクリプション名のサポートに使用される)がメッセージブローカーで作成されるかどうかを指定します。queue
を送信先とする場合で、キューがまだ存在しないときは、このプロパティをtrue
に設定します。キューが存在する場合は、このプロパティをfalse
に設定します。このプロパティを誤って設定すると、コネクターでエラーが発生します。"jms.destination.name"
: メッセージの送信先とするqueue
またはtopic
の名前を入力します。"tasks.max"
: コネクターが使用する タスク の最大数。タスクが多いほどパフォーマンスが向上する可能性があります。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。このコネクターでサポートされていない SMT のリストについては、「サポートされない変換」を参照してください。
See 構成プロパティ for all property values and descriptions.
ステップ 4: 構成ファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config solace-sink-config.json
出力例:
Created connector SolaceSinkConnector_0 lcc-ix4dl
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type
+-----------+------------------------+---------+------+
lcc-ix4dl | SolaceSinkConnector_0 | RUNNING | sink
ステップ 6: Solace で結果を確認します。¶
Solace イベントブローカーにメッセージがストリーミングされていることを検証します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
構成プロパティ¶
Use the following configuration properties with this connector.
How should we connect to your data?¶
name
Sets a name for your connector.
- 型: string
- Valid Values: A string at most 64 characters long
- 重要度: 高
Which topics do you want to get data from?¶
topics
Identifies the topic name or a comma-separated list of topic names.
- Type: list
- 重要度: 高
Input messages¶
input.data.format
Sets the input Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, STRING or BYTES. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- 型: string
- 重要度: 高
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- 型: string
- Default: KAFKA_API_KEY
- Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
How should we connect to your Solace cluster?¶
solace.host
IP or hostname and port (optional) of the message broker to connect to. If a port is not specified, the default port number is 55555 when compression is not in use, or 55003 when compression is in use.
- 型: string
- 重要度: 高
solace.username
Username to authenticate with Solace.
- 型: string
- 重要度: 高
solace.password
Password to authenticate with Solace.
- 型: password
- 重要度: 高
solace.vpn
Message VPN to use when connecting to the Solace message broker.
- 型: string
- 重要度: 中
Connection details¶
solace.compression.level
ZLIB compression level for messages written to Solace. Valid values for the compression level are -1 through 9. -1 means use the JNDI connection’s compression level. 0 means use no compression. 1 through 9 enables data compression (where 1 offers the least amount of compression and fastest data throughput, and 9 offers the most compression and slowest data throughput)
- 型: int
- Default: -1
- 指定可能な値: [-1,...,9]
- 重要度: 中
solace.dynamic.durables
Whether queues or topic endpoints (which are used to support durable subscription names), are to be created on the message broker. In case of queue destination, set this property to true if the queue doesn't exist already, false otherwise. Setting it incorrectly will fail the connector.
- 型: boolean
- 重要度: 低
solace.client.description
Application description on the message broker for the data connection.
- 型: string
- Default: Kafka Connect
- 重要度: 低
Solace secure connection¶
solace.ssl.keystore.file
Keystore of SSL-enabled VPN for Solace.
- 型: password
- Default: [hidden]
- 重要度: 中
solace.ssl.keystore.password
Keystore password for SSL-enabled VPN for Solace.
- 型: password
- 重要度: 中
solace.ssl.truststore.file
Truststore containing server CA certificate for SSL-enabled VPN for Solace.
- 型: password
- Default: [hidden]
- 重要度: 中
solace.ssl.truststore.password
Truststore password for SSL-enabled VPN for Solace.
- 型: password
- 重要度: 中
solace.ssl.validate.certificate
Whether to validate the SSL certificates.
- 型: boolean
- Default: true
- 重要度: 中
JMS details¶
jms.destination.name
メッセージの書き込み先である JMS 送信先の名前。
- 型: string
- 重要度: 高
jms.destination.type
JMS 送信先のタイプ。
- 型: string
- Default: queue
- 重要度: 高
jms.forward.kafka.key
Convert the Kafka record key to a string and forward it on the JMS Message property JMSCorrelationID.
- 型: boolean
- Default: false
- 重要度: 低
jms.forward.kafka.metadata
Forward the Kafka record metadata on the JMS Message properties. This includes the record topic, partition, and offset.
- 型: boolean
- Default: false
- 重要度: 低
jms.forward.kafka.headers
Add the Kafka record headers to the JMS Message as string properties.
- 型: boolean
- Default: false
- 重要度: 低
JMS formatter¶
jms.message.format
The format of JMS message values.
- 型: string
- デフォルト: string
- 重要度: 高
character.encoding
The character encoding to use while writing the message.
- 型: string
- Default: UTF-8
- 重要度: 低
Number of tasks for this connector¶
tasks.max
- 型: int
- Valid Values: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。