Configure mTLS Authentication and RBAC for Kafka Brokers

This configuration shows how to configure Kafka brokers with mutual TLS (mTLS) authentication and role-based access control (RBAC) through the Confluent Metadata Service (MDS). mTLS provides two-way authentication to ensure that traffic between clients and the MDS is secure, and that you can trust content coming from both directions. In this example:

  • Clients communicating with Kafka can use mTLS (encryption plus authentication).
  • Clients communicating with MDS use TLS (encryption) and basic authentication.

You can use this walk-through as a template for configuring other combinations of authentication and RBAC. The general pattern is:

  1. Configure properties for authentication, or if this is already configured, skip to the steps to configure the authorizer.
  2. Decide which authorizer to use (in this case, ConfluentServerAuthorizer, which provides RBAC and support for centralized ACLs). Authorization is defined in authorizer.class.name.
  3. Enable MDS.
  4. Add a directory service (such as LDAP).

Terminology

Following is a quick review of terms. To further help put these technologies in context, see the blog post, Apache Kafka Security 101.

Authentication

Authentication is a security mechanism that asks the question “who are you?” You can use different security protocols to authenticate (identify) the principal who is trying to get access to platform resources.

  • PLAINTEXT, in the context of Kafka brokers, means no authentication and no encryption. Data is sent in “plain” or “clear” text.

  • Secure Sockets Layer (SSL), and its newer incarnation Transport Layer Security (TLS), is a protocol for securing encrypted communication between entities. Kafka (like Java) still uses the term SSL in configuration and code. SSL can be configured for encryption only, or encryption and two-way authentication (mTLS). This involves generating certificates and using a certificate authority. mTLS is authentication with SSL. To learn more about SSL, see Encryption with SSL and Encryption and Authentication with SSL.

    Protocol Security Provided Implementation
    SSL encryption TLS
    SSL encryption + authentication mTLS
  • SASL is a general protocol for authentication which can support multiple mechanisms and implementations such as Kerberos, PLAIN, and SCRAM on the same port (for example, 9092). To learn more about SASL, see Authentication with SASL.

    Protocol Security Provided
    SASL (PLAIN) authentication, no encryption
    SASL (authentication), SSL (encryption) encryption + authentication

    Tip

    • You must use different ports for SASL and mTLS; these cannot run on the same port.
    • The SASL mechanism “PLAIN” is not the same as SSL “PLAINTEXT”. For more information, see Configuring PLAIN.

Authorization

Authorization is a security mechanism that asks the question “Do you have permissions for xyz (action, resource)?” RBAC determines client privileges and enables granular access controls based on granted roles. RBAC can be used in combination with ACLs.

Overview and Prerequisites

These examples and Walkthrough assume that you have:

As a part of the configuration, and to follow along with the tasks covered here, you will:

Configuration in a Nutshell

The example below shows configurations for authentication with mTLS and authorization with RBAC over MDS in a single etc/kafka/server.properties file, therefore a single broker configuration.

Tip

You need a Kafka configuration file for each broker. For example, to create three brokers, create three configuration files.

Authentication with mTLS uses three ports:

  • One port with mTLS for inter-brokers
  • One port with mTLS for external to clients
  • One port for token services (enables impersonation by MDS)

Port 8090 is used for MDS HTTPS connections.

Tip

Technically, only a single port is needed to run a broker.

  • Kafka requires at least one open port for client connections and inter-broker communication. This port must have mTLS.
  • A second port with mTLS is shown in the example as an operational convenience, allowing for better traffic control, as it can be configured to listen on different hostnames (for example, one internal, another external).
  • A third port for token services is not strictly mandatory either, but since it enables impersonation by MDS, this is a requirement for full RBAC functionality.

Line by line descriptions are shown at the end of the example.

Note

