Kafka Connect Elasticsearch Connector with Security

This documentation assumes you have the correct version of the connector installed in your Confluent Platform instance. It assumes you are using an Elasticsearch version with X-Pack plugin (pre-installed in Elasticsearch versions 6.3 or later).

Based on Elasticsearch documentation.

Download Elastic

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.tar.gz
tar xzvf elasticsearch-6.6.0.tar.gz
cd elasticsearch-6.6.0

Generate Certificates

Elasticsearch provides a certificate-generation utility, named bin/elasticsearch-certutil, which by default creates self-signed certificates. In this walkthrough, you generate your own certificates, so you can get closer to a production-like environment.

# Make a certificate working directory in config directory
mkdir config/certs
cd config/certs

# Generate the Certificate Authority (be sure your FQDN is ``localhost``)
openssl req -new -x509 -keyout cacert.key -out cacert.pem -days 666

# Generate a client certificate
openssl genrsa -out client1.key 2048

# Generate a certificate signing request
openssl req -new -key client1.key -out client1.csr

# Sign the request with the CA
openssl x509 -req -in client1.csr -CA cacert.pem -CAkey cacert.key \
  -CAcreateserial -out client1.crt -days 1825 -sha256

# Repeat the steps for the next client
openssl genrsa -out client2.key 2048
openssl req -new -key client2.key -out client2.csr
openssl x509 -req -in client2.csr -CA cacert.pem -CAkey cacert.key \
  -CAcreateserial -out client2.crt -days 1825 -sha256

# Package the connector keys as JKS
openssl pkcs12 -export -out bundle.p12 -in client2.crt -inkey client2.key
keytool -keystore truststore.jks -import -file cacert.pem -alias cacert
keytool -destkeystore keystore.jks -importkeystore -srckeystore bundle.p12 -srcstoretype PKCS12

Configure Elasticsearch

cd ../..
# Update the elastic config file
cat <<EOF >> config/elasticsearch.yml
xpack.security.enabled: true
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.client_authentication: required
xpack.security.http.ssl.key:  certs/client1.key
xpack.security.http.ssl.certificate: certs/client1.crt
xpack.security.http.ssl.certificate_authorities: [ "certs/cacert.pem" ]
EOF

# Set the passwords
bin/elasticsearch-keystore add xpack.security.http.ssl.secure_key_passphrase

# Run elastic
bin/elasticsearch

# Test the connection
curl --key config/certs/client2.key --cert config/certs/client2.crt --cacert config/certs/cacert.pem https://localhost:9200

Configure the Connector

Prerequisites

Open a new terminal and change your current directory to <path-to-confluent>.

In the etc/kafka-connect-elasticsearch, save this configuration file as elastic.properties, updating the certificate paths.

cat <<EOF > etc/kafka-connect-elasticsearch/elastic-secure.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=test-elasticsearch-sink
key.ignore=true
connection.url=https://localhost:9200
type.name=kafka-connect

elastic.https.ssl.keystore.location=/home/cyrus/elasticsearch-6.6.0/config/certs/keystore.jks
elastic.https.ssl.keystore.password=asdfasdf
elastic.https.ssl.key.password=asdfasdf
elastic.https.ssl.keystore.type=JKS
elastic.https.ssl.truststore.location=/home/cyrus/elasticsearch-6.6.0/config/certs/truststore.jks
elastic.https.ssl.truststore.password=asdfasdf
elastic.https.ssl.truststore.type=JKS
elastic.https.ssl.protocol=TLS
EOF

bin/confluent local start connect
bin/confluent local load elasticssl -d etc/kafka-connect-elasticsearch/elastic-secure.properties

Test the System

bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink \
   --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1": "secret1"}
{"f1": "secret2"}

Now query directly from Elasticsearch

curl --key config/certs/client2.key --cert config/certs/client2.crt --cacert config/certs/cacert.pem 'https://localhost:9200/test-elasticsearch-sink/_search?pretty'