Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Elasticsearch Connector with Security

Complete the following instructions to configure the Kafka Connect Elasticsearch connector with security. These instructions are based on the Elasticsearch document Encrypting HTTP Client communications.

Prerequisites:
  • Confluent Platform version 4.1 (or later) using the bundled Elasticsearch connector.
  • Elasticsearch with the X-Pack plugin. This plugin is pre-installed in Elasticsearch versions 6.3 or later.

Step 1: Download and extract the Elastic archive

Enter the following commands:

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.tar.gz
tar xzvf elasticsearch-6.6.0.tar.gz
cd elasticsearch-6.6.0

Step 2: Generate Certificates

Elasticsearch provides a certificate-generation utility named bin/elasticsearch-certutil. By default, this utility creates self-signed certificates. Use the commands below to generate your own certificates. This allows you to simulate a production-like environment.

  1. Make a certificate working directory in the Elasticsearch config directory:

    mkdir config/certs
    
    cd config/certs
    
  2. Generate the Certificate Authority (make sure your FQDN is localhost):

    openssl req -new -x509 -keyout cacert.key -out cacert.pem -days 666
    
  3. Generate a client certificate:

    openssl genrsa -out client1.key 2048
    
  4. Generate a certificate signing request:

    openssl req -new -key client1.key -out client1.csr
    
  5. Sign the request with the CA:

    openssl x509 -req -in client1.csr -CA cacert.pem -CAkey cacert.key \
    -CAcreateserial -out client1.crt -days 1825 -sha256
    
  6. Repeat the previous steps for the next client:

    openssl genrsa -out client2.key 2048
    
    openssl req -new -key client2.key -out client2.csr
    
    openssl x509 -req -in client2.csr -CA cacert.pem -CAkey cacert.key \
    -CAcreateserial -out client2.crt -days 1825 -sha256
    
  7. Package the connector keys as JKS:

    openssl pkcs12 -export -out bundle.p12 -in client2.crt -inkey client2.key
    
    keytool -keystore truststore.jks -import -file cacert.pem -alias cacert
    
    keytool -destkeystore keystore.jks -importkeystore -srckeystore bundle.p12 -srcstoretype PKCS12
    

Step 3: Configure Elasticsearch

  1. Return to the main Elasticsearch directory:

    cd ../..
    
  2. Update the Elasticsearch configuration file:

    cat <<EOF >> config/elasticsearch.yml
    xpack.security.enabled: true
    xpack.security.http.ssl.enabled: true
    xpack.security.http.ssl.client_authentication: required
    xpack.security.http.ssl.key:  certs/client1.key
    xpack.security.http.ssl.certificate: certs/client1.crt
    xpack.security.http.ssl.certificate_authorities: [ "certs/cacert.pem" ]
    EOF
    
  3. Set the passwords:

    bin/elasticsearch-keystore add xpack.security.http.ssl.secure_key_passphrase
    
  4. Run Elasticsearch:

    bin/elasticsearch
    
  5. Test the connection:

    curl --key config/certs/client2.key --cert config/certs/client2.crt \
    --cacert config/certs/cacert.pem https://localhost:9200
    

Step 4: Configure the Connector

Prerequisites
  1. Open a new terminal and change your current directory to <path-to-confluent>.

  2. Save the configuration file as elastic.properties in etc/kafka-connect-elasticsearch and add the certificate paths.

    cat <<EOF > etc/kafka-connect-elasticsearch/elastic-secure.properties
    name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=test-elasticsearch-sink
    key.ignore=true
    connection.url=https://localhost:9200
    type.name=kafka-connect
    
    elastic.security.protocol=SSL
    elastic.https.ssl.keystore.location=/home/directory/elasticsearch-6.6.0/config/certs/keystore.jks
    elastic.https.ssl.keystore.password=asdfasdf
    elastic.https.ssl.key.password=asdfasdf
    elastic.https.ssl.keystore.type=JKS
    elastic.https.ssl.truststore.location=/home/directory/elasticsearch-6.6.0/config/certs/truststore.jks
    elastic.https.ssl.truststore.password=asdfasdf
    elastic.https.ssl.truststore.type=JKS
    elastic.https.ssl.protocol=TLS
    EOF
    
  3. Start Connect and load the connector:

    bin/confluent local start connect
    
    bin/confluent local load elasticssl -d etc/kafka-connect-elasticsearch/elastic-secure.properties
    

Step 5: Test the System

  1. Enter the following command:

    bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    {"f1": "secret1"}
    {"f1": "secret2"}
    
  2. Query Elasticsearch:

    curl --key config/certs/client2.key --cert config/certs/client2.crt /
    --cacert config/certs/cacert.pem 'https://localhost:9200/test-elasticsearch-sink/_search?pretty'