Most configuration attributes show example values in <>, which can be helpful in terms of understanding the type of value expected. Users are expected to replace the example with values matching their own setup. Values displayed without <> can be used as recommended values.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
 ############################# Broker Settings ##################################
 zookeeper.connect=<host-1>:2181,<host-2>:2181,<host-3>:2181
 log.dirs=/var/lib/kafka/data
 broker.id=1

 ############################# Group Coordinator Settings #######################
 group.initial.rebalance.delay.ms=0

 ############################# Log Retention Policy, Log Basics ##################
 log.retention.check.interval.ms=300000
 log.retention.hours=168
 log.segment.bytes=1073741824
 num.io.threads=16
 num.network.threads=8
 num.partitions=1
 num.recovery.threads.per.data.dir=2

 ########################### Socket Server Settings #############################
 socket.receive.buffer.bytes=102400
 socket.request.max.bytes=104857600
 socket.send.buffer.bytes=102400

 ############################# Internal Topic Settings  #########################
 offsets.topic.replication.factor=3
 transaction.state.log.min.isr=2
 transaction.state.log.replication.factor=3

 ######################## Metrics Reporting ########################################
 metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
 confluent.metrics.reporter.bootstrap.servers=<address>-west-2.compute.internal:9092
 confluent.metrics.reporter.topic.replicas=3
 confluent.support.metrics.enable=true
 confluent.support.customer.id=anonymous

 ######################## LISTENERS ######################################
 listeners=INTERNAL://:9092,EXTERNAL://:9093,TOKEN://:9094
 advertised.listeners=INTERNAL://<localhost>:9092,\
                      EXTERNAL://<external-hostname>:9093,\
                      TOKEN://<external-hostname>:9094
 listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL,TOKEN:SASL_SSL

 inter.broker.listener.name=INTERNAL

 ############################ SSL SETTINGS #####################################
 ssl.truststore.location=/var/ssl/private/client.truststore.jks
 ssl.truststore.password=<truststore-password>
 ssl.keystore.location=/var/ssl/private/kafka.keystore.jks
 ssl.keystore.password=<keystore-password>
 ssl.key.password=<key-password>
 ssl.client.auth=required
 ssl.endpoint.identification.algorithm=HTTPS

 ##############  SSL settings for metrics reporting ##############
 confluent.metrics.reporter.security.protocol=SSL
 confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/client.truststore.jks
 confluent.metrics.reporter.ssl.truststore.password=<truststore-password>
 confluent.metrics.reporter.ssl.keystore.location=/var/ssl/private/kafka.keystore.jks
 confluent.metrics.reporter.ssl.keystore.password=<keystore-password>
 confluent.metrics.reporter.ssl.key.password=<key-password>

 ############################# SSL LISTENERS #############################
 listener.name.internal.ssl.principal.mapping.rules= \
         RULE:^CN=([a-zA-Z0-9.]*).*$/$1/L ,\
         DEFAULT

 listener.name.external.ssl.principal.mapping.rules= \
         RULE:^CN=([a-zA-Z0-9.]*).*$/$1/L ,\
         DEFAULT

 ############################# TOKEN LISTENER #############################
 listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER
 listener.name.token.oauthbearer.sasl.jaas.config= \
     org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
         publicKeyPath="<path-to-mds-public-key.pem>";
 listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
 listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler

 ############################# Authorization Settings #############################
 authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
 confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT
 super.users=User:kafka

 #############################  MDS Listener - which port to listen on #############################
 confluent.metadata.server.listeners=https://0.0.0.0:8090,http://0.0.0.0:8091
 confluent.metadata.server.advertised.listeners=https://<localhost>:8090,\
                                                http://<localhost>:8091

 ############################# SSL Settings for MDS #############################
 confluent.metadata.server.ssl.keystore.location=<path-to-kafka.keystore.jks>
 confluent.metadata.server.ssl.keystore.password=<keystore-password>
 confluent.metadata.server.ssl.key.password=<key-password>
 confluent.metadata.server.ssl.truststore.location=<path-to-client.truststore.jks>
 confluent.metadata.server.ssl.truststore.password=<truststore-password>

 ############################# MDS Token Service Settings - enable token generation #############################
 confluent.metadata.server.token.max.lifetime.ms=3600000
 confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>
 confluent.metadata.server.token.signature.algorithm=RS256
 confluent.metadata.server.authentication.method=BEARER

 ############################# Identity Provider Settings(LDAP - local OpenLDAP) #############################
 ldap.java.naming.factory.initial=com.sun.jndi.ldap.LdapCtxFactory
 ldap.com.sun.jndi.ldap.read.timeout=3000
 ldap.java.naming.provider.url=ldap:<ldap-server-address>
 # how mds authenticates to ldap server
 ldap.java.naming.security.principal=<CN=mds,CN=Demo,DC=confluent,DC=io>
 ldap.java.naming.security.credentials=<password>
 ldap.java.naming.security.authentication=simple
 # ldap search mode (GROUPS is default)
 #ldap.search.mode=GROUPS
 #ldap.search.mode=USERS
 # how to search for users
 ldap.user.search.base=<CN=Demo,DC=confluent,DC=io>
 # how to search for groups
 ldap.group.search.base=<CN=Demo,DC=confluent,DC=io>
 # which attribute in ldap record corresponds to user name
 ldap.user.name.attribute=sAMAccountName
 ldap.user.memberof.attribute.pattern=<CN=(.*),CN=Demo,DC=confluent,DC=io>
 ldap.group.object.class=group
 ldap.group.name.attribute=sAMAccountName
 ldap.group.member.attribute.pattern=<CN=(.*),CN=Demo,DC=confluent,DC=io>

 ########################### Enable Swagger #############################
 confluent.metadata.server.openapi.enable=true

