Configure mTLS Authentication and RBAC for Kafka Brokers

This configuration shows how to configure Kafka brokers with mutual TLS (mTLS) authentication and role-based access control (RBAC) through the Confluent Metadata Service (MDS). mTLS provides two-way authentication to ensure that traffic between clients and the MDS is secure, and that you can trust content coming from both directions. In this example:

  • Clients communicating with Kafka can use mTLS (encryption plus authentication).
  • Clients communicating with MDS use TLS (encryption) and HTTP Basic authentication.

You can use this walk-through as a template for configuring other combinations of authentication and RBAC. The general pattern is:

  1. Configure properties for authentication, or if this is already configured, skip to the steps to configure the authorizer.
  2. Decide which authorizer to use (in this case, ConfluentServerAuthorizer, which provides RBAC and support for centralized ACLs). Authorization is defined in authorizer.class.name.
  3. Enable MDS.
  4. Add a directory service (such as LDAP).

Terminology

Following is a quick review of terms. To further help put these technologies in context, see the blog post, Apache Kafka Security 101.

Authentication

Authentication is a security mechanism that asks the question “who are you?” You can use different security protocols to authenticate (identify) the principal who is trying to get access to platform resources.

  • PLAINTEXT, in the context of Kafka brokers, means no authentication and no encryption. Data is sent in “plain” or “clear” text.

  • Secure Sockets Layer (SSL), and its newer incarnation Transport Layer Security (TLS), is a protocol for securing encrypted communication between entities. Kafka (like Java) still uses the term SSL in configuration and code. TLS can be configured for encryption only, or encryption and mutual authentication (mTLS). This involves generating certificates and using a certificate authority. mTLS is authentication with TLS. To learn more about TLS, see Protect Data in Motion with TLS Encryption in Confluent Platform and Use TLS Authentication in Confluent Platform.

    Protocol Security Provided Implementation
    SSL encryption TLS
    SSL encryption + authentication mTLS
  • SASL is a general protocol for authentication which can support multiple mechanisms and implementations such as Kerberos, PLAIN, and SCRAM on the same port (for example, 9092). To learn more about SASL, see SASL.

    Protocol Security Provided
    SASL (PLAIN) authentication, no encryption
    SASL (authentication), TLS/SSL (encryption) encryption + authentication

    Tip

Authorization

Authorization is a security mechanism that asks the question “Do you have permissions for xyz (action, resource)?” RBAC determines client privileges and enables granular access controls based on granted roles. RBAC can be used in combination with ACLs.

Overview and Prerequisites

These examples and Walkthrough assume that you have:

As a part of the configuration, and to follow along with the tasks covered here, you will:

Configuration in a Nutshell

The example below shows configurations for authentication with mTLS and authorization with RBAC over MDS in a single etc/kafka/server.properties file, therefore a single broker configuration.

Tip

You need a Kafka configuration file for each broker. For example, to create three brokers, create three configuration files.

Authentication with mTLS uses three ports:

  • One port with mTLS for interbrokers
  • One port with mTLS for external to clients
  • One port for token services (enables impersonation with MDS)

Port 8090 is used for MDS HTTPS connections.

Tip

Technically, only a single port is needed to run a broker.

  • Kafka requires at least one open port for client connections and interbroker communication. This port must have mTLS.
  • A second port with mTLS is shown in the example as an operational convenience, allowing for better traffic control, as it can be configured to listen on different hostnames (for example, one internal, another external).
  • A third port for token services is not strictly mandatory either, but because it enables impersonation with MDS, this is a requirement for full RBAC functionality.

Line by line descriptions are shown at the end of the example.

Note

