Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Cassandra Sink Connector¶
The Cassandra Sink connector is used to write data to a Cassandra Cluster. This connector works by utilizing the Batch functionality to write all of the records in each poll in a single batch.
Important
This connector can be configured to manage the schema on the Cassandra cluster. When altering an existing table the key is ignored. This is due to the potential issues changing a primary key on an existing table. The key schema is used to generate a primary key for the table when it is newly created. These fields must be in the value schema as well. Data written to the table is always read from the value from Kafka. This connector uses the topic to determine the name of the table to write to. This can be changed on the fly by using a transform to change the topic name.
Tip
If you encounter error messages like this Batch for [test.twitter] is of size 127.661KiB, exceeding specified threshold of 50.000KiB by 77.661KiB.
or warning messages like Batch for [test.twitter] is of size 25.885KiB, exceeding specified threshold of 5.000KiB by 20.885KiB.
Try adjusting the consumer.max.poll.records
setting in the worker.properties for Kafka Connect.
Note
This connector uses the topic to determine the name of the table to write to. This can be changed on the fly by using a transform like Regex Router to change the topic name.
Examples¶
Upsert mode¶
This example will configure the connector to use upserts when writing data to Cassandra.
Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.
Distributed Mode Json
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"cassandra.contact.points" : "cassandra",
"cassandra.keyspace" : "test",
"cassandra.write.mode" : "Update"
}
}
Standalone Mode Properties
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.write.mode=Update
Standard¶
This example will connect to an Apache Cassandra instance without authentication.
Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.
Distributed Mode Json
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"cassandra.contact.points" : "cassandra",
"cassandra.keyspace" : "test"
}
}
Standalone Mode Properties
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
SSL and Authentication¶
This example will connect to an Apache Cassandra instance with SSL and username / password authentication.
Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.
Distributed Mode Json
{
"name" : "cassandraSinkConnector1",
"config" : {
"connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"cassandra.contact.points" : "cassandra",
"cassandra.keyspace" : "test",
"cassandra.ssl.enabled" : "true",
"cassandra.username" : "example",
"cassandra.password" : "password"
}
}
Standalone Mode Properties
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.ssl.enabled=true
cassandra.username=example
cassandra.password=password