Line Number Callouts for Config Sections

Configuring Brokers, Kafka Listeners, Authentication, and Authorization mechanism

  • Lines 36-42: Under LISTENERS, define three Kafka listeners and configure HTTPS for the broker, keyed to the security protocols the listeners will use. See Configure Listeners.
  • Lines 45-51: Under SSL SETTINGS, define top-level SSL settings for Kafka that can be shared and apply to all listeners. To define more granular SSL settings per entity, you would use a different set of prefixed configs and generate different certificates for each one. The first four lines define encryption for which you need to generate SSL certificates, and the last three lines specify authentication. See Kafka SSL Settings.
  • Lines 54-59: Also under SSL SETTINGS, in sub-section SSL settings for metrics reporting, is an example of setting up SSL for monitoring metrics. See Kafka SSL Settings.
  • Lines 62-68: Under SSL LISTENERS, apply a regular expression (regex) to the distinguished name (dname) for the principal in the certificate to extract only the first CN (common name) for machine name. See Principal Mapping Rules for SSL Listeners (Extract a Principal from a Certificate)
  • Lines 71-76: Under TOKEN LISTENER, configure another listener on Kafka which will accept tokens for impersonation by MDS using OAUTHBEARER. See Configure a Listener to Enable Impersonation by MDS on Behalf of a Client (OAUTHBEARER)
  • Lines 79-81: Under Authorization Settings, define the authorization mechanism as ConfluentServerAuthorizer, which includes support for RBAC and centralized ACLs. See Configure the Authorizer.

Configuring MDS

  • Lines 84-86: Under MDS Listener, define an MDS listener that uses HTTP protocol with SSL encryption (HTTPS) to talk to clients. See Specify HTTPS Listeners for MDS.
  • Lines 89-93: Under SSL Settings for MDS, define encryption and authentication to map to the MDS HTTPS listener. See SSL Settings for MDS.
  • Lines 96-99: MDS can provide tokens in exchange for client credentials as well as accept tokens from clients to authenticate. See Enable and Configure MDS Token Generation. Under MDS Token Service Settings, enable and configure the token service, set token lifetime, specify public/private key paths on the machine where MDS is running and encryption algorithm. See Enable and Configure MDS Token Generation.
  • Lines 102-121: Under Identity Provider Settings, configure LDAP so that RBAC can use it. See Configuring LDAP Settings.

Optional Testing and Troubleshooting

  • Line 124: Under Enable Swagger, set confluent.metadata.server.openapi.enable to true if you want to use the Swagger UI to test calls to Kafka. This is not specific to this mTLS configuration with RBAC example.

    Note

    Do not enable the Swagger UI in production. It is limited to HTTP, so will not work with HTTPS or custom certificates. That said, you might find it useful for initial setup and testing.

