Kudu Sink Connector for Confluent Platform

The Kafka Connect Kudu Sink connector allows you to export data from an Apache Kafka® topic to a Kudo columnar relational database using an Impala JDBC driver. The connector polls data from Kafka to write to Kudu based on the topics subscription. Auto-creation of tables, and limited auto-evolution is also supported.

Features

At least once delivery

This connector guarantees that records are delivered at least once from the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Kudu Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Data mapping

The Kudu Sink connector requires knowledge of schemas, so you should use a suitable converter (for example, the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled). Kafka record keys if present can be primitive types or a Connect struct, and the record value must be a Connect struct. Fields being selected from Connect structs must be of primitive types. If the data in the topic is not of a compatible format, implementing a custom Converter may be necessary.

Key handling

The default is for primary keys to not be extracted with pk.mode set to none, which is not suitable for advanced usage such as upsert semantics and when the connector is responsible for auto-creating the destination table. There are different modes that enable to use fields from the Kafka record key, the Kafka record value, or the Kafka coordinates for the record.

Refer to primary key configuration options for further detail.

Auto-creation and auto-evolution

Tip

Ensure the user has the appropriate permissions for DDL.

If auto.create is enabled, the connector can CREATE the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition. Primary keys are specified based on the key configuration settings.

If auto.evolve is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table. Addition of primary key constraints is also not attempted. In contrast, if auto.evolve is disabled no evolution is performed and the connector task fails with an error stating the missing columns.

For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema, and default values are also specified based on the default value of the corresponding field if applicable. We use the following mapping from Connect schema types to Impala and Kudu types:

Schema Type Impala Kudu
Int8 TINYINT int8
Int16 SMALLINT int16
Int32 INT int32
int64 BIGINT int64
Float32 FLOAT float
Float64 DOUBLE double
Boolean BOOLEAN bool
String STRING string
‘Decimal’ DECIMAL(38,s) decimal
‘Date’ TIMESTAMP unixtime_micros
‘Time’ TIMESTAMP unixtime_micros
‘Timestamp’ TIMESTAMP unixtime_micros

Important

For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.

Limitations

  1. Despite Impala supports ARRAY and MAP types, we currently do not support any structured types for column types.
  2. Impala will convert all column names to lowercase, so it would be better just use lowercase column names in the first place to avoid name mismatches.
  3. Upsert and idempotent writes are not supported for now.
  4. Delete is not supported for now.

Install the Kudu Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

If you are running a multi-node Connect cluster, the Kudu Sink connector and Impala JDBC driver JARs must be installed on every Connect worker in the cluster. See below for details.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-kudu:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-kudu:1.0.0-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Install the Impala JDBC Driver

The Kudu Sink connector uses the Java Database Connectivity (JDBC) API. In order for this to work, the connectors must use Impala to query Kudu database, and have Impala JDBC Driver installed.

The basic steps of installation are:

  1. Download Impala JDBC Connector, and unzip to get the JAR files.
  2. Place these JAR files into the share/confluent-hub-components/confluentinc-kafka-connect-kudu/lib directory in your Confluent Platform installation on each of the Connect worker nodes.
  3. Restart all of the Connect worker nodes.

General guidelines

The following are additional guidelines to consider:

  • Use the most recent version of the Impala JDBC driver available.
  • Use the correct JAR file for the Java version used to run Connect workers. Ensure you use the correct JAR file for the Java version in use. If you install and try to use the Impala JDBC driver JAR file for the wrong version of Java, starting any Kudu source connector or Kudu sink connector will likely fail with UnsupportedClassVersionError. If this happens, remove the Impala JDBC driver JAR file you installed and repeat the driver installation process with the correct JAR file.
  • The share/confluent-hub-components/confluentinc-kafka-connect-kudu/lib directory mentioned above is for Confluent Platform. If you are using a different installation, find the location where the Confluent Kudu Source and Sink connector JAR files are located, and place the Impala JDBC driver JAR file(s) for the target databases into the same directory.
  • If the Impala JDBC driver is not installed correctly, the Kudu Source or Sink Connector will fail on startup. Typically, the system throws the error No suitable driver found. If this happens, install the Impala JDBC driver again.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic. License requirements are the same for both the sink and source connector.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Kudu Sink Connector for Confluent Platform.

Prerequisites

  • Confluent Platform is installed and services are running by using the Confluent CLI commands. This quick start assumes that you are using the Confluent CLI. By default ZooKeeper, Kafka, Schema Registry, Kafka Connect REST API, and Kafka Connect are started with the confluent local services start command. For more information, see Confluent Platform. Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.
  • Kudu and Impala are installed and configured properly (Using Kudu with Impala). For DECIMAL type support, we need at least Kudu 1.7.0, and Impala 3.0.
  • Verify that the Impala JDBC driver is available on the Kafka Connect process’s CLASSPATH.
  • Kafka and Schema Registry are running locally on the default ports.

