Kafka Connect Cassandra Connector

The Cassandra Connector is a high speed mechanism for writing data to Apache Cassandra. It is compatible with versions 2.1, 2.2, and 3.0 of Cassandra.

Install the Cassandra Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

confluent-hub install confluentinc/kafka-connect-cassandra:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-cassandra:1.0.1-preview

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Usage Notes

This connector uses the topic to determine the name of the table to write to. You can change this dynamically by using a transform like Regex Router to change the topic name.

Schema Management

You can configure this connector to manage the schema on the Cassandra cluster. When altering an existing table the key is ignored. This is to avoid the potential issues around changing a primary key on an existing table. The key schema is used to generate a primary key for the table when it is created. These fields must also be in the value schema. Data written to the table is always read from the value from Kafka. This connector uses the topic to determine the name of the table to write to. This can be changed on the fly by using a transform to change the topic name.

Troubleshooting

If you encounter error messages like this:

Batch for [test.twitter] is of size 127.661KiB, exceeding specified threshold of 50.000KiB by 77.661KiB

Or warning messages like this:

Batch for [test.twitter] is of size 25.885KiB, exceeding specified threshold of 5.000KiB by 20.885KiB

Try adjusting the consumer.max.poll.records setting in the worker.properties for Kafka Connect.

Examples

This example will configure the connector to use upserts when writing data to Cassandra.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode JSON

{
  "name" : "cassandraSinkConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "cassandra.contact.points" : "cassandra",
    "cassandra.keyspace" : "test",
    "cassandra.write.mode" : "Update"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.write.mode=Update

This example will connect to an Apache Cassandra instance without authentication.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode JSON

{
  "name" : "cassandraSinkConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "cassandra.contact.points" : "cassandra",
    "cassandra.keyspace" : "test"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test

This example will connect to an Apache Cassandra instance with SSL and username / password authentication.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode JSON

{
  "name" : "cassandraSinkConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "cassandra.contact.points" : "cassandra",
    "cassandra.keyspace" : "test",
    "cassandra.ssl.enabled" : "true",
    "cassandra.username" : "example",
    "cassandra.password" : "password"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.ssl.enabled=true
cassandra.username=example
cassandra.password=password

Contents: