Redis Sink Connector for Confluent Cloud

The Kafka Connect Redis Sink connector for Confluent Cloud is used to export data from Apache Kafka® topics to Redis. The connector works with Redis Enterprise Cloud, Azure Cache for Redis, and Amazon ElastiCache for Redis

Important

If you are still on Confluent Cloud Enterprise, please contact your Confluent Account Executive for more information about using this connector.

Features

  • At least once delivery: The connector guarantees that records are delivered at least once.
  • Supports multiple tasks: The connector supports running one or more tasks.
  • SSL support: Supports one-way SSL.
  • Deletions: The connector supports deletions. If the record stored in Kafka has a null value, the connector sends a delete message with the corresponding key to Redis. Note that the connector always expects non-null keys.
  • Supported input data formats: This connector supports storing raw bytes or strings (as inserts) in Redis.

See Configuration Properties for configuration property values and descriptions. See Cloud connector limitations for additional information.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Redis Sink connector. The quick start provides the basics of selecting the connector and configuring it to stream events to Redis.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
  • The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Environment Limitations for additional information.
  • Access to one of the following managed Redis services:
    • Redis Enterprise Cloud
    • Azure Cache for Redis
    • Amazon ElastiCache for Redis. Note that to use this service, your Kafka cluster must be VPC peered.
  • Redis credentials, hostname, and database index name.
  • The Redis instance must be in the same region as your Confluent Cloud cluster.
  • For networking considerations, see Network access. To use static egress IPs, see Static Egress IP Addresses.
  • Kafka cluster credentials. The following lists the different ways you can provide credentials.
    • Enter an existing service account resource ID.
    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

In the left navigation menu, click Data integration, and then click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector.

Click the Redis Sink connector icon.

Redis Sink Connector Icon

Step 4: Set up the connection.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.

Complete the following and click Continue.

  1. Select one or more topics.
  2. Enter a Connector Name.
  3. Select an Input Kafka record value format (data coming from the Kafka topic): BYTES or STRINGS. If the Kafka topic is using JSON or a schema-based format, like Avro, you should select BYTES.
  4. Select an Input record key format. Sets the input message format for the record key. BYTES passes keys as they are serialized in Kafka, while STRING enforces UTF-8 encoding on keys.
  5. Select the way you want to provide Kafka Cluster credentials. You can either select a service account resource ID or you can enter an API key and secret (or generate these in the Cloud Console).
  6. Enter your Redis connection details.
  7. Add the SSL configuration details:
    • The default SSL mode is disabled. If set to enabled, the connector uses SSL to make the connection. If you select server, the connector additionally uses a trustore CA certificate to secure the connection. If you select server+client, the connector uses both the trustore and a keystore with a valid key pair and the associated certificate.
    • You use the Keystore file button to upload the keystore file. You must add the keystore password.
    • You use the Truststore file button to upload the truststore file that contains the CA information. You must add the truststore password.
  8. Enter the number of tasks for the connector. More tasks may improve performance.

See Configuration Properties for configuration property values and descriptions.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running.

Step 7: Check the results in Redis.

Verify that data is populating the Redis instance.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png

Using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Note

  • Make sure you have all your prerequisites completed.
  • The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.

Step 1: List the available connectors.

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

confluent connect plugin describe <connector-catalog-name>

For example:

confluent connect plugin describe RedisSink

Example output:

Following are the required configs:
connector.class: RedisSink
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
redis.hostname
redis.portnumber
tasks.max
topics

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows required and optional connector properties.

{
  "name": "RedisSinkConnector_0",
  "config": {
    "topics": "pageviews",
    "connector.class": "RedisSink",
    "name": "RedisSinkConnector_0",
    "input.data.format": "BYTES",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "redis.hostname": "test.redis.cache.windows.net",
    "redis.portnumber": "6380",
    "redis.database": "1",
    "redis.password": "********************************************",
    "redis.ssl.mode": "enabled",
    "tasks.max": "1"
  }
}

