Google Cloud BigTable Sink Connector for Confluent Platform¶
Kafka Connect BigTable Sink Connector では、データを Apache Kafka® から Google Cloud BigTable に移動できます。コネクターにより、Kafka のトピックのデータが、指定された BigTable インスタンスのテーブルに書き込まれます。テーブルの自動作成と、列ファミリの自動作成もサポートされています。
機能¶
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
BigTable Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
列マッピング¶
書き込み操作では、テーブル内の各 cell
の column family
、column
、row key
の指定が必要です。このコネクターでは、各レコード値の column family
と column
を推測できるように、Kafka レコード値が 2 レベルの構造体のフォーマットであることが求めらます。具体的には、各 Kafka レコード値は、以下のスキーマに適合する必要があります。
{
"column family name 1": {
"column name 1": "value",
"column name 2": "value",
"...": "...",
},
"column family name 2": {
"column name 3": "value",
"column name 4": "value",
"...": "...",
},
"...": "..."
}
たとえば、次のような Kafka レコード値があるとします。
{
"usage_stats": {
"daily_active_users": "10m",
"churn_rate": "5%"
},
"sales": {
"Jan": "10k",
"Feb": "12k"
}
}
このレコードが空のテーブルに書き込まれた場合、以下の例のようになります。
usage_stats | sales | |||
---|---|---|---|---|
daily_active_users | churn_rate | Jan | Feb | |
"example_row_key" | "10m" | "5%" | "10k" | "12k" |
1 行目は "列ファミリ"、2 行目は "列" を示しています。
レコードがこの 2 レベルの構造体のスキーマに従っていない場合、コネクターでは、以下のケースについて、正しく処理しようとします。
レコードは構造体であっても最上位レベルに構造体ではないフィールドがある場合、これらのフィールドの値は、デフォルト列ファミリにマップされます。
このケースの例として、次のような Kafka レコード値があるとします。
{ "usage_stats": { "daily_active_users": "10m", "churn_rate": "5%" }, "sales": "10" }
このレコードが空のテーブルに書き込まれた場合は、以下の例のようになります。
usage_stats default_column_family daily_active_users churn_rate sales "example_row_key" "10m" "5%" "10k" 注釈
デフォルト列ファミリはトピック名です。また、デフォルトの列名は
KAFKA_VALUE
です。レコード値が構造体ではない場合、コネクターにより、値全体がバイト配列としてデフォルト列とデフォルト列ファミリに書き込まれます。
このような値が空のテーブルに書き込まれた場合は、以下の例のようになります。
default_column_family default_column "example_row_key" kafka 値
行キーの構造¶
このコネクターは、Kafka レコードキーからの行キーの構築をサポートしています。
キー内のフィールドを連結して、行キーを形成できます。詳細については、「Google Cloud BigTable Sink Connector 構成プロパティ」を参照してください。
ちなみに
複雑な行キーを構築する場合は、Single Message Transformation を使用してレコードキーを必要なフォーマットにすることを検討してください。
データ型¶
Kafka レコード型のデータは、書き込みの前にバイト配列にシリアル化されます。このコネクターでは、hbase Bytes ライブラリを使用してシリアル化が処理されます。次の表は、Kafka レコード型がこのコネクターでどのようにシリアル化されるかを示しています。
Kafka レコード型 | バイト配列のシリアル化 |
---|---|
INT8、INT16、INT32、INT64、FLOAT32、FLOAT64、BOOLEAN、STRING | Hbase Bytes |
BYTES | そのまま |
DATE、TIMESTAMP | Long としてシリアル化(Hbase Bytes を使用) |
ARRAY、MAP、STRUCT | 文字列化 JSON オブジェクトとしてシリアル化 |
テーブルの自動作成および列ファミリの自動作成¶
auto.create
を有効にすると、送信先テーブルが存在しない場合に、コネクターでテーブルを作成することができます。
auto.create.column.families
を有効にすると、テーブルにレコードスキーマに関連する列ファミリが存在しない場合は、コネクターで作成することができます。
注釈
作成時は疎であるため、列がまだテーブルに存在しない場合は、列の設定にかかわらず、オンザフライで列が作成されます。
プロキシ設定¶
proxy.url
プロキシ設定を構成する際に、システムプロパティの変数(https.proxyHost
および https.proxyPort
)が JVM 全体に対してグローバルに設定されます。
Input data formats¶
The BigTable Sink connector supports AVRO, JSON Schema, and PROTOBUF input data.
制限¶
- このコネクターは、Google Bigtable によって適用されるすべての割り当てに従います。
- このコネクターでは
insert
のバッチ操作がサポートされていないため、挿入の場合はスループットの低下が想定されます。 - BigTable では
update
操作はサポートされていません。 - コネクターでは
delete
操作はサポートされていません。
BigTable Sink Connector のインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従ってインストールできます。ZIP ファイルを手動でダウンロードすることもできます。
前提条件¶
重要
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-gcp-bigtable:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-gcp-bigtable:1.0.0-preview
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
このコネクターは、ライセンスキーがなくても 30 日間試用できます。
30 日間経過後は、 Confluent エンタープライズライセンス を取得することで、このコネクターを利用できます。Confluent では、契約者に Confluent エンタープライズライセンス キーを発行します。併せて、Confluent Platform とコネクターに関する エンタープライズレベルのサポート を提供します。既にご契約されている場合は、詳細について Confluent サポート(support@confluent.io)にお問い合わせください。
ライセンスのプロパティについては、「Confluent Platform ライセンス」を参照してください。ライセンストピックの詳細については、「ライセンストピックの構成」を参照してください。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「Google Cloud BigTable Sink Connector 構成プロパティ」を参照してください。
トラブルシューティングとタスクの失敗¶
Connect Kafka Connect REST インターフェイス を使用して、コネクターやタスクのステータスを確認できます。タスクやコネクターが失敗した場合、trace
フィールドで原因とスタックトレースを確認できます。このコネクターでスローされたエラーの大半は、2 つのカテゴリに当てはまります。
- レコードレベルの失敗
- コネクターレベルの失敗
テーブル作成エラー¶
テーブル作成タスクは時間がかかる可能性があるため、テーブルの作成試行中にコネクターが失敗することがあります。この場合は、retry.timeout.ms
を増やすことを検討してください。
テーブル作成に関連するエラーは、テーブル作成中だけではなく、insert
の試行中にも発生することがあります。以下は、これらのエラーのスタックトレースの例です。
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Table currently being created
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Table not found:
注釈
retry.timeout.ms
はデフォルトで 90
秒であり、データベース操作の再試行に割り当てられる最大時間をミリ秒で指定します。auto.create.tables
が構成されている場合は、この構成のままにするか、数値を大きくすることを検討してください。通常、テーブル作成には最低でも 1 ~ 2 分かかります。
スキーマエラー¶
auto.create.column.families
が有効ではない場合、コネクターによって、存在しない列ファミリへの書き込みが試行されることがあるため、レコードレベルの失敗が多数発生する可能性があります。これは、コネクターが 2 レベルの構造体のレコード値を受け取らなかった場合に、デフォルト列ファミリ(kafka のトピック)へのデータの書き込みを試行することで発生する可能性が高くなります。この場合は、Single Message Transformation を使用してコネクターの想定に合ったレコードに再構成するか、auto.create.column.families
を有効にします。
認可の失敗¶
BigTable コネクターは、BigTable インスタンスで認証し、接続を確立する必要があります。認証が原因で接続に失敗した場合、コネクターは即座に停止します。これらのエラーにより、場合によっては使用中の Google Cloud アカウントの変更が要求され、サービスアカウントキーの作成が必要になることもあります。アカウントを変更した後、コネクターを再度実行してみてください。詳細については、サービスアカウントキー の説明を参照してください。
クォータの失敗¶
BigTable のクォータ(割り当て上限値) のいくつかを超えたことにより、コネクターが失敗することがあります。
よく見られるクォータエラーについて以下で説明します。
このコネクターは、
per user per 100 seconds
と定義されたメッセージクォータを超えた場合に失敗する可能性があります。この場合、クォータをリセットした後にコネクターで処理を再試行できる程度にretry.timeout.ms
を高く設定してください。以下は、スタックトレースの例を示しています。
Caused by: org.apache.kafka.connect.errors.ConnectException: ... ... ERROR Could not complete RPC. Failure #0, got: Status{code=RESOURCE_EXHAUSTED, description=Quota exceeded for quota group 'TablesWriteGroup' and limit 'USER-100s' of service 'bigtableadmin.googleapis.com' for consumer 'project_number: ..
コネクターが
per project per day
で定義されたクォータを超える場合もあります。この場合は、コネクターを再起動してもエラーは修正されません。一部のクォータエラーは、列ファミリが過度に作成されることに関連して起こる場合があります(BigTable では列ファミリの上限はテーブルにつき 100 です)。コネクターで作成される列ファミリが多くなりすぎないように、テーブルスキーマを変更することを検討してください。詳細については、BigTable のスキーマの設計 を参照してください。
デバッグログの有効化¶
Connect ワーカーログ構成で、ログに詳細をどの程度まで含めるかを制御できます。デフォルトでは、ワーカーログに基本的な機能を特定できる程度の詳細が含まれています。さらに詳しい情報を含めるには、Connect ワーカーのログ構成でデバッグログを有効にします。この変更は、ワーカーごとに行う必要があります。変更が有効になるのは、ワーカーの起動後です。以下の説明どおりに各 Connect ワーカーのログ構成を変更した後、すべての Connect ワーカーを再起動してください。必要な場合は、ローリング再起動を使用できます。
注釈
トレースレベルのログは冗長で、詳細情報が多く含まれているので、特定の失敗を解決するのに役立つ可能性があります。トレースレベルのログ記録は、デバッグレベルのログ記録を有効にする場合と同じように有効にできます。ただし、DEBUG
の代わりに TRACE
を使用します。
オンプレミスのインストール¶
Confluent Platform をローカルまたはオンプレミスでインストールしている場合は、etc/kafka/connect-log4j.properties
ファイルで、Connect ワーカープロセスのログ構成を定義しています。BigTable コネクターのみで DEBUG を有効にするには、etc/kafka/connect-log4j.properties
ファイルに以下の行を含めます。
log4j.logger.io.confluent.gcp.bigtable=DEBUG
すべてのコネクターを含むすべての Connect ワーカーのコードで DEBUG を有効にするには、log4j.rootLogger=
行で変更し、 INFO
の代わりに DEBUG
を使用します。たとえば、Connect のデフォルトのログ構成には、次の行があります。
log4j.rootLogger=INFO, stdout
すべての Connect ワーカーのコードで DEBUG を有効にするには、この行を次のように変更します。
log4j.rootLogger=DEBUG, stdout
注釈
この設定によって、org.apache.kafka.clients
パッケージから生成されるログの数が大幅に増える可能性があります。これは、log4j.logger.org.apache.kafka.clients=ERROR
を設定することで抑制できます。
クイックスタート¶
このクイックスタートでは、BigTable Sink Connector を使用して、Avro コンソールプロデューサーで生成されたデータを BigTable インスタンスのテーブルにエクスポートします。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
前提条件¶
- Cloud BigTable の前提条件
- Google Cloud Platform(GCP)アカウント
- GCP プロジェクト、およびその請求が有効になっていること(手順は こちら を参照)。このリンクの手順 3 は省略可能です。
- Cloud SDK および cbt を こちらの手順 を使用してセットアップします。
- Confluent の前提条件
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
認証情報のセットアップ¶
GCP プロジェクトにサービスアカウントおよびサービスアカウントキーを作成します。
- GCP Console で IAM と管理 ページを開きます。
- 使用するプロジェクトを選択し、続行 をクリックします。
- 左側のナビゲーションで、サービスアカウント をクリックします。
- 上部のツールバーで、サービスアカウントの作成 をクリックします。
- サービスアカウントの名前と説明(
test-service-account
など)を入力します。 - 作成 をクリックし、次のページで、
Cloud BigTable
の下のBigTable Administrator
ロールを選択します。 - 次のページで、キーを作成 をクリックして、JSON ファイルをダウンロードします。
- このクイックスタート用に、ファイルを自分の
$home
ディレクトリに保存し、bigtable-test-credentials.json
という名前を付けます。
サービスアカウントキーの詳細については、こちら を参照してください。
BigTable インスタンスの作成¶
コンソールを使用して、BigTable に test-instance
という名前のテストインスタンスを作成します。インスタンスの作成については 詳細な手順 を参照してください。
コネクターのインストールと読み込み¶
Confluent Hub クライアント を使用してコネクターをインストールします。
# run from your CP installation directory confluent-hub install confluentinc/kafka-connect-gcp-bigtable:latest
ちなみに
デフォルトで、プラグインが
share/confluent-hub-components
にインストールされ、このディレクトリがプラグインパスに追加されます。新しいコネクタープラグインを追加した場合は、Connect の再起動が必要です。Connect を再起動するには、Confluent CLI を使用します。
confluent local services connect stop && confluent local services connect start
etc/kafka-connect-gcp-bigtable/sink-quickstart-bigtable.properties
ファイルを以下のプロパティで追加して、コネクターを構成します。name=BigTableSinkConnector topics=stats tasks.max=1 connector.class=io.confluent.connect.gcp.bigtable.BigtableSinkConnector gcp.bigtable.credentials.path=$home/bigtable-test-credentials.json gcp.bigtable.project.id=YOUR-PROJECT-ID gcp.bigtable.instance.id=test-instance auto.create.tables=true aut.create.column.families=true table.name.format=example_table # The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses. # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster, # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations. confluent.license= confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
注釈
YOUR-PROJECT-ID
を、このクイックスタートの前提条件で作成したプロジェクト ID と置き換えてください。$home
は、自分のホームディレクトリのパス、または認証情報ファイルを保存した場所のパスと置き換えてください。以下のコマンドを使用してコネクターの構成を読み込むことにより、BigTable Sink Connector を起動します。
confluent local services connect connector load bigtable --config etc/kafka-connect-gcp-bigtable/sink-quickstart-bigtable.properties
このコマンドの出力は次のようになります。
{ "name": "bigtable", "config": { "topics": "stats", "tasks.max": "1", "connector.class": "io.confluent.connect.gcp.bigtable.BigtableSinkConnector", "gcp.bigtable.credentials.path": "$home/bigtable-test-credentials.json", "gcp.bigtable.instance.id": "test-instance", "gcp.bigtable.project.id": "YOUR-PROJECT-ID", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "example_table", "confluent.license": "", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "name": "bigtable" }, "tasks": [ { "connector": "bigtable", "task": 0 } ], "type": "sink" }
コネクターのステータスをチェックして、
RUNNING
ステートであることを確認します。confluent local services connect connector status bigtable
このコマンドの出力は次のようになります。
{ "name": "bigtable", "connector": { "state": "RUNNING", "worker_id": "10.200.7.192:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.200.7.192:8083" } ], "type": "sink" }
Kafka へのデータ送信¶
stats
トピックにレコードを生成するには、まず、Kafka プロデューサーを起動します。bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic stats \ --property parse.key=true \ --property key.separator=, \ --property key.schema='{"type" : "string", "name" : "id"}' \ --property value.schema='{"type":"record","name":"myrecord", "fields":[{"name":"users","type":{"name": "columnfamily", "type":"record","fields":[{"name": "name", "type": "string"}, {"name": "friends", "type": "string"}]}}]}'
コンソールプロデューサーが入力待ちになっているので、引き続いてトピックにレコードを挿入できます。
"simple-key-1", {"users": {"name":"Bob","friends": "1000"}} "simple-key-2", {"users": {"name":"Jess","friends": "10000"}} "simple-key-3", {"users": {"name":"John","friends": "10000"}}
BigTable でのデータチェック¶
cbt を使用して、データが BigTable に書き込まれたことを検証します。
cbt read example_table
出力は、以下の例のようになります。
simple-key-1
user:name @ 2019/09/10-14:51:01.365000
Bob
user:friends @ 2019/09/10-14:51:01.365000
1000
simple-key-2
user:name @ 2019/09/10-14:51:01.365000
Jess
user:friends @ 2019/09/10-14:51:01.365000
10000
simple-key-3
user:name @ 2019/09/10-14:51:01.365000
John
user:friends @ 2019/09/10-14:51:01.365000
10000
リソースのクリーンアップ¶
テーブルを削除します。
cbt deletetable example_table
テストインスタンスを削除します。
- 左のサイドバーの
Instance details
をクリックします。 - 上のツールバーで Delete Instance をクリックし、インスタンス名を入力して、削除することを確認します。
- 左のサイドバーの
テストに使用したサービスアカウントの認証情報を削除します。
- GCP Console で IAM と管理 ページを開きます。
- 使用するプロジェクトを選択し、続行 をクリックします。
- 左側のナビゲーションで、サービスアカウント をクリックします。
test-service-account
を見つけたら、Actions の下の More ボタンをクリックします。- Delete をクリックし、削除することを確認します。