Elasticsearch Connector with Security

Complete the following instructions to configure the Kafka Connect Elasticsearch connector with security. These instructions are based on the Elasticsearch document Encrypting HTTP Client communications.

Prerequisites:
  • Confluent Platform version 4.1 (or later) using the bundled Elasticsearch connector.
  • Elasticsearch with the X-Pack plugin. This plugin is pre-installed in Elasticsearch versions 6.3 or later.

Step 1: Download and extract the Elastic archive

Enter the following commands:

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.tar.gz
tar xzvf elasticsearch-6.6.0.tar.gz
cd elasticsearch-6.6.0

Step 2: Generate Certificates

Elasticsearch provides a certificate-generation utility named bin/elasticsearch-certutil. By default, this utility creates self-signed certificates. Use the commands below to generate your own certificates. This allows you to simulate a production-like environment.

  1. Make a certificate working directory in the Elasticsearch config directory:

    mkdir config/certs
    
    cd config/certs
    
  2. Generate the Certificate Authority (make sure your FQDN is localhost):

    openssl req -new -x509 -keyout cacert.key -out cacert.pem -days 666
    
  3. Generate a client certificate:

    openssl genrsa -out client1.key 2048
    
  4. Generate a certificate signing request:

    openssl req -new -key client1.key -out client1.csr
    
  5. Sign the request with the CA:

    openssl x509 -req -in client1.csr -CA cacert.pem -CAkey cacert.key \
    -CAcreateserial -out client1.crt -days 1825 -sha256
    
  6. Repeat the previous steps for the next client:

    openssl genrsa -out client2.key 2048
    
    openssl req -new -key client2.key -out client2.csr
    
    openssl x509 -req -in client2.csr -CA cacert.pem -CAkey cacert.key \
    -CAcreateserial -out client2.crt -days 1825 -sha256
    
  7. Package the connector keys as JKS:

    openssl pkcs12 -export -out bundle.p12 -in client2.crt -inkey client2.key
    
    keytool -keystore truststore.jks -import -file cacert.pem -alias cacert
    
    keytool -destkeystore keystore.jks -importkeystore -srckeystore bundle.p12 -srcstoretype PKCS12
    

Step 3: Configure Elasticsearch

  1. Return to the main Elasticsearch directory:

    cd ../..
    
  2. Update the Elasticsearch configuration file:

    cat <<EOF >> config/elasticsearch.yml
    xpack.security.enabled: true
    xpack.security.http.ssl.enabled: true
    xpack.security.http.ssl.client_authentication: required
    xpack.security.http.ssl.key:  certs/client1.key
    xpack.security.http.ssl.certificate: certs/client1.crt
    xpack.security.http.ssl.certificate_authorities: [ "certs/cacert.pem" ]
    EOF
    
  3. Set the passwords:

    bin/elasticsearch-keystore add xpack.security.http.ssl.secure_key_passphrase
    
  4. Run Elasticsearch:

    bin/elasticsearch
    
  5. Test the connection:

    curl --key config/certs/client2.key --cert config/certs/client2.crt \
    --cacert config/certs/cacert.pem https://localhost:9200
    

Step 4: Configure the Connector

Prerequisites
  1. Open a new terminal and change your current directory to <path-to-confluent>.

  2. Save the configuration file as elastic.properties in etc/kafka-connect-elasticsearch and add the certificate paths.

    cat <<EOF > etc/kafka-connect-elasticsearch/elastic-secure.properties
    name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=test-elasticsearch-sink
    key.ignore=true
    connection.url=https://localhost:9200
    type.name=kafka-connect
    
    elastic.security.protocol=SSL
    elastic.https.ssl.keystore.location=/home/directory/elasticsearch-6.6.0/config/certs/keystore.jks
    elastic.https.ssl.keystore.password=asdfasdf
    elastic.https.ssl.key.password=asdfasdf
    elastic.https.ssl.keystore.type=JKS
    elastic.https.ssl.truststore.location=/home/directory/elasticsearch-6.6.0/config/certs/truststore.jks
    elastic.https.ssl.truststore.password=asdfasdf
    elastic.https.ssl.truststore.type=JKS
    elastic.https.ssl.protocol=TLS
    EOF
    
  3. Start Connect and load the connector:

    bin/confluent local services connect start
    
    bin/confluent local services connect connector load elasticssl --config etc/kafka-connect-elasticsearch/elastic-secure.properties
    

Step 5: Test the System

  1. Enter the following command:

    bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    {"f1": "secret1"}
    {"f1": "secret2"}
    
  2. Query Elasticsearch:

    curl --key config/certs/client2.key --cert config/certs/client2.crt /
    --cacert config/certs/cacert.pem 'https://localhost:9200/test-elasticsearch-sink/_search?pretty'