Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Cassandra Sink Connector

The Cassandra Sink connector is used to write data to a Cassandra Cluster. This connector works by utilizing the Batch functionality to write all of the records in each poll in a single batch.

Important

This connector can be configured to manage the schema on the Cassandra cluster. When altering an existing table the key is ignored. This is due to the potential issues changing a primary key on an existing table. The key schema is used to generate a primary key for the table when it is newly created. These fields must be in the value schema as well. Data written to the table is always read from the value from Kafka. This connector uses the topic to determine the name of the table to write to. This can be changed on the fly by using a transform to change the topic name.

Tip

If you encounter error messages like this Batch for [test.twitter] is of size 127.661KiB, exceeding specified threshold of 50.000KiB by 77.661KiB. or warning messages like Batch for [test.twitter] is of size 25.885KiB, exceeding specified threshold of 5.000KiB by 20.885KiB. Try adjusting the consumer.max.poll.records setting in the worker.properties for Kafka Connect.

Note

This connector uses the topic to determine the name of the table to write to. This can be changed on the fly by using a transform like Regex Router to change the topic name.

Examples

Upsert mode

This example will configure the connector to use upserts when writing data to Cassandra.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode Json

{
  "name" : "cassandraSinkConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "cassandra.contact.points" : "cassandra",
    "cassandra.keyspace" : "test",
    "cassandra.write.mode" : "Update"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.write.mode=Update

Standard

This example will connect to an Apache Cassandra instance without authentication.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode Json

{
  "name" : "cassandraSinkConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "cassandra.contact.points" : "cassandra",
    "cassandra.keyspace" : "test"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test

SSL and Authentication

This example will connect to an Apache Cassandra instance with SSL and username / password authentication.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode Json

{
  "name" : "cassandraSinkConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "cassandra.contact.points" : "cassandra",
    "cassandra.keyspace" : "test",
    "cassandra.ssl.enabled" : "true",
    "cassandra.username" : "example",
    "cassandra.password" : "password"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.ssl.enabled=true
cassandra.username=example
cassandra.password=password