.. _connect-datadiode-connector:

|kconnect-long| Data Diode Connector (Source and Sink)
======================================================

The data diode source and sink connectors are used in tandem to replicate one or more |ak-tm| topics
from a source |ak| cluster to a destination |ak| cluster over UDP protocol.

The data diode connector solves a similar purpose as :ref:`connect_replicator`; however, the big difference
is that the data diode connector works over UDP, while :ref:`connect_replicator` requires TCP/IP.

The data diode connector is meant to be used in a high-security unidirectional network. In such networks,
the network settings do not permit TCP/IP packets and UDP packets are only allowed in one direction.


The sink connector serializes one or more |ak| records into a datagram packet 
and sends it to a remote server running the Data Diode Source Connector. The sink connector must be installed in the source |ak| cluster. For more information, please refer to `Data Diode Sink Connector Configuration <datadiode_sink_connector_config>`_

The source connector opens a UDP socket to listen to incoming datagram packets. The source cluster MUST
be started in standalone mode because one worker (only) must open the UDP socket. 
The source connector must be installed in the destination |ak| cluster. For more information, please refer to `Data Diode Source Connector Configuration <datadiode_source_connector_config>`_

Limitations
-----------

The data diode connector has the following limitations:

#.  **No Ordering Guarantees** - The UDP protocol does not guarantee ordering. As a result, 
    packets may arrive out of order at the destination, and are inserted out of order
    into the destination |ak| topic.
#.  **Records can be lost** - The UDP protocol does not have error reporting or retry mechanisms. 
    In addition, there is no feedback in a unidirectional network. This means that 
    if a datagram packet doesn't arrive at the destination, the records contained in that packet
    are lost.
#.  **Cannot run source connector in distributed mode** - the source connector cannot be started
    in distributed mode. In standalone mode, if the worker process fails for some reason,
    packets are lost until the worker is brought online again.
#.  **No Retries** - the sink connector installed on the source |ak| cluster does not know if the datagram packet
    was sent successfully or not. As a result, it cannot perform a retry operation.
#.  **Only supports records less than 64KB** - the sink connector will fail to send records 
    greater than 64 KB in size. This is because a datagram packet has a max limit of ~64 KB.
    Records greater than 64 KB are ignored, and the record identifiers (topic, partition, offset)
    are logged for debugging purposes.

Install Data Diode Connector
-----------------------------

.. include:: ../includes/connector-install.rst

.. include:: ../includes/connector-install-hub.rst

.. codewithvars:: bash

   confluent-hub install confluentinc/kafka-connect-data-diode:latest

.. include:: ../includes/connector-install-version.rst

.. codewithvars:: bash

   confluent-hub install confluentinc/kafka-connect-data-diode:1.0.1


--------------------------
Install Connector Manually
--------------------------

`Download and extract the ZIP file <https://www.confluent.io/connector/kafka-connect-datadiode/#download>`_ for your
connector and then follow the manual connector installation :ref:`instructions <connect_install_connectors>`.

License
-------

.. include:: ../includes/enterprise-license.rst

**Source connector:** See :ref:`datadiode-source-connector-license-config` for license properties and :ref:`datadiode-source-license-topic-configuration` for information about the license topic.

**Sink connector:** See :ref:`datadiode-sink-connector-license-config` for license properties and :ref:`datadiode-sink-connector-license-topic-configuration` for information about the license topic.

Quick Start
-----------

In this quick start, you will configure the data diode connector to replicate
records  in the topic ``diode`` to the topic ``dest_diode``.


Start the services with one command using Confluent CLI:

.. sourcecode:: bash

    |confluent_start|

Next, create two topics - ``diode`` and ``dest_diode``. 

.. sourcecode:: bash
    
    ./bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic diode
    ./bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dest_diode


Next, start the console producer and import a few records to the ``diode`` topic.

.. sourcecode:: bash
    
    ./bin/kafka-console-producer --broker-list localhost:9092 --topic diode


