Apache HBase Sink Connector for Confluent Platform

The Kafka Connect Apache HBase Sink Connector moves data from Apache Kafka® to Apache HBase. It writes data from a topic in Kafka to a table in the specified HBase instance. Auto-creation of tables and the auto-creation of column families are also supported.

Features

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Apache HBase Sink Connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

Column mapping

Write operations require the specification of a column family, a column and a row key for each cell in the table. This connector expects Kafka record values to be formatted as two level structs to be able to infer a column family and a column for each value. Specifically, each Kafka record value must fit the following schema:

{
  "column family name 1": {
    "column name 1": "value",
    "column name 2": "value",
    "...": "...",
  },
  "column family name 2": {
    "column name 3": "value",
    "column name 4": "value",
    "...": "...",
  },
   "...": "..."
}

For example, consider the following Kafka record value:

{
  "usage_stats": {
    "daily_active_users": "10m",
    "churn_rate": "5%"
  },
  "sales": {
    "Jan": "10k",
    "Feb": "12k"
  }
}

If this record is written to an empty table, it would look like the example below:

  usage_stats sales
  daily_active_users churn_rate Jan Feb
“example_row_key” “10m” “5%” “10k” “12k”

Where the first row represents the column families and the second row represents the columns.

If the record does not conform to this two-level struct schema, the connector would attempt to gracefully handle the following cases:

  • If the record is a struct but some of the top-level fields are not structs then the values of these fields are mapped to a default column family.

    As an example of this case, consider the following Kafka record value:

    {
      "usage_stats": {
        "daily_active_users": "10m",
        "churn_rate": "5%"
      },
      "sales": "10"
    }
    

    If this record is written to an empty table, the table would look like the example below:

      usage_stats default_column_family
      daily_active_users churn_rate sales
    “example_row_key” “10m” “5%” “10k”

    Note

    The default column family is the topic name and the default column name is KAFKA_VALUE

  • If the record value is not a struct, the connector writes the entire value as a byte array to the default column and default column family.

    If such a value were to be written to an empty table, the table would look like:

      default_column_family
      default_column
    “example_row_key” kafka value

Row key construction

This connector supports the construction of a row key from the Kafka record key.

Fields within the key can be concatenated together to form a row key. See the Apache HBase Sink Connector Configuration Properties for additional information.

Tip

For more complex row key construction, consider using Single Message Transformation to format the record key as desired.

Data types

Data from the Kafka record types are serialized into byte arrays before being written. This connector uses the hbase Bytes library to handle serializing. The following table shows how Kafka record types are serialized in this connector.

Kafka Record Type Byte Array Serialization
INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING Hbase Bytes
BYTES Used as is
DATE, TIMESTAMP Serialized as a Long (through Hbase Bytes)
ARRAY, MAP, STRUCT Serialized as a stringified JSON object

Auto table creation and auto column family creation

If auto.create.tables is enabled, the connector can create the destination table in cases where the table is missing.

If auto.create.column.families is enabled, the connector can create missing columns families in the table, relative to the record schema.

Note

Since it is sparse, columns are created on the fly if they don’t already exist in the table, regardless of these settings.

Proxy settings

Note

When the proxy.url proxy settings are configured, the system property variables (https.proxyHost and https.proxyPort) are set globally for the entire JVM.

Prerequisites

The following are required to run the Kafka Connect Apache HBase Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above
  • Connect: Confluent Platform 4.1.0 or above
  • Java 1.8
  • HBase 1.1.x, 1.2.x, 1.3.x, or 1.4.x

Install the Apache HBase Sink Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Important

You must install the connector on every machine where Connect will run.

  • An installation of the Confluent Hub Client.

    Note

    This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-hbase:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-hbase:1.0.1-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Apache HBase Sink Connector for Confluent Platform for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Apache HBase Sink Connector Configuration Properties.

Troubleshooting and Task Failures

