.. _connect_HBase: Apache HBase Sink Connector for |cp| ==================================== The |kconnect-long| Apache HBase Sink Connector moves data from |ak-tm| to Apache HBase. It writes data from a topic in |ak| to a table in the specified HBase instance. Auto-creation of tables and the auto-creation of column families are also supported. Limitations ----------- * The connector does not support batched ``insert`` operations hence the throughput on inserts is expected to be lower. * The connector does not support ``delete`` operations Prerequisites ------------- The following are required to run the |kconnect-long| Apache HBase Sink Connector: * |ak| Broker: |cp| 3.3.0 or above * |kconnect|: |cp| 4.1.0 or above * Java 1.8 * HBase 1.1.x, 1.2.x, 1.3.x, or 1.4.x Install the Apache HBase Sink Connector --------------------------------------- .. include:: ../includes/connector-install.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-hbase:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-hbase:1.0.1-preview Install the connector manually ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `Download and extract the ZIP file `__ for your connector and then follow the manual connector installation :ref:`instructions `. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`hbase-sink-connector-license-config` for license properties and :ref:`hbase-sink-license-topic-configuration` for information about the license topic. Configuration Properties ------------------------ For a complete list of configuration properties for this connector, see :ref:`hbase_connector_configuration_options`. .. include:: ../includes/bigtable-index.rst .. include:: ../includes/connect-to-cloud-note.rst Quick Start ----------- In this quick start, the HBase sink connector is used to export data produced by the Avro console producer to a table in a dockerized HBase instance. Prerequisites ^^^^^^^^^^^^^ HBase Prerequisites - Docker `installed `_ Confluent Prerequisites - :ref:`Confluent Platform ` - :ref:`Confluent CLI ` (requires separate installation) Create a Dockerized HBase Instance ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Get the Docker image. .. codewithvars:: bash docker pull aaionap/hbase:1.2.0 #. Start the HBase Docker image. .. codewithvars:: bash docker run -d --name hbase --hostname hbase -p 2182:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16201:16201 -p 16301:16301 aaionap/hbase:1.2.0 #. Add an entry ``127.0.0.1 hbase`` to ``/etc/hosts``. Install and Load the Connector ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Install the connector through the :ref:`confluent_hub_client`. .. codewithvars:: bash # run from your CP installation directory confluent-hub install confluentinc/kafka-connect-hbase:latest .. tip:: By default, it will install the plugin into ``share/confluent-hub-components`` and add the directory to the plugin path. #. Adding a new connector plugin requires restarting |kconnect|. Use the Confluent CLI to restart Connect. .. codewithvars:: bash |confluent_stop| connect && |confluent_start| connect #. Create a ``hbase-qs.json`` file with the following contents. .. codewithvars:: json { "name": "hbase", "config": { "topics": "hbase-test", "tasks.max": "1", "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "hbase.zookeeper.quorum": "localhost", "hbase.zookeeper.property.clientPort": "2182", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "example_table" } } #. Load the HBase Sink Connector. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| hbase|dash| -d hbase-qs.json .. important:: Don't use the CLI commands in production environments. #. Check the status of the connector to confirm that it is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| hbase Your output should resemble the following: .. sourcecode:: bash { "name": "hbase", "connector": { "state": "RUNNING", "worker_id": "10.200.7.192:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.200.7.192:8083" } ], "type": "sink" } Send Data to |ak| ^^^^^^^^^^^^^^^^^ Produce test data to the ``hbase-test`` topic in |ak| using the :ref:`cli` |confluent_produce| command. .. codewithvars:: bash echo key1,value1 | confluent local produce hbase-test -- --property parse.key=true --property key.separator=, echo key2,value2 | confluent local produce hbase-test -- --property parse.key=true --property key.separator=, echo key3,value3 | confluent local produce hbase-test -- --property parse.key=true --property key.separator=, Check HBase for Data ^^^^^^^^^^^^^^^^^^^^ #. Start the HBase Shell. .. codewithvars:: bash docker exec -it hbase /bin/bash entrypoint.sh #. Verify the table exists. .. codewithvars:: bash list The output should resemble: .. sourcecode:: bash TABLE example_table 1 row(s) in 0.2750 seconds => ["example_table"] #. Verify the data was written. .. codewithvars:: bash scan 'example_table' The output should resemble: .. sourcecode:: bash ROW COLUMN+CELL key1 column=hbase-test:KAFKA_VALUE, timestamp=1572400726104, value=value1 key2 column=hbase-test:KAFKA_VALUE, timestamp=1572400726105, value=value2 key3 column=hbase-test:KAFKA_VALUE, timestamp=1572400726106, value=value3 3 row(s) in 0.1570 seconds Clean up resources ^^^^^^^^^^^^^^^^^^ #. Delete the connector. .. codewithvars:: bash |confluent_unload| hbase #. Stop |cp|. .. codewithvars:: bash |confluent_stop| #. Delete the Dockerized HBase Instance. .. codewithvars:: bash docker stop hbase docker rm -f hbase Write JSON message values into Apache HBase ------------------------------------------- The example settings file is shown below: #. Create a ``hbase-json.json`` file with the following contents. .. codewithvars:: json { "name": "hbase", "config": { "topics": "products", "tasks.max": "1", "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "hbase.zookeeper.quorum": "localhost", "hbase.zookeeper.property.clientPort": "2182", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "hbase-products" } } #. Load the HBase Sink Connector. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| hbase|dash| -d hbase-json.json .. important:: Don't use the CLI commands in production environments. #. Check the status of the connector to confirm that it is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| hbase #. Produce JSON records to ``products`` topic. .. codewithvars:: bash kafka-console-producer \ --broker-list localhost:9092 \ --topic products \ --property parse.key=true \ --property key.separator=, .. codewithvars:: bash key1, {"schema": {"type": "struct", "fields": [{"type": "int64", "optional": false, "field": "registertime"},{"type": "string", "optional": false, "field": "userid"}, {"type": "string","optional":false,"field": "regionid"},{"type": "string","optional": false,"field": "gender"},{"field" : "usage_stats","type" : "struct","fields" : [ {"field" : "daily_active_users","type" : "int64"}, {"field" : "churn_rate","type" : "float"} ]}],"optional": false,"name": "ksql.users"}, "payload": {"registertime": 1493819497170,"userid": "User_1","regionid": "Region_5","gender": "MALE","usage_stats": {"daily_active_users": 10,"churn_rate": 0.05}}} Check HBase for Data ^^^^^^^^^^^^^^^^^^^^ #. Start the HBase Shell. .. codewithvars:: bash docker exec -it hbase /bin/bash entrypoint.sh #. Verify the table exists. .. codewithvars:: bash list The output should resemble: .. sourcecode:: bash TABLE hbase-products 1 row(s) in 0.2750 seconds => ["hbase-products"] #. Verify the data was written. .. codewithvars:: bash scan 'hbase-products' The output should resemble: .. sourcecode:: bash ROW COLUMN+CELL key1 column=products:gender, timestamp=1574790075499, value=MALE key1 column=products:regionid, timestamp=1574790075496, value=Region_5 key1 column=products:registertime, timestamp=1574790075485, value=\x00\x00\x01[\xCE\x94\x9A\xD2 key1 column=products:userid, timestamp=1574790075492, value=User_1 key1 column=usage_stats:churn_rate, timestamp=1574790075507, value==L\xCC\xCD key1 column=usage_stats:daily_active_users, timestamp=1574790075502, value=\x00\x00\x00\x00\x00\x00\x00\x0A Write Avro message values into Apache HBase ------------------------------------------- The example settings file is shown below: #. Create a ``hbase-avro.json`` file with the following contents. .. codewithvars:: json { "name": "hbase", "config": { "topics": "products", "tasks.max": "1", "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "hbase.zookeeper.quorum": "localhost", "hbase.zookeeper.property.clientPort": "2182", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "products-avro" } } #. Load the HBase Sink Connector. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| hbase|dash| -d hbase-avro.json .. important:: Don't use the CLI commands in production environments. #. Check the status of the connector to confirm that it is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| hbase #. Produce Avro records to ``products`` topic. .. codewithvars:: bash kafka-avro-console-producer \ --broker-list localhost:9092 --topic products \ --property parse.key=true \ --property key.separator=, \ --property key.schema='{"type":"string"}' \ --property value.schema='{"name": "myMetric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "dimensions","type": {"name": "dimensions","type": "record","fields": [{"name": "dimensions1","type": "string"},{"name": "dimensions2","type": "string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"count", "type": "double"},{"name":"oneMinuteRate", "type": "double"},{"name":"fiveMinuteRate", "type": "double"},{"name":"fifteenMinuteRate", "type": "double"},{"name":"meanRate", "type": "double"}]}}]}' .. codewithvars:: bash "key1", {"name" : "test_meter","type" : "meter", "timestamp" : 1574667646013, "dimensions" : {"dimensions1" : "InstanceID","dimensions2" : "i-aaba32d4"},"values" : {"count" : 32423.0,"oneMinuteRate" : 342342.2,"fiveMinuteRate" : 34234.2,"fifteenMinuteRate" : 2123123.1,"meanRate" : 2312312.1}} Check HBase for Data ^^^^^^^^^^^^^^^^^^^^ #. Start the HBase Shell. .. codewithvars:: bash docker exec -it hbase /bin/bash entrypoint.sh #. Verify the table exists. .. codewithvars:: bash list The output should resemble: .. sourcecode:: bash TABLE products-avro 1 row(s) in 0.2750 seconds => ["products-avro"] #. Verify the data was written. .. codewithvars:: bash scan 'products-avro' The output should resemble: .. sourcecode:: bash ROW COLUMN+CELL key1 column=dimensions:dimensions1, timestamp=1574787507772, value=InstanceID key1 column=dimensions:dimensions2, timestamp=1574787507777, value=i-aaba32d4 key1 column=products:name, timestamp=1574787507755, value=test_meter key1 column=products:timestamp, timestamp=1574787507767, value=\x00\x00\x01n\xA1\x81t= key1 column=products:type, timestamp=1574787507763, value=meter key1 column=values:count, timestamp=1574787507780, value=@\xDF\xA9\xC0\x00\x00\x00\x00 key1 column=values:fifteenMinuteRate, timestamp=1574787507794, value=A@2\xB9\x8C\xCC\xCC\xCD key1 column=values:fiveMinuteRate, timestamp=1574787507787, value=@\xE0\xB7Fffff key1 column=values:meanRate, timestamp=1574787507797, value=AA\xA4<\x0C\xCC\xCC\xCD key1 column=values:oneMinuteRate, timestamp=1574787507784, value=A\x14\xE5\x18\xCC\xCC\xCC\xCD Authorization Failures ---------------------- The HBase connector can authenticate with a HBase instance and establish a connection using Kerberos. If a connection fails because of authentication, the connector will stop immediately. These errors may require changes in your connector configurations or HBase configurations account. Try to rerun your connector after you make the changes. Enabling Debug Logging ---------------------- The |kconnect| worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the |kconnect| worker's log configuration to include more details. This change must be made on each worker and only takes effect upon worker startup. After you change the log configuration as outlined below on each |kconnect| worker, restart all of the |kconnect| workers. A rolling restart can be used if necessary. .. note:: Trace-level logging is verbose and contains many more details, and may be useful to solve certain failures. Trace-level logging is enabled like debug-level logging is enabled, except ``TRACE`` is used instead of ``DEBUG``. On-Premises Installation ^^^^^^^^^^^^^^^^^^^^^^^^ For local or on-premises installations of |cp|, the ``etc/kafka/connect-log4j.properties`` file defines the logging configuration of the |kconnect| worker process. To enable DEBUG on just the HBase connector, modify the ``etc/kafka/connect-log4j.properties`` file to include the following line: :: log4j.logger.io.confluent.hbase=DEBUG To enable DEBUG on all of the |kconnect| worker's code, including all connectors, change the ``log4j.rootLogger=`` line to use ``DEBUG`` instead of ``INFO``. For example, the default log configuration for |kconnect| includes this line: :: log4j.rootLogger=INFO, stdout Change this line to the following to enable DEBUG on all of the |kconnect| worker code: :: log4j.rootLogger=DEBUG, stdout .. note:: This setting causes may generate a large amount of logs from ``org.apache.kafka.clients`` packages, which can be suppressed by setting ``log4j.logger.org.apache.kafka.clients=ERROR``. Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options changelog