Then, add records (one per line) in the console producer.

.. sourcecode:: bash

    silicon
    resistor
    transistor
    capacitor
    amplifier

This publishes five records to the |ak| topic ``diode``. Keep the window open.

Next, load the Source Connector.

.. tip:: Before starting the connector, verify that the configurations in ``etc/kafka-connect-udp/DataDiodeSourceConnector.properties`` are properly set.

.. include:: ../../includes/confluent-local-consume-limit.rst

.. codewithvars:: bash

    ./bin/|confluent_load| datadiode-source-connector|dash| -d ./etc/kafka-connect-datadiode/DataDiodeSourceConnector.properties

Your output should resemble the following:

.. sourcecode:: bash

    {
        "name": "datadiode-source-connector",
        "config": {
        "connector.class": "io.confluent.connect.diode.source.DataDiodeSourceConnector",
        "tasks.max": "1",
        "kafka.topic.prefix": "dest_"
        "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "udp.port": "3456",
        "udp.encryption.password": "supersecretpassword",
        "udp.encryption.salt": "secretsalt"
        },
        "tasks": [],
        "type": null
    }


Next, load the Sink Connector.

.. tip:: Before starting the connector, verify that the configuration parameters in ``etc/kafka-connect-datadiode/DataDiodeSinkConnector.properties`` are properly set. The most important configuration is ``diode.host``, which must point to the host or ip address on which the Source Connector was started.

.. include:: ../../includes/confluent-local-consume-limit.rst

.. codewithvars:: bash

   ./bin/|confluent_load| datadiode-sink-connector|dash| -d ./etc/kafka-connect-datadiode/DataDiodeSinkConnector.properties

Your output should resemble the following:

.. sourcecode:: bash

    {
        "name": "datadiode-sink-connector",
        "config": {
        "connector.class": "io.confluent.connect.diode.sink.DataDiodeSinkConnector",
        "tasks.max": "1",
        "topics": "diode",
        "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "udp.port": "10.12.13.15",
        "udp.port": "3456",
        "udp.encryption.password": "supersecretpassword",
        "udp.encryption.salt": "secretsalt"
        },
        "tasks": [],
        "type": null
    }


View the |kconnect| worker log and verify that the connectors started successfully.
``

.. codewithvars:: bash

    |confluent_log| connect

        
Finally, check that records are now available in ``dest_diode`` topic.

.. sourcecode:: bash
    
    ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic dest_diode --from-beginning


You should see five records in the consumer. If you have the console producer running, 
you can create additional records. These additional records should be immediately 
visible in the consumer.

Record Metadata 
---------------

The records that are inserted in topics in the destination |ak| cluster 
may have different |ak| coordinates (i.e., topic, partition, offset).

To ensure traceability, the UDP connector inserts three additional headers in each record
in the destination.

#.  ``sourceTopic`` is the topic name in the source |ak| cluster
#.  ``sourcePartition`` is the partition in the source |ak| cluster
#.  ``sourceOffset`` is the offset of this record in the source |ak| cluster

Note that ``sourcePartition`` and ``sourceOffset`` are stored as UTF-8 strings rather than
as a long data type. Downstream applications can use `Long.parseLong` to convert to a long data type. 

Handling of Missing Records
---------------------------

It is possible for the source connector to detect missing packets 
at the destination by looking at the offsets within each topic and partition.

By default, the connector logs ``(topic, partition, offset)`` for each missing record.

To write missing records to an |ak| topic, configure ``missing.records.topic`` to the desired topic name 
and also provide the |kconnect| Worker configurations such as ``bootstrap.servers`` 
and ``client.id`` with ``missing.records.`` as prefix.

Note that detection of missing records is on a best-effort basis. 
False positives (that is, logged as missing, but successfully delivered) are possible if the packets arrive significantly out of order. 
Similarly, false negatives (that is, no missing logs, but record was not delivered) 
are also possible when the connector is just started.