Most configuration attributes show example values in <>, which can be helpful in terms of understanding the type of value expected. Users are expected to replace the example with values matching their own setup. Values displayed without <> can be used as recommended values.

  1 ############################# Broker Settings ##################################
  2 zookeeper.connect=<host-1>:2181,<host-2>:2181,<host-3>:2181
  3 log.dirs=/var/lib/kafka/data
  4 broker.id=1
  5
  6 ############################# Log Retention Policy, Log Basics ##################
  7 log.retention.check.interval.ms=300000
  8 log.retention.hours=168
  9 log.segment.bytes=1073741824
 10 num.io.threads=16
 11 num.network.threads=8
 12 num.partitions=1
 13 num.recovery.threads.per.data.dir=2
 14
 15 ########################### Socket Server Settings #############################
 16 socket.receive.buffer.bytes=102400
 17 socket.request.max.bytes=104857600
 18 socket.send.buffer.bytes=102400
 19
 20 ############################# Internal Topic Settings  #########################
 21 offsets.topic.replication.factor=3
 22 transaction.state.log.min.isr=2
 23 transaction.state.log.replication.factor=3
 24
 25 ######################## Metrics Reporting ########################################
 26 metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
 27 confluent.metrics.reporter.bootstrap.servers=<address>-west-2.compute.internal:9092
 28 confluent.metrics.reporter.topic.replicas=3
 29 confluent.support.customer.id=anonymous
 30
 31 ######################## LISTENERS ######################################
 32 listeners=INTERNAL://:9092,EXTERNAL://:9093,TOKEN://:9094
 33 advertised.listeners=INTERNAL://<localhost>:9092,\
 34                      EXTERNAL://<external-hostname>:9093,\
 35                      TOKEN://<external-hostname>:9094
 36 listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL,TOKEN:SASL_SSL
 37
 38 inter.broker.listener.name=INTERNAL
 39
 40 ############################ TLS/SSL SETTINGS #####################################
 41 ssl.truststore.location=/var/ssl/private/client.truststore.jks
 42 ssl.truststore.password=<truststore-password>
 43 ssl.keystore.location=/var/ssl/private/kafka.keystore.jks
 44 ssl.keystore.password=<keystore-password>
 45 ssl.key.password=<key-password>
 46 ssl.client.auth=required
 47 ssl.endpoint.identification.algorithm=HTTPS
 48
 49 ##############  TLS/SSL settings for metrics reporting ##############
 50 confluent.metrics.reporter.security.protocol=SSL
 51 confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/client.truststore.jks
 52 confluent.metrics.reporter.ssl.truststore.password=<truststore-password>
 53 confluent.metrics.reporter.ssl.keystore.location=/var/ssl/private/kafka.keystore.jks
 54 confluent.metrics.reporter.ssl.keystore.password=<keystore-password>
 55 confluent.metrics.reporter.ssl.key.password=<key-password>
 56
 57 ############################# TLS/SSL LISTENERS #############################
 58 listener.name.internal.ssl.principal.mapping.rules= \
 59         RULE:^CN=([a-zA-Z0-9.]*).*$/$1/L ,\
 60         DEFAULT
 61
 62 listener.name.external.ssl.principal.mapping.rules= \
 63         RULE:^CN=([a-zA-Z0-9.]*).*$/$1/L ,\
 64         DEFAULT
 65
 66 ############################# TOKEN LISTENER #############################
 67 listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER
 68 listener.name.token.oauthbearer.sasl.jaas.config= \
 69     org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
 70         publicKeyPath="<path-to-mds-public-key.pem>";
 71 listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
 72 listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
 73
 74 ############################# Authorization Settings #############################
 75 authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
 76 confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT
 77 super.users=User:kafka
 78
 79 #############################  MDS Listener - which port to listen on #############################
 80 confluent.metadata.server.listeners=https://0.0.0.0:8090,http://0.0.0.0:8091
 81 confluent.metadata.server.advertised.listeners=https://<localhost>:8090,\
 82                                                http://<localhost>:8091
 83
 84 ############################# TLS/SSL Settings for MDS #############################
 85 confluent.metadata.server.ssl.keystore.location=<path-to-kafka.keystore.jks>
 86 confluent.metadata.server.ssl.keystore.password=<keystore-password>
 87 confluent.metadata.server.ssl.key.password=<key-password>
 88 confluent.metadata.server.ssl.truststore.location=<path-to-client.truststore.jks>
 89 confluent.metadata.server.ssl.truststore.password=<truststore-password>
 90
 91 ############################# MDS Token Service Settings - enable token generation #############################
 92 confluent.metadata.server.token.max.lifetime.ms=3600000
 93 confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>
 94 confluent.metadata.server.token.signature.algorithm=RS256
 95 confluent.metadata.server.authentication.method=BEARER
 96
 97 ############################# Identity Provider Settings(LDAP - local OpenLDAP) #############################
 98 ldap.java.naming.factory.initial=com.sun.jndi.ldap.LdapCtxFactory
 99 ldap.com.sun.jndi.ldap.read.timeout=3000
