重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Redis Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Redis Sink Connector for Confluent Platform」を参照してください。

Kafka Connect Redis Sink Connector for Confluent Cloud は、データを Apache Kafka® のトピックから Redis へエクスポートするために使用されます。このコネクターは Redis Enterprise Cloud、Azure Cache for Redis、および Amazon ElastiCache for Redis で機能します。

機能

  • 少なくとも 1 回のデリバリー: レコードが少なくとも 1 回は配信されることが保証されます。
  • 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。
  • SSL のサポート: 一方向 SSL をサポートします。
  • 削除: コネクターでは "削除" がサポートされています。Kafka に格納されているレコードに null 値がある場合、このコネクターは対応するキーを使用して削除メッセージを Redis に送信します。このコネクターでは常に null ではないキーが想定されています。
  • サポートされている入力データフォーマット: このコネクターでは、Redis での raw 型バイトまたは文字列の(挿入としての)格納がサポートされています。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud Redis Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、Redis にイベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • アマゾンウェブサービス ( AWS )、Microsoft Azure ( Azure )、または Google Cloud Platform ( GCP )上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • 以下のいずれかの Redis マネージドサービスにアクセスします。
    • Redis Enterprise Cloud
    • Azure Cache for Redis
    • Amazon ElastiCache for Redis。このサービスを使用するためには、Kafka クラスターを VPC ピアリング接続する必要があります。
  • Redis 認証情報、ホスト名、およびデータベースインデックス名。
  • Redis インスタンスは Confluent Cloud クラスターと同じリージョンに存在している必要があります。
  • ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成する。サービスアカウントのドキュメント で、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Redis Sink connector card.

Redis Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。

Add Redis Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: Redis で結果を確認します。

データが Redis インスタンスに取り込まれていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

Confluent CLI の使用

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe RedisSink

出力例:

Following are the required configs:
connector.class: RedisSink
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
redis.hostname
redis.portnumber
tasks.max
topics

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティとオプションのプロパティを示しています。

{
  "name": "RedisSinkConnector_0",
  "config": {
    "topics": "pageviews",
    "connector.class": "RedisSink",
    "name": "RedisSinkConnector_0",
    "input.data.format": "BYTES",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "redis.hostname": "test.redis.cache.windows.net",
    "redis.portnumber": "6380",
    "redis.database": "1",
    "redis.password": "********************************************",
    "redis.ssl.mode": "enabled",
    "tasks.max": "1"
  }
}

以下のプロパティ定義に注意してください。

  • "name": 新しいコネクターの名前を設定します。
  • "connector.class": コネクターのプラグイン名を指定します。
  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
  • "input.data.format": Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは BYTES または STRING です。Kafka トピックで、JSON または Avro のようなスキーマベースのフォーマットが使用されている場合は、BYTES を選択する必要があります。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "redis.ssl.mode": 接続の SSL モードを設定します。指定可能なオプションは enableddisabledserver、および server+client です。デフォルトは disabled (無効)です。このプロパティを enabled (有効)に設定すると、コネクターは SSL を使用して接続を確立します。server (サーバー)を選択すると、コネクターはトラストストアを使用します。server+client (サーバーとクライアント)を選択した場合、コネクターは有効なキーペアおよび関連付けられた証明書を使ってトラストストアとキーストアの両方を使用します。

    • "redis.ssl.mode": "server": SSL モードで server を使用する場合、コネクターは接続を保護するためにトラストストアの CA 証明書を使用します。2 つのプロパティ redis.ssl.trustore.file および redis.ssl.trustore.password を追加します。トラストストアファイルはバイナリファイルです。redis.ssl.trustore.file プロパティの場合、バイナリトラストファイルを base64 でエンコードし、エンコードした文字列を取得してから data:text/plain;base64 プレフィックスを追加した後、その文字列全体をプロパティのエントリとして指定します。たとえば、"redis.ssl.trustore.file" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...==" のように指定します。
    • "redis.ssl.mode": "server+client": SSL モードで server+client を使用する場合、コネクターでは接続を保護するためにトラストストアの CA 証明書と追加のキーストアを使用します。4 つのプロパティ、redis.ssl.trustore.fileredis.ssl.trustore.passwordredis.ssl.keystore.file、および redis.ssl.keystore.password を追加します。トラストストアおよびキーストアファイルはバイナリファイルです。redis.ssl.trustore.file および redis.ssl.keystore.file プロパティの場合、バイナリトラストストアおよびキーストアファイルを base64 でエンコードし、エンコードした文字列を取得してから data:text/plain;base64 プレフィックスを追加した後、その文字列全体をプロパティのエントリとして指定します。たとえば、"redis.ssl.keystore.file" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...==" のように指定します。
  • "tasks.max": コネクターが使用する タスク の最大数。タスクが多いほどパフォーマンスが向上する可能性があります。

すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。

ステップ 4: 構成ファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config redis-sink-config.json

出力例:

Created connector RedisSinkConnector_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |       Name            | Status  | Type
+-----------+-----------------------+---------+------+
lcc-ix4dl   | RedisSinkConnector_0  | RUNNING | sink

ステップ 6: Redis で結果を確認します。

データが Redis インスタンスに取り込まれていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。BYTES では、Kafka でシリアル化された状態のキーが渡され、STRING では、レコード値に対して UTF-8 エンコードが強制的に使用されます。

  • 型: string
  • 重要度: 高
input.key.format

Kafka 入力レコードキーのフォーマットを設定します。BYTES では、Kafka でシリアル化された状態のキーが渡され、STRING では、キーに対して UTF-8 エンコードが強制的に使用されます。

  • 型: string
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

Redis サーバーへの接続方法(How should we connect to your Redis server?)

redis.hostname

Redis サーバーのホスト名。

  • 型: string
  • 重要度: 高
redis.portnumber

Redis サーバーのポート番号。

  • 型: int
  • 重要度: 高
redis.client.mode

Redis サーバーを 1 つまたは複数のノードで実行するかどうか。

  • 型: string
  • デフォルト: Standalone
  • 重要度: 高
redis.database

書き込み先のデータベースインデックス。

  • 型: int
  • デフォルト: 0
  • 重要度: 高
redis.password

Redis サーバーのパスワード。

  • 型: password
  • 重要度: 中

SSL の構成(SSL configuration)

redis.ssl.mode

Redis サーバーを保護する方法。「server」では、サーバー CA 証明書が含まれるトラストストアが必要です。「server+client」ではさらに、有効な証明書とキーのペアが含まれるキーストアが必要です。

  • 型: string
  • デフォルト: disabled
  • 重要度: 中
redis.ssl.keystore.file

サーバー証明書が含まれるキーストア。

  • 型: password
  • 重要度: 低
redis.ssl.keystore.password

キーストアファイルのストアパスワード。

  • 型: password
  • 重要度: 低
redis.ssl.truststore.file

サーバー CA 証明書情報が含まれるトラストストア。

  • 型: password
  • 重要度: 低
redis.ssl.truststore.password

サーバー CA 証明書情報が含まれるトラストストアのパスワード。

  • 型: password
  • 重要度: 低

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png