Encrypt and Authenticate with TlS/SSL¶
TLS/SSL Overview¶
With TLS/SSL authentication, the server authenticates the client (also called “2-way authentication”). Because TLS authentication requires TLS encryption, this page shows you how to configure both at the same time and is a superset of configurations required just for SSL encryption.
By default, Apache Kafka® communicates in PLAINTEXT
, which means that all data is
sent in the clear. To encrypt communication, you should configure
all the Confluent Platform components in your deployment to use TLS/SSL encryption.
Secure Sockets Layer (SSL) is the predecessor of Transport Layer Security (TLS), and has been deprecated since June 2015. However, for historical reasons, Kafka (like Java) uses the term/acronym “SSL” instead of “TLS” in configuration and code.
You can configure TLS/SSL for encryption, but you can also configure TLS/SSL for authentication. You can configure just TLS/SSL encryption (by default, TLS/SSL encryption includes certificate authentication of the server) and independently choose a separate mechanism for client authentication (for example, SSL, SASL). Technically speaking, TLS/SSL encryption already enables 1-way authentication in which the client authenticates the server certificate. In this topic, “TLS/SSL authentication” refers to the 2-way authentication, where the broker also authenticates the client certificate.
Enabling TLS/SSL may have a performance impact due to encryption overhead.
TLS/SSL uses private-key/certificate pairs, which are used during the TLS/SSL handshake process.
- Each broker needs its own private-key/certificate pair, and the client uses the certificate to authenticate the broker.
- Each logical client needs a private-key/certificate pair if client authentication is enabled, and the broker uses the certificate to authenticate the client.
You can configure each broker and logical client with a truststore, which is used to determine which certificates (broker or logical client identities) to trust (authenticate). You can configure the truststore in many ways. Consider the following two examples:
- The truststore contains one or many certificates: the broker or logical client will trust any certificate listed in the truststore.
- The truststore contains a Certificate Authority (CA): the broker or logical client will trust any certificate that was signed by the CA in the truststore.
Using the CA method is more convenient, because adding a new broker or client doesn’t require a change to the truststore. The CA method is outlined in this diagram.
However, with the CA method, Kafka does not conveniently support blocking authentication for individual brokers or clients that were previously trusted using this mechanism (certificate revocation is typically done using Certificate Revocation Lists or the Online Certificate Status Protocol), so you would have to rely on authorization to block access.
In contrast, if you use one or many certificates, blocking authentication is achieved by removing the broker or client’s certificate from the truststore.
See also
For an example that shows this in action, see the Confluent Platform demo. Refer to the demo’s docker-compose.yml file for a configuration reference.
Creating TLS/SSL Keys and Certificates¶
Refer to the Security Tutorial, which describes how to create SSL keys and certificates.
Brokers¶
Configure all brokers in the Kafka cluster to accept secure connections from clients. Any configuration changes made to the broker will require a rolling restart.
Enable security for Kafka brokers as described in the section below. Additionally, if you are using Confluent Control Center or Auto Data Balancer, configure your brokers for:
Note
For details on all required and optional broker configuration properties, see Kafka Broker Configurations.
Configure the truststore, keystore, and password in the
server.properties
file of every broker. Because this stores passwords directly in the broker configuration file, it is important to restrict access to these files using file system permissions.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234
Note that
ssl.truststore.password
is technically optional, but strongly recommended. If a password is not set, access to the truststore is still available, but integrity checking is disabled.If you want to enable TLS/SSL for inter-broker communication, add the following to the broker properties file, which defaults to
PLAINTEXT
:security.inter.broker.protocol=SSL
Configure the ports for the Apache Kafka® brokers to listen for client and inter-broker
SSL
connections. You should configurelisteners
, and optionally,advertised.listeners
if the value is different fromlisteners
.listeners=SSL://kafka1:9093 advertised.listeners=SSL://localhost:9093
Configure both
SSL
ports andPLAINTEXT
ports if:- TLS/SSL is not enabled for inter-broker communication
- Some clients connecting to the cluster do not use TLS/SSL
listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093 advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
Note that
advertised.host.name
andadvertised.port
configure a singlePLAINTEXT
port and are incompatible with secure protocols. Useadvertised.listeners
instead.To enable the broker to authenticate clients (2-way authentication), you must configure all the brokers for client authentication. Configure this to use
required
rather thanrequested
because misconfigured clients can still connect successfully and this provides a false sense of security.ssl.client.auth=required
Note
If any SASL authentication mechanisms are enabled for a given listener, then SSL client authentication is disabled–even if you have specified
ssl.client.auth=required
and the broker authenticates clients only using SASL on that listener.
Optional settings¶
ssl.endpoint.identification.algorithm
- The endpoint identification algorithm used by clients to validate server host
name. The default value is
https
. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the broker’s certificate. Disable server host name verification by settingssl.endpoint.identification.algorithm
to an empty string.
- Type: string
- Default: https
- Importance: medium
ssl.cipher.suites
- A cipher suite is a named combination of authentication, encryption, MAC, and key exchange algorithm used to negotiate the security settings for a network connection (using the TLS/SSL network protocol).
- Type: list
- Default: null (by default, all supported cipher suites are enabled)
- Importance: medium
ssl.enabled.protocols
- The list of protocols enabled for SSL connections.
- Type: list
- Default: TLSv1.2,TLSv1.1,TLSv1
- Importance: medium
ssl.truststore.type
- The file format of the truststore file.
- Type: string
- Default: JKS
- Importance: medium
Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the JCE Unlimited Strength Jurisdiction Policy Files must be obtained and installed in the JDK/JRE. See the JCA Providers Documentation for more information.
Clients¶
The new Producer and Consumer clients support security for Kafka versions 0.9.0 and higher.
If you are using the Kafka Streams API, you can read on how to configure equivalent SSL and SASL parameters.
In the following configuration example, the underlying assumption is that client
authentication is required by the broker so that you can store it in a client properties file
client-ssl.properties
. Because this stores passwords directly in the broker
configuration file, it is important to restrict access to these files using file
system permissions.
Important
If you are configuring this for Schema Registry or REST Proxy, you must prefix each parameter with
confluent.license
. For example, security.protocol
becomes
confluent.license.security.protocol
.
bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
Note that ssl.truststore.password
is technically optional, but strongly
recommended. If a password is not set, access to the truststore is still
available, but integrity checking is disabled.
The following examples use kafka-console-producer
and kafka-console-consumer
,
and pass in the client-ssl.properties
defined above:
bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties
bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --consumer.config client-ssl.properties --from-beginning
Optional settings¶
ssl.endpoint.identification.algorithm
- The endpoint identification algorithm used by clients to validate server host
name. The default value is
https
. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the broker’s certificate. Disable server host name verification by settingssl.endpoint.identification.algorithm
to an empty string.
- Type: string
- Default: https
- Importance: medium
ssl.provider
- The name of the security provider used for TLS/SSL connections. Default value is the default security provider of the JVM.
- Type: string
- Default: null
- Importance: medium
ssl.cipher.suites
- A cipher suite is a named combination of authentication, encryption, MAC, and key exchange algorithm used to negotiate the security settings for a network connection (using the TLS/SSL network protocol).
- Type: list
- Default: null (by default, all supported cipher suites are enabled)
- Importance: medium
ssl.enabled.protocols
- The list of protocols enabled for TLS/SSL connections.
- Type: list
- Default: TLSv1.2,TLSv1.1,TLSv1
- Importance: medium
ssl.truststore.type
- The file format of the truststore file.
- Type: string
- Default: JKS
- Importance: medium
ZooKeeper¶
Starting in Confluent Platform version 5.5.0, the version of ZooKeeper bundled with Kafka supports TLS/SSL. For details, refer to Adding security to a running cluster.
Kafka Connect¶
This section describes how to enable security for Kafka Connect. Securing Kafka Connect requires that you configure security for:
- Kafka Connect workers: part of the Kafka Connect API, a worker is really just an advanced client, underneath the covers
- Kafka Connect connectors: connectors may have embedded producers or consumers, so you must override the default configurations for Connect producers used with source connectors and Connect consumers used with sink connectors
- Kafka Connect REST: Kafka Connect exposes a REST API that can be configured to use SSL using additional properties
Configure security for Kafka Connect as described in the section below. Additionally, if you are using Confluent Control Center streams monitoring for Kafka Connect, configure security for:
Configure the top-level settings in the Connect workers to use TLS/SSL by adding
these properties in connect-distributed.properties
. These top-level settings
are used by the Connect worker for group coordination and to read and write to
the internal topics that are used to track the cluster’s state (for example,
configs and offsets). The assumption here is that client authentication is
required by the brokers.
bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
Connect workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, must also override the default producer/consumer configuration that the worker uses. The assumption here is that client authentication is required by the brokers.
For source connectors: configure the same properties adding the
producer
prefix.producer.bootstrap.servers=kafka1:9093 producer.security.protocol=SSL producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.ssl.truststore.password=test1234 producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks producer.ssl.keystore.password=test1234 producer.ssl.key.password=test1234
For sink connectors: configure the same properties adding the
consumer
prefix.consumer.bootstrap.servers=kafka1:9093 consumer.security.protocol=SSL consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.ssl.truststore.password=test1234 consumer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks consumer.ssl.keystore.password=test1234 consumer.ssl.key.password=test1234
Tip
For more information, see Kafka Connect Security Basics.
Confluent Replicator¶
Confluent Replicator is a type of Kafka source connector that replicates data from a source to destination Kafka cluster. An embedded consumer inside Replicator consumes data from the source cluster, and an embedded producer inside the Kafka Connect worker produces data to the destination cluster.
Replicator version 4.0 and earlier requires a connection to ZooKeeper in the origin and destination Kafka clusters. If ZooKeeper is configured for authentication, the client configures the ZooKeeper security credentials via the global JAAS configuration setting -Djava.security.auth.login.config
on the Connect workers, and the ZooKeeper security credentials in the origin and destination clusters must be the same.
To configure Confluent Replicator security, you must configure the Replicator connector as shown below and additionally you must configure:
To add TLS/SSL to the Confluent Replicator embedded consumer, modify the Replicator JSON properties file.
This example is a subset of configuration properties to add for TLS/SSL encryption and authentication. The assumption here is that client authentication is required by the brokers.
{
"name":"replicator",
"config":{
....
"src.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
"src.kafka.ssl.truststore.password":"confluent",
"src.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks",
"src.kafka.ssl.keystore.password":"confluent",
"src.kafka.ssl.key.password":"confluent",
"src.kafka.security.protocol":"SSL"
....
}
}
}
See also
To see an example Confluent Replicator configuration, refer to the SSL source authentication demo script. For demos of common security configurations refer to Replicator security demos.
To configure Confluent Replicator for a destination cluster with SSL authentication, modify the Replicator JSON configuration to include the following:
{
"name":"replicator",
"config":{
....
"dest.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
"dest.kafka.ssl.truststore.password":"confluent",
"dest.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks",
"dest.kafka.ssl.keystore.password":"confluent",
"dest.kafka.ssl.key.password":"confluent",
"dest.kafka.security.protocol":"SSL"
....
}
}
}
Additionally, the following properties are required in the Connect worker:
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
ssl.truststore.password=confluent
ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
ssl.keystore.password=confluent
ssl.key.password=confluent
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
producer.ssl.truststore.password=confluent
producer.ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
producer.ssl.keystore.password=confluent
producer.ssl.key.password=confluent
For more details, see general security configuration for Connect workers.
See also
To see an example Confluent Replicator configuration, see the SSL destination authentication demo script. For demos of common security configurations see: Replicator security demos.
Confluent Control Center¶
Confluent Control Center uses Kafka Streams as a state store, so if all the Kafka brokers in the cluster backing Control Center are secured, then the Control Center application also needs to be secured.
Note
When RBAC is enabled, Control Center cannot be used in conjunction with Kerberos because Control Center cannot support any SASL mechanism other than OAUTHBEARER.
Enable security for the Control Center application as described in the section below. Additionally, configure security for the following components:
- Confluent Metrics Reporter: required on the production cluster being monitored
- Confluent Monitoring Interceptors: optional if you are using Control Center streams monitoring
Enable TLS/SSL for Confluent Control Center in the etc/confluent-control-center/control-center.properties
file.
The assumption here is that client authentication is required by the brokers.
confluent.controlcenter.streams.security.protocol=SSL
confluent.controlcenter.streams.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.controlcenter.streams.ssl.truststore.password=test1234
confluent.controlcenter.streams.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.controlcenter.streams.ssl.keystore.password=test1234
confluent.controlcenter.streams.ssl.key.password=test1234
Confluent Metrics Reporter¶
This section describes how to enable SSL encryption and authentication for Confluent Metrics Reporter, which is used for Confluent Control Center and Auto Data Balancer.
To add TLS/SSL for the Confluent Metrics Reporter, add the following to server.properties
on
the brokers in the Kafka cluster being monitored. The assumption here is that
client authentication is required by the brokers.
confluent.metrics.reporter.bootstrap.servers=kafka1:9093
confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234
confluent.metrics.reporter.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.metrics.reporter.ssl.keystore.password=test1234
confluent.metrics.reporter.ssl.key.password=test1234
Confluent Monitoring Interceptors¶
Confluent Monitoring Interceptors are used for Confluent Control Center streams monitoring. This section describes how to enable security for Confluent Monitoring Interceptors in three places:
- General clients
- Kafka Connect
- Confluent Replicator
Important
The typical use case for Confluent Monitoring Interceptors is to provide monitoring
data to a separate monitoring cluster that most likely has different configurations.
Interceptor configurations do not inherit configurations for the monitored component.
If you wish to use configurations from the monitored component, you must add
the appropriate prefix. For example, the option confluent.monitoring.interceptor.security.protocol=SSL
,
if being used for a producer, must be prefixed with producer.
and would appear as
producer.confluent.monitoring.interceptor.security.protocol=SSL
.
Interceptors for General Clients¶
For Confluent Control Center stream monitoring to work with Kafka clients, you must configure TLS/SSL encryption and authentication for the Confluent Monitoring Interceptors in each client.
- Verify that the client has configured interceptors.
Producer:
interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
Consumer:
interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
Configure TLS/SSL encryption and authentication for the interceptor. The assumption here is that client authentication is required by the brokers.
confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
confluent.monitoring.interceptor.security.protocol=SSL
confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.monitoring.interceptor.ssl.truststore.password=test1234
confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.monitoring.interceptor.ssl.keystore.password=test1234
confluent.monitoring.interceptor.ssl.key.password=test1234
Interceptors for Kafka Connect¶
For Confluent Control Center stream monitoring to work with Kafka Connect, you must configure SSL
for the Confluent Monitoring Interceptors in Kafka Connect. The assumption
here is that client authentication is required by the brokers. Configure the
Connect workers by adding these properties in connect-distributed.properties
,
depending on whether the connectors are sources or sinks.
Source connector: configure the Confluent Monitoring Interceptors for TLS/SSL encryption and authentication with the
producer
prefix.producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 producer.confluent.monitoring.interceptor.security.protocol=SSL producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 producer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks producer.confluent.monitoring.interceptor.ssl.keystore.password=test1234 producer.confluent.monitoring.interceptor.ssl.key.password=test1234
Sink connector: configure the Confluent Monitoring Interceptors for TLS/SSL encryption and authentication with the
consumer
prefix.consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 consumer.confluent.monitoring.interceptor.security.protocol=SSL consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 consumer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks consumer.confluent.monitoring.interceptor.ssl.keystore.password=test1234 consumer.confluent.monitoring.interceptor.ssl.key.password=test1234
Interceptors for Replicator¶
For Confluent Control Center stream monitoring to work with Replicator, you must configure TLS/SSL for the Confluent Monitoring Interceptors in the Replicator JSON configuration file. Here is an example subset of configuration properties to add for TLS/SSL encryption and authentication:
{
"name":"replicator",
"config":{
....
"src.consumer.group.id": "replicator",
"src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
"src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL",
"src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093",
"src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/private/ssl/kafka.client.truststore.jks",
"src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234",
"src.consumer.confluent.monitoring.interceptor.ssl.keystore.location": "/var/private/ssl/kafka.client.keystore.jks",
"src.consumer.confluent.monitoring.interceptor.ssl.keystore.password": "test1234",
"src.consumer.confluent.monitoring.interceptor.ssl.key.password": "test1234",
....
}
}
}
Enable TLS/SSL in a Self-Balancing cluster¶
To enable TLS/SSL encryption in a Self-Balancing cluster, add the following to
the server.properties
file on the brokers in the Kafka cluster.
confluent.rebalancer.metrics.security.protocol=SSL
confluent.rebalancer.metrics.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
confluent.rebalancer.metrics.ssl.truststore.password=confluent
confluent.rebalancer.metrics.ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
confluent.rebalancer.metrics.ssl.keystore.password=confluent
confluent.rebalancer.metrics.ssl.key.password=confluent
Schema Registry¶
Schema Registry uses Kafka to persist schemas, and so it acts as a client to write data to the Kafka cluster. Therefore, if the Kafka brokers are configured for security, you should also configure Schema Registry to use security. You may also refer to the complete list of Schema Registry configuration options.
The following is an example subset of schema-registry.properties
configuration
parameters to add for TLS/SSL encryption and authentication. The assumption
here is that client authentication is required by the brokers.
kafkastore.bootstrap.servers=SSL://kafka1:9093
kafkastore.security.protocol=SSL
kafkastore.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
kafkastore.ssl.truststore.password=test1234
kafkastore.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
kafkastore.ssl.keystore.password=test1234
kafkastore.ssl.key.password=test1234
REST Proxy¶
Securing Confluent REST Proxy with TLS/SSL encryption and authentication requires that you configure security between:
- REST clients and the REST Proxy (HTTPS)
- REST proxy and the Kafka cluster
Also, refer to the complete list of REST Proxy configuration options.
Configure HTTPS between REST clients and the REST Proxy. The following is an example subset of
kafka-rest.properties
configuration parameters to configure HTTPS.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234
Configure TLS/SSL encryption and authentication between REST proxy and the Kafka cluster. The following is an example subset of
kafka-rest.properties
configuration parameters to add for TLS/SSL encryption and authentication. The assumption here is that client authentication is required by the brokers.client.bootstrap.servers=kafka1:9093 client.security.protocol=SSL client.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks client.ssl.truststore.password=test1234 client.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks client.ssl.keystore.password=test1234 client.ssl.key.password=test1234
SSL Logging¶
Enable SSL debug logging at the JVM level by starting the Kafka broker and/or clients with the javax.net.debug
system property. For example:
export KAFKA_OPTS=-Djavax.net.debug=all
bin/kafka-server-start etc/kafka/server.properties
Tip
These instructions are based on the assumption that you are installing Confluent Platform by using ZIP or TAR archives. For more information, see On-Premises Deployments.
Once you start the broker you should be able to see in the server.log
:
with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)
To verify if the server’s keystore and truststore are setup correctly you can run the following command:
openssl s_client -debug -connect localhost:9093 -tls1
Note: TLSv1 should be listed under ssl.enabled.protocols
.
In the output of this command you should see the server’s certificate:
-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Joe Smith
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com
You can find more details on this in the Oracle documentation on debugging SSL/TLS connections.
If the certificate does not show up with the openssl
command, or if there are any other error messages, then your keys or certificates are not setup correctly. Review your configurations.