How to run it

  1. Start ZooKeeper.

    sudo <path-to-confluent>/bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
    
  2. Start Kafka with the etc/kafka/server.properties you just configured.

    <path-to-confluent>/bin/kafka-server-start <path-to-confluent>/etc/kafka/server.properties
    

To learn more, see Start Confluent Platform and how to install and run Confluent Platform.

You can configure clients like Schema Registry, Control Center, ksqlDB, and Connect to talk to Kafka and MDS over HTTPS in their respective properties files.

../../../_images/security-rbac-mtls.png

Deep Dive on mTLS and RBAC configs

This Deep Dive goes into more detail about the same configurations summarized in the nutshell section above.

Kafka Server Properties File

The primary focus of this walkthrough are configs defined in ./etc/kafka/<server.properties>. After you have all configurations set up, you would run the following command to Kafka using your properties file:

./bin/kafka-server-start ./etc/kafka/<server.properties>

Configure Listeners

Define which ports Kafka should listen on, using the format <NAME>://<PORT>.

By default we provide one Kafka listener on 9092.

You need at least this, and you need an advertised listener for clients over the internet.

######################## LISTENERS ######################################
listeners=INTERNAL://:9092,EXTERNAL://:9093,TOKEN://:9094
advertised.listeners=INTERNAL://<localhost>:9092,\
                     EXTERNAL://<external-hostname>:9093,\
                     TOKEN://<external-hostname>:9094
listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL,TOKEN:SASL_SSL

inter.broker.listener.name=INTERNAL

Tip

  • By default, Kafka uses 9092.
  • You can use any free port on your system, and give custom names as desired.
  • Name used to have to match the security protocol, but this is no longer a requirement.

listeners - Local ports available for other cloud machines on the same network. The example defines three ports with descriptive names.

advertised listeners - Keyed by name to listeners, advertised ports are for clients interacting with these servers over the internet. When a client asks how can I talk to you, you provide the host address (advertised) and port.

listener.security.protocol.map - Maps listener name to security protocol

  • INTERNAL:SSL - SSL authentication
  • EXTERNAL:SSL - SSL authentication
  • TOKEN:SASL_SSL - SASL authentication with SSL encryption

When Kafka starts, it grabs the specified ports to listen on. Kafka clients, such as producers and consumers, can reach Kafka on those ports.

Kafka SSL Settings

Tip

To learn more about configuring SSL, see Encryption with SSL and Encryption and Authentication with SSL.

To configure SSL:

  1. Create certificates.
  2. Configure SSL in the properties file.

The SSL settings shown below are top-level configurations for Kafka that define encryption and authentication, and apply to listeners because they are not prefixed to point to individual listeners.

############################ SSL SETTINGS #####################################
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=<truststore-password>
ssl.keystore.location=/var/ssl/private/client.keystore.jks
ssl.keystore.password=<keystore-password>
ssl.key.password=<key-password>
ssl.client.auth=required
ssl.endpoint.identification.algorithm=HTTPS

You could configure SSL separately by generating individual certificates for each listener; but that is not done for this example.

These SSL settings can also be shared by MDS if you do not configure MDS specific prefixed SSL settings.

If you wanted to define a setting specific to a listener, you would use the following format for the prefix: listener.name.<name>.<SETTING>=<value>. For example, to specify values for INTERNAL:

listener.name.internal.ssl.truststore.location=/var/ssl/private/client.truststore.jks
listener.name.internal.ssl.truststore.password=<truststore-password>
listener.name.internal.ssl.keystore.location=/var/ssl/private/client.keystore.jks
listener.name.internal.ssl.keystore.password=<keystore-password>
listener.name.internal.ssl.key.password=<key-password>
listener.name.internal.ssl.client.auth=required
listener.name.internal.ssl.endpoint.identification.algorithm=HTTPS

