Apache HBase Sink Connector for Confluent Platform¶
Kafka Connect Apache HBase Sink Connector では、データを Apache Kafka® から Apache HBase に移動します。コネクターにより、Kafka のトピックのデータが、指定された HBase インスタンスのテーブルに書き込まれます。テーブルの自動作成と、列ファミリの自動作成もサポートされています。
機能¶
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
Apache HBase Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
列マッピング¶
書き込み操作では、テーブル内の各 cell
の column family
、column
、row key
の指定が必要です。このコネクターでは、各レコード値の column family
と column
を推測できるように、Kafka レコード値が 2 レベルの構造体のフォーマットであることが求めらます。具体的には、各 Kafka レコード値は、以下のスキーマに適合する必要があります。
{
"column family name 1": {
"column name 1": "value",
"column name 2": "value",
"...": "...",
},
"column family name 2": {
"column name 3": "value",
"column name 4": "value",
"...": "...",
},
"...": "..."
}
たとえば、次のような Kafka レコード値があるとします。
{
"usage_stats": {
"daily_active_users": "10m",
"churn_rate": "5%"
},
"sales": {
"Jan": "10k",
"Feb": "12k"
}
}
このレコードが空のテーブルに書き込まれた場合、以下の例のようになります。
usage_stats | sales | |||
---|---|---|---|---|
daily_active_users | churn_rate | Jan | Feb | |
"example_row_key" | "10m" | "5%" | "10k" | "12k" |
1 行目は列ファミリを、2 行目は列を示しています。
レコードがこの 2 レベルの構造体のスキーマに従っていない場合、コネクターでは、以下のケースについて、正しく処理しようとします。
レコードは構造体であっても最上位レベルに構造体ではないフィールドがある場合、これらのフィールドの値は、デフォルト列ファミリにマップされます。
このケースの例として、次のような Kafka レコード値があるとします。
{ "usage_stats": { "daily_active_users": "10m", "churn_rate": "5%" }, "sales": "10" }
このレコードが空のテーブルに書き込まれた場合は、以下の例のようになります。
usage_stats default_column_family daily_active_users churn_rate sales "example_row_key" "10m" "5%" "10k" 注釈
デフォルト列ファミリはトピック名です。また、デフォルトの列名は
KAFKA_VALUE
です。レコード値が構造体ではない場合、コネクターにより、値全体がバイト配列としてデフォルト列とデフォルト列ファミリに書き込まれます。
このような値が空のテーブルに書き込まれた場合は、以下の例のようになります。
default_column_family default_column "example_row_key" kafka 値
行キーの構造¶
このコネクターは、Kafka レコードキーからの行キーの構築をサポートしています。
キー内のフィールドを連結して、行キーを形成できます。詳細については、「Apache HBase Sink Connector 構成プロパティ」を参照してください。
ちなみに
複雑な行キーを構築する場合は、Single Message Transformation を使用してレコードキーを必要なフォーマットにすることを検討してください。
データ型¶
Kafka レコード型のデータは、書き込みの前にバイト配列にシリアル化されます。このコネクターでは、hbase Bytes ライブラリを使用してシリアル化が処理されます。次の表は、Kafka レコード型がこのコネクターでどのようにシリアル化されるかを示しています。
Kafka レコード型 | バイト配列のシリアル化 |
---|---|
INT8、INT16、INT32、INT64、FLOAT32、FLOAT64、BOOLEAN、STRING | Hbase Bytes |
BYTES | そのまま |
DATE、TIMESTAMP | Long としてシリアル化(Hbase Bytes を使用) |
ARRAY、MAP、STRUCT | 文字列化 JSON オブジェクトとしてシリアル化 |
テーブルの自動作成および列ファミリの自動作成¶
auto.create
を有効にすると、送信先テーブルが存在しない場合に、コネクターでテーブルを作成することができます。
auto.create.column.families
を有効にすると、テーブルにレコードスキーマに関連する列ファミリが存在しない場合は、コネクターで作成することができます。
注釈
作成時は疎であるため、列がまだテーブルに存在しない場合は、列の設定にかかわらず、オンザフライで列が作成されます。
プロキシ設定¶
注釈
proxy.url
プロキシ設定を構成する際に、システムプロパティの変数(https.proxyHost
および https.proxyPort
)が JVM 全体に対してグローバルに設定されます。
前提条件¶
Kafka Connect Apache HBase Sink Connector を実行するには、以下が必要です。
- Kafka ブローカー: Confluent Platform 3.3.0 以上
- Connect : Confluent Platform 4.1.0 以上
- Java 1.8
- HBase 2.1.x、2.2.x、2.3.x、または 2.4.x
Apache HBase Sink Connector のインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従ってインストールできます。ZIP ファイルを手動でダウンロードすることもできます。
前提条件¶
重要
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-hbase:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-hbase:1.0.1-preview
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
このコネクターは、ライセンスキーがなくても 30 日間試用できます。
30 日間経過後は、 Confluent エンタープライズライセンス を取得することで、このコネクターを利用できます。Confluent では、契約者に Confluent エンタープライズライセンス キーを発行します。併せて、Confluent Platform とコネクターに関する エンタープライズレベルのサポート を提供します。既にご契約されている場合は、詳細について Confluent サポート(support@confluent.io)にお問い合わせください。
ライセンスのプロパティについては、「Confluent Platform ライセンス」を参照してください。ライセンストピックについては、「Apache HBase Sink Connector for Confluent Platform」を参照してください。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「Apache HBase Sink Connector 構成プロパティ」を参照してください。
トラブルシューティングとタスクの失敗¶
Connect Kafka Connect REST インターフェイス を使用して、コネクターやタスクのステータスを確認できます。タスクやコネクターが失敗した場合、trace
フィールドで原因とスタックトレースを確認できます。このコネクターでスローされたエラーの大半は、2 つのカテゴリに当てはまります。
- レコードレベルの失敗
- コネクターレベルの失敗
テーブル作成エラー¶
テーブル作成タスクは時間がかかる可能性があるため、テーブルの作成試行中にコネクターが失敗することがあります。この場合は、retry.timeout.ms
を増やすことを検討してください。
テーブル作成に関連するエラーは、テーブル作成中だけではなく、insert
の試行中にも発生することがあります。以下は、これらのエラーのスタックトレースの例です。
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Table currently being created
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Table not found:
注釈
retry.timeout.ms
はデフォルトで 90
秒であり、データベース操作の再試行に割り当てられる最大時間をミリ秒で指定します。auto.create.tables
が構成されている場合は、この構成のままにするか、数値を大きくすることを検討してください。通常、テーブル作成には最低でも 1 ~ 2 分かかります。
スキーマエラー¶
auto.create.column.families
が有効ではない場合、コネクターによって、存在しない列ファミリへの書き込みが試行されることがあるため、レコードレベルの失敗が多数発生する可能性があります。これは、コネクターが 2 レベルの構造体のレコード値を受け取らなかった場合に、デフォルト列ファミリ(kafka のトピック)へのデータの書き込みを試行することで発生する可能性が高くなります。この場合は、Single Message Transformation を使用してコネクターの想定に合ったレコードに再構成するか、auto.create.column.families
を有効にします。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
クイックスタート¶
このクイックスタートでは、HBase Sink Connector を使用して、Avro コンソールプロデューサーで生成されたデータを、Docker 化された HBase インスタンスのテーブルにエクスポートします。
前提条件¶
- HBase の前提条件
- Docker の インストール
- Confluent の前提条件
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
Docker 化された HBase インスタンスの作成¶
Docker イメージを取得します。
docker pull aaionap/hbase:1.2.0
HBase Docker イメージを起動します。
docker run -d --name hbase --hostname hbase -p 2182:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16201:16201 -p 16301:16301 aaionap/hbase:1.2.0
エントリ
127.0.0.1 hbase
を/etc/hosts
に追加します。
コネクターのインストールと読み込み¶
Confluent Hub クライアント を使用してコネクターをインストールします。
# run from your CP installation directory confluent-hub install confluentinc/kafka-connect-hbase:latest
ちなみに
デフォルトで、プラグインが
share/confluent-hub-components
にインストールされ、このディレクトリがプラグインパスに追加されます。新しいコネクタープラグインを追加した場合は、Connect の再起動が必要です。Confluent CLI を使用して Connect を再起動します。
confluent local services connect stop && confluent local services connect start
hbase-qs.json
ファイルを以下の内容で作成します。{ "name": "hbase", "config": { "topics": "hbase-test", "tasks.max": "1", "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "hbase.zookeeper.quorum": "localhost", "hbase.zookeeper.property.clientPort": "2182", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "example_table" } }
HBase Sink Connector を読み込みます。
注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を含める必要があります。詳細については、こちらの投稿 を参照してください。confluent local load hbase --config hbase-qs.json
重要
CLI コマンドは、本稼働環境では使用しないでください。
コネクターのステータスをチェックして、
RUNNING
ステートであることを確認します。confluent local status hbase
出力は以下のようになります。
{ "name": "hbase", "connector": { "state": "RUNNING", "worker_id": "10.200.7.192:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.200.7.192:8083" } ], "type": "sink" }
Kafka へのデータ送信¶
Confluent CLI confluent local produce コマンドを使用して、Kafka の hbase-test
トピックにテストデータを生成します。
echo key1,value1 | confluent local produce hbase-test --property parse.key=true --property key.separator=,
echo key2,value2 | confluent local produce hbase-test --property parse.key=true --property key.separator=,
echo key3,value3 | confluent local produce hbase-test --property parse.key=true --property key.separator=,
HBase でのデータチェック¶
HBase シェルを起動します。
docker exec -it hbase /bin/bash entrypoint.sh
テーブルが存在することを確認します。
list
出力は以下のようになります。
TABLE example_table 1 row(s) in 0.2750 seconds => ["example_table"]
データが書き込まれたことを確認します。
scan 'example_table'
出力は以下のようになります。
ROW COLUMN+CELL key1 column=hbase-test:KAFKA_VALUE, timestamp=1572400726104, value=value1 key2 column=hbase-test:KAFKA_VALUE, timestamp=1572400726105, value=value2 key3 column=hbase-test:KAFKA_VALUE, timestamp=1572400726106, value=value3 3 row(s) in 0.1570 seconds
リソースのクリーンアップ¶
コネクターを削除します。
confluent local unload hbase
Confluent Platform を停止します。
confluent local stop
Docker 化された HBase インスタンスを削除します。
docker stop hbase docker rm -f hbase
Apache HBase への JSON メッセージ値の書き込み¶
設定ファイルの例を以下に示します。
hbase-json.json
ファイルを以下の内容で作成します。{ "name": "hbase", "config": { "topics": "products", "tasks.max": "1", "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "hbase.zookeeper.quorum": "localhost", "hbase.zookeeper.property.clientPort": "2182", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "hbase-products" } }
HBase Sink Connector を読み込みます。
注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を含める必要があります。詳細については、こちらの投稿 を参照してください。confluent local load hbase --config hbase-json.json
重要
CLI コマンドは、本稼働環境では使用しないでください。
コネクターのステータスをチェックして、
RUNNING
ステートであることを確認します。confluent local status hbase
products
トピックに JSON レコードを生成します。kafka-console-producer \ --broker-list localhost:9092 \ --topic products \ --property parse.key=true \ --property key.separator=,
key1, {"schema": {"type": "struct", "fields": [{"type": "int64", "optional": false, "field": "registertime"},{"type": "string", "optional": false, "field": "userid"}, {"type": "string","optional":false,"field": "regionid"},{"type": "string","optional": false,"field": "gender"},{"field" : "usage_stats","type" : "struct","fields" : [ {"field" : "daily_active_users","type" : "int64"}, {"field" : "churn_rate","type" : "float"} ]}],"optional": false,"name": "ksql.users"}, "payload": {"registertime": 1493819497170,"userid": "User_1","regionid": "Region_5","gender": "MALE","usage_stats": {"daily_active_users": 10,"churn_rate": 0.05}}}
HBase でのデータチェック¶
HBase シェルを起動します。
docker exec -it hbase /bin/bash entrypoint.sh
テーブルが存在することを確認します。
list
出力は以下のようになります。
TABLE hbase-products 1 row(s) in 0.2750 seconds => ["hbase-products"]
データが書き込まれたことを確認します。
scan 'hbase-products'
出力は以下のようになります。
ROW COLUMN+CELL key1 column=products:gender, timestamp=1574790075499, value=MALE key1 column=products:regionid, timestamp=1574790075496, value=Region_5 key1 column=products:registertime, timestamp=1574790075485, value=\x00\x00\x01[\xCE\x94\x9A\xD2 key1 column=products:userid, timestamp=1574790075492, value=User_1 key1 column=usage_stats:churn_rate, timestamp=1574790075507, value==L\xCC\xCD key1 column=usage_stats:daily_active_users, timestamp=1574790075502, value=\x00\x00\x00\x00\x00\x00\x00\x0A
Apache HBase への Avro メッセージ値の書き込み¶
設定ファイルの例を以下に示します。
hbase-avro.json
ファイルを以下の内容で作成します。{ "name": "hbase", "config": { "topics": "products", "tasks.max": "1", "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "hbase.zookeeper.quorum": "localhost", "hbase.zookeeper.property.clientPort": "2182", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "products-avro" } }
HBase Sink Connector を読み込みます。
注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を含める必要があります。詳細については、こちらの投稿 を参照してください。confluent local load hbase --config hbase-avro.json
重要
CLI コマンドは、本稼働環境では使用しないでください。
コネクターのステータスをチェックして、
RUNNING
ステートであることを確認します。confluent local status hbase
products
トピックに Avro レコードを生成します。kafka-avro-console-producer \ --broker-list localhost:9092 --topic products \ --property parse.key=true \ --property key.separator=, \ --property key.schema='{"type":"string"}' \ --property value.schema='{"name": "myMetric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "dimensions","type": {"name": "dimensions","type": "record","fields": [{"name": "dimensions1","type": "string"},{"name": "dimensions2","type": "string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"count", "type": "double"},{"name":"oneMinuteRate", "type": "double"},{"name":"fiveMinuteRate", "type": "double"},{"name":"fifteenMinuteRate", "type": "double"},{"name":"meanRate", "type": "double"}]}}]}'
"key1", {"name" : "test_meter","type" : "meter", "timestamp" : 1574667646013, "dimensions" : {"dimensions1" : "InstanceID","dimensions2" : "i-aaba32d4"},"values" : {"count" : 32423.0,"oneMinuteRate" : 342342.2,"fiveMinuteRate" : 34234.2,"fifteenMinuteRate" : 2123123.1,"meanRate" : 2312312.1}}
HBase でのデータチェック¶
HBase シェルを起動します。
docker exec -it hbase /bin/bash entrypoint.sh
テーブルが存在することを確認します。
list
出力は以下のようになります。
TABLE products-avro 1 row(s) in 0.2750 seconds => ["products-avro"]
データが書き込まれたことを確認します。
scan 'products-avro'
出力は以下のようになります。
ROW COLUMN+CELL key1 column=dimensions:dimensions1, timestamp=1574787507772, value=InstanceID key1 column=dimensions:dimensions2, timestamp=1574787507777, value=i-aaba32d4 key1 column=products:name, timestamp=1574787507755, value=test_meter key1 column=products:timestamp, timestamp=1574787507767, value=\x00\x00\x01n\xA1\x81t= key1 column=products:type, timestamp=1574787507763, value=meter key1 column=values:count, timestamp=1574787507780, value=@\xDF\xA9\xC0\x00\x00\x00\x00 key1 column=values:fifteenMinuteRate, timestamp=1574787507794, value=A@2\xB9\x8C\xCC\xCC\xCD key1 column=values:fiveMinuteRate, timestamp=1574787507787, value=@\xE0\xB7Fffff key1 column=values:meanRate, timestamp=1574787507797, value=AA\xA4<\x0C\xCC\xCC\xCD key1 column=values:oneMinuteRate, timestamp=1574787507784, value=A\x14\xE5\x18\xCC\xCC\xCC\xCD
認可の失敗¶
HBase コネクターは、HBase インスタンスで認証し、Kerberos を使用して接続を確立することができます。認証が原因で接続に失敗した場合、コネクターは即座に停止します。これらのエラーにより、コネクター構成または HBase 構成のアカウントでの変更が必要となる場合があります。変更後にコネクターを再度実行してみます。
デバッグログの有効化¶
Connect ワーカーログ構成で、ログに詳細をどの程度まで含めるかを制御できます。デフォルトでは、ワーカーログに基本的な機能を特定できる程度の詳細が含まれています。さらに詳しい情報を含めるには、Connect ワーカーのログ構成でデバッグログを有効にします。この変更は、ワーカーごとに行う必要があります。変更が有効になるのは、ワーカーの起動後です。以下の説明どおりに各 Connect ワーカーのログ構成を変更した後、すべての Connect ワーカーを再起動してください。必要な場合は、ローリング再起動を使用できます。
注釈
トレースレベルのログは冗長で、詳細情報が多く含まれているので、特定の失敗を解決するのに役立つ可能性があります。トレースレベルのログ記録は、デバッグレベルのログ記録を有効にする場合と同じように有効にできます。ただし、DEBUG
の代わりに TRACE
を使用します。
オンプレミスのインストール¶
Confluent Platform をローカルまたはオンプレミスでインストールしている場合は、etc/kafka/connect-log4j.properties
ファイルで、Connect ワーカープロセスのログ構成を定義しています。HBase コネクターのみで DEBUG を有効にするには、etc/kafka/connect-log4j.properties
ファイルに以下の行を含めます。
log4j.logger.io.confluent.hbase=DEBUG
すべてのコネクターを含むすべての Connect ワーカーのコードで DEBUG を有効にするには、log4j.rootLogger=
行で変更し、 INFO
の代わりに DEBUG
を使用します。たとえば、Connect のデフォルトのログ構成には、次の行があります。
log4j.rootLogger=INFO, stdout
すべての Connect ワーカーのコードで DEBUG を有効にするには、上の行を次のように変更します。
log4j.rootLogger=DEBUG, stdout
注釈
この設定によって、org.apache.kafka.clients
パッケージから生成されるログの量が大幅に増える可能性があります。これは、log4j.logger.org.apache.kafka.clients=ERROR
を設定することで抑制できます。