If you need to detect missing packets reliably, you can write a 
|ak| streams application on the destination cluster. The streams application would have to 
inspect the metadata headers in each record to identify missing records.


Record Batching
---------------

The sink connector can batch multiple records in a single UDP packet. 
To enable batching, set ``diode.buffer.size.kb`` to a value between 1 and 64KB.

Batching multiple records in a single datagram packet increases throughput. However,
if the datagram packet is lost, then all records in that packet are also lost. 

Encryption of in-flight packets
-------------------------------

The sink connector can optionally encrypt in-flight datagram packets as they travel
over the network. On the receiving end, the source connector must be configured with
the same password and salt for it to decrypt the packets.

The data diode connector uses the following approach to encrypt packets:

#.  The provided ``diode.encryption.password`` and ``diode.encryption.salt`` are used to generate
    a 256 bit key. ``PBKDF2WithHmacSHA256`` algorithm is used to generate this 256 bit key. 
    The connector uses 65536 rounds to generate the key.
#.  The sink and source connector both must have the same ``diode.encryption.password`` 
    and ``diode.encryption.salt``, so that they arrive at the same 256 bit symetric encryption key.
#.  The datagram packet is encrypted using the ``AES/CBC/PKCS5Padding`` algorithm.
#.  The initialization vector used for encryption and the ciphertext are concatenated. 
    The first 16 bytes of the payload is the initialization vector.
#.  On the receiving end, the source connector extracts the IV (the first 16 bytes)
    and the ciphertext (remaining bytes) from the payload.
#.  Finally, it uses ``AES/CBC/PKCS5Padding`` algorithm to decrypt the ciphertext.

Note that this approach does not guarantee message authentication. In a future release,
the UDP connector may switch to AES with authenticated encryption using the GCM mode.


Compression of in-flight messages
---------------------------------

Optionally, the sink connector can compress the datagram packets. Compression is always 
done before encryption. 

To enable compression, set ``diode.compression.class=io.confluent.connect.diode.serde.GzipCompressor``. 
To disable (default), set ``diode.compression.class=io.confluent.connect.diode.serde.NoopCompressor``

Tuning Guide
------------

#. Increase the operating system socket receive buffer for datagram packets. On linux, you can increase the buffer by running the following commands:

   .. codewithvars:: bash

      sysctl -w net.core.rmem_max=26214400
      sysctl -w net.core.rmem_default=26214400
    
   To verify, run the following command:

   .. codewithvars:: bash

      sysctl -a | grep net.core.rmem
    
#. Ensure CPU utilization on the server running source connector is less than 60% on all cores. The operating system is likely to drop datagram packets if the CPU utilization increases.

#. The source connector cannot run in distributed mode. So if CPU utilization is higher than 60%, you must switch to another server with higher number of CPU cores.


Serialization Format
--------------------

The serialization format is how records are transferred over the wire. 
This format is internal to the UDP connectors and does not impact users of the connector.


#.  A list of SinkRecords is serialized to bytes using Avro. 
    These serialized bytes are referred to as payload.
#.  If batching of records is disabled, it is as though the list of records has size = 1
#.  The payload is compressed using the configured compression algorithm. 
    If compression is disabled, ``Noop`` algorithm is used, which does nothing.
#.  The compressed payload is then encrypted using the configured encryption algorithm
    If encryption is disabled, ``Noop`` algorithm is used, which does nothing.
#.  An envelope record is created with the compression algorithm,
    the encryption algorithm, and the final payload.
#.  The envelope record is once again serialized using Avro, 
    and then sent over the network in a datagram packet
#.  On the receiving end, the source connector validates it can 
    understand the compression and encryption algorithms
#.  Then, it reverses the steps and generates a list of SourceRecords.


Additional Documentation
------------------------

.. toctree::
   :maxdepth: 1

   datadiode_source_connector_config
   datadiode_sink_connector_config
   changelog