100 ldap.java.naming.provider.url=ldap:<ldap-server-address>
101 # how mds authenticates to ldap server
102 ldap.java.naming.security.principal=<CN=mds,CN=Demo,DC=confluent,DC=io>
103 ldap.java.naming.security.credentials=<password>
104 ldap.java.naming.security.authentication=simple
105 # ldap search mode (GROUPS is default)
106 #ldap.search.mode=GROUPS
107 #ldap.search.mode=USERS
108 # how to search for users
109 ldap.user.search.base=<CN=Demo,DC=confluent,DC=io>
110 # how to search for groups
111 ldap.group.search.base=<CN=Demo,DC=confluent,DC=io>
112 # which attribute in ldap record corresponds to user name
113 ldap.user.name.attribute=sAMAccountName
114 ldap.user.memberof.attribute.pattern=<CN=(.*),CN=Demo,DC=confluent,DC=io>
115 ldap.group.object.class=group
116 ldap.group.name.attribute=sAMAccountName
117 ldap.group.member.attribute.pattern=<CN=(.*),CN=Demo,DC=confluent,DC=io>
118
119 ########################### Enable Swagger #############################
120 confluent.metadata.server.openapi.enable=true

Line Number Callouts for Configuration Sections

Configuring Brokers, Kafka Listeners, Authentication, and Authorization mechanism

  • Lines 32-38: Under “LISTENERS”, define three Kafka listeners and configure HTTPS for the broker, keyed to the security protocols the listeners will use. See Configure Listeners.
  • Lines 41-55: Under “TLS/SSL SETTINGS”, define top-level TLS/SSL settings for Kafka that can be shared and apply to all listeners. To define more granular TLS/SSL settings per entity, you would use a different set of prefixed configs and generate different certificates for each one. The first four lines define encryption for which you need to generate TLS/SSL certificates, and the last three lines specify authentication. See Kafka TLS/SSL Settings.
  • Lines 50-55: Also under “TLS/SSL SETTINGS”, in sub-section “TLS/SSL settings for metrics reporting”, is an example of setting up TLS/SSL for monitoring metrics. See Kafka TLS/SSL Settings.
  • Lines 58-64: Under “TLS/SSL LISTENERS”, apply a regular expression (regex) to the distinguished name (dname) for the principal in the certificate to extract only the first CN (common name) for machine name. See Principal Mapping Rules for TLS/SSL Listeners (Extract a Principal from a Certificate)
  • Lines 67-72: Under “TOKEN LISTENER”, configure another listener on Kafka which will accept tokens for impersonation with MDS using OAUTHBEARER. See Enable Token-based Authentication
  • Lines 75-77: Under “Authorization Settings”, define the authorization mechanism as ConfluentServerAuthorizer, which includes support for RBAC and centralized ACLs. See Configure the Authorizer.

Configuring MDS

  • Lines 80-82: Under “MDS Listener”, define an MDS listener that uses HTTP protocol with TLS/SSL encryption (HTTPS) to talk to clients. See Specify HTTPS Listeners for MDS.
  • Lines 85-89: Under “TLS/SSL Settings for MDS”, define encryption and authentication to map to the MDS HTTPS listener. See TLS/SSL Settings for MDS.
  • Lines 92-95: MDS can provide tokens in exchange for client credentials as well as accept tokens from clients to authenticate. See Enable and Configure MDS Token Generation. Under “MDS Token Service Settings”, enable and configure the token service, set token lifetime, specify public/private key paths on the machine where MDS is running and encryption algorithm. See Enable and Configure MDS Token Generation.
  • Lines 98-117: Under “Identity Provider Settings”, configure LDAP so that RBAC can use it. See Configuring LDAP Settings.

