Google Cloud BigTable Sink Connector for Confluent Cloud¶
注釈
If you are installing the connector locally for Confluent Platform, see Google Cloud BigTable Sink Connector for Confluent Platform.
Kafka Connect Google Cloud BigTable Sink Connector for Confluent Cloud を使用すると、Apache Kafka® のデータを Google Cloud BigTable に移動できます。このコネクターにより、Kafka のトピックのデータが、指定された BigTable インスタンスのテーブルに書き込まれます。
機能¶
- 挿入とアップサートのサポート: Google Cloud BigTable で行の挿入と行のアップデートを実行できます。
- テーブルおよび列ファミリの自動作成: 不足しているテーブルと不足している列ファミリを作成できます。
- レコードフィールドからの行キーの作成: Kafka レコードキーフィールド名のコンマ区切りのリストを連結して行キーを形成できます。
- 少なくとも 1 回のデリバリー: レコードが少なくとも 1 回は配信されることが保証されます。
- 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。
- 入力データのフォーマット: Avro、JSON Schema、Protobuf 入力データをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
See Cloud connector limitations for additional information.
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
クイックスタート¶
このクイックスタートを使用して、Google Cloud BigTable Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、BigTable インスタンスにイベントをストリーミングするようにコネクターを構成する基本的な方法について説明します。
- 前提条件
Google Cloud 上の BigTable インスタンスへのアクセスを許可されていること。
Google Cloud サービスアカウントの JSON キーファイル。サービスアカウントの作成時に、キーを作成してダウンロードします。キーは、JSON ファイルとしてダウンロードする必要があります。サービスアカウントに BigTable に対する書き込みアクセス許可 が必要です。最小限のアクセス許可は以下のとおりです。
bigtable.tables.create bigtable.tables.mutateRows bigtable.tables.get bigtable.tables.update bigtable.tables.readRows bigtable.tables.list bigtable.tables.delete
- Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
- 既存の サービスアカウント のリソース ID を入力する。
- コネクター用の Confluent Cloud サービスアカウント を作成する。サービスアカウントのドキュメント で、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
- Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。
- BigTable インスタンスと Kafka クラスターは同じリージョンに存在している必要があります。
- Confluent Cloud CLI がインストールされ、クラスター用に構成されていること。「Confluent Cloud CLI のインストール」を参照してください。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
Confluent Cloud Console を使用する場合¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
Step 4: Enter the connector details.¶
注釈
- Ensure you have all your prerequisites completed.
- アスタリスク( * )は必須項目であることを示しています。
At the Add Google Cloud BigTable Sink Connector screen, complete the following:
If you've already populated your Kafka topics, select the topic(s) you want to connect from the Topics list.
To create a new topic, click +Add new topic.
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
- Upload your GCP credentials file, which is the GCP service account JSON file with write permissions for Cloud Bigtable.
- Enter your BigTable Project ID, which is the ID of the Cloud Bigtable project to connect to.
- Enter your BigTable Instance ID-, which is the ID of the Cloud Bigtable instance to connect to.
注釈
Configuration properties that are not shown in the Cloud Console use the default values. See 構成プロパティ for all property values and definitions.
Select an Input Kafka record value format (data coming from the Kafka topic): AVRO, JSON_SR (JSON Schema), PROTOBUF, JSON, BYTES. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
Select an insert mode–the insertion mode to use:
INSERT
: 標準的なINSERT
行関数を使用します。該当する行が既にテーブルに存在する場合は、エラーが発生します。UPSERT
: This mode is similar toINSERT
. However, if the row already exists, theUPSERT
function overwrites column values with the new values provided.
Show advanced configurations
Input record key format: AVRO, JSON_SR (JSON Schema), PROTOBUF, JSON, STRING, or BYTES. A valid schema must be available in Schema Registry to use a schema-based message format.
Max batch size: The maximum number of records that can be batched into a batch of upserts. Note that since only a batch size of 1 for inserts is supported, max.batch.size must be exactly 1 when
insert.mode
is set toINSERT
.Table name format: A format string for the destination table name, which may contain
${topic}
as a placeholder for the originating topic name. For example, to create a table namedkafka-orders
based on a Kafka topic namedorders
, you would enterkafka-${topic}
in this field.Roy key definition: A comma separated list of Kafka Record key field names that specifies the order of Kafka key fields to be concatenated to form the row key.
注釈
行キー定義のプロパティが空のままで、Kafka レコードキーが構造体の場合、その構造体のすべてのフィールドが行キーの作成に使用されます。レコードキーがバイト配列の場合、行キーにはバイト配列がそのまま設定されます。レコードキーがプリミティブの場合、行キーにはプリミティブ(文字列化されたもの)が設定されます。
Row key delimiter: The delimiter used in concatenating Kafka key fields in the row key. If this configuration is empty or unspecified, the key fields will be concatenated together directly.
Auto create tables: Whether to automatically create the destination table if it is found to be missing.
Auto create column families: 列ファミリが存在しない場合に自動的に作成するかどうかを指定します。
See 構成プロパティ for all property values and definitions.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of recommended tasks, enter the number of tasks for the connector to use in the Tasks field.
- Click Continue.
Verify the connection details.
Click Launch.
コネクターのステータスが Provisioning から Running に変わります。
Step 5: Check the results in BigTable.¶
BigTable インスタンスでテーブルにデータが入力されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
Confluent Cloud CLI を使用する場合¶
Confluent Cloud CLI でコネクターをセットアップして実行するには、次の手順を実行します。
注釈
すべての 前提条件 を満たしていることを確認してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
ccloud connector-catalog describe BigTableSink
出力例:
Following are the required configs:
connector.class: BigTableSink
input.data.format
name
kafka.api.key
kafka.api.secret
gcp.bigtable.credentials.json
gcp.bigtable.project.id
gcp.bigtable.instance.id
tasks.max
topics
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"name": "BigTableSinkConnector_0",
"config": {
"topics": "pageviews",
"input.data.format": "AVRO",
"input.key.format": "STRING",
"connector.class": "BigTableSink",
"name": "BigTableSinkConnector_0",
"kafka.api.key": "****************",
"kafka.api.secret": "*************************************************",
"gcp.bigtable.credentials.json": "*",
"gcp.bigtable.project.id": "connect-123456789",
"gcp.bigtable.instance.id": "confluent",
"insert.mode": "INSERT",
"auto.create.tables": "true",
"auto.create.column.families": "true",
"tasks.max": "1"
}
}
以下のプロパティ定義に注意してください。
"name"
: 新しいコネクターの名前を設定します。"connector.class"
: コネクターのプラグイン名を指定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR、または PROTOBUF です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。"input.key.format"
: 入力レコードキーフォーマット( Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、BYTES、JSON、JSON_SR (JSON スキーマ)、PROTOBUF、または STRING です。スキーマベースのメッセージフォーマットを使用する場合は、Confluent Cloud Schema Registry を構成しておく必要があります。"gcp.bigtable.credentials.json"
: このプロパティには、ダウンロードした JSON ファイルの内容が含まれます。ダウンロードした認証情報ファイルのフォーマットを変更して、その内容を使用する方法の詳細については、「キーファイル認証情報のフォーマットの変更」を参照してください。"insert.mode"
: 挿入モードを入力します。デフォルトのモードはUPSERT
です。"INSERT"
: 標準的な挿入行関数です。該当する行が既にテーブルに存在する場合は、エラーが発生します。"UPSERT"
: このモードはINSERT
と似ています。ただし、該当する行が既に存在する場合に、UPSERT
関数は、指定された値で列の値を上書きします。
max.batch.size
:(オプション)テーブルに対する 1 回の挿入またはアップサートの操作で、バッチにまとめることができる最大レコード数。insert.mode
がINSERT
の場合、最大バッチサイズを1
に設定する必要があります。デフォルト値は1000
です。"auto.create.tables"
: テーブルが存在しない場合に自動的に作成するかどうかを指定します。デフォルト値はfalse
です。"auto.create.column.families"
: 列ファミリが存在しない場合に自動的に作成するかどうかを指定します。デフォルト値はfalse
です。
See 構成プロパティ for all property values and descriptions.
ステップ 4: 構成ファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
ccloud connector create --config <file-name>.json
例:
ccloud connector create --config bigtable-sink-config.json
出力例:
Created connector BigTableSinkConnector_0 lcc-ix4dl
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
ccloud connector list
出力例:
ID | Name | Status | Type
+-----------+-------------------------+---------+------+
lcc-ix4dl | BigTableSinkConnector_0 | RUNNING | sink
ステップ 6: BigTable で結果を確認します。¶
BigTable インスタンスでテーブルにデータが入力されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
キーファイル認証情報のフォーマットの変更¶
ダウンロードした認証情報ファイルの内容は、コネクター構成で使用する前に、文字列フォーマットに変換する必要があります。
JSON ファイルの内容を文字列フォーマットに変換します。これは、オンラインのコンバーターツールを使用して実行できます。たとえば、JSON to String Online Converter などがあります。
Private Key セクションの
\n
のすべての出現箇所の前にエスケープ文字\
を追加します。これで、各セクションの先頭が\\n
になります(以下の強調表示された行を参照してください)。以下の例は、\\n
の出現箇所がわかりすいようにフォーマットを整えています。認証情報キーの大部分は省略しています。ちなみに
認証情報を文字列に変換し、さらに必要に応じてエスケープ文字を追加するスクリプトも用意されています。『Stringify GCP Credentials』を参照してください。
{ "name" : "BigTableSinkConnector_0", "connector.class" : "BigTableSink", "kafka.api.key" : "<my-kafka-api-keyk>", "kafka.api.secret" : "<my-kafka-api-secret>", "input.data.format": "AVRO", "topics" : "pageviews", "gcp.bigtable.credentials.json" : "{\"type\":\"service_account\",\"project_id\":\"connect- 1234567\",\"private_key_id\":\"omitted\", \"private_key\":\"-----BEGIN PRIVATE KEY----- \\nMIIEvAIBADANBgkqhkiG9w0BA \\n6MhBA9TIXB4dPiYYNOYwbfy0Lki8zGn7T6wovGS5\opzsIh \\nOAQ8oRolFp\rdwc2cC5wyZ2+E+bhwn \\nPdCTW+oZoodY\\nOGB18cCKn5mJRzpiYsb5eGv2fN\/J \\n...rest of key omitted... \\n-----END PRIVATE KEY-----\\n\", \"client_email\":\"pub-sub@connect-123456789.iam.gserviceaccount.com\", \"client_id\":\"123456789\",\"auth_uri\":\"https:\/\/accounts.google.com\/o\/oauth2\/ auth\",\"token_uri\":\"https:\/\/oauth2.googleapis.com\/ token\",\"auth_provider_x509_cert_url\":\"https:\/\/ www.googleapis.com\/oauth2\/v1\/ certs\",\"client_x509_cert_url\":\"https:\/\/www.googleapis.com\/ robot\/v1\/metadata\/x509\/pub-sub%40connect- 123456789.iam.gserviceaccount.com\"}", "gcp.bigtable.project.id": "<project-id>", "gcp.bigtable.instance.id": "<instance-id", "insert.mode": "UPSERT", "auto.create.tables": "true", "auto.create.column.families": "true", "tasks.max": "1" }
変換したすべての文字列の内容を、前述の例のように構成ファイルの
"gcp.bigtable.credentials.json"
認証情報セクションに追加します。
構成プロパティ¶
Use the following configuration properties with this connector.
Which topics do you want to get data from?¶
topics
Identifies the topic name or a comma-separated list of topic names.
- 型: list
- 重要度: 高
Input messages¶
input.data.format
Sets the input Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, JSON or BYTES. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- 型: string
- 重要度: 高
input.key.format
Sets the input Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, STRING or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF
- 型: string
- Default: JSON
- Valid Values: AVRO, JSON_SR, PROTOBUF, JSON, STRING, BYTES
- 重要度: 高
How should we connect to your data?¶
name
Sets a name for your connector.
- 型: string
- Valid Values: A string at most 64 characters long
- 重要度: 高
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- 型: string
- Default: KAFKA_API_KEY
- Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
- 重要度: 高
kafka.api.key
- Type: password
- 重要度: 高
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- 型: string
- 重要度: 高
kafka.api.secret
- Type: password
- 重要度: 高
GCP credentials¶
gcp.bigtable.credentials.json
GCP service account JSON file with write permissions for Cloud Bigtable.
- Type: password
- 重要度: 高
How should we connect to your Cloud BigTable instance?¶
gcp.bigtable.project.id
The ID of the Cloud Bigtable project to connect to.
- 型: string
- 重要度: 高
gcp.bigtable.instance.id
The ID of the Cloud Bigtable instance to connect to.
- 型: string
- 重要度: 高
Database details¶
insert.mode
The insertion mode to use.
- 型: string
- Default: UPSERT
- Valid Values: INSERT, UPSERT
- 重要度: 高
Connection details¶
max.batch.size
The maximum number of records that can be batched into a batch of upserts. Note that since only a batch size of 1 for inserts is supported, max.batch.size must be exactly 1 when insert.mode is set to INSERT.
- 型: int
- デフォルト: 1000
- Valid Values: [1,...,5000]
- 重要度: 中
Data mapping¶
table.name.format
A format string for the destination table name, which may contain ${topic} as a placeholder for the originating topic name. For example, kafka_${topic} for the topic ‘orders’ will map to the table name ‘kafka_orders’.
- 型: string
- Default: ${topic}
- 重要度: 中
bigtable.row.key.definition
A comma separated list of Kafka Record key field names that specifies the order of Kafka key fields to be concatenated to form the row key.
For example the list: 'username, post_id, time_stamp' when applied to a Kafka key: {'username': 'bob','post_id': '213', 'time_stamp': '123123'} and with delimiter # gives the row key 'bob#213#123123'. You can also access terms nested in the key by using . as a delimiter. If this configuration is empty or unspecified and the Kafka Message Key is a: STRUCT: all the fields in the struct are used to construct the row key. BYTE ARRAY: the row key is set to the byte array as is. PRIMITIVE: the row key is set to the primitive stringified.
If prefixes, more complicated delimiters, and string constants are required in your Row Key, consider configuring an SMT to add relevant fields to the Kafka Record key.
- 型: list
- デフォルト: ""
- 重要度: 中
bigtable.row.key.delimiter
The delimiter used in concatenating Kafka key fields in the row key. If this configuration is empty or unspecified, the key fields will be concatenated together directly.
- 型: string
- デフォルト: ""
- 重要度: 低
auto.create.tables
送信先テーブルが存在しない場合に、テーブルを自動的に作成するかどうかを指定します。
- 型: ブール値
- デフォルト: false
- 重要度: 中
auto.create.column.families
レコードスキーマに関連する列ファミリがテーブルに存在しない場合に、列ファミリを自動的に作成するかどうかを指定します。
- 型: ブール値
- デフォルト: false
- 重要度: 中
Number of tasks for this connector¶
tasks.max
- 型: int
- Valid Values: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。