Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kudu Sink Connector for Confluent Platform¶
The Kafka Connect Kudu Sink connector allows you to export data from Apache Kafka® topics to columnar relational database Kudu with Impala JDBC driver. The connector polls data from Kafka to write to Kudu based on the topics subscription. Auto-creation of tables, and limited auto-evolution is also supported.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Kudu Sink Connector Configuration Properties.
Features¶
Data mapping¶
The sink connector requires knowledge of schemas, so you should use a suitable converter (for example, the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled).
Kafka record keys if present can be primitive types or a Connect struct, and the record value must be a Connect struct.
Fields being selected from Connect structs must be of primitive types.
If the data in the topic is not of a compatible format, implementing a custom Converter
may be necessary.
Key handling¶
The default is for primary keys to not be extracted with pk.mode
set to none
,
which is not suitable for advanced usage such as upsert semantics and when the connector is responsible for auto-creating the destination table.
There are different modes that enable to use fields from the Kafka record key, the Kafka record value, or the Kafka coordinates for the record.
Refer to primary key configuration options for further detail.
Auto-creation and Auto-evoluton¶
Tip
Make sure the user has the appropriate permissions for DDL.
If auto.create
is enabled, the connector can CREATE the destination table if it is found to be missing.
The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition.
Primary keys are specified based on the key configuration settings.
If auto.evolve
is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing.
Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.
Addition of primary key constraints is also not attempted. In contrast, if auto.evolve
is disabled no evolution is performed and the connector task fails with an error stating the missing columns.
For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema, and default values are also specified based on the default value of the corresponding field if applicable. We use the following mapping from Connect schema types to Impala and Kudu types:
Schema Type | Impala | Kudu |
---|---|---|
Int8 | TINYINT | int8 |
Int16 | SMALLINT | int16 |
Int32 | INT | int32 |
Int64 | BIGINT | int64 |
Float32 | FLOAT | float |
Float64 | DOUBLE | double |
Boolean | BOOLEAN | bool |
String | STRING | string |
‘Decimal’ | DECIMAL(38,s) | decimal |
‘Date’ | TIMESTAMP | unixtime_micros |
‘Time’ | TIMESTAMP | unixtime_micros |
‘Timestamp’ | TIMESTAMP | unixtime_micros |
Important
For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.
Limitations¶
- Despite Impala supports
ARRAY
andMAP
types, we currently do not support any structured types for column types. - Impala will convert all column names to lowercase, so it would be better just use lowercase column names in the first place to avoid name mismatches.
- Upsert and idempotent writes are not supported for now.
- Delete is not supported for now.
Quick Start¶
The following procedure steps you through copying Avro data from a single Kafka topic to a local Kudu database.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Prerequisites¶
- Confluent Platform is installed and services are running by using the Confluent CLI. This quick start assumes that you are using the Confluent CLI. By default ZooKeeper, Kafka, Schema Registry, Kafka Connect REST API, and Kafka Connect are started with the
confluent local start
command. For more information, see On-Premises Deployments. - Kudu and Impala are installed and configured properly (Using Kudu with Impala). For DECIMAL type support, we need at least Kudu 1.7.0, and Impala 3.0.
- Verify that the Impala JDBC driver is available on the Kafka Connect process’s
CLASSPATH
. - Kafka and Schema Registry are running locally on the default ports.
Create Kudu Database¶
Start Impala shell.
impala-shell -i localhost:21000 -l -u <ldap-username> --ldap_password_cmd="echo -n <ldap-password>" --auth_creds_ok_in_clear
Create a database with this command:
CREATE DATABASE test;
Your output should resemble:
Query: create DATABASE test Fetched 0 row(s) in 0.80s
Load the Kudu Sink Connector¶
Load the predefined Kudu sink connector.
Optional: View the available predefined connectors with this command:
confluent local list connectors
Your output should resemble:
Bundled Predefined Connectors (edit configuration under etc/): elasticsearch-sink file-source file-sink jdbc-source jdbc-sink kudu-source kudu-sink hdfs-sink s3-sink
Create a
kudu-sink.json
file for your Kudu Sink Connector{ "name": "kudu-sink", "config": { "connector.class": "io.confluent.connect.kudu.KuduSinkConnector", "tasks.max": "1", "topics": "orders", "impala.server": "127.0.0.1", "impala.port": "21050", "kudu.database": "test", "auto.create": "true", "pk.mode":"record_value", "pk.fields":"id", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "impala.ldap.password": "<ladp-password>", "impala.ldap.user": "<ldap-user>", "kudu.tablet.replicas": "1", "name": "kudu-sink" } }
Load the
kudu-sink
connector:confluent local load kudu-sink -- -d kudu-sink.json
Your output should resemble:
{ "name": "kudu-sink", "config": { "connector.class": "io.confluent.connect.kudu.KuduSinkConnector", "tasks.max": "1", "topics": "orders", "impala.server": "127.0.0.1", "impala.port": "21050", "kudu.database": "test", "auto.create": "true", "pk.mode":"record_value", "pk.fields":"id", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "impala.ldap.password": "secret", "impala.ldap.user": "kudu", "kudu.tablet.replicas": "1", "name": "kudu-sink" }, "tasks": [], "type": "sink" }
Produce a Record in Kafka¶
Produce a record into the
orders
topic../bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "float"}]}'
The console producer waits for input.
Copy and paste the following record into the terminal and press Enter:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Use Impala shell to query the Kudu database and you should see that the
orders
table was automatically created and contains the record.USE test; SELECT * from orders; foo|50.0|100|999
Troubleshooting¶
HiveServer2 error¶
When you run this connector, you might see the following error message.
java.sql.SQLException: [Cloudera][ImpalaJDBCDriver](500176) Error connecting to HiveServer2, please verify connection settings.
It means you haven’t set an LDAP in Impala or a username and a password for LDAP is not valid.
Not enough live tablet servers¶
When you run this connector, you might see the following error message.
com.cloudera.impala.support.exceptions.GeneralException: [Cloudera][ImpalaJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: TStatus(statusCode:ERROR_STATUS, sqlState:HY000, errorMessage:ImpalaRuntimeException: Error creating Kudu table 'impala::test.orders'
CAUSED BY: NonRecoverableException: not enough live tablet servers to create a table with the requested replication factor 3; 1 tablet servers are alive
), Query: CREATE TABLE `orders` (
`id` INT NOT NULL,
`product` STRING NOT NULL,
`quantity` INT NOT NULL,
`price` INT NOT NULL,
PRIMARY KEY(`id`)) PARTITION BY HASH PARTITIONS 2 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '3').
It means you don’t have enough tablet servers to support 3 replicas. Either you need to increase your tablet servers to 3 or you can add the following property to your connector.
"kudu.tablet.replicas":"1"