Optional Testing and Troubleshooting

  • Line 120: Under “Enable Swagger”, set confluent.metadata.server.openapi.enable to true if you want to use the Swagger UI to test calls to Kafka. This is not specific to this mTLS configuration with RBAC example.

    Note

    Do not enable the Swagger UI in production. It is limited to HTTP, so will not work with HTTPS or custom certificates. That said, you might find it useful for initial setup and testing.

How to run it

  1. Start ZooKeeper.

    sudo zookeeper-server-start ${CONFLUENT_HOME}/etc/kafka/zookeeper.properties
    
  2. Start Kafka with the etc/kafka/server.properties you just configured.

    kafka-server-start ${CONFLUENT_HOME}/etc/kafka/server.properties
    

To learn more, see Start Confluent Platform and how to install and run Confluent Platform.

You can configure clients like Schema Registry, Control Center, ksqlDB, and Connect to talk to Kafka and MDS over HTTPS in their respective properties files.

../../_images/security-rbac-mtls.png

Deep Dive on mTLS and RBAC configs

This Deep Dive goes into more detail about the same configurations summarized in the nutshell section above.

Kafka Server Properties File

The primary focus of this walkthrough are configs defined in ./etc/kafka/<server.properties>. After you have all configurations set up, you would run the following command to Kafka using your properties file:

kafka-server-start ${CONFLUENT_HOME}/kafka/<server.properties>

Configure Listeners

Define which ports Kafka should listen on, using the format <NAME>://<PORT>.

By default we provide one Kafka listener on 9092.

You need at least this, and you need an advertised listener for clients over the internet.

######################## LISTENERS ######################################
listeners=INTERNAL://:9092,EXTERNAL://:9093,TOKEN://:9094
advertised.listeners=INTERNAL://<localhost>:9092,\
                     EXTERNAL://<external-hostname>:9093,\
                     TOKEN://<external-hostname>:9094
listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL,TOKEN:SASL_SSL

inter.broker.listener.name=INTERNAL

Tip

  • By default, Kafka uses 9092.
  • You can use any free port on your system, and give custom names as desired.
  • Name used to have to match the security protocol, but this is no longer a requirement.

listeners - Local ports available for other cloud machines on the same network. The example defines three ports with descriptive names.

advertised listeners - Keyed by name to listeners, advertised ports are for clients interacting with these servers over the internet. When a client asks how can I talk to you, you provide the host address (advertised) and port.

listener.security.protocol.map - Maps listener name to security protocol

  • INTERNAL:SSL - TLS/SSL authentication
  • EXTERNAL:SSL - TLS/SSL authentication
  • TOKEN:SASL_SSL - SASL authentication with TLS/SSL encryption

When Kafka starts, it grabs the specified ports to listen on. Kafka clients, such as producers and consumers, can reach Kafka on those ports.

Kafka TLS/SSL Settings

To configure SSL:

  1. Create certificates.
  2. Configure TLS/SSL in the properties file.

The TLS/SSL settings shown below are top-level configurations for Kafka that define encryption and authentication, and apply to listeners because they are not prefixed to point to individual listeners.

############################ TLS/SSL SETTINGS #####################################
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=<truststore-password>
ssl.keystore.location=/var/ssl/private/client.keystore.jks
ssl.keystore.password=<keystore-password>
ssl.key.password=<key-password>
ssl.client.auth=required
ssl.endpoint.identification.algorithm=HTTPS

You could configure TLS/SSL separately by generating individual certificates for each listener; but that is not done for this example.

These TLS/SSL settings can also be shared by MDS if you do not configure MDS specific prefixed TLS/SSL settings.

If you wanted to define a setting specific to a listener, you would use the following format for the prefix: listener.name.<name>.<SETTING>=<value>. For example, to specify values for INTERNAL:

listener.name.internal.ssl.truststore.location=/var/ssl/private/client.truststore.jks
listener.name.internal.ssl.truststore.password=<truststore-password>
listener.name.internal.ssl.keystore.location=/var/ssl/private/client.keystore.jks
listener.name.internal.ssl.keystore.password=<keystore-password>
listener.name.internal.ssl.key.password=<key-password>
listener.name.internal.ssl.client.auth=required
listener.name.internal.ssl.endpoint.identification.algorithm=HTTPS