You can use the Connect Kafka Connect REST Interface to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace. The vast majority of the errors thrown by this connector fall into two categories:

  • Record-level failures
  • Connector-level failures

Table creation errors

Table creation can be a time-intensive task and sometimes the connector can fail while attempting to create a table. In such cases, consider increasing the retry.timeout.ms.

Errors related to table creation might not only bubble up during table creation, but also when trying to insert. Following are stack trace examples for these errors.

Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Table currently being created
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Table not found:

Note

The retry.timeout.ms defaults to 90 seconds and specifies the maximum time in milliseconds allocated for retrying database operations. If auto.create.tables is configured consider leaving this configuration as is, or making it higher, as table creation generally takes at least a minute or two.

Schema errors

If auto.create.column.families is not enabled, many record-level failures can occur because the connector may attempt to write to a column family that does not exist. This is likely to occur if the connector does not receive a two-level struct record value, and then attempts to write the data to the default column family (the kafka topic). If this happens, consider using Single Message Transformation to reconfigure the record to fit the connector’s expectation or enable auto.create.column.families.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Quick Start

In this quick start, the HBase sink connector is used to export data produced by the Avro console producer to a table in a dockerized HBase instance.

Prerequisites

HBase Prerequisites
Confluent Prerequisites

Create a Dockerized HBase Instance

  1. Get the Docker image.

    docker pull aaionap/hbase:1.2.0
    
  2. Start the HBase Docker image.

    docker run -d --name hbase --hostname hbase -p 2182:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16201:16201 -p 16301:16301 aaionap/hbase:1.2.0
    
  3. Add an entry 127.0.0.1   hbase to /etc/hosts.

Install and Load the Connector

  1. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent-hub install confluentinc/kafka-connect-hbase:latest
    

    Tip

    By default, it will install the plugin into share/confluent-hub-components and add the directory to the plugin path.

  2. Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.

    confluent local services connect stop && confluent local services connect start
    
  3. Create a hbase-qs.json file with the following contents.

    {
      "name": "hbase",
      "config": {
        "topics": "hbase-test",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor":1,
        "hbase.zookeeper.quorum": "localhost",
        "hbase.zookeeper.property.clientPort": "2182",
        "auto.create.tables": "true",
        "auto.create.column.families": "true",
        "table.name.format": "example_table"
      }
    }
    
  4. Load the HBase Sink Connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local load hbase --config hbase-qs.json
    

    Important

    Don’t use the CLI commands in production environments.

  5. Check the status of the connector to confirm that it is in a RUNNING state.

    confluent local status hbase
    

    Your output should resemble the following:

    {
      "name": "hbase",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.200.7.192:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.200.7.192:8083"
        }
      ],
      "type": "sink"
    }
    

Send Data to Kafka

Produce test data to the hbase-test topic in Kafka using the Confluent CLI confluent local produce command.

echo key1,value1 | confluent local produce hbase-test --property parse.key=true --property key.separator=,
echo key2,value2 | confluent local produce hbase-test --property parse.key=true --property key.separator=,
echo key3,value3 | confluent local produce hbase-test --property parse.key=true --property key.separator=,

Check HBase for Data

  1. Start the HBase Shell.

    docker exec -it hbase /bin/bash entrypoint.sh
    
  2. Verify the table exists.

    list
    

    The output should resemble:

    TABLE
    example_table
    1 row(s) in 0.2750 seconds
    => ["example_table"]
    
  3. Verify the data was written.

    scan 'example_table'
    

    The output should resemble:

    ROW                                COLUMN+CELL
     key1                              column=hbase-test:KAFKA_VALUE, timestamp=1572400726104, value=value1
     key2                              column=hbase-test:KAFKA_VALUE, timestamp=1572400726105, value=value2
     key3                              column=hbase-test:KAFKA_VALUE, timestamp=1572400726106, value=value3
    3 row(s) in 0.1570 seconds
    