Also included in this section are SSL settings for metrics reporting. These are not specific to this mTLS configuration with RBAC example, but included here to show that you can configure unique SSL settings specific to different services by adding prefixes, such as confluent.metrics.reporter, to ssl.truststore.xxx. To learn more about metrics reporting and SSL, see Confluent Metrics Reporter and Configuring SSL.

confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/client.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=<truststore-password>
confluent.metrics.reporter.ssl.keystore.location=/var/ssl/private/client.keystore.jks
confluent.metrics.reporter.ssl.keystore.password=<keystore-password>
confluent.metrics.reporter.ssl.key.password=<key-password>

Encryption

These settings (a subset of those shown above) define encryption, for which you need to generate the SSL certificates.

ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=<truststore-password>
ssl.keystore.location=/var/ssl/private/client.keystore.jks
ssl.keystore.password=<keystore-password>
Demo Script for Generating Certificates

A demo script for generating certificates is available here. To adapt the script, change the names of the machine(s) or Macbook(s) to match your deployment.

The script generates a “truststore” that defines which certificates you can trust and a “keystore” that defines who you are. Whenever two entities are talking to each other, they use these certificates to verify identities.

Generate the client truststore.jks and client.keystore.jks, put these files in some location on your machine(s) (local/on-premises or on a cloud platform), and specify the locations in the encryption section.

Another demo script for generating certificates and keys is also provided in Creating SSL Keys and Certificates.

Authentication

These settings define the authentication:

ssl.key.password=<key-password>
ssl.client.auth=required
ssl.endpoint.identification.algorithm=HTTPS

Principal Mapping Rules for SSL Listeners (Extract a Principal from a Certificate)

The SSL certificate defines a value for the LDAP distinguished name (dname) that is the SSL user name, in the form CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown.

When you generate certificates, you decide what to put for the value of CN (common name) for the dname.

It is difficult to create ACLs or role bindings from this complex name, so you can use a supported rule to extract only a part of the distinguished name in the certificate to create the ACL or role binding.

In the Kafka server properties file, under SSL LISTENERS, apply the given regular expression (regex) to the principal to extract only the first CN for machine name (internal and external in the example) from the dname stored in the certificate.

############################# SSL LISTENERS #############################
listener.name.internal.ssl.principal.mapping.rules= \
        RULE:^CN=([a-zA-Z0-9.]*).*$/$1/L ,\
        DEFAULT

listener.name.external.ssl.principal.mapping.rules= \
        RULE:^CN=([a-zA-Z0-9.]*).*$/$1/L ,\
        DEFAULT

How the regex works

By default, the SSL user name is in the form CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown. This configuration allows a list of rules for mapping the X.500 distinguished name (DN) to short name. The rules are evaluated in order and the first rule that matches a DN is used to map it to a short name. Any later rules in the list are ignored.

The format of ssl.principal.mapping.rules is a list where each rule starts with “RULE:” and contains an expression using the formats below. The default rule returns string representation of the X.500 certificate DN. If the DN matches the pattern, then the replacement command is run over the name. This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding an /L or /U to the end of the rule:

RULE:pattern/replacement/
RULE:pattern/replacement/[LU]

Example ssl.principal.mapping.rules values are:

RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
DEFAULT

These rules translate the DN as follows: CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown to serviceuser and CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown to adminuser@admin.

Configure a Listener to Enable Impersonation by MDS on Behalf of a Client (OAUTHBEARER)

Configure another listener on Kafka which will accept tokens for impersonation by MDS. In this example, we call the listener “token”, but you can name it anything.

This is for MDS to authenticate to Kafka on behalf of a client.

  • Kafka talks to MDS and gets a token from MDS.
  • Kafka passes the token to the “token” listener.
  • Listener looks at the token, and verifies that it as an authenticated client.
############################# TOKEN LISTENER #############################
listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER
listener.name.token.oauthbearer.sasl.jaas.config= \
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        publicKeyPath="<path-to-mds-public-key.pem>";
listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler

Configure the Authorizer

The authorizer is running in Kafka.

It implements the authorization mechanism; that is, decides whether a given principal, such as “Bob”, can access something or not.