Also included in this section are TLS/SSL settings for metrics reporting. These are not specific to this mTLS configuration with RBAC example, but included here to show that you can configure unique TLS/SSL settings specific to different services by adding prefixes, such as confluent.metrics.reporter, to ssl.truststore.xxx. To learn more about metrics reporting and SSL, see Monitor Kafka with Metrics Reporter in Confluent Platform and Configure TLS for Control Center on Confluent Platform.

confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/client.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=<truststore-password>
confluent.metrics.reporter.ssl.keystore.location=/var/ssl/private/client.keystore.jks
confluent.metrics.reporter.ssl.keystore.password=<keystore-password>
confluent.metrics.reporter.ssl.key.password=<key-password>

Encryption

These settings (a subset of those shown above) define encryption, for which you need to generate the TLS/SSL certificates.

ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=<truststore-password>
ssl.keystore.location=/var/ssl/private/client.keystore.jks
ssl.keystore.password=<keystore-password>
Demo Script for Generating Certificates

A demo script for generating certificates is available here. To adapt the script, change the names of the machine(s) or Macbook(s) to match your deployment.

The script generates a “truststore” that defines which certificates you can trust and a “keystore” that defines who you are. Whenever two entities are talking to each other, they use these certificates to verify identities.

Generate the client truststore.jks and client.keystore.jks, put these files in some location on your machine(s) (local/on-premises or on a cloud platform), and specify the locations in the encryption section.

Another demo script for generating certificates and keys is also provided in Creating TLS/SSL Keys and Certificates.

Authentication

These settings define the authentication:

ssl.key.password=<key-password>
ssl.client.authentication=required
ssl.endpoint.identification.algorithm=HTTPS

Principal Mapping Rules for TLS/SSL Listeners (Extract a Principal from a Certificate)

The TLS/SSL certificate defines a value for the LDAP distinguished name (dname) that is the TLS/SSL user name, in the form CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown.

When you generate certificates, you decide what to put for the value of CN (common name) for the dname.

It is difficult to create ACLs or role bindings from this complex name, so you can use a supported rule to extract only a part of the distinguished name in the certificate to create the ACL or RBAC role binding.

In the Kafka server properties file, under TLS/SSL LISTENERS, apply the given regular expression (regex) to the principal to extract only the first CN for machine name (internal and external in the example) from the dname stored in the certificate.

############################# TLS/SSL LISTENERS #############################
listener.name.internal.ssl.principal.mapping.rules= \
        RULE:^CN=([a-zA-Z0-9.]*).*$/$1/L ,\
        DEFAULT

listener.name.external.ssl.principal.mapping.rules= \
        RULE:^CN=([a-zA-Z0-9.]*).*$/$1/L ,\
        DEFAULT

How regular expressions (regex) work

By default, the TLS/SSL user name is in the form CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown. This configuration allows a list of rules for mapping the X.500 distinguished name (DN) to short name. The rules are evaluated in order and the first rule that matches a DN is used to map it to a short name. Any later rules in the list are ignored.

The format of ssl.principal.mapping.rules is a list where each rule starts with “RULE:” and contains an expression using the formats below. The default rule returns string representation of the X.500 certificate DN. If the DN matches the pattern, then the replacement command is run over the name.

  • Shorthand character classes are supported using double backslahes, for example, \\d for digits, \\w for word characters, \\s for whitespace, and \\p{L} for Unicode letters.
  • To use a backslash (\) in the pattern or replacement, escape it with an additional backslash (\\). For example, to match a dot (.), use \\..
  • You can force the translated result to be all lowercase or uppercase case by adding an /L or /U to the end of the rule:
RULE:pattern/replacement/
RULE:pattern/replacement/[LU]

Example ssl.principal.mapping.rules values are:

RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
DEFAULT

These rules translate the DN as follows: CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown to serviceuser and CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown to adminuser@admin.

Enable Token-based Authentication

Configure another listener on Kafka which will accept tokens for impersonation with MDS. In this example, we call the listener “token”, but you can name it anything.

This is for MDS to authenticate to Kafka on behalf of a client.

  • Kafka talks to MDS and gets a token from MDS.
  • Kafka passes the token to the “token” listener.
  • Listener looks at the token, and verifies that it as an authenticated client.
