Cassandra Sink Connector for Confluent Platform¶
Kafka Connect Cassandra Sink Connector は、データを Apache Cassandra に書き込むための高速メカニズムで、Cassandra 2.1、2.2、3.0 と互換性があります。
重要
Cassandra Sink Connector バージョン 2.0.0 には、バージョン 1.0.0、1.1.0、1.2.0 との後方互換性はありません。詳細については、「バージョン 2.0.x へのアップグレード」セクションを参照してください。
機能¶
Cassandra Sink Connector for Confluent Platform には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
Cassandra Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
Cassandra コネクターのインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従うか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
重要
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-cassandra:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-cassandra:2.0.0
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
このコネクターは、30 日間の試用期間の間は、ライセンスキーなしで使用できます。
30 日間経過後は、 Confluent エンタープライズライセンス を取得することで、このコネクターを利用できます。Confluent では、契約者に Confluent エンタープライズライセンス キーを発行します。併せて、Confluent Platform とコネクターに関する エンタープライズレベルのサポート を提供します。既にご契約されている場合は、詳細について Confluent サポート(support@confluent.io)にお問い合わせください。
ライセンスのプロパティについては、「Confluent Platform ライセンス」を参照してください。ライセンストピックの詳細については、「ライセンストピックの構成」を参照してください。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「Cassandra Sink Connector 構成プロパティ」を参照してください。
バージョン 2.0.x へのアップグレード¶
Cassandra Sink Connector バージョン 2.0.x には、バージョン 1.0.x、1.1.x、1.2.x、1.3.x との後方互換性のない変更が含まれています。要件に応じて、以下のいずれかのパスで新しいバージョンのコネクターにアップグレードできます。
コネクターで SSL も Kerberos も使用しない場合は、
cassandra.local.datacenter
プロパティを以下の手順に従って構成できます。cqlsh
ツールを使用して、cassandra.contact.points
で定義された Cassandra ノードのいずれかに接続します。以下の CQL コマンドを実行します。
SELECT data_center FROM system.local;
上の CQL コマンドで取得した値を使用して、
cassandra.local.datacenter
プロパティを構成します。注釈
cassandra.contact.points
プロパティのデフォルトはlocalhost
ではなくなったので、ローカル環境でこれを使用している場合は、明示的に設定します。
SSL を使用する場合
cassandra.ssl.provider
の構成を削除します。- 1 番目のアップグレードパスに従います。これによる SSL セットアップへの影響はありません。
Kerberos を使用する場合
KDC で Cassandra コンタクトポイントにアクセスするためのチケットを発行するために、KDC を指定できる
krb5.conf
が各 Connect ワーカーに構成されていることを確認します。コネクターのユーザープリンシパルをセットアップし、それに対応するキータブファイルを取得します。
cassandra.security
にKERBEROS
を構成し、connect.cassandra.principal
およびconnect.cassandra.keytab
を構成します。注釈
cassandra.kerberos.sasl.protocol
構成プロパティは、Cassandra サービスプリンシパルのサービス部分と一致する必要があります。要件に応じて、上で説明した 1 番目または 2 番目のアップグレードパスに従います。
Cassandra Sink Connector のバージョン 2.0.x で行われた具体的な各変更を確認するには、「更新履歴」を参照してください。
使用上の注意¶
このコネクターでは、トピックを使用して書き込み先のテーブルの名前を決定します。テーブル名は、RegexRouter などの変換を使用してトピック名を変更することにより、動的に変更できます。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
スキーマ管理¶
このコネクターを構成して、Cassandra クラスターのスキーマを管理することができます。既存のテーブルを変更する場合、キーは無視されます。これは、既存のテーブルのプライマリキーの変更に関連する問題の発生を回避するためです。キースキーマは、テーブルを作成する際にそのプライマリキーを生成するために使用されます。これらのフィールドは、値スキーマでも必要です。テーブルに書き込まれるデータは、常に Apache Kafka® の値から読み取られます。このコネクターでは、トピックを使用して書き込み先のテーブルの名前を決定します。テーブル名は、変換を使用してトピック名を変更することにより、オンザフライで変更できます。
Time To Live(TTL)サポート¶
このコネクターでは、TTL のサポートを提供します。TTL では、一定期間が経過したデータを自動的に期限切れにすることができます。TTL
値は、データの有効期間の値です。指定期間が経過すると、データは自動的に削除されます。たとえば、TTL 値に 100 秒を設定した場合、データは 100 秒後に自動的に削除されます。この機能を使用するには、データの保持期間(秒)を cassandra.ttl
構成に指定する必要があります。このプロパティを指定しない場合、レコードは、デフォルトの TTL 値の null で挿入されます。この場合、書き込まれたデータは期限切れにはなりません。
Kafka でのオフセットトラッキングサポート¶
Cassandra Sink Connector には、以下の 2 種類のオフセットトラッキングサポートが用意されています。
オフセットを Cassandra テーブルに保存¶
これは、このコネクターのデフォルトの動作です。この場合、オフセットは Cassandra テーブルに保存されます。
オフセットを Kafka に保存¶
オフセットを Kafka で管理する必要がある場合は、cassandra.offset.storage.table.enable=false
を指定する必要があります。デフォルトでは、このプロパティは true です(この場合、オフセットは Cassandra テーブルに保存されます)。
トラブルシューティング¶
次のようなエラーメッセージが表示されることがあります。
Batch for [test.twitter] is of size 127.661KiB, exceeding specified threshold of 50.000KiB by 77.661KiB
また、次のような警告が表示されることがあります。
Batch for [test.twitter] is of size 25.885KiB, exceeding specified threshold of 5.000KiB by 20.885KiB
このような場合は、Kafka Connect の worker.properties の consumer.max.poll.records
設定を調整してください。
サンプル¶
アップサートモード¶
この例では、Cassandra にデータを書き込む際に、アップサートを使用するようにコネクターを構成します。
Kafka Connect のデプロイ状況に基づいて、以下のいずれかの構成方法を選択します。分散モードでは JSON および REST の例を使用します。スタンドアロンモードでは、プロパティをべースにした例を使用します。
分散モード JSON¶
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"cassandra.contact.points" : "cassandra",
"cassandra.keyspace" : "test",
"cassandra.write.mode" : "Update"
}
}
スタンドアロンモードのプロパティ¶
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.write.mode=Update
標準¶
この例では、認証なしで Apache Cassandra インスタンスに接続します。
Kafka Connect のデプロイ状況に基づいて、以下のいずれかの構成方法を選択します。分散モードでは JSON および REST の例を使用します。スタンドアロンモードでは、プロパティをべースにした例を使用します。
分散モード JSON¶
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"cassandra.contact.points" : "cassandra",
"cassandra.keyspace" : "test"
}
}
スタンドアロンモードのプロパティ¶
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
SSL および認証¶
この例では、SSL とユーザー名およびパスワードによる認証で Apache Cassandra インスタンスに接続します。
Kafka Connect のデプロイ状況に基づいて、以下のいずれかの構成方法を選択します。分散モードでは JSON および REST の例を使用します。スタンドアロンモードでは、プロパティをべースにした例を使用します。
分散モード JSON¶
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max": "1",
"topics": "topic1,topic2,topic3",
"cassandra.contact.points": "cassandra",
"cassandra.keyspace": "test",
"cassandra.ssl.enabled": "true",
"cassandra.security": "PASSWORD",
"cassandra.username": "example",
"cassandra.password": "password",
"cassandra.ssl.truststore.password": "password",
"cassandra.ssl.truststore.path": "path_to_keystore"
}
}
スタンドアロンモードのプロパティ¶
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.security=PASSWORD
cassandra.username=example
cassandra.password=password
cassandra.ssl.enabled=true
cassandra.ssl.truststore.password=password
cassandra.ssl.truststore.path=path_to_keystore