Clean up resources

  1. Delete the connector.

    confluent local unload hbase
    
  2. Stop Confluent Platform.

    confluent local stop
    
  3. Delete the Dockerized HBase Instance.

    docker stop hbase
    docker rm -f hbase
    

Write JSON message values into Apache HBase

The example settings file is shown below:

  1. Create a hbase-json.json file with the following contents.

    {
       "name": "hbase",
       "config": {
        "topics": "products",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector",
    
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
    
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor":1,
        "hbase.zookeeper.quorum": "localhost",
        "hbase.zookeeper.property.clientPort": "2182",
        "auto.create.tables": "true",
        "auto.create.column.families": "true",
        "table.name.format": "hbase-products"
      }
    }
    
  2. Load the HBase Sink Connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local load hbase --config hbase-json.json
    

    Important

    Don’t use the CLI commands in production environments.

  3. Check the status of the connector to confirm that it is in a RUNNING state.

    confluent local status hbase
    
  4. Produce JSON records to products topic.

    kafka-console-producer \
     --broker-list localhost:9092 \
     --topic products \
     --property parse.key=true \
     --property key.separator=,
    
    key1, {"schema": {"type": "struct", "fields": [{"type": "int64", "optional": false, "field": "registertime"},{"type": "string", "optional": false, "field": "userid"}, {"type": "string","optional":false,"field": "regionid"},{"type": "string","optional": false,"field": "gender"},{"field" : "usage_stats","type" : "struct","fields" : [ {"field" : "daily_active_users","type" : "int64"}, {"field" : "churn_rate","type" : "float"} ]}],"optional": false,"name": "ksql.users"}, "payload": {"registertime": 1493819497170,"userid": "User_1","regionid": "Region_5","gender": "MALE","usage_stats": {"daily_active_users": 10,"churn_rate": 0.05}}}
    

