.. _elasticsearch_connector_security:

|kconnect-long| Elasticsearch Connector with Security
=====================================================

This documentation assumes you have the correct version of the connector
installed in your |CP| instance. It assumes you are using an Elasticsearch
version with X-Pack plugin (pre-installed in Elasticsearch versions 6.3 or
later).

Based on `Elasticsearch documentation
<https://www.elastic.co/guide/en/elasticsearch/reference/6.6/configuring-tls.html#tls-http>`_.


Download Elastic
----------------

.. sourcecode:: bash

    wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.tar.gz
    tar xzvf elasticsearch-6.6.0.tar.gz
    cd elasticsearch-6.6.0


Generate Certificates
---------------------

Elasticsearch provides a certificate-generation utility, named ``bin/elasticsearch-certutil``, which by default creates self-signed certificates. In this walkthrough, you generate your own certificates, so you can get closer to a production-like environment.

.. sourcecode:: bash

    # Make a certificate working directory in config directory
    mkdir config/certs
    cd config/certs

    # Generate the Certificate Authority (be sure your FQDN is ``localhost``)
    openssl req -new -x509 -keyout cacert.key -out cacert.pem -days 666

    # Generate a client certificate
    openssl genrsa -out client1.key 2048

    # Generate a certificate signing request
    openssl req -new -key client1.key -out client1.csr

    # Sign the request with the CA
    openssl x509 -req -in client1.csr -CA cacert.pem -CAkey cacert.key \
      -CAcreateserial -out client1.crt -days 1825 -sha256

    # Repeat the steps for the next client
    openssl genrsa -out client2.key 2048
    openssl req -new -key client2.key -out client2.csr
    openssl x509 -req -in client2.csr -CA cacert.pem -CAkey cacert.key \
      -CAcreateserial -out client2.crt -days 1825 -sha256

    # Package the connector keys as JKS
    openssl pkcs12 -export -out bundle.p12 -in client2.crt -inkey client2.key
    keytool -keystore truststore.jks -import -file cacert.pem -alias cacert
    keytool -destkeystore keystore.jks -importkeystore -srckeystore bundle.p12 -srcstoretype PKCS12


Configure Elasticsearch
-----------------------

.. sourcecode:: bash

    cd ../..
    # Update the elastic config file
    cat <<EOF >> config/elasticsearch.yml
    xpack.security.enabled: true
    xpack.security.http.ssl.enabled: true
    xpack.security.http.ssl.client_authentication: required
    xpack.security.http.ssl.key:  certs/client1.key
    xpack.security.http.ssl.certificate: certs/client1.crt
    xpack.security.http.ssl.certificate_authorities: [ "certs/cacert.pem" ]
    EOF

    # Set the passwords
    bin/elasticsearch-keystore add xpack.security.http.ssl.secure_key_passphrase

    # Run elastic
    bin/elasticsearch

    # Test the connection
    curl --key config/certs/client2.key --cert config/certs/client2.crt --cacert config/certs/cacert.pem https://localhost:9200


Configure the Connector
-----------------------

Open a new terminal and change your current directory to ``<path-to-confluent>``.

In the ``etc/kafka-connect-elasticsearch``, save this configuration file as ``elastic.properties``, updating the certificate paths.

.. sourcecode:: bash

    cat <<EOF > etc/kafka-connect-elasticsearch/elastic-secure.properties
    name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=test-elasticsearch-sink
    key.ignore=true
    connection.url=https://localhost:9200
    type.name=kafka-connect

    elastic.https.ssl.keystore.location=/home/cyrus/elasticsearch-6.6.0/config/certs/keystore.jks
    elastic.https.ssl.keystore.password=asdfasdf
    elastic.https.ssl.key.password=asdfasdf
    elastic.https.ssl.keystore.type=JKS
    elastic.https.ssl.truststore.location=/home/cyrus/elasticsearch-6.6.0/config/certs/truststore.jks
    elastic.https.ssl.truststore.password=asdfasdf
    elastic.https.ssl.truststore.type=JKS
    elastic.https.ssl.protocol=TLS
    EOF

    bin/confluent start connect
    bin/confluent load elasticssl -d etc/kafka-connect-elasticsearch/elastic-secure.properties


Test the System
---------------

.. sourcecode:: bash

    bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink \
       --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    {"f1": "secret1"}
    {"f1": "secret2"}


Now query directly from Elasticsearch

.. sourcecode:: bash

    curl --key config/certs/client2.key --cert config/certs/client2.crt --cacert config/certs/cacert.pem 'https://localhost:9200/test-elasticsearch-sink/_search?pretty'