Quick Start

The following procedure steps you through copying Avro data from a single Kafka topic to a local Kudu database.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Create Kudu database

  1. Start Impala shell.

    impala-shell -i localhost:21000 -l -u <ldap-username> --ldap_password_cmd="echo -n <ldap-password>" --auth_creds_ok_in_clear
    
  2. Create a database with this command:

    CREATE DATABASE test;
    

    Your output should resemble:

    Query: create DATABASE test
    Fetched 0 row(s) in 0.80s
    

Load the Kudu Sink Connector

Load the predefined Kudu Sink connector.

  1. Optional: View the available predefined connectors with this command:

    confluent local services connect connector list
    

    Your output should resemble:

    Bundled Predefined Connectors (edit configuration under etc/):
      elasticsearch-sink
      file-source
      file-sink
      jdbc-source
      jdbc-sink
      kudu-source
      kudu-sink
      hdfs-sink
      s3-sink
    
  2. Create a kudu-sink.json file for your Kudu Sink connector.

    {
          "name": "kudu-sink",
          "config": {
            "connector.class": "io.confluent.connect.kudu.KuduSinkConnector",
            "tasks.max": "1",
            "topics": "orders",
            "impala.server": "127.0.0.1",
            "impala.port": "21050",
            "kudu.database": "test",
            "auto.create": "true",
            "pk.mode":"record_value",
            "pk.fields":"id",
    
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://localhost:8081",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://localhost:8081",
            "confluent.topic.bootstrap.servers": "localhost:9092",
            "confluent.topic.replication.factor": "1",
            "impala.ldap.password": "<ladp-password>",
            "impala.ldap.user": "<ldap-user>",
            "kudu.tablet.replicas": "1",
            "name": "kudu-sink"
          }
      }
    
  3. Load the kudu-sink connector:

    confluent local services connect connector load kudu-sink --config kudu-sink.json
    

    Your output should resemble:

    {
        "name": "kudu-sink",
        "config": {
          "connector.class": "io.confluent.connect.kudu.KuduSinkConnector",
          "tasks.max": "1",
          "topics": "orders",
          "impala.server": "127.0.0.1",
          "impala.port": "21050",
          "kudu.database": "test",
          "auto.create": "true",
          "pk.mode":"record_value",
          "pk.fields":"id",
    
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://localhost:8081",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://localhost:8081",
          "confluent.topic.bootstrap.servers": "localhost:9092",
          "confluent.topic.replication.factor": "1",
          "impala.ldap.password": "secret",
          "impala.ldap.user": "kudu",
          "kudu.tablet.replicas": "1",
          "name": "kudu-sink"
          },
          "tasks": [],
          "type": "sink"
        }
    

Produce a record in Kafka

  1. Produce a record into the orders topic.

     ./bin/kafka-avro-console-producer \
    --broker-list localhost:9092 --topic orders \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "float"}]}'
    

    The console producer waits for input.

  2. Copy and paste the following record into the terminal and press Enter:

    {"id": 999, "product": "foo", "quantity": 100, "price": 50}
    
  3. Use Impala shell to query the Kudu database and you should see that the orders table was automatically created and contains the record.

    USE test;
    SELECT * from orders;
    foo|50.0|100|999
    

Troubleshooting

HiveServer2 error

When you run this connector, you might see the following error message.

java.sql.SQLException: [Cloudera][ImpalaJDBCDriver](500176) Error connecting to HiveServer2, please verify connection settings.

It means you haven’t set an LDAP in Impala or a username and a password for LDAP is not valid.

Not enough live tablet servers

When you run this connector, you might see the following error message:

com.cloudera.impala.support.exceptions.GeneralException: [Cloudera][ImpalaJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: TStatus(statusCode:ERROR_STATUS, sqlState:HY000, errorMessage:ImpalaRuntimeException: Error creating Kudu table 'impala::test.orders'
CAUSED BY: NonRecoverableException: not enough live tablet servers to create a table with the requested replication factor 3; 1 tablet servers are alive
), Query: CREATE TABLE `orders` (
`id` INT NOT NULL,
`product` STRING NOT NULL,
`quantity` INT NOT NULL,
`price` INT NOT NULL,
PRIMARY KEY(`id`)) PARTITION BY HASH PARTITIONS 2 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '3').

It means you don’t have enough tablet servers to support 3 replicas. Either you need to increase your tablet servers to 3 or you can add the following property to your connector.

"kudu.tablet.replicas":"1"