Redis Sink Connector for Confluent Platform

The Kafka Connect Redis Sink Connector is used to export data from Apache Kafka® topics to Redis.

Features

The Kafka Connect Redis Sink Connector offers the following features:

  • This connector supports storing raw bytes or strings (as inserts) in Redis. If your keys and values in Kafka records are already serialized as strings, use the StringConverter with this connector to store the record key and value in Redis as strings:

    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    

    Otherwise, use the ByteArrayConverter with this connector to store the binary serialized form (e.g., JSON, Avro, Strings, etc.) of the Kafka record keys and values in Redis as byte arrays. Applications accessing these values can then read this information from Redis and deserialize the bytes into a useable form. If your data in Kafka is not in the format you want to persist in Redis, consider using a Single Message Transformation to convert the data to a byte array or string before it’s written to Redis.

  • This connector supports deletes. If the record stored in Kafka has a null value, this connector sends a delete with the corresponding key to Redis.

Important

This connector expects non-null keys. Each Kafka record in exported topics must have an explicit key and value.

Prerequisites

The following are required to run the Redis Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 (or later) or Kafka 0.11.0 (or later)
  • Connect: Confluent Platform 4.0 (or later) or Kafka 1.0 (or later)
  • Java 1.8
  • Redis sink connector version 0.0.2.8 (or later)
  • Redis v5.0.0 (or later). See the Redis release cycle document.
  • Lettuce (Redis client library) 5.1.6.RELEASE

Install the Redis Sink Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install jcustenborder/kafka-connect-redis:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install jcustenborder/kafka-connect-redis:0.0.2.8

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

The Redis Sink connector is an open source connector and does not require a Confluent Enterprise License.

Configuration Properties

For a complete list of configuration properties for this connector, see Redis Sink Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.

Quick Start

  1. Install Redis.

  2. Start the Redis server so it can start listening for Redis connections. This starts Redis using the default port 6379 and no password (for testing purposes only).

    redis-server
    
  3. Use the Redis CLI (redis-cli) to view any insertions being made. You can use the MONITOR command if the instance is being used only for this quick start test (see the note below).

    redis-cli MONITOR
    

    Important

    The MONITOR CLI command is a debugging command that streams back every command processed by the Redis server. It assists you in understanding what is happening to the database. However, using it comes at a performance cost. Do not use this in production environments.

  4. Install the connector. See installation instructions for details.

  5. Start the Confluent Platform.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services start
    

    Important

    Make sure the Confluent Platform is started after installing the connector. If not, the Connect workers must be restarted to register the installation and to add the new connector location to the path.

  6. Make sure that the installed connector has been identified by the Confluent Platform.

    confluent local services connect plugin list
    
  7. Produce test data to the users topic in Kafka.

    echo key1,value1 | confluent local services kafka produce users --property parse.key=true --property key.separator=,
    echo key2,value2 | confluent local services kafka produce users --property parse.key=true --property key.separator=,
    echo key3,value3 | confluent local services kafka produce users --property parse.key=true --property key.separator=,
    

    Important

    This connector expects non-null keys. The parse.key and key.separator properties ensure the exported records have explicit keys and values

  8. Create a redis-sink.properties file with the properties below.

    name=kafka-connect-redis
    topics=users
    tasks.max=1
    connector.class=com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    
  9. Start the connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load kafka-connect-redis --config redis-sink.properties
    
  10. Make sure that the connector status is RUNNING.

    confluent local services connect connector status kafka-connect-redis
    
  11. Observe that data is flowing and the keys and values being inserted into Kafka are going to the desired Redis instance.

  12. Shut down Confluent Platform.

    confluent local destroy
    
  13. Stop the redis-server and redis-cli (use Ctrl+C).

REST-based example

This configuration is used typically along with distributed workers. Write the following JSON to kafka-connect-redis.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the Kafka Connect REST API.

{
    "name" : "kafka-connect-redis",
     "config" : {
        "name" : "kafka-connect-redis",
        "connector.class" : "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
        "topics" : "users",
        "tasks.max" : "1",
        "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter" : "org.apache.kafka.connect.storage.StringConverter"
      }
    }

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect workers.

curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-redis.json http://localhost:8083/connectors