Apache HBase Sink Connector for Confluent Platform¶
The Kafka Connect Apache HBase Sink connector moves data from Apache Kafka® to Apache HBase. It writes data from a topic in Kafka to a table in the specified HBase instance. Auto-creation of tables and the auto-creation of column families are also supported.
Features¶
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- Column mapping
- Row key construction
- Data types
- Auto table creation and auto column family creation
- Proxy settings
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Apache HBase Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
Column mapping¶
Write operations require the specification of a column family
, a column
and a row key
for each cell
in the table. This connector expects Kafka
record values to be formatted as two level structs to be able to infer a
column family
and a column
for each value. Specifically, each Kafka
record value must fit the following schema:
Important
The HBase Sink connector requires a non-null value for every field if the input is a struct or primitive–this is because writing a null value into a cell will cause the connector to delete that cell. To prevent this behavior, ensure either of the following:
- The fields have a non-null default.
- Every incoming record has non-null values for all fields. If the values are primitives, they should be non-null as well.
{
"column family name 1": {
"column name 1": "value",
"column name 2": "value",
"...": "...",
},
"column family name 2": {
"column name 3": "value",
"column name 4": "value",
"...": "...",
},
"...": "..."
}
For example, consider the following Kafka record value:
{
"usage_stats": {
"daily_active_users": "10m",
"churn_rate": "5%"
},
"sales": {
"Jan": "10k",
"Feb": "12k"
}
}
If this record is written to an empty table, it would look like the following example:
usage_stats | sales | |||
---|---|---|---|---|
daily_active_users | churn_rate | Jan | Feb | |
“example_row_key” | “10m” | “5%” | “10k” | “12k” |
Where the first row represents the column families and the second row represents the columns.
If the record does not conform to this two-level struct schema, the connector would attempt to gracefully handle the following cases:
If the record is a struct but some of the top-level fields are not structs then the values of these fields are mapped to a default column family.
As an example of this case, consider the following Kafka record value:
{ "usage_stats": { "daily_active_users": "10m", "churn_rate": "5%" }, "sales": "10" }
If this record is written to an empty table, the table would look like the following example:
usage_stats default_column_family daily_active_users churn_rate sales “example_row_key” “10m” “5%” “10k” The default column family is the topic name and the default column name is
KAFKA_VALUE
.If the record value is not a struct, the connector writes the entire value as a byte array to the default column and default column family.
If such a value were to be written to an empty table, the table would look like the following table:
“example_row_key” | ||
---|---|---|
default_column_family | default_column | Kafka value |
Row key construction¶
This connector supports the construction of a row key from the Kafka record key. Fields within the key can be concatenated together to form a row key. See the Configuration Reference for Apache HBase Sink Connector for Confluent Platform for additional information. For more complex row key construction, consider using Single Message Transformation to format the record key as desired.
Data types¶
Data from the Kafka record types are serialized into byte arrays before being written. This connector uses the HBase Bytes library to handle serializing. The following table shows how Kafka record types are serialized in this connector.
Kafka Record Type | Byte Array Serialization |
---|---|
INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING | HBase Bytes |
BYTES | Used as is |
DATE, TIMESTAMP | Serialized as a Long (through HBase Bytes) |
ARRAY, MAP, STRUCT | Serialized as a stringified JSON object |
Auto table creation and auto column family creation¶
If auto.create.tables
is enabled, the connector can create the destination
table in cases where the table is missing.
If auto.create.column.families
is enabled, the connector can create missing
columns families in the table, relative to the record schema. Since it is sparse,
columns are created on the fly if they don’t already exist in the table,
regardless of these settings.
Proxy settings¶
When the proxy.url
proxy settings are configured, the system property
variables (https.proxyHost
and https.proxyPort
) are set globally for the
entire JVM.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, see Apache HBase Sink connector for Confluent Platform.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Apache HBase Sink Connector for Confluent Platform.
Install the Apache HBase Sink Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- Kafka Broker: Confluent Platform 3.3.0 or later.
- Connect: Confluent Platform 4.1.0 or later.
- Java 8+. Note that Java 8 is deprecated in versions 7.2 and later of Confluent Platform. For more details, view Java compatibility with Confluent Platform by version.
- HBase 2.x.x.
- An installation of the latest (
latest
) connector version.
Install the connector using the Confluent CLI¶
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent connect plugin install confluentinc/kafka-connect-hbase:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-hbase:2.0.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions
Troubleshooting and task failures¶
You can use the Kafka Connect REST
Interface to check the status of the connectors
and tasks. If a task or connector has failed, the trace
field will include a
reason and a stack trace. The vast majority of the errors thrown by this
connector fall into two categories:
- Record-level failures
- Connector-level failures
Table creation errors¶
Table creation can be a time-intensive task and sometimes the connector can fail
when trying to create a table. In such cases, consider increasing the
retry.timeout.ms
.
The retry.timeout.ms
defaults to 90
seconds and specifies the maximum
time in milliseconds allocated for retrying database operations. If
auto.create.tables
is configured consider leaving this configuration as is,
or setting it higher, as table creation generally takes at least a minute or
two.
Errors might not only happen during table creation, but also when trying to
insert
. Here are a few stack trace examples for the errors:
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Table currently being created
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Table not found:
Schema errors¶
If auto.create.column.families
is not enabled, many record-level failures
can occur because the connector may attempt to write to a column family that
does not exist. This is likely to occur if the connector does not receive a
two-level struct record value, and then attempts to write the data to the
default column family (the Kafka topic). If this happens, consider using
Single Message Transformation to
reconfigure the record to fit the connector’s expectation or enable
auto.create.column.families
.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.
Quick start¶
In this quick start, the HBase Sink connector is used to export data produced by the Avro console producer to a table in a dockerized HBase instance. Before moving forward in this quick start, ensure you have the following:
- Docker installed
- Confluent Platform
- Confluent CLI (requires separate installation)
Create a Dockerized HBase instance¶
Get the Docker image.
docker pull aaionap/hbase:1.2.0
Start the HBase Docker image.
docker run -d --name hbase --hostname hbase -p 2182:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16201:16201 -p 16301:16301 aaionap/hbase:1.2.0
Add an entry
127.0.0.1 hbase
to/etc/hosts
.
Install and load the connector¶
Install the connector through the Confluent Hub Client.
# run from your CP installation directory confluent connect plugin install confluentinc/kafka-connect-hbase:latest
By default, it will install the plugin into
share/confluent-hub-components
and add the directory to the plugin path.Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.
confluent local services connect stop && confluent local services connect start
Create a
hbase-qs.json
file with the following contents.{ "name": "hbase", "config": { "topics": "hbase-test", "tasks.max": "1", "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "hbase.zookeeper.quorum": "localhost", "hbase.zookeeper.property.clientPort": "2182", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "example_table" } }
Load the HBase Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local load hbase --config hbase-qs.json
Confluent recommends you don’t use the CLI commands in production environments.
Check the status of the connector to confirm that it is in a
RUNNING
state.confluent local status hbase
Your output should resemble the following:
{ "name": "hbase", "connector": { "state": "RUNNING", "worker_id": "10.200.7.192:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.200.7.192:8083" } ], "type": "sink" }
Send Data to Kafka¶
Produce test data to the hbase-test
topic in Kafka using the Confluent CLI confluent local produce command.
echo key1,value1 | confluent local produce hbase-test --property parse.key=true --property key.separator=,
echo key2,value2 | confluent local produce hbase-test --property parse.key=true --property key.separator=,
echo key3,value3 | confluent local produce hbase-test --property parse.key=true --property key.separator=,
Check HBase for data¶
Start the HBase Shell.
docker exec -it hbase /bin/bash entrypoint.sh
Verify the table exists.
list
The output should resemble:
TABLE example_table 1 row(s) in 0.2750 seconds => ["example_table"]
Verify the data was written.
scan 'example_table'
The output should resemble:
ROW COLUMN+CELL key1 column=hbase-test:KAFKA_VALUE, timestamp=1572400726104, value=value1 key2 column=hbase-test:KAFKA_VALUE, timestamp=1572400726105, value=value2 key3 column=hbase-test:KAFKA_VALUE, timestamp=1572400726106, value=value3 3 row(s) in 0.1570 seconds
Clean up resources¶
Delete the connector.
confluent local unload hbase
Stop Confluent Platform.
confluent local stop
Delete the Dockerized HBase Instance.
docker stop hbase docker rm -f hbase
Write JSON message values into Apache HBase¶
The example settings file is shown below:
Create a
hbase-json.json
file with the following contents.{ "name": "hbase", "config": { "topics": "products", "tasks.max": "1", "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "hbase.zookeeper.quorum": "localhost", "hbase.zookeeper.property.clientPort": "2182", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "hbase-products" } }
Load the HBase Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local load hbase --config hbase-json.json
Confluent recommends you don’t use the CLI commands in production environments.
Check the status of the connector to confirm that it is in a
RUNNING
state.confluent local status hbase
Produce JSON records to
products
topic.kafka-console-producer \ --broker-list localhost:9092 \ --topic products \ --property parse.key=true \ --property key.separator=,
key1, {"schema": {"type": "struct", "fields": [{"type": "int64", "optional": false, "field": "registertime"},{"type": "string", "optional": false, "field": "userid"}, {"type": "string","optional":false,"field": "regionid"},{"type": "string","optional": false,"field": "gender"},{"field" : "usage_stats","type" : "struct","fields" : [ {"field" : "daily_active_users","type" : "int64"}, {"field" : "churn_rate","type" : "float"} ]}],"optional": false,"name": "ksql.users"}, "payload": {"registertime": 1493819497170,"userid": "User_1","regionid": "Region_5","gender": "MALE","usage_stats": {"daily_active_users": 10,"churn_rate": 0.05}}}
Check HBase for data¶
Start the HBase Shell.
docker exec -it hbase /bin/bash entrypoint.sh
Verify the table exists.
list
The output should resemble:
TABLE hbase-products 1 row(s) in 0.2750 seconds => ["hbase-products"]
Verify the data was written.
scan 'hbase-products'
The output should resemble:
ROW COLUMN+CELL key1 column=products:gender, timestamp=1574790075499, value=MALE key1 column=products:regionid, timestamp=1574790075496, value=Region_5 key1 column=products:registertime, timestamp=1574790075485, value=\x00\x00\x01[\xCE\x94\x9A\xD2 key1 column=products:userid, timestamp=1574790075492, value=User_1 key1 column=usage_stats:churn_rate, timestamp=1574790075507, value==L\xCC\xCD key1 column=usage_stats:daily_active_users, timestamp=1574790075502, value=\x00\x00\x00\x00\x00\x00\x00\x0A
Write Avro message values into Apache HBase¶
The example settings file is shown below:
Create a
hbase-avro.json
file with the following contents.{ "name": "hbase", "config": { "topics": "products", "tasks.max": "1", "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "hbase.zookeeper.quorum": "localhost", "hbase.zookeeper.property.clientPort": "2182", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "products-avro" } }
Load the HBase Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local load hbase --config hbase-avro.json
Confluent recommends you don’t use the CLI commands in production environments.
Check the status of the connector to confirm that it is in a
RUNNING
state.confluent local status hbase
Produce Avro records to
products
topic.kafka-avro-console-producer \ --broker-list localhost:9092 --topic products \ --property parse.key=true \ --property key.separator=, \ --property key.schema='{"type":"string"}' \ --property value.schema='{"name": "myMetric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "dimensions","type": {"name": "dimensions","type": "record","fields": [{"name": "dimensions1","type": "string"},{"name": "dimensions2","type": "string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"count", "type": "double"},{"name":"oneMinuteRate", "type": "double"},{"name":"fiveMinuteRate", "type": "double"},{"name":"fifteenMinuteRate", "type": "double"},{"name":"meanRate", "type": "double"}]}}]}'
"key1", {"name" : "test_meter","type" : "meter", "timestamp" : 1574667646013, "dimensions" : {"dimensions1" : "InstanceID","dimensions2" : "i-aaba32d4"},"values" : {"count" : 32423.0,"oneMinuteRate" : 342342.2,"fiveMinuteRate" : 34234.2,"fifteenMinuteRate" : 2123123.1,"meanRate" : 2312312.1}}
Check HBase for Data¶
Start the HBase Shell.
docker exec -it hbase /bin/bash entrypoint.sh
Verify the table exists.
list
The output should resemble:
TABLE products-avro 1 row(s) in 0.2750 seconds => ["products-avro"]
Verify the data was written.
scan 'products-avro'
The output should resemble:
ROW COLUMN+CELL key1 column=dimensions:dimensions1, timestamp=1574787507772, value=InstanceID key1 column=dimensions:dimensions2, timestamp=1574787507777, value=i-aaba32d4 key1 column=products:name, timestamp=1574787507755, value=test_meter key1 column=products:timestamp, timestamp=1574787507767, value=\x00\x00\x01n\xA1\x81t= key1 column=products:type, timestamp=1574787507763, value=meter key1 column=values:count, timestamp=1574787507780, value=@\xDF\xA9\xC0\x00\x00\x00\x00 key1 column=values:fifteenMinuteRate, timestamp=1574787507794, value=A@2\xB9\x8C\xCC\xCC\xCD key1 column=values:fiveMinuteRate, timestamp=1574787507787, value=@\xE0\xB7Fffff key1 column=values:meanRate, timestamp=1574787507797, value=AA\xA4<\x0C\xCC\xCC\xCD key1 column=values:oneMinuteRate, timestamp=1574787507784, value=A\x14\xE5\x18\xCC\xCC\xCC\xCD
Authorization failures¶
The HBase connector can authenticate with a HBase instance and establish a connection using Kerberos. If a connection fails because of authentication, the connector will stop immediately. These errors may require changes in your connector configurations or HBase configurations account. Try to rerun your connector after you make the changes.
Enabling debug logging¶
The Connect worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the Connect worker’s log configuration to include more details. This change must be made on each worker and only takes effect upon worker startup. After you change the log configuration as outlined below on each Connect worker, restart all of the Connect workers. A rolling restart can be used if necessary.
Trace-level logging may also be useful in solving certain failures–this form of
logging, however, contains much more detail. Trace-level logging is enabled
much like debug-level logging, except TRACE
is used instead of DEBUG
.
On-premises installation¶
For local or on-premises installations of Confluent Platform, the
etc/kafka/connect-log4j.properties
file defines the logging configuration of
the Connect worker process. To enable DEBUG
on just the HBase connector,
modify the etc/kafka/connect-log4j.properties
file to include the following
line:
log4j.logger.io.confluent.hbase=DEBUG
To enable DEBUG
on all of the Connect worker’s code, including all
connectors, change the log4j.rootLogger=
line to use DEBUG
instead of
INFO
. For example, instead of using the default log configuration for Connect,
log4j.rootLogger=INFO, stdout
, use the following:
log4j.rootLogger=DEBUG, stdout
Be aware that this setting may generate a large amount of logs from
``org.apache.kafka.clients`` packages, which can be suppressed by setting
``log4j.logger.org.apache.kafka.clients=ERROR``.