############################# TOKEN LISTENER #############################
listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER
listener.name.token.oauthbearer.sasl.jaas.config= \
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        publicKeyPath="<path-to-mds-public-key.pem>";
listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler

Configure the Authorizer

The authorizer is running in Kafka.

It implements the authorization mechanism; that is, decides whether a given principal, such as “Bob”, can access something or not.

############################# Authorization Settings #############################
authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT
super.users=User:kafka

Authorizer Class

The main property here is authorizer.class.name which defines the authorizer you want to use:

Rule Providers

confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT

Tip

As of release Confluent Platform. 5.4, ZK_ACL replaces ACL and CONFLUENT replaces RBAC for valid values for the rule providers.

This setting specifies ACL rule providers. You can enable one or both:

Super Users

Define which users are super users.

super.users=User:kafka

The super.users configs define which users have global permissions. If a user has super user permissions, MDS does not need to check any further, the user can do anything. You must include in super.users at least one principal used for interbroker communication. Also, you can optionally define separate principals per broker node.

In this example, a single super user kafka is used for all interbroker communication. Whenever a user called “kafka” authenticates to a broker, the authorizer checks to see if “kafka” has permissions to do some operation. But before doing anything else, it checks to see if “kafka” is in the list of super users. This is the user in the TLS/SSL certificate that Kafka is using to talk to other brokers over the internal TLS/SSL port.

The user names here were generated from the previously-explained certs-create.sh certificate script, under demo scripts in Kafka TLS/SSL Settings. To match this example, when you generate the certificate for Kafka, you would define kafka for CN in the dname: -dname "CN=kafka,OU=QE,O=CONFLUENT,L=PaloAlto,ST=Ca,C=US"

Super users are also discussed in the documentation on ACLs.

MDS Configs

The following sections describe the configuration for the Metadata Service (MDS). Some things are shared, like super users, and run in the same Java process, but generally the above configs are for Kafka, everything in this section relates to Confluent MDS, even though you configure it in the same properties file.

MDS allows you to create role bindings on resources and principals, and give you tokens that you can later use to authenticate to the OAUTHBEARER listener.

Specify HTTPS Listeners for MDS

Specify HTTP protocol with TLS/SSL encryption for MDS as follows:

#############################  MDS Listener - which port to listen on #############################
confluent.metadata.server.listeners=https://0.0.0.0:8090,http://0.0.0.0:8091
confluent.metadata.server.advertised.listeners=https://<localhost>:8090,\
                                               http://<localhost>:8091
  • confluent.metadata.server.listeners: MDS will listen on ports 8090 and 8091 per this example.
  • confluent.metadata.server.advertised.listeners: Advertised ports are for clients interacting with MDS over the internet. When a client asks “how can I talk to you?” MDS provides host addresses (advertised) and ports.

TLS/SSL Settings for MDS

You must provide these keys and encryption on the machine where MDS is running because you specified HTTPS as one of the MDS listeners. To make the SSL encryption work, specify the appropriate keystore and truststore.

The following configures TLS/SSL for how clients talk to MDS over HTTPS.

##### TLS/SSL Settings for MDS #####
confluent.metadata.server.ssl.keystore.location=/var/ssl/private/client.truststore.jks
confluent.metadata.server.ssl.keystore.password=<keystore-password>
confluent.metadata.server.ssl.key.password=<key-password>
confluent.metadata.server.ssl.truststore.location=/var/ssl/private/kafka.keystore.jks
confluent.metadata.server.ssl.truststore.password=<truststore-password>

This is configurable on many layers, in that you could generate multiple certificates for various components. For example, using the prefixes, you could have separate certificates for MDS and Kafka, separate certificates for multiple internal and external listeners, and so forth. You could generate another certificate for MDS and name it with “mds” in the names: confluent.metadata.server.ssl.truststore.location=/var/ssl/private/mds.keystore.jks (replacing “kafka” with “mds” to distinguish it as such). In this case, the user would be called “mds” not “kafka” in the certificate, and you would likely add mds to the list of super users.

Enable and Configure MDS Token Generation

MDS can give a token in exchange for a user name and password, and also can accept a token from a user or client and authenticate that principal by the token (which can be used after the first time to continue to authenticate).