############################# Authorization Settings #############################
authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT
super.users=User:kafka

Authorizer Class

The main property here is authorizer.class.name which defines the authorizer you want to use:

Rule Providers

confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT

Tip

As of release Confluent Platform. 5.4, ZK_ACL replaces ACL and CONFLUENT replaces RBAC for valid values for the rule providers.

This setting specifies ACL rule providers. You can enable one or both:

  • CONFLUENT - Enable role bindings, enable centralized ACLs also managed through MDS, but not part of RBAC, stored in Kafka.
  • ZK_ACL - Pre RBAC ACLs described in Authorization using ACLs.

Super Users

Define which users are super users.

super.users=User:kafka

The super.users configs define which users have global permissions. If a user has super user permissions, MDS does not need to check any further, the user can do anything. You must include in super.users at least one principal used for inter-broker communication. Also, you can optionally define separate principals per broker node.

In this example, a single super user kafka is used for all inter-broker communication. Whenever a user called “kafka” authenticates to a broker, the authorizer checks to see if “kafka” has permissions to do some operation. But before doing anything else, it checks to see if “kafka” is in the list of super users. This is the user in the SSL certificate that Kafka is using to talk to other brokers over the internal SSL port.

The user names here were generated from the previously-explained certs-create.sh certificate script, under demo scripts in Kafka SSL Settings. To match this example, when you generate the certificate for Kafka, you would define kafka for CN in the dname: -dname "CN=kafka,OU=QE,O=CONFLUENT,L=PaloAlto,ST=Ca,C=US"

Super Users are also discussed in the documentation on ACLs.

MDS Configs

The following sections describe the configuration for the Metadata Service (MDS). Some things are shared, like super users, and run in the same Java process, but generally the above configs are for Kafka, everything in this section relates to Confluent MDS, even though you configure it in the same properties file.

MDS allows you to create role bindings on resources and principals, and give you tokens that you can later use to authenticate to the OAUTHBEARER listener.

Specify HTTPS Listeners for MDS

Specify HTTP protocol with SSL encryption for MDS as follows:

#############################  MDS Listener - which port to listen on #############################
confluent.metadata.server.listeners=https://0.0.0.0:8090,http://0.0.0.0:8091
confluent.metadata.server.advertised.listeners=https://<localhost>:8090,\
                                               http://<localhost>:8091
  • confluent.metadata.server.listeners: MDS will listen on ports 8090 and 8091 per this example.
  • confluent.metadata.server.advertised.listeners: Advertised ports are for clients interacting with MDS over the internet. When a client asks “how can I talk to you?” MDS provides host addresses (advertised) and ports.

SSL Settings for MDS

You must provide these keys and encryption on the machine where MDS is running because you specified HTTPS as one of the MDS listeners. To make the SSL encryption work, specify the appropriate keystore and truststore.

The following configures SSL for how clients talk to MDS over HTTPS.

##### SSL Settings for MDS #####
confluent.metadata.server.ssl.keystore.location=/var/ssl/private/client.truststore.jks
confluent.metadata.server.ssl.keystore.password=<keystore-password>
confluent.metadata.server.ssl.key.password=<key-password>
confluent.metadata.server.ssl.truststore.location=/var/ssl/private/kafka.keystore.jks
confluent.metadata.server.ssl.truststore.password=<truststore-password>

This is configurable on many layers, in that you could generate multiple certificates for various components. For example, using the prefixes, you could have separate certificates for MDS and Kafka, separate certificates for multiple internal and external listeners, and so forth. You could generate another certificate for MDS and name it with “mds” in the names: confluent.metadata.server.ssl.truststore.location=/var/ssl/private/mds.keystore.jks (replacing “kafka” with “mds” to distinguish it as such). In this case, the user would be called “mds” not “kafka” in the certificate, and you would likely add mds to the list of super users.

Enable and Configure MDS Token Generation

MDS can give a token in exchange for a user name and password, and also can accept a token from a user or client and authenticate that principal by the token (which can be used after the first time to continue to authenticate).

The tokens are used to authenticate to the Kafka-configured OAUTHBEARER listener. This example shows how to enable and configure the MDS token service.

