Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Elasticsearch Connector with Security¶
Complete the following instructions to configure the Kafka Connect Elasticsearch connector with security. These instructions are based on the Elasticsearch document Encrypting HTTP Client communications.
- Prerequisites:
- Confluent Platform version 4.1 (or later) using the bundled Elasticsearch connector.
- Elasticsearch with the X-Pack plugin. This plugin is pre-installed in Elasticsearch versions 6.3 or later.
Step 1: Download and extract the Elastic archive¶
Enter the following commands:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.tar.gz
tar xzvf elasticsearch-6.6.0.tar.gz
cd elasticsearch-6.6.0
Step 2: Generate Certificates¶
Elasticsearch provides a certificate-generation utility named bin/elasticsearch-certutil
. By default, this utility creates self-signed certificates. Use the commands below to generate your own certificates. This allows you to simulate a production-like environment.
Make a certificate working directory in the Elasticsearch
config
directory:mkdir config/certs
cd config/certs
Generate the Certificate Authority (make sure your FQDN is
localhost
):openssl req -new -x509 -keyout cacert.key -out cacert.pem -days 666
Generate a client certificate:
openssl genrsa -out client1.key 2048
Generate a certificate signing request:
openssl req -new -key client1.key -out client1.csr
Sign the request with the CA:
openssl x509 -req -in client1.csr -CA cacert.pem -CAkey cacert.key \ -CAcreateserial -out client1.crt -days 1825 -sha256
Repeat the previous steps for the next client:
openssl genrsa -out client2.key 2048
openssl req -new -key client2.key -out client2.csr
openssl x509 -req -in client2.csr -CA cacert.pem -CAkey cacert.key \ -CAcreateserial -out client2.crt -days 1825 -sha256
Package the connector keys as JKS:
openssl pkcs12 -export -out bundle.p12 -in client2.crt -inkey client2.key
keytool -keystore truststore.jks -import -file cacert.pem -alias cacert
keytool -destkeystore keystore.jks -importkeystore -srckeystore bundle.p12 -srcstoretype PKCS12
Step 3: Configure Elasticsearch¶
Return to the main Elasticsearch directory:
cd ../..
Update the Elasticsearch configuration file:
cat <<EOF >> config/elasticsearch.yml xpack.security.enabled: true xpack.security.http.ssl.enabled: true xpack.security.http.ssl.client_authentication: required xpack.security.http.ssl.key: certs/client1.key xpack.security.http.ssl.certificate: certs/client1.crt xpack.security.http.ssl.certificate_authorities: [ "certs/cacert.pem" ] EOF
Set the passwords:
bin/elasticsearch-keystore add xpack.security.http.ssl.secure_key_passphrase
Run Elasticsearch:
bin/elasticsearch
Test the connection:
curl --key config/certs/client2.key --cert config/certs/client2.crt \ --cacert config/certs/cacert.pem https://localhost:9200
Step 4: Configure the Connector¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Open a new terminal and change your current directory to
<path-to-confluent>
.Save the configuration file as
elastic.properties
inetc/kafka-connect-elasticsearch
and add the certificate paths.cat <<EOF > etc/kafka-connect-elasticsearch/elastic-secure.properties name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=test-elasticsearch-sink key.ignore=true connection.url=https://localhost:9200 type.name=kafka-connect elastic.security.protocol=SSL elastic.https.ssl.keystore.location=/home/directory/elasticsearch-6.6.0/config/certs/keystore.jks elastic.https.ssl.keystore.password=asdfasdf elastic.https.ssl.key.password=asdfasdf elastic.https.ssl.keystore.type=JKS elastic.https.ssl.truststore.location=/home/directory/elasticsearch-6.6.0/config/certs/truststore.jks elastic.https.ssl.truststore.password=asdfasdf elastic.https.ssl.truststore.type=JKS elastic.https.ssl.protocol=TLS EOF
Start Connect and load the connector:
bin/confluent local start connect
bin/confluent local load elasticssl -d etc/kafka-connect-elasticsearch/elastic-secure.properties
Step 5: Test the System¶
Enter the following command:
bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1": "secret1"} {"f1": "secret2"}
Query Elasticsearch:
curl --key config/certs/client2.key --cert config/certs/client2.crt / --cacert config/certs/cacert.pem 'https://localhost:9200/test-elasticsearch-sink/_search?pretty'