Cassandra Sink Connector for Confluent Platform¶
The Kafka Connect Cassandra Sink connector is a high-speed mechanism for writing data to Apache Cassandra and is compatible with Cassandra 2.1, 2.2, and 3.0.
Important
Cassandra Sink connector version 2.0.0 is not backward-compatible with version 1.0.0, 1.1.0, and 1.2.0. See the Upgrading to Version 2.0.x section for more information.
Features¶
The Cassandra Sink connector for Confluent Platform includes the following features:
- Exactly once delivery
- Dead Letter Queue
- Multiple tasks
- Time-To-Live support
- Offset tracking support in Kafka
Exactly once delivery¶
The Cassandra Sink connector supports exactly once delivery which can be enabled
using the cassandra.offset.storage.table
configuration property. For more
details, see the Cassandra table
configuration properties.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Cassandra Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Time-To-Live support¶
This connector supports Time-To-Live (TTL) in which data expires after a
specified period. After the TTL
time, data will be automatically deleted.
For example, if the TTL value is set to 100 seconds, the data will be deleted
after 100 seconds. To use this feature, you must configure the
cassandra.ttl configuration property. If
you don’t set this property, the record will be inserted with the default
TTL
value of null
in which written data will not expire.
Offset tracking support in Kafka¶
The Cassandra Sink connector also provides the following offset tracking support:
- Offset stored in Cassandra Table: This is the default behavior of the connector where the offset is stored in the Cassandra table.
- Offset stored in Kafka: If you want the offset to be managed in Kafka you
can set
cassandra.offset.storage.table.enable
tofalse
. By default,cassandra.offset.storage.table.enable
istrue
in which the offset is stored in the Cassandra table.
Limitations¶
The Cassandra Sink connector has the following limitations:
- The connector only supports topic names that contain the following characters:
[a-z0-9]
. - The connector does not support the Avro map data type.
Install the Cassandra Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-cassandra:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-cassandra:2.0.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Cassandra Sink Connector for Confluent Platform.
Upgrading to Version 2.0.x¶
Cassandra Sink connector version 2.0.x includes changes that do not allow backward compatibility with versions 1.0.x, 1.1.x, 1.2.x, and 1.3.x. Depending on your requirements, you can upgrade to the new version of the connector by taking one of the following paths:
If you don’t want to use SSL or Kerberos with the connector, you can configure the
cassandra.local.datacenter
property by completing the following steps:Connect to one of your Cassandra nodes defined in
cassandra.contact.points
using thecqlsh
tool.Run the following CQL command:
SELECT data_center FROM system.local;
Use the value from running the previous CQL command to configure the
cassandra.local.datacenter
property.Note
The
cassandra.contact.points
property no longer defaults tolocalhost
, so if you’re using it in a local environment, explicitly set it.
If you want to use SSL:
- Remove the
cassandra.ssl.provider
configuration. - Follow the first upgrade path. This shouldn’t have an impact on your SSL set up.
- Remove the
If you want to use Kerberos:
Ensure each of your Connect workers have a
krb5.conf
configured that can point to a KDC to provide tickets to access your Cassandra contact points.Set up a user principal for your connector and grab its corresponding key tab file.
Configure
cassandra.security
toKERBEROS
and configureconnect.cassandra.principal
andconnect.cassandra.keytab
.Note
The
cassandra.kerberos.sasl.protocol
configuration property must match the service portion of your Cassandra service principal.Follow the first or second upgrade path mentioned previously depending on your requirements.
To view the specific changes for the 2.0.x versions of the Cassandra Sink Connector, see the changelog.
Usage Notes¶
This connector uses the topic to determine the name of the table to write to. You can change this dynamically by using a transform, like Regex Router, to change the topic name.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Schema management¶
You can configure this connector to manage the schema on the Cassandra cluster. When altering an existing table, the key is ignored. This is to avoid the potential issues around changing a primary key on an existing table. The key schema is used to generate a primary key for the table when it is created. These fields must also be in the value schema. Data written to the table is always read from the value from Apache Kafka®. This connector uses the topic to determine the name of the table to write to. This can be changed on the fly by using a transform to change the topic name.
Time to Live (TTL) support¶
This connector provides support for TTL by which data can be automatically
expired after a specific period. TTL
value is the time to live value for the
data. After that particular amount of time, data will be automatically deleted.
For example, if the TTL value is set to 100 seconds, then data would be
automatically deleted after 100 seconds. To use this feature, you have to set
cassandra.ttl
config with time(in seconds) for which you want to retain the
data. If you don’t specify this property then the record will be inserted with
default TTL value null, meaning that written data will not expire.
Offset tracking support in Kafka¶
The Cassandra Sink connector supports the following two types of offset tracking support.
Offset stored in Cassandra table¶
This is the default behavior of the connector. Here, the offset is stored in the Cassandra table.
Offset stored in Kafka¶
If you want that offset should be managed in Kafka then you must specify
cassandra.offset.storage.table.enable=false
. By default, this property is
true (in this case offset will be stored in Cassandra table).
Troubleshooting¶
If you encounter error messages like this:
Batch for [test.twitter] is of size 127.661KiB, exceeding specified threshold of 50.000KiB by 77.661KiB
Or warning messages like this:
Batch for [test.twitter] is of size 25.885KiB, exceeding specified threshold of 5.000KiB by 20.885KiB
Try adjusting the consumer.max.poll.records
setting in the worker.properties
for Kafka Connect.
Examples¶
Upsert mode¶
This example will configure the connector to use upserts when writing data to Cassandra.
Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the JSON / REST examples. Standalone mode will use the properties based example.
Distributed mode JSON¶
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"cassandra.contact.points" : "cassandra",
"cassandra.keyspace" : "test",
"cassandra.write.mode" : "Update"
}
}
Standalone mode properties¶
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.write.mode=Update
Standard¶
This example will connect to an Apache Cassandra instance without authentication.
Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the JSON / REST examples. Standalone mode will use the properties based example.
Distributed mode JSON¶
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"cassandra.contact.points" : "cassandra",
"cassandra.keyspace" : "test"
}
}
Standalone mode properties¶
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
SSL and authentication¶
This example will connect to an Apache Cassandra instance with SSL and username/password authentication.
Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed mode will use the JSON/REST examples. Standalone mode will use the properties based example.
Distributed mode JSON¶
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max": "1",
"topics": "topic1,topic2,topic3",
"cassandra.contact.points": "cassandra",
"cassandra.keyspace": "test",
"cassandra.ssl.enabled": "true",
"cassandra.security": "PASSWORD",
"cassandra.username": "example",
"cassandra.password": "password",
"cassandra.ssl.truststore.password": "password",
"cassandra.ssl.truststore.path": "path_to_keystore"
}
}
Standalone mode properties¶
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.security=PASSWORD
cassandra.username=example
cassandra.password=password
cassandra.ssl.enabled=true
cassandra.ssl.truststore.password=password
cassandra.ssl.truststore.path=path_to_keystore