Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka Connect Elasticsearch Connector with Security¶
This documentation assumes you have the correct version of the connector installed in your Confluent Platform instance. It assumes you are using an Elasticsearch version with X-Pack plugin (pre-installed in Elasticsearch versions 6.3 or later).
Based on Elasticsearch documentation.
Download Elastic¶
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.tar.gz
tar xzvf elasticsearch-6.6.0.tar.gz
cd elasticsearch-6.6.0
Generate Certificates¶
Elasticsearch provides a certificate-generation utility, named bin/elasticsearch-certutil
, which by default creates self-signed certificates. In this walkthrough, you generate your own certificates, so you can get closer to a production-like environment.
# Make a certificate working directory in config directory
mkdir config/certs
cd config/certs
# Generate the Certificate Authority (be sure your FQDN is ``localhost``)
openssl req -new -x509 -keyout cacert.key -out cacert.pem -days 666
# Generate a client certificate
openssl genrsa -out client1.key 2048
# Generate a certificate signing request
openssl req -new -key client1.key -out client1.csr
# Sign the request with the CA
openssl x509 -req -in client1.csr -CA cacert.pem -CAkey cacert.key \
-CAcreateserial -out client1.crt -days 1825 -sha256
# Repeat the steps for the next client
openssl genrsa -out client2.key 2048
openssl req -new -key client2.key -out client2.csr
openssl x509 -req -in client2.csr -CA cacert.pem -CAkey cacert.key \
-CAcreateserial -out client2.crt -days 1825 -sha256
# Package the connector keys as JKS
openssl pkcs12 -export -out bundle.p12 -in client2.crt -inkey client2.key
keytool -keystore truststore.jks -import -file cacert.pem -alias cacert
keytool -destkeystore keystore.jks -importkeystore -srckeystore bundle.p12 -srcstoretype PKCS12
Configure Elasticsearch¶
cd ../..
# Update the elastic config file
cat <<EOF >> config/elasticsearch.yml
xpack.security.enabled: true
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.client_authentication: required
xpack.security.http.ssl.key: certs/client1.key
xpack.security.http.ssl.certificate: certs/client1.crt
xpack.security.http.ssl.certificate_authorities: [ "certs/cacert.pem" ]
EOF
# Set the passwords
bin/elasticsearch-keystore add xpack.security.http.ssl.secure_key_passphrase
# Run elastic
bin/elasticsearch
# Test the connection
curl --key config/certs/client2.key --cert config/certs/client2.crt --cacert config/certs/cacert.pem https://localhost:9200
Configure the Connector¶
Open a new terminal and change your current directory to <path-to-confluent>
.
In the etc/kafka-connect-elasticsearch
, save this configuration file as elastic.properties
, updating the certificate paths.
cat <<EOF > etc/kafka-connect-elasticsearch/elastic-secure.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=test-elasticsearch-sink
key.ignore=true
connection.url=https://localhost:9200
type.name=kafka-connect
elastic.https.ssl.keystore.location=/home/cyrus/elasticsearch-6.6.0/config/certs/keystore.jks
elastic.https.ssl.keystore.password=asdfasdf
elastic.https.ssl.key.password=asdfasdf
elastic.https.ssl.keystore.type=JKS
elastic.https.ssl.truststore.location=/home/cyrus/elasticsearch-6.6.0/config/certs/truststore.jks
elastic.https.ssl.truststore.password=asdfasdf
elastic.https.ssl.truststore.type=JKS
elastic.https.ssl.protocol=TLS
EOF
bin/confluent start connect
bin/confluent load elasticssl -d etc/kafka-connect-elasticsearch/elastic-secure.properties
Test the System¶
bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1": "secret1"}
{"f1": "secret2"}
Now query directly from Elasticsearch
curl --key config/certs/client2.key --cert config/certs/client2.crt --cacert config/certs/cacert.pem 'https://localhost:9200/test-elasticsearch-sink/_search?pretty'