重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Solace Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Solace Sink Connector for Confluent Platform」を参照してください。

Kafka Connect Solace Sink Connector for Confluent Cloud は、データを Apache Kafka® から Solace PubSub+ Event Broker クラスターにエクスポートするために使用されます。

機能

  • 少なくとも 1 回のデリバリー: コネクターによって、レコードが少なくとも 1 回は配信されることが保証されます。
  • 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。
  • Input data formats: このコネクターは、Avro、JSON スキーマ、Protobuf、JSON(スキーマレス)、文字列、Bytes の入力データフォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud Solace Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、Solace にイベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • Solace アカウントの認証情報、および Solace ブローカーの接続の詳細情報(ホスト、VPN(必要な場合)、JMS 送信先名など)。
  • ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Solace Sink connector card.

Solace Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク ( * ) は必須項目であることを示しています。

Add Solace Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: Solace で結果を確認します。

Solace イベントブローカーにメッセージがストリーミングされていることを検証します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

Confluent CLI の使用

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe SolaceSink

出力例:

Following are the required configs:
connector.class: SolaceSink
name
input.data.format
kafka.auth.mode
kafka.api.key
kafka.api.secret
solace.host
solace.username
solace.password
solace.dynamic.durables
jms.destination.name
tasks.max
topics

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。その他の構成プロパティの値と詳細については、「構成プロパティ」を参照してください。

{
  "name": "SolaceConnector_0",
  "config": {
    "topics": "pageviews",
    "connector.class": "SolaceSink",
    "name": "SolaceSinkConnector_0",
    "input.data.format": "STRING",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "solace.host": "tcps://mr83451.messaging.solace.cloud:55443",
    "solace.username": "<username>",
    "solace.password": "<password>",
    "solace.dynamic.durables": "true",
    "jms.destination.name": "<destination-name>",
    "tasks.max": "1"
  }
}

以下のプロパティ定義に注意してください。

  • "name": 新しいコネクターの名前を設定します。
  • "connector.class": コネクターのプラグイン名を指定します。
  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
  • "input.data.format": Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVROJSON_SRPROTOBUFSTRINGJSON (スキーマレス)、BYTES です。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用する場合は、Schema Registry を構成しておく必要があります。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "solace.<...>": 接続の詳細情報を入力します。solace host: 接続先メッセージブローカーの IP またはホスト名とポート(省略可能)を指定します。非 SSL ホストエントリで、ポートを指定せず圧縮も使用しない場合に使用されるデフォルトのポート番号は、55555 です。圧縮を使用する場合のデフォルトポートは 55003 です。SSL 接続(圧縮ありなし問わず)のデフォルトポートは 55443 です。SSL ホストエントリは、以下の例のようになります(有効なスキーム: tcp、tcps、smf、または smfs)。詳しくは、『Data Connection Properties』を参照してください。

  • "solace.dynamic.durables": キューまたはトピックのエンドポイント(永続サブスクリプション名のサポートに使用される)がメッセージブローカーで作成されるかどうかを指定します。queue を送信先とする場合で、キューがまだ存在しないときは、このプロパティを true に設定します。キューが存在する場合は、このプロパティを false に設定します。このプロパティを誤って設定すると、コネクターでエラーが発生します。

  • "jms.destination.name": メッセージの送信先とする queue または topic の名前を入力します。

  • "tasks.max": コネクターが使用する タスク の最大数。タスクが多いほどパフォーマンスが向上する可能性があります。

Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。このコネクターでサポートされていない SMT のリストについては、「サポートされない変換」を参照してください。

すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。

ステップ 4: 構成ファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config solace-sink-config.json

出力例:

Created connector SolaceSinkConnector_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |       Name             | Status  | Type
+-----------+------------------------+---------+------+
lcc-ix4dl   | SolaceSinkConnector_0  | RUNNING | sink

ステップ 6: Solace で結果を確認します。

Solace イベントブローカーにメッセージがストリーミングされていることを検証します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、STRING、BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

Solace クラスターへの接続方法(How should we connect to your Solace cluster?)

solace.host

接続先メッセージブローカーの IP またはホスト名とポート(省略可)。ポートを指定しない場合、デフォルトのポート番号は、55555(圧縮未使用時)または 55003(圧縮使用時)です。

  • 型: string
  • 重要度: 高
solace.username

Solace での認証に使用するユーザー名。

  • 型: string
  • 重要度: 高
solace.password

Solace での認証に使用するパスワード。

  • 型: password
  • 重要度: 高
solace.vpn

Solace メッセージブローカーに接続する際に使用するメッセージ VPN。

  • 型: string
  • 重要度: 中

接続の詳細(Connection details)

solace.compression.level

Solace に書き込まれるメッセージの ZLIB 圧縮レベル。圧縮レベルに指定可能な値は -1 から 9 で、-1 を指定すると、JNDI 接続の圧縮レベルが使用されます。0 は圧縮なしです。1 から 9 を指定すると、データ圧縮が有効になります(1 では圧縮率が最も低く、データスループットは最も高速になります。9 では圧縮率が最も高く、データスループットは最も低速になります)。

  • 型: int
  • デフォルト: -1
  • 指定可能な値: [-1,...,9]
  • 重要度: 中
solace.dynamic.durables

キューまたはトピックのエンドポイント(永続サブスクリプション名のサポートに使用される)がメッセージブローカーで作成されるかどうかを指定します。queue を送信先とする場合で、キューがまだ存在しないときは、このプロパティを true に設定します。それ以外では false にします。このプロパティを誤って設定すると、コネクターがエラーになります。

  • 型: boolean
  • 重要度: 低
solace.client.description

データ接続用のメッセージブローカーのアプリケーション説明。

  • 型: string
  • デフォルト: Kafka Connect
  • 重要度: 低

Solace へのセキュアな接続(Solace secure connection)

solace.ssl.keystore.file

Solace 用の SSL 対応 VPN のキーストア。

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 中
solace.ssl.keystore.password

Solace 用の SSL 対応 VPN のキーストアのパスワード。

  • 型: password
  • 重要度: 中
solace.ssl.truststore.file

Solace の SSL 対応 VPN 用のサーバー CA 証明書情報を含むトラストストア。

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 中
solace.ssl.truststore.password

Solace 用の SSL 対応 VPN のトラストストアのパスワード。

  • 型: password
  • 重要度: 中
solace.ssl.validate.certificate

SSL 証明書を検証するかどうかを指定します。

  • 型: boolean
  • デフォルト: true
  • 重要度: 中

JMS の詳細(JMS details)

jms.destination.name

メッセージの書き込み先である JMS 送信先の名前。

  • 型: string
  • 重要度: 高
jms.destination.type

JMS 送信先のタイプ。

  • 型: string
  • デフォルト: queue
  • 重要度: 高
jms.forward.kafka.key

Kafka レコードキーを文字列に変換し、JMS メッセージのプロパティ JMSCorrelationID に転送します。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低
jms.forward.kafka.metadata

Kafka レコードのメタデータを JMS メッセージのプロパティに転送します。これには、レコードのトピック、パーティション、オフセットが含められます。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低
jms.forward.kafka.headers

Kafka レコードの各ヘッダーを JMS メッセージに string プロパティとして追加します。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低

JMS フォーマッター(JMS formatter)

jms.message.format

JMS メッセージ値のフォーマット。

  • 型: string
  • デフォルト: string
  • 重要度: 高
character.encoding

メッセージの書き込み時に使用する文字エンコーディング。

  • 型: string
  • デフォルト: UTF-8
  • 重要度: 低

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png