Kudu Source Connector for Confluent Platform¶
The Kafka Connect Kudu Source connector allows you to import data to an Apache Kafka® topic from a Kudo columnar relational database using the Impala JDBC driver.
Data is loaded by periodically executing a SQL query and creating an output record for each row in the result set. By default, all tables in a database are copied, each to its own output topic. The database is monitored for new or deleted tables and adapts automatically. When copying data from a table, the connector can load only new or modified rows by specifying which columns should be used to detect new or modified data.
You can configure Java streams applications to deserialize and ingest data in multiple ways, including Kafka console producers, JDBC source connectors, and Java client producers. For full code examples, see Pipelining with Kafka Connect and Kafka Streams.
Features¶
The Kudu Source connector supports copying tables with a variety of JDBC data types, adding and removing tables from the database dynamically, whitelists and blacklists, varying polling intervals, and other settings. However, the most important features for most users are the settings controlling how data is incrementally copied from the database.
Kafka Connect tracks the latest record it retrieved from each table, so it can start in the correct location on the next iteration (or in case of a crash). The source connector uses this functionality to only get updated rows from a table (or from the output of a custom query) on each iteration. Several modes are supported, each of which differs in how modified rows are detected.
The Kudu Source connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered to the Kafka topic at least once. If the connector restarts, there may be some duplicate records in the Kafka topic.
Multiple tasks¶
The Kudu Source connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
Incremental query modes¶
Each incremental query mode tracks a set of columns for each row, which it uses
to keep track of which rows have been processed and which rows are new or have
been updated. The mode
setting controls this behavior and supports the
following options:
- Incrementing Column: A single column containing a unique ID for each row,
where newer rows are guaranteed to have larger IDs—that is, an
AUTOINCREMENT
column. Note that this mode can only detect new rows. Updates to existing rows cannot be detected, so this mode should only be used for immutable data. One example where you might use this mode is when streaming fact tables in a data warehouse, since those are typically insert-only. - Timestamp Column: In this mode, a single column containing a modification timestamp is used to track the last time data was processed and to query only for rows that have been modified since that time. Note that because timestamps are no necessarily unique, this mode cannot guarantee all updated data will be delivered: if 2 rows share the same timestamp and are returned by an incremental query, but only one has been processed before a crash, the second update will be missed when the system recovers.
- Timestamp and Incrementing Columns: This is the most robust and accurate mode, combining an incrementing column with a timestamp column. By combining the two, as long as the timestamp is sufficiently granular, each (ID, timestamp) tuple will uniquely identify an update to a row. Even if an update fails after partially completing, unprocessed updates will still be correctly detected and delivered when the system recovers.
- Custom Query: The source connector supports using custom queries instead
of copying whole tables. With a custom query, one of the other update
automatic update modes can be used as long as the necessary
WHERE
clause can be correctly appended to the query. Alternatively, the specified query may handle filtering to new updates itself; however, note that no offset tracking will be performed (unlike the automatic modes whereincrementing
and/ortimestamp
column values are recorded for each record), so the query must track offsets itself. - Bulk: This mode is unfiltered and therefore not incremental at all. It will load all rows from a table on each iteration. This can be useful if you want to periodically dump an entire table where entries are eventually deleted and the downstream system can safely handle duplicates.
Note
All incremental query modes that use certain columns to detect changes will require indexes on those columns to efficiently perform the queries.
For incremental query modes that use timestamps, the source connector uses a
configuration timestamp.delay.interval.ms
to control the waiting period
after a row with certain timestamp appears before you include it in the result.
The additional wait allows transactions with earlier timestamps to complete and
the related changes to be included in the result. For more information, see
Configuration Reference for Kudu Source Connector for Confluent Platform.
Mapping column types¶
The source connector has a few options for controlling how column types are
mapped into Connect field types. By default, the connector maps SQL/JDBC types
to the most accurate representation in Java, which is straightforward for many
SQL types but maybe a bit unexpected for some types. For example, SQL’s
DECIMAL
types have very clear semantics controlled by the precision and
scale, and the most accurate representation is Connect’s Decimal
logical
type that uses Java’s BigDecimal
representation. Unfortunately, Avro
serializes DECIMAL
types as raw bytes that may be difficult to consume.
Limitations¶
- Kudu does not support
DATE
andTIME
types. ConnectDate
,Time
andTimestamp
types all will be mapped to ImpalaTIMESTAMP
type, which corresponds to Kuduunixtime_micros
type. - Impala does not support
BINARY
type, so our connectors will not accept binary data as well. - Complex data types like
Array
,Map
andStruct
are not supported. - For
Decimal
type, both Impala and Kudu allow at most 38 precision. And our connector shall observe the cap.
Install the Kudu Source Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
If you are running a multi-node Connect cluster, the Kudu Source connector and Impala JDBC driver JARs must be installed on every Connect worker in the cluster. See below for details.
You must install the connector on every machine where Connect will run.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-kudu:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-kudu:1.0.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic. License requirements are the same for both the sink and source connector.
Configuration Properties¶
For a complete list of configuration properties for the source connector, see Configuration Reference for Kudu Source Connector for Confluent Platform.
Quick Start¶
To see the basic functionality of the connector, you copy a single table from a local Kudu database. You can assume each entry in the table is assigned a unique ID and is not modified after creation.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Prerequisites¶
- Confluent Platform is
installed and services are running by using the Confluent
CLI commands. This quick start assumes that you are using the
Confluent CLI. By default ZooKeeper, Kafka, Schema Registry, Kafka Connect REST API, and
Kafka Connect are started with the
confluent local services start
command. For more information, see Confluent Platform. Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. - Kudu and Impala are installed and configured properly (Using Kudu with Impala). For DECIMAL type support, we need at least Kudu 1.7.0, and Impala 3.0.
- Verify that the Impala JDBC driver is available on the Kafka Connect process’s
CLASSPATH
. - Kafka and Schema Registry are running locally on the default ports.
Create table and load data¶
Start Impala shell.
impala-shell -i localhost:21000 -l -u <ldap-username> --ldap_password_cmd="echo -n <ldap-password>" --auth_creds_ok_in_clear
Create a database with the following command:
CREATE DATABASE test;
Use a database with the following command:
USE test;
Create a table and seed it with some data:
CREATE TABLE accounts ( id BIGINT, name STRING, PRIMARY KEY(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU TBLPROPERTIES ("kudu.master_addresses" = "127.0.0.1","kudu.num_tablet_replicas" = "1");
INSERT INTO accounts (id, name) VALUES (1, 'alice');
INSERT INTO accounts (id, name) VALUES (2, 'bob');
Tip
You can run
SELECT * from accounts;
to verify your table has been created.
Load the Kudu Source Connector¶
Load the predefined Kudu Source connector.
Optional: View the available predefined connectors with this command:
confluent local services connect connector list
Your output should resemble:
Bundled Predefined Connectors (edit configuration under etc/): elasticsearch-sink file-source file-sink jdbc-source jdbc-sink kudu-source kudu-sink hdfs-sink s3-sink
Create a
kudu-source.json
file for your Kudu Source connector.{ "name": "kudu-source", "config": { "connector.class": "io.confluent.connect.kudu.KuduSourceConnector", "tasks.max": "1", "impala.server": "127.0.0.1", "impala.port": "21050", "kudu.database": "test", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "test-kudu-", "table.whitelist": "accounts", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "impala.ldap.password": "secret", "impala.ldap.user": "kudu", "name": "kudu-source" } }
Load the
kudu-source
connector. Thetest
file must be in the same directory where Connect is started.confluent local services connect connector load kudu-source --config kudu-source.json
Your output should resemble:
{ "name": "kudu-source", "config": { "connector.class": "io.confluent.connect.kudu.KuduSourceConnector", "tasks.max": "1", "impala.server": "127.0.0.1", "impala.port": "21050", "kudu.database": "test", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "test-kudu-", "table.whitelist": "accounts", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "impala.ldap.password": "<ldap-password>", "impala.ldap.user": "<ldap-user>", "name": "kudu-source" }, "tasks": [], "type": "source" }
To check that it has copied the data that was present when you started Kafka Connect, start a console consumer, reading from the beginning of the topic:
./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test-kudu-accounts --from-beginning {"id":1,"name":{"string":"alice"}} {"id":2,"name":{"string":"bob"}}
The output shows the two records as expected, one per line, in the JSON encoding
of the Avro records. Each row is represented as an Avro record and each column
is a field in the record. You can see both columns in the table, id
and
name
. The IDs were auto-generated and the column is of type INTEGER NOT
NULL
, which can be encoded directly as an integer. The name
column has
type STRING
and can be NULL
. The JSON encoding of Avro encodes the
strings in the format {"type": value}
, so you can see that both rows have
string
values with the names specified when you inserted the data.
Add a record to the consumer¶
Add another record using the Impala shell:
INSERT INTO accounts (id, name) VALUES (3, 'cathy');
You can switch back to the console consumer and see the new record is added and, importantly, the old entries are not repeated:
{"id":3,"name":{"string":"cathy"}}
Note
The default polling interval is five seconds, so it may take a few seconds to show up. Depending on your expected rate of updates or desired latency, a smaller poll interval could be used to deliver updates more quickly.
All the features of Kafka Connect, including
offset management and fault tolerance, work with the source connector. You can
restart and kill the processes and they will pick up where they left off,
copying only new data (as defined by the mode
setting).
Configuration¶
The full set of configuration options are listed in Configuration Reference for Kudu Source Connector for Confluent Platform, but here are a few template configurations that cover some common usage scenarios.
Use a whitelist to limit changes to a subset of tables in a Kudu database, using
id
and modified
columns that are standard on all whitelisted tables to
detect rows that have been modified. This mode is the most robust because it can
combine the unique, immutable row IDs with modification timestamps to guarantee
modifications are not missed even if the process dies in the middle of an
incremental update query.
name=whitelist-timestamp-source
connector.class=io.confluent.connect.kudu.KuduSourceConnector
tasks.max=10
connection.url=jdbc:impala://<Impala server>:21050/my_database
table.whitelist=users,products,transactions
mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id
topic.prefix=kudu-
Use a custom query instead of loading tables, allowing you to join data from multiple tables. As long as the query does not include its own filtering, you can still use the built-in modes for incremental queries (in this case, using a timestamp column). Note that this limits you to a single output per connector and because there is no table name, the topic “prefix” is actually the full topic name in this case.
name=whitelist-timestamp-source
connector.class=io.confluent.connect.kudu.KuduSourceConnector
tasks.max=10
connection.url=jdbc:impala://<Impala server>:21050/my_database
query=SELECT users.id, users.name, transactions.timestamp, transactions.user_id, transactions.payment FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp
topic.prefix=kudu-joined-data
Troubleshooting¶
HiveServer2 error¶
When you run this connector, you might see the following error message.
java.sql.SQLException: [Cloudera][ImpalaJDBCDriver](500176) Error connecting to HiveServer2, please verify connection settings.
It means you haven’t set an LDAP in Impala or a username and a password for LDAP is not valid.