Note the following property definitions:

  • "name": Sets a name for your new connector.
  • "connector.class": Identifies the connector plugin name.
  • "topics": Identifies the topic name or a comma-separated list of topic names.
  • "input.data.format": Sets the input Kafka record value format (data coming from the Kafka topic). Valid entries are BYTES or STRING. If the Kafka topic is using JSON or a schema-based format, like Avro, you should select BYTES.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY. To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "redis.ssl.mode": Sets the SSL mode for the connection. Options are enabled, disabled, server, and server+client. The default is disabled. If set the property to enabled, the connector uses SSL to make the connection. If you select server, the connector uses a trustore. If you select server+client, the connector uses both the trustore and a keystore with a valid key pair and the associated certificate.

    • "redis.ssl.mode": "server": If you use SSL mode server, the connector uses a trustore CA certificate to secure the connection. You add two additional properties: redis.ssl.trustore.file and redis.ssl.trustore.password. A trustore file is a binary file. For the redis.ssl.trustore.file property, you encode the binary trustore file in base64, take the encoded string, add the data:text/plain;base64 prefix, and then specify the entire string as the property entry. For example: "redis.ssl.trustore.file" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...==".
    • "redis.ssl.mode": "server+client": If you use SSL mode server+client, the connector uses a trustore CA certificate and an additional keystore to secure the connection. You add four additional properties: redis.ssl.trustore.file, redis.ssl.trustore.password, redis.ssl.keystore.file, and redis.ssl.keystore.password. The trustore and keystore files are binary files. For the redis.ssl.trustore.file and redis.ssl.keystore.file properties, you encode the binary trustore and keystore files in base64, take the encoded string, add the data:text/plain;base64 prefix, and then specify the entire string as the property entry. For example: "redis.ssl.keystore.file" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...==".
  • "tasks.max": Maximum tasks for the connector to use. More tasks may improve performance.

See Configuration Properties for configuration property values and descriptions.

Step 4: Load the configuration file and create the connector.

Enter the following command to load the configuration and start the connector:

confluent connect create --config <file-name>.json

For example:

confluent connect create --config redis-sink-config.json

Example output:

Created connector RedisSinkConnector_0 lcc-ix4dl

Step 5: Check the connector status.

Enter the following command to check the connector status:

confluent connect list

Example output:

ID          |       Name            | Status  | Type
+-----------+-----------------------+---------+------+
lcc-ix4dl   | RedisSinkConnector_0  | RUNNING | sink

Step 6: Check the results in Redis.

Verify that data is populating the Redis instance.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png

Configuration Properties

The following connector configuration properties are used for the Redis Sink connector for Confluent Cloud.

redis.hostname

The hostname of the Redis server to connect to.

  • Type: string
  • Importance: high
redis.portnumber

The Redis server port number to connect to.

  • Type: int
  • Importance: high
input.data.format

Sets the input Kafka record value format (data coming from the Kafka topic). Valid entries are BYTES or STRING. If the Kafka topic is using JSON or a schema-based format, like Avro, you should select BYTES.

  • Type: String
  • Importance: High
  • Valid values: BYTES, STRING
input.key.format

Sets the input record key format. BYTES passes keys as they are serialized in Kafka, while STRING enforces a UTF-8 encoding on keys.

  • Type: String
  • Importance: High
  • Valid values: BYTES, STRING
redis.client.mode

The client mode to use when interacting with the Redis cluster.

  • Type: string
  • Default: Standalone
  • Importance: medium
  • Valid values: Standalone, Cluster
redis.database

The Redis database index.

  • Type: int
  • Default: 1
  • Importance: medium
redis.password

Password used to connect to Redis.

  • Type: password
  • Importance: medium
redis.ssl.mode

How the Redis server is secured. Options are enabled, disabled, server, and server+client. The default is disabled. If set the property to enabled, the connector uses SSL to make the connection. If you select server, the connector uses a trustore. If you select server+client, the connector uses both the trustore and a keystore with a valid key pair and the associated certificate.

  • Type: string
  • Default: disabled
  • Importance: medium
redis.ssl.keystore.file

The keystore containing the key pair and certificate.

  • Type: password
  • Importance: low
redis.ssl.keystore.password

The password for the SSL keystore.

  • Type: password
  • Importance: medium
redis.ssl.trustore.file

The trust store containing server CA certificate.

  • Type: password
  • Importance: low
redis.ssl.truststore.password

The password for the SSL truststore.

  • Type: password
  • Importance: medium

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png