Google Cloud Spanner Sink Connector for Confluent Platform¶
注釈
Confluent Cloud を使用している場合は、「Google Cloud Spanner Sink Connector for Confluent Cloud」でクラウドのクイックスタートを参照してください。
Google Cloud Spanner Sink Connector では、データを Apache Kafka® から Google Cloud Spanner データベースに移動します。コネクターにより、Kafka 内のトピックのデータが、指定された Spanner データベースのテーブルに書き込まれます。テーブルの自動作成と、制限付きの自動進化もサポートされています。
機能¶
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
Google Cloud Spanner Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
データマッピング¶
シンクコネクターは、スキーマを認識していないので、スキーマレジストリ に付属の Avro コンバーター、またはスキーマが有効になっている JSON コンバーターなどの適切なコンバーターを使用する必要があります。Kafka レコードキーが存在する場合は、プリミティブ型または Connect 構造体にできます。レコード値は Connect 構造体にする必要があります。トピック内のデータのフォーマットと互換性がない場合は、カスタムの Converter
の実装が必要になることがあります。
データ型¶
Kafka レコード型から Spanner のデータ型 へのマッピングについて、以下で説明します。
Kafka レコード型 | Spanner の型 |
---|---|
INT8、INT16、INT32、INT64 | INT64 |
FLOAT32、FLOAT64 | FLOAT64 |
BOOLEAN | BOOL |
STRING | STRING(MAX) |
BYTES | BYTES(MAX) |
DATE | DATE |
TIMESTAMP | TIMESTAMP |
ARRAY<INT8>、ARRAY<INT16>、ARRAY<INT32>、ARRAY<INT64> | ARRAY<INT64> |
ARRAY<FLOAT32>、ARRAY<FLOAT64> | ARRAY<FLOAT64> |
ARRAY<BOOLEAN> | ARRAY<BOOL> |
ARRAY<STRING> | ARRAY<STRING(MAX)> |
サポートなし | ARRAY<BYTES(MAX)> |
サポートなし | ARRAY<DATE> |
サポートなし | ARRAY<TIMESTAMP> |
MAP | サポートなし |
STRUCT | サポートなし |
自動作成と自動進化¶
auto.create
を有効にすると、送信先テーブルが存在しない場合に、コネクターでテーブルを作成することができます。コネクターは、テーブル定義のベースとしてレコードスキーマを使用します。したがって、作成は、レコードがトピックから消費される際にオンラインで実行されます。
auto.evolve
を有効にすると、レコードに列が存在しない場合に、コネクターでは制限付きで自動進化を実行することができます。データ型の変更と列の削除は障害につながる可能性があるので、その種の進化がコネクターによってテーブルで実行されることはありません。
重要
テーブルスキーマ進化の後方互換性を維持するため、レコードスキーマの新規フィールドは、省略可能にするか、デフォルト値を設定する必要があります。
前提条件¶
Kafka Connect GCP Spanner Sink Connector を実行するには、以下が必要です。
- Confluent Platform 4.0.0 以上、または Kafka 1.0.0 以上
- Java 1.8
- このコネクターには、少なくとも
roles/spanner.databaseUser
が必要です。「Cloud Spanner のアクセス制御 」を参照してください。
制限¶
- Cloud Spanner には テーブルサイズ と クエリの制限 があり、コネクターに適用されます。
- インスタンスの パフォーマンスの制限 がコネクターに適用されます。
- コネクターでは、インターリーブされたテーブルの作成 はサポートされていません。
auto.evolve
を有効にしている場合にデフォルト値を持つ新規列が追加された際は、新規レコードでのみ、そのデフォルト値が使用されます。既存のレコードでは、新しい列の値は<NULL>
となります。- 現在、コネクターではトピック名を変更する Single Message Transformations(SMT) をサポートしていません。また、以下の変換は許可されていません。
io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.outbox.EventRouter
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
Cloud Spanner コネクターのインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従うか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
重要
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-gcp-spanner:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-gcp-spanner:1.0.1
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
このコネクターは、ライセンスキーがなくても 30 日間試用できます。
30 日間経過後は、コネクターのサブスクリプションを購入する必要があります。これには、サブスクライバー用の Confluent エンタープライズライセンス キーと、Confluent Platform およびコネクターに対する `エンタープライズレベルのサポート<https://www.confluent.io/subscription/>`__ が含まれています。サブスクリプションをご購入済みの場合、詳細については Confluent サポート(support@confluent.io)にお問い合わせください。
ライセンスのプロパティについては、「Confluent Platform ライセンス」を参照してください。ライセンストピックの詳細については、「ライセンストピックの構成」を参照してください。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「Google Cloud Spanner Sink Connector 構成プロパティ」を参照してください。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
クイックスタート¶
このクイックスタートでは、Spanner Sink Connector を使用して、Avro コンソールプロデューサーで生成されたデータを Spanner インスタンスのデータベースにエクスポートします。
前提条件¶
Cloud Spanner の前提条件¶
- Google Cloud Platform(GCP)アカウント
- GCP プロジェクト、およびその請求が有効になっていること。手順については、『Console を使用したクイックスタート』を参照してください。
Confluent の前提条件¶
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
認証情報のセットアップ¶
GCP プロジェクトにサービスアカウントおよびサービスアカウントキーを作成します。
- GCP Console で IAM と管理 ページを開きます。
- 使用するプロジェクトを選択し、続行 をクリックします。
- 左側のナビゲーションで、サービスアカウント をクリックします。
- 上部のツールバーで、サービスアカウントの作成 をクリックします。
- サービスアカウントの名前と説明(
test-service-account
など)を入力します。 - 作成 をクリックし、次のページで、
Cloud Spanner
の下のCloud Spanner データベース管理者
ロールを選択します。 - 次のページで、キーを作成 をクリックして、JSON ファイルをダウンロードします。
- このクイックスタート用に、ファイルを自分の
$home
ディレクトリに保存し、spanner-test-credentials.json
という名前を付けます。
サービスアカウントキーの作成の詳細については、『 サービスアカウントキーの作成と管理 』を参照してください。
Spanner インスタンスとデータベースの作成¶
- Console を使用して、Spanner に
test-instance
という名前のテストインスタンスを作成します。手順については、『 Console を使用したクイックスタート 』を参照してください。 test-instance
の下に、example-db
という名前のデータベースを作成します。手順については、『Console を使用したクイックスタート』を参照してください。
コネクターのインストールと読み込み¶
Confluent Hub クライアント を使用してコネクターをインストールします。
# run from your CP installation directory confluent-hub install confluentinc/kafka-connect-gcp-spanner:latest
ちなみに
デフォルトで、プラグインが
share/confluent-hub-components
にインストールされ、このディレクトリがプラグインパスに追加されます。新しいコネクタープラグインを追加した場合は、Kafka Connect の再起動が必要です。Connect を再起動するには、Confluent CLI を使用します。
ちなみに
Confluent CLI 開発用コマンドのコマンド構文は、5.3.0 で変更されました。該当するコマンドは
confluent local
に移行されています。たとえば、confluent start
の構文は、confluent local services start
に変わりました。詳しくは、「confluent local」を参照してください。confluent local services connect stop && confluent local services connect start
spanner-sink.properties
ファイルを以下のプロパティで追加して、コネクターを構成します。name=SpannerSinkConnector topics=products tasks.max=1 connector.class=io.confluent.connect.gcp.spanner.SpannerSinkConnector gcp.spanner.credentials.path=$home/spanner-test-credentials.json gcp.spanner.instance.id=test-instance gcp.spanner.database.id=example-db auto.create=true table.name.format=kafka_${topic} # The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses. # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster, # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations. confluent.license= confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
注釈
$home
は、自分のホームディレクトリのパス、または認証情報ファイルを保存した場所のパスと置き換えてください。以下のコマンドを使用してコネクターの構成を読み込み、Spanner Sink Connector を起動します。
confluent local services connect connector load spanner --config spanner-sink.properties
このコマンドの出力は次のようになります。
{ "name": "spanner", "config": { "topics": "products", "tasks.max": "1", "connector.class": "io.confluent.connect.gcp.spanner.SpannerSinkConnector", "gcp.spanner.credentials.path": "$home/spanner-test-credentials.json", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "example-db", "auto.create": "true", "table.name.format": "kafka_${topic}", "confluent.license": "", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "name": "spanner" }, "tasks": [ { "connector": "spanner", "task": 0 } ], "type": "sink" }
コネクターのステータスをチェックして、
RUNNING
ステートであることを確認します。confluent local services connect connector status spanner
このコマンドの出力は次のようになります。
{ "name": "spanner", "connector": { "state": "RUNNING", "worker_id": "10.200.7.192:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.200.7.192:8083" } ], "type": "sink" }
Kafka へのデータ送信¶
products
トピックにレコードを生成するには、まず、Kafka プロデューサーを起動します。kafka-avro-console-producer \ --broker-list localhost:9092 --topic products \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"price", "type": "float"}, {"name":"quantity", "type": "int"}]}'
コンソールプロデューサーが入力待ちになっているので、引き続いてトピックにレコードを挿入できます。
{"name": "scissors", "price": 2.75, "quantity": 3} {"name": "tape", "price": 0.99, "quantity": 10} {"name": "notebooks", "price": 1.99, "quantity": 5}
Spanner データベースでのデータチェック¶
データが Spanner に書き込まれたことを検証するには、GCP Console(https://console.cloud.google.com/spanner/instances/test-instance/databases)を使用できます。
データベースの下に、kafka_products
テーブルが表示されます。テーブルをクリックすると、DATA
に、先ほど挿入した 3 行が表示されます。
connect_topic__ |
connect_partition__ |
connect_offset__ |
quantity | price | name |
---|---|---|---|---|---|
products | 0 | 0 | 3 | 2.75 | scissors |
products | 0 | 1 | 10 | 0.99 | tape |
products | 0 | 2 | 5 | 1.99 | notebooks |
不要なリソースの削除¶
テストインスタンスを削除します。
- 左のサイドバーの
test-instance
をクリックします。 - 上のツールバーで Delete Instance をクリックし、インスタンス名を入力して、削除することを確認します。
テストに使用したサービスアカウントの認証情報を削除します。
- GCP Console で IAM と管理 ページを開きます。
- 使用するプロジェクトを選択し、続行 をクリックします。
- 左側のナビゲーションで、サービスアカウント をクリックします。
test-service-account
を見つけたら、Actions の下の More ボタンをクリックします。- 削除 をクリックし、削除を確認します。
コネクターおよびタスクの失敗に関するトラブルシューティング¶
Connect REST API を使用して、コネクターやタスクのステータスをチェックできます。タスクやコネクターが失敗した場合、trace
フィールドで原因とスタックトレースを確認できます。
認可の失敗¶
Cloud Spanner コネクターは、Spanner インスタンスで認証し、接続を確立する必要があります。認証が原因で接続に失敗した場合、コネクターは即座に停止します。これらのエラーにより、場合によっては使用中の Google Cloud アカウントの変更が要求され、サービスアカウントキーの作成が必要になることもあります。アカウントを変更した後、コネクターを再度実行してみてください。サービスアカウントキーの詳細については、『サービスアカウントキーの作成と管理』を参照してください。
Spanner のエラーコード¶
コネクターまたはタスクが失敗するたびに、Spanner のエラーコードとエラーメッセージを含むメッセージが取り込まれます。メッセージには、失敗の理由または修正の提案が含まれていることもあります。エラーコードの詳細については、Spanner のエラーコード を参照してください。
接続の失敗が繰り返される場合¶
接続のタイムアウトによってコネクターが頻繁に停止する場合は、以下のコネクター構成プロパティを変更して再起動することを検討してください。
request.timeout.ms
はデフォルトが 6 秒(6000 ミリ秒)であり、このプロパティによって、Spanner の 1 つの操作が成功するのをコネクターが待つ最大時間を指定します。この時間が経過すると、コネクターによって再試行されます。retry.timeout.ms
はデフォルトが 1 分(60000 ミリ秒)であり、このプロパティによって、失敗したリクエストの再試行をコネクターが継続する最大時間を指定します。実際の遅延時間はランダムで、再試行ごとに指数関数的に長くなります。すべての再試行にかかった時間がこの値より長くなった場合、コネクターのタスクは失敗します。これは初期接続の試行には適用されませんが、以降の再接続リクエストに適用されます。Spanner からの再試行が可能なエラーのみ再試行されます。
クォータの失敗¶
書き込み制限の超過によってコネクターが失敗する場合は、max.batch.size
構成のプロパティを変更してコネクターを再起動することを検討してください。max.batch.size
は、デフォルトが 1000 で、1 回の Spanner 操作(INSERT、UPDATE、UPSERT)でバッチ処理される最大行数を指定します。行のサイズが大きい場合は、最大バッチサイズを小さくすることを検討してください。
コネクターのエラーモード¶
デフォルトで、コネクターの error.mode
は FAIL
となっています。つまり、レコードのバッチを Spanner に書き込む際にエラーが発生した場合、コネクターは失敗します。代わりに、特定のバッチが失敗した後もコネクターが実行を続けるように、error.mode
に WARN
または INFO
を設定したほうが便利な場合もあります。
プロキシ設定¶
gcp.spanner.proxy.url
プロキシ設定を構成する際に、システムプロパティの変数(https.proxyHost
および https.proxyPort
)が JVM 全体に対してグローバルに設定されます。
デバッグログの有効化¶
Connect ワーカーログ構成で、ログに詳細をどの程度まで含めるかを制御できます。デフォルトでは、ワーカーログに基本的な機能を特定できる程度の詳細が含まれています。さらに詳しい情報を含めるには、Connect ワーカーのログ構成でデバッグログを有効にします。この変更は、ワーカーごとに行う必要があります。変更が有効になるのは、ワーカーの起動後です。以下の説明どおりに各 Connect ワーカーのログ構成を変更した後、すべての Connect ワーカーを再起動してください。必要であれば、ローリング再起動を使用できます。
注釈
トレースレベルのログは冗長で、詳細情報が多く含まれているので、特定の失敗を解決するのに役立つ可能性があります。トレースレベルのログ記録は、デバッグレベルのログ記録のように有効になっています。ただし、DEBUG
の代わりに TRACE
を使用します。
オンプレミスのインストール¶
Confluent Platform をローカルまたはオンプレミスでインストールしている場合は、etc/kafka/connect-log4j.properties
ファイルで、Connect ワーカープロセスのログ構成を定義しています。Spanner コネクターのみで DEBUG を有効にするには、etc/kafka/connect-log4j.properties
ファイルに以下の行を含めます。
log4j.logger.io.confluent.gcp.spanner=DEBUG
すべてのコネクターを含むすべての Connect ワーカーのコードで DEBUG を有効にするには、log4j.rootLogger=
行で変更し、 INFO
の代わりに DEBUG
を使用します。たとえば、Connect のデフォルトのログ構成には、次の行があります。
log4j.rootLogger=INFO, stdout
すべての Connect ワーカーのコードで DEBUG を有効にするには、上の行を次のように変更します。
log4j.rootLogger=DEBUG, stdout
注釈
この設定によって、org.apache.kafka.clients
パッケージから生成されるログの量が大幅に増える可能性があります。これは、log4j.logger.org.apache.kafka.clients=ERROR
を設定することで抑制できます。