Data Diode Connector (Source and Sink) for Confluent Platform

The Kafka Connect Data Diode Source and Sink connectors are used in tandem to replicate one or more Apache Kafka® topics from a source Kafka cluster to a destination Kafka cluster over UDP protocol.

The Data Diode connector solves a similar purpose as Confluent Replicator; however, the big difference is that the data diode connector works over UDP, while Confluent Replicator requires TCP/IP.

The Data Diode connector is meant to be used in a high-security unidirectional network. In such networks, the network settings do not permit TCP/IP packets and UDP packets are only allowed in one direction.

The sink connector serializes one or more Kafka records into a datagram packet and sends it to a remote server running the Data Diode Source connector. The sink connector must be installed in the source Kafka cluster. For more information, see Data Diode Sink Connector Configuration Properties.

The source connector opens a UDP socket to listen to incoming datagram packets. The source cluster must be started in standalone mode because one worker (only) must open the UDP socket. The source connector must be installed in the destination Kafka cluster. For more information, refer to the Data Diode Source Connector Configuration Properties page.

Features

Data Diode Sink Connector

The Data Diode Sink connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once from the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Data Diode Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Data Diode Source Connector

The Data Diode Source connector includes the following features:

At least once delivery

This connector guarantees that records are delivered to the Kafka topic at least once. If the connector restarts, there may be some duplicate records in the Kafka topic.

Limitations

The Data Diode connector has the following limitations:

  1. No Ordering Guarantees: The UDP protocol does not guarantee ordering. As a result, packets may arrive out of order at the destination, and are inserted out of order into the destination Kafka topic.
  2. Records can be lost: The UDP protocol does not have error reporting or retry mechanisms. In addition, there is no feedback in a unidirectional network. This means that if a datagram packet doesn’t arrive at the destination, the records contained in that packet are lost.
  3. Cannot run source connector in distributed mode: The source connector cannot be started in distributed mode. In standalone mode, if the worker process fails for some reason, packets are lost until the worker is brought online again.
  4. No Retries: The sink connector installed on the source Kafka cluster does not know if the datagram packet was sent successfully or not. As a result, it cannot perform a retry operation.
  5. Only supports records less than 64KB: The sink connector will fail to send records greater than 64 KB in size. This is because a datagram packet has a max limit of ~64 KB. Records greater than 64 KB are ignored, and the record identifiers (topic, partition, offset) are logged for debugging purposes.
  6. Converter properties must be set: The key.converter, value.converter, and header.converter configuration properties must be set to org.apache.kafka.connect.converters.ByteArrayConverter; otherwise, the connector won’t start and will throw an exception.

Install the Data Diode (Sink and Source) Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-data-diode:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-data-diode:1.0.1
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

Source connector: See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.

Sink connector: See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for the Data Diode Source Connector, see Data Diode Source Connector Configuration Properties.

For a complete list of configuration properties for the Date Diode Sink connector, see Data Diode Sink Connector Configuration Properties.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Quick Start

In this quick start, you will configure the Data Diode Connector to replicate records in the topic diode to the topic dest_diode.

Start the services with one command using Confluent CLI.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

|confluent_start|

Next, create two topics - diode and dest_diode.

./bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic diode
./bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dest_diode

Next, start the console producer and import a few records to the diode topic.

./bin/kafka-console-producer --broker-list localhost:9092 --topic diode

Then, add records (one per line) in the console producer.

silicon
resistor
transistor
capacitor
amplifier

This publishes five records to the Kafka topic diode. Keep the window open.

Next, load the Source connector.

Tip

Before starting the connector, verify that the configurations in etc/kafka-connect-udp/DataDiodeSourceConnector.properties are properly set.

Caution

You must include a double dash (--) between the topic name and your flag. For more information, see this post.

./bin/confluent local services connect connector load datadiode-source-connector --config ./etc/kafka-connect-datadiode/DataDiodeSourceConnector.properties

Your output should resemble the following:

{
    "name": "datadiode-source-connector",
    "config": {
        "connector.class": "io.confluent.connect.diode.source.DataDiodeSourceConnector",
        "tasks.max": "1",
        "kafka.topic.prefix": "dest_"
        "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "diode.port": "3456",
        "diode.encryption.password": "supersecretpassword",
        "diode.encryption.salt": "secretsalt"
    },
    "tasks": [],
    "type": null
}

Next, load the Sink connector.

Tip

Before starting the connector, verify that the configuration parameters in etc/kafka-connect-datadiode/DataDiodeSinkConnector.properties are properly set. The most important configuration is diode.host, which must point to the host or ip address on which the Source connector was started.

Caution

You must include a double dash (--) between the topic name and your flag. For more information, see this post.

./bin/confluent local services connect connector load datadiode-sink-connector --config ./etc/kafka-connect-datadiode/DataDiodeSinkConnector.properties

Your output should resemble the following:

