Cassandra Sink Connector for Confluent Platform
The Kafka Connect Cassandra Sink Connector is a high-speed mechanism for
writing data to Apache Cassandra and is
compatible with Cassandra 2.1, 2.2, and 3.0.
Important
Cassandra Sink Connector version 2.0.0 is not backward-compatible with
version 1.0.0, 1.1.0, and 1.2.0. See the Upgrading to Version 2.0.x
section for more information.
Features
The Cassandra Sink Connector for Confluent Platform includes the following features:
At least once delivery
This connector guarantees that records from the Kafka topic are delivered at
least once.
Install the Cassandra Connector
You can install this connector by using the Confluent Hub client installation
instructions or by manually
downloading the ZIP file.
Prerequisites
Important
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (latest
) connector version.
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent-hub install confluentinc/kafka-connect-cassandra:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent-hub install confluentinc/kafka-connect-cassandra:2.0.0
Upgrading to Version 2.0.x
Cassandra Sink connector version 2.0.x includes changes that do not allow
backward compatibility with versions 1.0.x, 1.1.x, 1.2.x, and 1.3.x. Depending
on your requirements, you can upgrade to the new version of the connector by
taking one of the following paths:
If you don’t want to use SSL or Kerberos with the connector, you can
configure the cassandra.local.datacenter
property by completing the
following steps:
Connect to one of your Cassandra nodes defined in
cassandra.contact.points
using the
cqlsh
tool.
Run the following CQL command:
SELECT data_center FROM system.local;
Use the value from running the previous CQL command to configure the
cassandra.local.datacenter
property.
Note
The cassandra.contact.points
property no longer defaults to
localhost
, so if you’re using it in a local environment,
explicitly set it.
If you want to use SSL:
- Remove the
cassandra.ssl.provider
configuration.
- Follow the first upgrade path. This shouldn’t have an impact on your SSL
set up.
If you want to use Kerberos:
Ensure each of your Connect workers have a krb5.conf
configured that
can point to a KDC to provide tickets to access your Cassandra
contact points.
Set up a user principal for your connector and grab its corresponding key
tab file.
Configure cassandra.security
to KERBEROS
and configure
connect.cassandra.principal
and connect.cassandra.keytab
.
Note
The cassandra.kerberos.sasl.protocol
configuration property must
match the service portion of your Cassandra service principal.
Follow the first or second upgrade path mentioned previously depending on
your requirements.
To view the specific changes for the 2.0.x versions of the Cassandra Sink
Connector, see the changelog.
Usage Notes
This connector uses the topic to determine the name of the table to write to.
You can change this dynamically by using a transform, like Regex Router, to change the
topic name.
Schema management
You can configure this connector to manage the schema on the Cassandra cluster.
When altering an existing table, the key is ignored. This is to avoid the
potential issues around changing a primary key on an existing table. The key
schema is used to generate a primary key for the table when it is created. These
fields must also be in the value schema. Data written to the table is always
read from the value from Apache Kafka®. This connector uses the topic to determine the
name of the table to write to. This can be changed on the fly by using a
transform to change the topic name.
Time to Live (TTL) support
This connector provides support for TTL by which data can be automatically
expired after a specific period. TTL
value is the time to live value for the
data. After that particular amount of time, data will be automatically deleted.
For example, if the TTL value is set to 100 seconds, then data would be
automatically deleted after 100 seconds. To use this feature, you have to set
cassandra.ttl
config with time(in seconds) for which you want to retain the
data. If you don’t specify this property then the record will be inserted with
default TTL value null, meaning that written data will not expire.
Offset tracking support in Kafka
The Cassandra Sink Connector supports the following two types of offset tracking
support.
Offset stored in Cassandra table
This is the default behavior of the connector. Here, the offset is stored in
the Cassandra table.
Offset stored in Kafka
If you want that offset should be managed in Kafka then you must specify
cassandra.offset.storage.table.enable=false
. By default, this property is
true (in this case offset will be stored in Cassandra table).
Troubleshooting
If you encounter error messages like this:
Batch for [test.twitter] is of size 127.661KiB, exceeding specified threshold of 50.000KiB by 77.661KiB
Or warning messages like this:
Batch for [test.twitter] is of size 25.885KiB, exceeding specified threshold of 5.000KiB by 20.885KiB
Try adjusting the consumer.max.poll.records
setting in the worker.properties
for Kafka Connect.
Examples
Upsert mode
This example will configure the connector to use upserts when writing data to
Cassandra.
Select one of the following configuration methods based on how you have deployed
Kafka Connect. Distributed Mode will the JSON / REST examples. Standalone mode
will use the properties based example.
Distributed mode JSON
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"cassandra.contact.points" : "cassandra",
"cassandra.keyspace" : "test",
"cassandra.write.mode" : "Update"
}
}
Standalone mode properties
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.write.mode=Update
Standard
This example will connect to an Apache Cassandra instance without
authentication.
Select one of the following configuration methods based on how you have deployed
Kafka Connect. Distributed Mode will the JSON / REST examples. Standalone mode
will use the properties based example.
Distributed mode JSON
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"cassandra.contact.points" : "cassandra",
"cassandra.keyspace" : "test"
}
}
Standalone mode properties
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
SSL and authentication
This example will connect to an Apache Cassandra instance with SSL and
username/password authentication.
Select one of the following configuration methods based on how you have deployed
Kafka Connect. Distributed mode will use the JSON/REST examples. Standalone
mode will use the properties based example.
Distributed mode JSON
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max": "1",
"topics": "topic1,topic2,topic3",
"cassandra.contact.points": "cassandra",
"cassandra.keyspace": "test",
"cassandra.ssl.enabled": "true",
"cassandra.security": "PASSWORD",
"cassandra.username": "example",
"cassandra.password": "password",
"cassandra.ssl.truststore.password": "password",
"cassandra.ssl.truststore.path": "path_to_keystore"
}
}
Standalone mode properties
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.security=PASSWORD
cassandra.username=example
cassandra.password=password
cassandra.ssl.enabled=true
cassandra.ssl.truststore.password=password
cassandra.ssl.truststore.path=path_to_keystore