The tokens are used to authenticate to the Kafka-configured OAUTHBEARER listener. This example shows how to enable and configure the MDS token service.

############################# MDS Token Service Settings - enable token generation #############################
confluent.metadata.server.token.max.lifetime.ms=3600000
confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>
confluent.metadata.server.token.signature.algorithm=RS256
confluent.metadata.server.authentication.method=BEARER
  • Set time for how long the token stays valid (1 hour max, in milliseconds): confluent.metadata.server.token.max.lifetime.ms=3600000)
  • Specify public, private keys for the token (which are both saved on the machine where MDS is running).
    • The key to encrypt the token (when you issue you a token): confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>:
  • Define the encryption algorithm: confluent.metadata.server.token.signature.algorithm=RS256
  • After Confluent Platform 5.3, you must specify that you support token authentication on the MDS side: confluent.metadata.server.authentication.method=BEARER
MDS Token and OAUTHBEARER Walkthrough
  • MDS encrypts the token using the private key, and a client, like Confluent Control Center for example, takes that token and passes it on to the OAUTHBEARER listener, who decrypts it with the public key.
  • You can use a script to generate keys for the Swagger interface to test authentication (both login via HTTPs user name and password, and token exchange).
  • When a user logs into Control Center, using username and password credentials (authentication), Control Center gets a token by user name, and then uses that token in communication with Kafka to show only the information that user is authorized to see.

On the MDS side,

  1. User/client makes a call to Control Center, user: fry, password: future
  2. Control Center calls MDS, authenticates, user: fry, password: future
  3. MDS returns an encrypted token for user: fry, password: future
  4. Control Center calls Kafka with a token for user: fry, password: future, and a request for a list of topics

On the Kafka side:

  • Authentication: decrypt a token, and extract the principal (if Kafka can decrypt a token, it trusts the token, because it trusts that it was encrypted by MDS, and while doing the encryption, MDS actually verified the credentials.)
  • Authorization: as usual, checks if the principal has permissions

Configuring LDAP Settings

LDAP is a protocol that defines the API and contract of a directory service.

You can store records in a directory based on a hierarchy and each record has many attributes, and structure of the records, how to create groups (distinguished name, common name)

LDAP configurations will vary a great deal, depending on many factors including:

  • your installation
  • the distribution or implementation of LDAP you are using (for example, Microsoft Active Directory, Apache Directory Service, Open LDAP)
  • which attributes are defined per user
  • LDAP configurations, such as the hierarchy you use, how you store users and groups, and so on

A basic set of LDAP configurations are shown below.

############################# Identity Provider Settings(LDAP - local OpenLDAP) #############################
ldap.java.naming.factory.initial=com.sun.jndi.ldap.LdapCtxFactory
ldap.com.sun.jndi.ldap.read.timeout=3000
ldap.java.naming.provider.url=ldap://<ldap-host>
# how mds authenticates to ldap server
ldap.java.naming.security.principal=CN=mds,CN=Demo,DC=confluent,DC=io
ldap.java.naming.security.credentials=<password>
ldap.java.naming.security.authentication=simple
# ldap search mode (GROUPS is default)
#ldap.search.mode=GROUPS
#ldap.search.mode=USERS
# how to search for users
ldap.user.search.base=CN=Demo,DC=confluent,DC=io
# how to search for groups
ldap.group.search.base=CN=Demo,DC=confluent,DC=io
# which attribute in ldap record corresponds to user name
ldap.user.name.attribute=sAMAccountName
ldap.user.memberof.attribute.pattern=CN=(.*),CN=Demo,DC=confluent,DC=io
ldap.group.object.class=group
ldap.group.name.attribute=sAMAccountName
ldap.group.member.attribute.pattern=CN=(.*),CN=Demo,DC=confluent,DC=io

Tip

In the example above, sAMAccountName is specific to Microsoft Active Directory. Modify the above configuration with settings specific to your LDAP.

To test this, you can run one of the following demos:

Or use this Open LDAP Docker image , which is the same LDAP used for the RBAC Docker demo.

For more on configuring LDAP, see Configure Confluent Server Authorizer in Confluent Platform and Configure LDAP Group-Based Authorization in Confluent Platform.