Redis Sink Connector for Confluent Platform¶
Kafka Connect Redis Sink Connector は、Apache Kafka® のトピックから Redis にデータをエクスポートするために使用されます。
機能¶
Kafka Connect Redis Sink Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
Redis Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
バイトと文字列のストレージ¶
このコネクターでは、Redis での raw 型バイトまたは文字列の(挿入としての)格納がサポートされています。Kafka レコードのキーと値が文字列としてシリアル化済みの場合は、このコネクターと StringConverter
を組み合わせて、レコードのキーと値を Redis に文字列として格納します。
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
それ以外の場合は、このコネクターを ByteArrayConverter
とともに使用して、Kafka レコードキーおよび値のバイナリシリアル化形式(JSON、Avro、文字列など)を Redis にバイト配列として格納します。これで、これらの値にアクセスするアプリケーションが、この情報を Redis から読み取り、使用可能な形式にバイトを逆シリアル化することができます。Kafka のデータが Redis で保持するフォーマットではない場合は、Single Message Transformation を使用してデータをバイト配列または文字列に変換してから Redis に書き込むことを検討してください。
削除¶
このコネクターでは、削除のみサポートされます。Kafka に格納されているレコードに null
値がある場合、このコネクターでは対応するキーを使用して削除コマンドが Redis に送信されます。
重要
このコネクターでは null ではないキーが想定されています。エクスポートされるトピックにおける各 Kafka レコードには、明示的なキーと値が必要です。
前提条件¶
Redis Sink Connector を実行するには、以下が必要です。
- Kafka ブローカー : Confluent Platform 3.3.0(以上)、または Kafka 0.11.0(以上)
- Connect: Confluent Platform 4.0(以上)、または Kafka 1.0(以上)
- Java 1.8
- Redis Sink Connector バージョン 0.0.2.8(以上)
- Redis v5.0.0(以降)。Redis のリリースサイクルに関するドキュメント を参照してください。
- Lettuce(Redis クライアントライブラリ)5.1.6.RELEASE
Redis Sink Connector のインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従ってインストールするか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
重要
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install jcustenborder/kafka-connect-redis:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install jcustenborder/kafka-connect-redis:0.0.2.8
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
Redis Sink Connector は、オープンソースコネクターであり、Confluent エンタープライズライセンスは不要です。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「Redis Sink Connector 構成プロパティ」を参照してください。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
クイックスタート¶
Redis をインストールします。
Redis 接続のリッスンを開始できるよう、Redis サーバーを起動します。これにより、Redis が(テスト目的専用の)デフォルトポート 6379、パスワードなしで起動されます。
redis-server
Redis CLI(
redis-cli
)を使用して、実行されたすべての挿入を表示します。インスタンスがこのクイックスタートテスト目的に限って使用されている場合は、MONITOR
コマンドを使用できます(下記の注を参照)。redis-cli MONITOR
重要
CLI コマンド
MONITOR
はデバッグ用コマンドで、Redis サーバーで処理された全コマンドがバックストリームされます。この機能はデータベースで何が起こっているか、理解する上で役に立ちます。ただし、使用するとパフォーマンスコストに影響します。本稼働環境ではこれを使用しないでください。コネクターをインストールします。詳細については、 インストールの手順 を参照してください。
Confluent Platform を起動します。
ちなみに
Confluent CLI 開発用コマンドのコマンド構文は、5.3.0 で変更されています。該当するコマンドは
confluent local
に移行されています。たとえば、confluent start
の構文は、confluent local services start
に変わりました。詳しくは、「confluent local」を参照してください。confluent local services start
重要
必ず、コネクターのインストール後に Confluent Platform を起動します。そうしない場合は、インストールを登録し、新しいコネクターの場所をパスに追加するために、Connect ワーカーを再起動する必要があります。
インストールされたコネクターが Confluent Platform によって認識されていることを確認します。
confluent local services connect plugin list
テストデータを Kafka の
users
トピックに 生成 します。echo key1,value1 | confluent local services kafka produce users --property parse.key=true --property key.separator=, echo key2,value2 | confluent local services kafka produce users --property parse.key=true --property key.separator=, echo key3,value3 | confluent local services kafka produce users --property parse.key=true --property key.separator=,
重要
このコネクターでは null ではないキーが想定されています。
parse.key
およびkey.separator
プロパティで、エクスポートされるレコードに明示的なキーと値があることが保証されます。以下のプロパティを使用して
redis-sink.properties
ファイルを作成します。name=kafka-connect-redis topics=users tasks.max=1 connector.class=com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
コネクターを起動します。
注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を含める必要があります。詳細については、こちらの投稿 を参照してください。confluent local services connect connector load kafka-connect-redis --config redis-sink.properties
コネクターのステータスが
RUNNING
であることを確認します。confluent local services connect connector status kafka-connect-redis
データが流れており、Kafka に挿入されているキーと値が目的の Redis インスタンスに送信されていることを確認します。
Confluent Platform をシャットダウンします。
confluent local destroy
redis-server
およびredis-cli
を停止します(Ctrl + C キーを使用)。
REST ベースの例¶
この構成は通常、 分散ワーカー と併せて使用します。以下の JSON を kafka-connect-redis.json
に書き込み、必要なすべての値を構成し、以下のコマンドを使用して、分散 Connect ワーカーのいずれかに構成をポストします。Kafka Connect の詳細については、「REST API」を参照してください。
{
"name" : "kafka-connect-redis",
"config" : {
"name" : "kafka-connect-redis",
"connector.class" : "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"topics" : "users",
"tasks.max" : "1",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.storage.StringConverter"
}
}
いずれかの Kafka Connect ワーカーに構成をポストするには、curl を使用します。http://localhost:8083/
を、Kafka Connect ワーカーのいずれかのエンドポイントに変更します。
curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-redis.json http://localhost:8083/connectors