Kafka Connect Cassandra Connector

The Cassandra Connector is a high speed mechanism for writing data to Apache Cassandra. The Cassandra Sink connector is used to write data to a Cassandra Cluster. This connector works by utilizing the Batch functionality to write all of the records in each poll in a single batch.

Install the Cassandra Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Confluent Hub

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-cassandra:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-cassandra:1.0.1-preview

Download

Download the ZIP file and extract it into a directory that is listed on the plugin path of the Connect worker configuration properties (e.g. plugin.path=/usr/local/share/kafka/plugins). This must be done on each of the installations where Connect will be run. For more information, see Installing Plugins.

Usage Notes

This connector uses the topic to determine the name of the table to write to. You can change this dynamically by using a transform like Regex Router to change the topic name.

Schema Management

You can configure this connector to manage the schema on the Cassandra cluster. When altering an existing table the key is ignored. This is to avoid the potential issues around changing a primary key on an existing table. The key schema is used to generate a primary key for the table when it is created. These fields must also be in the value schema. Data written to the table is always read from the value from Kafka. This connector uses the topic to determine the name of the table to write to. This can be changed on the fly by using a transform to change the topic name.

Troubleshooting

If you encounter error messages like this:

Batch for [test.twitter] is of size 127.661KiB, exceeding specified threshold of 50.000KiB by 77.661KiB

Or warning messages like this:

Batch for [test.twitter] is of size 25.885KiB, exceeding specified threshold of 5.000KiB by 20.885KiB

Try adjusting the consumer.max.poll.records setting in the worker.properties for Kafka Connect.

Examples

This example will configure the connector to use upserts when writing data to Cassandra.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode JSON

{
  "name" : "cassandraSinkConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "cassandra.contact.points" : "cassandra",
    "cassandra.keyspace" : "test",
    "cassandra.write.mode" : "Update"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.write.mode=Update

This example will connect to an Apache Cassandra instance without authentication.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode JSON

{
  "name" : "cassandraSinkConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "cassandra.contact.points" : "cassandra",
    "cassandra.keyspace" : "test"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test

This example will connect to an Apache Cassandra instance with SSL and username / password authentication.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode JSON

{
  "name" : "cassandraSinkConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "cassandra.contact.points" : "cassandra",
    "cassandra.keyspace" : "test",
    "cassandra.ssl.enabled" : "true",
    "cassandra.username" : "example",
    "cassandra.password" : "password"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
cassandra.contact.points=cassandra
cassandra.keyspace=test
cassandra.ssl.enabled=true
cassandra.username=example
cassandra.password=password

Contents: