Elasticsearch Connector with Security¶
Complete the following instructions to configure the Kafka Connect Elasticsearch connector with security. These instructions are based on the Elasticsearch document Encrypting HTTP Client communications.
- Prerequisites:
- Confluent Platform version 4.1 (or later) using the bundled Elasticsearch connector.
- Elasticsearch with the X-Pack plugin. This plugin is pre-installed in Elasticsearch versions 6.3 or later.
Step 1: Download and extract the Elastic archive¶
Enter the following commands:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.tar.gz
tar xzvf elasticsearch-6.6.0.tar.gz
cd elasticsearch-6.6.0
Step 2: Generate Certificates¶
Elasticsearch provides a certificate-generation utility named bin/elasticsearch-certutil
. By default, this utility creates self-signed certificates. Use the commands below to generate your own certificates. This allows you to simulate a production-like environment.
Make a certificate working directory in the Elasticsearch
config
directory:mkdir config/certs
cd config/certs
Generate the Certificate Authority (make sure your FQDN is
localhost
):openssl req -new -x509 -keyout cacert.key -out cacert.pem -days 666
Generate a client certificate:
openssl genrsa -out client1.key 2048
Generate a certificate signing request:
openssl req -new -key client1.key -out client1.csr
Sign the request with the CA:
openssl x509 -req -in client1.csr -CA cacert.pem -CAkey cacert.key \ -CAcreateserial -out client1.crt -days 1825 -sha256
Repeat the previous steps for the next client:
openssl genrsa -out client2.key 2048
openssl req -new -key client2.key -out client2.csr
openssl x509 -req -in client2.csr -CA cacert.pem -CAkey cacert.key \ -CAcreateserial -out client2.crt -days 1825 -sha256
Package the connector keys as JKS:
openssl pkcs12 -export -out bundle.p12 -in client2.crt -inkey client2.key
keytool -keystore truststore.jks -import -file cacert.pem -alias cacert
keytool -destkeystore keystore.jks -importkeystore -srckeystore bundle.p12 -srcstoretype PKCS12
Step 3: Configure Elasticsearch¶
Return to the main Elasticsearch directory:
cd ../..
Update the Elasticsearch configuration file:
cat <<EOF >> config/elasticsearch.yml xpack.security.enabled: true xpack.security.http.ssl.enabled: true xpack.security.http.ssl.client_authentication: required xpack.security.http.ssl.key: certs/client1.key xpack.security.http.ssl.certificate: certs/client1.crt xpack.security.http.ssl.certificate_authorities: [ "certs/cacert.pem" ] EOF
Set the passwords:
bin/elasticsearch-keystore add xpack.security.http.ssl.secure_key_passphrase
Run Elasticsearch:
bin/elasticsearch
Test the connection:
curl --key config/certs/client2.key --cert config/certs/client2.crt \ --cacert config/certs/cacert.pem https://localhost:9200
Step 4: Configure the Connector¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Open a new terminal and change your current directory to
<path-to-confluent>
.Save the configuration file as
elastic.properties
inetc/kafka-connect-elasticsearch
and add the certificate paths.cat <<EOF > etc/kafka-connect-elasticsearch/elastic-secure.properties name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=test-elasticsearch-sink key.ignore=true connection.url=https://localhost:9200 type.name=kafka-connect elastic.security.protocol=SSL elastic.https.ssl.keystore.location=/home/directory/elasticsearch-6.6.0/config/certs/keystore.jks elastic.https.ssl.keystore.password=asdfasdf elastic.https.ssl.key.password=asdfasdf elastic.https.ssl.keystore.type=JKS elastic.https.ssl.truststore.location=/home/directory/elasticsearch-6.6.0/config/certs/truststore.jks elastic.https.ssl.truststore.password=asdfasdf elastic.https.ssl.truststore.type=JKS elastic.https.ssl.protocol=TLS EOF
Start Connect and load the connector:
bin/confluent local services connect start
bin/confluent local services connect connector load elasticssl --config etc/kafka-connect-elasticsearch/elastic-secure.properties
Step 5: Test the System¶
Enter the following command:
bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1": "secret1"} {"f1": "secret2"}
Query Elasticsearch:
curl --key config/certs/client2.key --cert config/certs/client2.crt / --cacert config/certs/cacert.pem 'https://localhost:9200/test-elasticsearch-sink/_search?pretty'