{
    "name": "datadiode-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.diode.sink.DataDiodeSinkConnector",
        "tasks.max": "1",
        "topics": "diode",
        "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "diode.host": "10.12.13.15",
        "diode.port": "3456",
        "diode.encryption.password": "supersecretpassword",
        "diode.encryption.salt": "secretsalt"
    },
    "tasks": [],
    "type": null
}

View the Connect worker log and verify that the connectors started successfully.

confluent local services connect log

Finally, check that records are now available in dest_diode topic.

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic dest_diode --from-beginning

You should see five records in the consumer. If you have the console producer running, you can create additional records. These additional records should be immediately visible in the consumer.

Record Metadata

The records that are inserted in topics in the destination Kafka cluster may have different Kafka coordinates (that is, topic, partition, offset).

To ensure traceability, the UDP connector inserts three additional headers in each record in the destination.

  1. sourceTopic is the topic name in the source Kafka cluster
  2. sourcePartition is the partition in the source Kafka cluster
  3. sourceOffset is the offset of this record in the source Kafka cluster

Note that sourcePartition and sourceOffset are stored as UTF-8 strings rather than as a long data type. Downstream applications can use Long.parseLong to convert to a long data type.

Handling of Missing Records

It is possible for the source connector to detect missing packets at the destination by looking at the offsets within each topic and partition.

By default, the connector logs the topic, partition, and offset for each missing record.

To write missing records to a Kafka topic, configure missing.records.topic to the desired topic name and also provide the Connect Worker configurations such as bootstrap.servers and client.id with missing.records. as prefix.

Note that detection of missing records is on a best-effort basis. False positives (that is, logged as missing, but successfully delivered) are possible if the packets arrive significantly out of order. Similarly, false negatives (that is, no missing logs, but record was not delivered) are also possible when the connector is just started.

If you need to detect missing packets reliably, you can write a Kafka streams application on the destination cluster. The streams application would have to inspect the metadata headers in each record to identify missing records.

Record Batching

The sink connector can batch multiple records in a single UDP packet. To enable batching, set diode.buffer.size.kb to a value between 1 and 64 KB.

Batching multiple records in a single datagram packet increases throughput. However, if the datagram packet is lost, then all records in that packet are also lost.

Encryption of in-flight packets

The sink connector can optionally encrypt in-flight datagram packets as they travel over the network. On the receiving end, the source connector must be configured with the same password and salt for it to decrypt the packets.

The data diode connector uses the following approach to encrypt packets:

  1. The provided diode.encryption.password and diode.encryption.salt are used to generate a 256 bit key. PBKDF2WithHmacSHA256 algorithm is used to generate this 256 bit key. The connector uses 65536 rounds to generate the key.
  2. The sink and source connector both must have the same diode.encryption.password and diode.encryption.salt, so that they arrive at the same 256 bit symmetric encryption key.
  3. The datagram packet is encrypted using the AES/CBC/PKCS5Padding algorithm.
  4. The initialization vector used for encryption and the ciphertext are concatenated. The first 16 bytes of the payload is the initialization vector.
  5. On the receiving end, the source connector extracts the IV (the first 16 bytes) and the ciphertext (remaining bytes) from the payload.
  6. Finally, it uses AES/CBC/PKCS5Padding algorithm to decrypt the ciphertext.

Note that this approach does not guarantee message authentication. In a future release, the UDP connector may switch to AES with authenticated encryption using the GCM mode.

Compression of in-flight messages

Optionally, the sink connector can compress the datagram packets. Compression is always done before encryption.

To enable compression, set diode.compression.class=io.confluent.connect.diode.serde.GzipCompressor. To disable (default), set diode.compression.class=io.confluent.connect.diode.serde.NoopCompressor

Tuning Guide

  1. Increase the operating system socket receive buffer for datagram packets. On linux, you can increase the buffer by running the following commands:

    sysctl -w net.core.rmem_max=26214400
    sysctl -w net.core.rmem_default=26214400
    

    To verify, run the following command:

    sysctl -a | grep net.core.rmem
    
  2. Ensure CPU utilization on the server running source connector is less than 60% on all cores. The operating system is likely to drop datagram packets if the CPU utilization increases.

  3. The source connector cannot run in distributed mode. So if CPU utilization is higher than 60%, you must switch to another server with higher number of CPU cores.

Serialization Format

The serialization format is how records are transferred over the wire. This format is internal to the UDP connectors and does not impact users of the connector.

  1. A list of SinkRecords is serialized to bytes using Avro. These serialized bytes are referred to as payload.
  2. If batching of records is disabled, it is as though the list of records has size = 1
  3. The payload is compressed using the configured compression algorithm. If compression is disabled, Noop algorithm is used, which does nothing.
  4. The compressed payload is then encrypted using the configured encryption algorithm. If encryption is disabled, the Noop algorithm is used, which does nothing.
  5. An envelope record is created with the compression algorithm, the encryption algorithm, and the final payload.
  6. The envelope record is once again serialized using Avro, and then sent over the network in a datagram packet
  7. On the receiving end, the source connector validates it can understand the compression and encryption algorithms
  8. Then, it reverses the steps and generates a list of SourceRecords.