Check HBase for Data

  1. Start the HBase Shell.

    docker exec -it hbase /bin/bash entrypoint.sh
    
  2. Verify the table exists.

    list
    

    The output should resemble:

    TABLE
    hbase-products
    1 row(s) in 0.2750 seconds
    => ["hbase-products"]
    
  3. Verify the data was written.

    scan 'hbase-products'
    

    The output should resemble:

    ROW     COLUMN+CELL
    key1    column=products:gender, timestamp=1574790075499, value=MALE
    key1    column=products:regionid, timestamp=1574790075496, value=Region_5
    key1    column=products:registertime, timestamp=1574790075485, value=\x00\x00\x01[\xCE\x94\x9A\xD2
    key1    column=products:userid, timestamp=1574790075492, value=User_1
    key1    column=usage_stats:churn_rate, timestamp=1574790075507, value==L\xCC\xCD
    key1    column=usage_stats:daily_active_users, timestamp=1574790075502, value=\x00\x00\x00\x00\x00\x00\x00\x0A
    

Write Avro message values into Apache HBase

The example settings file is shown below:

  1. Create a hbase-avro.json file with the following contents.

    {
      "name": "hbase",
      "config": {
        "topics": "products",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector",
    
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
    
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
    
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor":1,
        "hbase.zookeeper.quorum": "localhost",
        "hbase.zookeeper.property.clientPort": "2182",
        "auto.create.tables": "true",
        "auto.create.column.families": "true",
        "table.name.format": "products-avro"
      }
    }
    
  2. Load the HBase Sink Connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local load hbase --config hbase-avro.json
    

    Important

    Don’t use the CLI commands in production environments.

  3. Check the status of the connector to confirm that it is in a RUNNING state.

    confluent local status hbase
    
  4. Produce Avro records to products topic.

    kafka-avro-console-producer \
     --broker-list localhost:9092 --topic products \
     --property parse.key=true \
     --property key.separator=, \
     --property key.schema='{"type":"string"}' \
     --property value.schema='{"name": "myMetric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "dimensions","type": {"name": "dimensions","type": "record","fields": [{"name": "dimensions1","type": "string"},{"name": "dimensions2","type": "string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"count", "type": "double"},{"name":"oneMinuteRate", "type": "double"},{"name":"fiveMinuteRate", "type": "double"},{"name":"fifteenMinuteRate", "type": "double"},{"name":"meanRate", "type": "double"}]}}]}'
    
    "key1", {"name" : "test_meter","type" : "meter", "timestamp" : 1574667646013, "dimensions" : {"dimensions1" : "InstanceID","dimensions2" : "i-aaba32d4"},"values" : {"count" : 32423.0,"oneMinuteRate" : 342342.2,"fiveMinuteRate" : 34234.2,"fifteenMinuteRate" : 2123123.1,"meanRate" : 2312312.1}}
    

Check HBase for Data

  1. Start the HBase Shell.

    docker exec -it hbase /bin/bash entrypoint.sh
    
  2. Verify the table exists.

    list
    

    The output should resemble:

    TABLE
    products-avro
    1 row(s) in 0.2750 seconds
    => ["products-avro"]
    
  3. Verify the data was written.

    scan 'products-avro'
    

    The output should resemble:

    ROW     COLUMN+CELL
    key1    column=dimensions:dimensions1, timestamp=1574787507772, value=InstanceID
    key1    column=dimensions:dimensions2, timestamp=1574787507777, value=i-aaba32d4
    key1    column=products:name, timestamp=1574787507755, value=test_meter
    key1    column=products:timestamp, timestamp=1574787507767, value=\x00\x00\x01n\xA1\x81t=
    key1    column=products:type, timestamp=1574787507763, value=meter
    key1    column=values:count, timestamp=1574787507780, value=@\xDF\xA9\xC0\x00\x00\x00\x00
    key1    column=values:fifteenMinuteRate, timestamp=1574787507794, value=A@2\xB9\x8C\xCC\xCC\xCD
    key1    column=values:fiveMinuteRate, timestamp=1574787507787, value=@\xE0\xB7Fffff
    key1    column=values:meanRate, timestamp=1574787507797, value=AA\xA4<\x0C\xCC\xCC\xCD
    key1    column=values:oneMinuteRate, timestamp=1574787507784, value=A\x14\xE5\x18\xCC\xCC\xCC\xCD
    

Authorization Failures

The HBase connector can authenticate with a HBase instance and establish a connection using Kerberos. If a connection fails because of authentication, the connector will stop immediately. These errors may require changes in your connector configurations or HBase configurations account. Try to rerun your connector after you make the changes.

Enabling Debug Logging

The Connect worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the Connect worker’s log configuration to include more details. This change must be made on each worker and only takes effect upon worker startup. After you change the log configuration as outlined below on each Connect worker, restart all of the Connect workers. A rolling restart can be used if necessary.

Note

Trace-level logging is verbose and contains many more details, and may be useful to solve certain failures. Trace-level logging is enabled like debug-level logging is enabled, except TRACE is used instead of DEBUG.

On-Premises Installation

For local or on-premises installations of Confluent Platform, the etc/kafka/connect-log4j.properties file defines the logging configuration of the Connect worker process. To enable DEBUG on just the HBase connector, modify the etc/kafka/connect-log4j.properties file to include the following line:

log4j.logger.io.confluent.hbase=DEBUG

To enable DEBUG on all of the Connect worker’s code, including all connectors, change the log4j.rootLogger= line to use DEBUG instead of INFO. For example, the default log configuration for Connect includes this line:

log4j.rootLogger=INFO, stdout

Change this line to the following to enable DEBUG on all of the Connect worker code:

log4j.rootLogger=DEBUG, stdout

Note

This setting causes may generate a large amount of logs from org.apache.kafka.clients packages, which can be suppressed by setting log4j.logger.org.apache.kafka.clients=ERROR.