############################# MDS Token Service Settings - enable token generation #############################
confluent.metadata.server.token.max.lifetime.ms=3600000
confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>
confluent.metadata.server.token.signature.algorithm=RS256
confluent.metadata.server.authentication.method=BEARER
  • Set time for how long the token stays valid (1 hour max, in milliseconds): confluent.metadata.server.token.max.lifetime.ms=3600000)
  • Specify public, private keys for the token (which are both saved on the machine where MDS is running).
    • The key to encrypt the token (when you issue you a token): confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>:
  • Define the encryption algorithm: confluent.metadata.server.token.signature.algorithm=RS256
  • After Confluent Platform 5.3, you must specify that you support token authentication on the MDS side: confluent.metadata.server.authentication.method=BEARER
MDS Token and OAUTHBEARER Walkthrough
  • MDS encrypts the token using the private key, and a client, like Confluent Control Center for example, takes that token and passes it on to the OAUTHBEARER listener, who decrypts it with the public key.
  • You can use a script to generate keys for the Swagger interface to test authentication (both login via HTTPs user name and password, and token exchange).
  • When a user logs into Control Center, using username and password credentials (authentication), Control Center gets a token by user name, and then uses that token in communication with Kafka to show only the information that user is authorized to see.

On the MDS side,

  1. Make a call to Control Center, user: fry, password: future
  2. Control Center calls MDS, authenticates, user: fry, password: future
  3. MDS returns an encrypted token for user: fry, password: future
  4. Control Center calls Kafka with a token for user: fry, password: future, and a request for a list of topics

On the Kafka side:

  • Authentication: decrypt a token, and extract the principal (if Kafka can decrypt a token, it trusts the token, because it trusts that it was encrypted by MDS, and while doing the encryption, MDS actually verified the credentials.)
  • Authorization: as usual, checks if the principal has permissions

Configuring LDAP Settings

LDAP is a protocol that defines the API and contract of a directory service.

You can store records in a directory based on a hierarchy and each record has many attributes, and structure of the records, how to create groups (distinguished name, common name)

LDAP configurations will vary a great deal, depending on many factors including:

  • your installation
  • the distribution or implementation of LDAP you are using (for example, Microsoft Active Directory, Apache Directory Service, Open LDAP)
  • which attributes are defined per user
  • LDAP configurations, such as the hierarchy you use, how you store users and groups, and so on

A basic set of LDAP configurations are shown below.

############################# Identity Provider Settings(LDAP - local OpenLDAP) #############################
ldap.java.naming.factory.initial=com.sun.jndi.ldap.LdapCtxFactory
ldap.com.sun.jndi.ldap.read.timeout=3000
ldap.java.naming.provider.url=ldap://<ldap-host>
# how mds authenticates to ldap server
ldap.java.naming.security.principal=CN=mds,CN=Demo,DC=confluent,DC=io
ldap.java.naming.security.credentials=<password>
ldap.java.naming.security.authentication=simple
# ldap search mode (GROUPS is default)
#ldap.search.mode=GROUPS
#ldap.search.mode=USERS
# how to search for users
ldap.user.search.base=CN=Demo,DC=confluent,DC=io
# how to search for groups
ldap.group.search.base=CN=Demo,DC=confluent,DC=io
# which attribute in ldap record corresponds to user name
ldap.user.name.attribute=sAMAccountName
ldap.user.memberof.attribute.pattern=CN=(.*),CN=Demo,DC=confluent,DC=io
ldap.group.object.class=group
ldap.group.name.attribute=sAMAccountName
ldap.group.member.attribute.pattern=CN=(.*),CN=Demo,DC=confluent,DC=io

Tip

In the example above, sAMAccountName is specific to Microsoft Active Directory. Modify the above configuration with settings specific to your LDAP.

To test this, you can run one of the following demos:

Or use this Open LDAP Docker image , which is the same LDAP used for the RBAC Docker demo.

For more on configuring LDAP, see Confluent LDAP Authorizer and Configuring the LDAP Authorizer.