Kudu Sink Connector for Confluent Platform¶
The Kafka Connect Kudu Sink connector allows you to export data from an Apache Kafka® topic to a Kudo columnar relational database using an Impala JDBC driver. The connector polls data from Kafka to write to Kudu based on the topics subscription. Auto-creation of tables, and limited auto-evolution is also supported.
Features¶
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- Data mapping
- Key handling
- Auto-creation and auto-evolution
At least once delivery¶
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Kudu Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Data mapping¶
The Kudu Sink connector requires knowledge of schemas, so you should use a suitable
converter (for example, the Avro converter that comes with Schema Registry, or the JSON
converter with schemas enabled). Kafka record keys if present can be primitive
types or a Connect struct, and the record value must be a Connect struct. Fields
being selected from Connect structs must be of primitive types. If the data in
the topic is not of a compatible format, implementing a custom Converter
may
be necessary.
Key handling¶
The default is for primary keys to not be extracted with pk.mode
set to
none
, which is not suitable for advanced usage such as upsert semantics and
when the connector is responsible for auto-creating the destination table. There
are different modes that enable to use fields from the Kafka record key, the
Kafka record value, or the Kafka coordinates for the record.
Refer to primary key configuration options for further detail.
Auto-creation and auto-evolution¶
Tip
Ensure the user has the appropriate permissions for DDL.
If auto.create
is enabled, the connector can CREATE the destination table
if it is found to be missing. The creation takes place online with records being
consumed from the topic, since the connector uses the record schema as a basis
for the table definition. Primary keys are specified based on the key
configuration settings.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing ALTER on the destination table when it encounters a record for
which a column is found to be missing. Since data-type changes and removal of
columns can be dangerous, the connector does not attempt to perform such
evolutions on the table. Addition of primary key constraints is also not
attempted. In contrast, if auto.evolve
is disabled no evolution is performed
and the connector task fails with an error stating the missing columns.
For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema, and default values are also specified based on the default value of the corresponding field if applicable. We use the following mapping from Connect schema types to Impala and Kudu types:
Schema Type | Impala | Kudu |
---|---|---|
Int8 | TINYINT | int8 |
Int16 | SMALLINT | int16 |
Int32 | INT | int32 |
int64 | BIGINT | int64 |
Float32 | FLOAT | float |
Float64 | DOUBLE | double |
Boolean | BOOLEAN | bool |
String | STRING | string |
‘Decimal’ | DECIMAL(38,s) | decimal |
‘Date’ | TIMESTAMP | unixtime_micros |
‘Time’ | TIMESTAMP | unixtime_micros |
‘Timestamp’ | TIMESTAMP | unixtime_micros |
Important
For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.
Limitations¶
- Despite Impala supports
ARRAY
andMAP
types, we currently do not support any structured types for column types. - Impala will convert all column names to lowercase, so it would be better just use lowercase column names in the first place to avoid name mismatches.
- Upsert and idempotent writes are not supported for now.
- Delete is not supported for now.
Install the Kudu Sink Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
If you are running a multi-node Connect cluster, the Kudu Sink connector and Impala JDBC driver JARs must be installed on every Connect worker in the cluster. See below for details.
Prerequisites¶
You must install the connector on every machine where Connect will run.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-kudu:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-kudu:1.0.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Install the Impala JDBC Driver¶
The Kudu Sink connector uses the Java Database Connectivity (JDBC) API. In order for this to work, the connectors must use Impala to query Kudu database, and have Impala JDBC Driver installed.
The basic steps of installation are:
- Download Impala JDBC Connector, and unzip to get the JAR files.
- Place these JAR files into the
share/confluent-hub-components/confluentinc-kafka-connect-kudu/lib
directory in your Confluent Platform installation on each of the Connect worker nodes. - Restart all of the Connect worker nodes.
General guidelines¶
The following are additional guidelines to consider:
- Use the most recent version of the Impala JDBC driver available.
- Use the correct JAR file for the Java version used to run Connect workers.
Ensure you use the correct JAR file for the Java version in use. If you
install and try to use the Impala JDBC driver JAR file for the wrong version
of Java, starting any Kudu source connector or Kudu sink connector will likely
fail with
UnsupportedClassVersionError
. If this happens, remove the Impala JDBC driver JAR file you installed and repeat the driver installation process with the correct JAR file. - The
share/confluent-hub-components/confluentinc-kafka-connect-kudu/lib
directory mentioned above is for Confluent Platform. If you are using a different installation, find the location where the Confluent Kudu Source and Sink connector JAR files are located, and place the Impala JDBC driver JAR file(s) for the target databases into the same directory. - If the Impala JDBC driver is not installed correctly, the Kudu Source or Sink
Connector will fail on startup. Typically, the system throws the error
No suitable driver found
. If this happens, install the Impala JDBC driver again.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic. License requirements are the same for both the sink and source connector.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Kudu Sink Connector for Confluent Platform.
Prerequisites¶
- Confluent Platform is
installed and services are running by using the Confluent
CLI commands. This quick start assumes that you are using the
Confluent CLI. By default ZooKeeper, Kafka, Schema Registry, Kafka Connect REST API, and
Kafka Connect are started with the
confluent local services start
command. For more information, see Confluent Platform. Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. - Kudu and Impala are installed and configured properly (Using Kudu with Impala). For DECIMAL type support, we need at least Kudu 1.7.0, and Impala 3.0.
- Verify that the Impala JDBC driver is available on the Kafka Connect process’s
CLASSPATH
. - Kafka and Schema Registry are running locally on the default ports.
Quick Start¶
The following procedure steps you through copying Avro data from a single Kafka topic to a local Kudu database.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Create Kudu database¶
Start Impala shell.
impala-shell -i localhost:21000 -l -u <ldap-username> --ldap_password_cmd="echo -n <ldap-password>" --auth_creds_ok_in_clear
Create a database with this command:
CREATE DATABASE test;
Your output should resemble:
Query: create DATABASE test Fetched 0 row(s) in 0.80s
Load the Kudu Sink Connector¶
Load the predefined Kudu Sink connector.
Optional: View the available predefined connectors with this command:
confluent local services connect connector list
Your output should resemble:
Bundled Predefined Connectors (edit configuration under etc/): elasticsearch-sink file-source file-sink jdbc-source jdbc-sink kudu-source kudu-sink hdfs-sink s3-sink
Create a
kudu-sink.json
file for your Kudu Sink connector.{ "name": "kudu-sink", "config": { "connector.class": "io.confluent.connect.kudu.KuduSinkConnector", "tasks.max": "1", "topics": "orders", "impala.server": "127.0.0.1", "impala.port": "21050", "kudu.database": "test", "auto.create": "true", "pk.mode":"record_value", "pk.fields":"id", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "impala.ldap.password": "<ladp-password>", "impala.ldap.user": "<ldap-user>", "kudu.tablet.replicas": "1", "name": "kudu-sink" } }
Load the
kudu-sink
connector:confluent local services connect connector load kudu-sink --config kudu-sink.json
Your output should resemble:
{ "name": "kudu-sink", "config": { "connector.class": "io.confluent.connect.kudu.KuduSinkConnector", "tasks.max": "1", "topics": "orders", "impala.server": "127.0.0.1", "impala.port": "21050", "kudu.database": "test", "auto.create": "true", "pk.mode":"record_value", "pk.fields":"id", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "impala.ldap.password": "secret", "impala.ldap.user": "kudu", "kudu.tablet.replicas": "1", "name": "kudu-sink" }, "tasks": [], "type": "sink" }
Produce a record in Kafka¶
Produce a record into the
orders
topic../bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "float"}]}'
The console producer waits for input.
Copy and paste the following record into the terminal and press Enter:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Use Impala shell to query the Kudu database and you should see that the
orders
table was automatically created and contains the record.USE test; SELECT * from orders; foo|50.0|100|999
Troubleshooting¶
HiveServer2 error¶
When you run this connector, you might see the following error message.
java.sql.SQLException: [Cloudera][ImpalaJDBCDriver](500176) Error connecting to HiveServer2, please verify connection settings.
It means you haven’t set an LDAP in Impala or a username and a password for LDAP is not valid.
Not enough live tablet servers¶
When you run this connector, you might see the following error message:
com.cloudera.impala.support.exceptions.GeneralException: [Cloudera][ImpalaJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: TStatus(statusCode:ERROR_STATUS, sqlState:HY000, errorMessage:ImpalaRuntimeException: Error creating Kudu table 'impala::test.orders'
CAUSED BY: NonRecoverableException: not enough live tablet servers to create a table with the requested replication factor 3; 1 tablet servers are alive
), Query: CREATE TABLE `orders` (
`id` INT NOT NULL,
`product` STRING NOT NULL,
`quantity` INT NOT NULL,
`price` INT NOT NULL,
PRIMARY KEY(`id`)) PARTITION BY HASH PARTITIONS 2 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '3').
It means you don’t have enough tablet servers to support 3 replicas. Either you need to increase your tablet servers to 3 or you can add the following property to your connector.
"kudu.tablet.replicas":"1"