Configure Metadata Service (MDS) in Confluent Platform¶
The Confluent Platform Metadata Service (MDS) manages a variety of metadata about your Confluent Platform installation. Specifically, the MDS:
- Hosts the cluster registry that enables you to keep track of which clusters you have installed.
- Serves as the system of record for cross-cluster authorization data (including RBAC, and centralized ACLs), and can be used for token-based authentication.
- Provides a convenient way to manage audit log configurations across multiple clusters.
- Can be used to authenticate data (note that client authentication is not supported).
You can set up the MDS internally within a Kafka cluster that serves other functions, and manage permissions in the same way that a database stores permissions for users logging into the database itself. You can also use the MDS to store user data. For the Kafka cluster hosting MDS, you must configure MDS on each Kafka broker, and you should synchronize these configurations across nodes.
You can also set up MDS on a dedicated Kafka cluster, servicing multiple worker Kafka clusters in such a way that security information is isolated away from client data. In the case of role-based access control (RBAC), the MDS offers a single, centralized configuration context that, after it is set up for a cluster, saves administrators from the complex and time-consuming task of defining and assigning roles for each resource on an individual basis. The MDS can enforce the rules for RBAC, centralized audit logs, centralized ACLs, and the cluster registry on its host Kafka cluster and across multiple secondary clusters (such as Kafka, Connect, and Schema Registry). So you can use a single Kafka cluster hosting MDS to manage and secure multiple secondary Kafka, Connect, Schema Registry, and ksqlDB clusters.
The MDS listens for commands using HTTP on the default port
1. MDS maintains a local cache of authorization data that is persisted to an
internal Kafka topic named _confluent-metadata-auth
.
Running on a Kafka broker, you can optionally integrate MDS with LDAP to provide authentication and refreshable bearer tokens for impersonation. Note that impersonation is restricted to Confluent components. For details about configuring LDAP integration with RBAC, see Configure LDAP Authentication.
This topic includes the following configuration tasks:
- Configure a primary Kafka cluster to host the MDS and role binding
- Configure a secondary Kafka cluster managed by the MDS of the primary Kafka cluster
Ready to get started?
- Sign up for Confluent Cloud, the fully managed cloud-native service for Apache Kafka® and get started for free using the Cloud quick start.
- Download Confluent Platform, the self managed, enterprise-grade distribution of Apache Kafka and get started using the Confluent Platform quick start.
Prerequisites¶
You must download self-managed Confluent Platform for your environment.
Active Directory (LDAP service) must be configured. The configurations in this topic are based on Microsoft Active Directory (AD). You must update these configurations to match your LDAP service.
Note
Nested LDAP groups are not supported.
Brokers running MDS must be configured with a separate listener for inter-broker communication. To access this listener, a user must be configured with ZooKeeper-based ACLs (not centralized ACLs) for authorization. If required, you can configure these users as
super.users
, but they cannot rely on access to resources using role-based or group-based access. The broker user must be configured as a super user or granted access using ACLs.Tip
To avoid broker listener or inter-broker communication issues after you have migrated from ZooKeeper-based ACLs to centralized ACLs, you may want to include broker users in the list of
super.users
to ensure access.Brokers will accept requests on the inter-broker listener port before the metadata for RBAC authorization has been initialized. However, requests on other ports are only accepted after the required metadata has been initialized, including any available LDAP metadata. Broker initialization only completes after all relevant metadata has been obtained and cached. When starting multiple brokers in an MDS cluster with a replication factor of 3 (default) for a metadata topic, at least three brokers must also be started simultaneously to enable initialization to complete on the brokers. Note that there is a timeout/retry limitation for this initialization, which you can specify in
confluent.authorizer.init.timeout.ms
. For details, refer to Configure Confluent Server Authorizer in Confluent Platform.REST Proxy services that integrate with AD/LDAP using MDS will use the user login name as the user principal for authorization decisions. By default, this is also the principal used by brokers for users authenticating using SASL/GSSAPI (Kerberos). If your broker configuration overrides
principal.builder.class
orsasl.kerberos.principal.to.local.rules
to create a different principal, the user principal used by brokers may be different from the principal used by other Confluent Platform components. In this case, you should configure ACLs and role bindings for your customized principal for broker resources.
Important
The user ID specified in group role bindings is case-specific, and must match the case specified in the AD record. Also note that when logging in as a super user, the login ID is also case-specific and must match the case specified for the user ID in role bindings.
Create a PEM key pair¶
You must create a PEM (Privacy-Enhanced Mail) key pair for use by the
token service. This key pair is added to your server.properties
file in
the next section.
Important
MDS only loads the PKCS#1 PEM key format, which can be recognized
by the presence of the RSA keyword in the header and footer of the key. The
openssl genrsa
command uses PKCS#1 as the default format until OpenSSL
1.1.1 is available.
Create a PEM key.
Note
The PEM key length depends on the encryption method you are using (AES-256 or RSA-4096), and the bits should be based on the needs/requirements of the method being used.
This example shows how to create the 2048-bit RSA private key, and stores the keys in a folder named
<path-to-tokenKeypair.pem>
(replace the path in<>
to reflect your setup).mkdir <path-to-tokenKeypair.pem> && openssl genrsa -out <path-to-tokenKeypair.pem> 2048
Extract the public key.
openssl rsa -in <path-to-tokenKeypair.pem> -outform PEM -pubout -out <path-to-public.pem>
Note
Only use OpenSSL to create the PEM key files. Do not use Keytool; it is not a valid option and will result in an error during startup.
Create a PKCS#8 key¶
Note that MDS supports an unencrypted PKCS#8 key, but this section discusses using encrypted PKCS#8 keys.
An encrypted private key can be encrypted using any of the multiple algorithms that are supported. When the token endpoint is configured for MDS, it loads the keypair at startup time. Loading an encrypted keypair involves reading the encrypted keypair file, reading the passphrase, and decrypting the keypair with the provided password.
When MDS tries to decrypt the key with the given passphrase, the configured security provider is used to run the decryption logic. A few encryption algorithms are supported by the default security provider present with Java. For keypairs encrypted with such algorithms, no further configuration is needed.
For encryption algorithms that require a specific security provider to be
configured, the security provider must be configured for the Kafka process
inside which MDS is running. For example, if the keypair is encrypted with
AES-256, a compatible security provider, such as the Bouncy Castle FIPS
security provider must be configured. For the Bouncy Castle FIPS provider,
you must specify the following configuration option in the Kafka server.properties
file:
security.providers=io.confluent.kafka.security.fips.provider.BcFipsProviderCreator
The Bouncy Castle FIPS provider supports two types of keystore formats: PKCS12 and BCFKS.
Note that when the Bouncy Castle FIPS provider is configured on an SSL-enabled Kafka cluster, additional steps are required to configure the provider.
For more information, see:
Default provider¶
To generate a PKCS#8-format encrypted keypair that works with the default security
provider, run the following openssl
commands. The first command generates a
private key in PKCS#8 format. The second command prompts for a password.
$ openssl genpkey -algorithm RSA -out private_key.pem
$ openssl pkcs8 -topk8 -in private_key.pem \
-out pkcs8KeypairEncrypted_PBE-SHA1-3DES.pem -v1 PBE-SHA1-3DES
Bouncy Castle FIPS provider¶
To generate a PKCS#8-format encrypted keypair that works with the Bouncy Castle
FIPS provider, run the following openssl
commands. The first command generates
a private key in PKCS#8 format. The second command prompts for a password.
$ openssl genpkey -algorithm RSA -out private_key.pem
$ openssl pkcs8 -topk8 -in rsakey.pem \
-out pvtkey-pkcs8-aes256.pem -v2 aes256
To see a sample configuration for the Bouncy Castle FIPS provider, expand the following section:
Sample configuration for Bouncy Castle FIPS provider
Example configuration for Bouncy Castle FIPS provider .. code-block:: text advertised.listeners=INTERNAL://REDACTED.us-west-2.compute.internal:9092,BROKER://REDACTED.us-west-2.compute.internal:9091,CUSTOM://REDACTED.us-west-2.compute.amazonaws.com:9093,TOKEN://REDACTED.us-west-2.compute.amazonaws.com:9094 authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer broker.id=1 confluent.ansible.managed=true confluent.authorizer.access.rule.providers=CONFLUENT,ZK_ACL confluent.balancer.topic.replication.factor=3 confluent.basic.auth.credentials.source=USER_INFO confluent.basic.auth.user.info=schema-registry:password confluent.license.topic=_confluent-command confluent.license.topic.replication.factor=3 confluent.metadata.server.advertised.listeners=https://REDACTED.us-west-2.compute.internal:8090 confluent.metadata.server.authentication.method=BEARER confluent.metadata.server.listeners=https://0.0.0.0:8090 confluent.metadata.server.ssl.key.password=REDACTED confluent.metadata.server.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks confluent.metadata.server.ssl.keystore.password=REDACTED confluent.metadata.server.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks confluent.metadata.server.ssl.truststore.password=REDACTED confluent.metadata.server.ssl.truststore.type=BCFKS confluent.metadata.server.ssl.keystore.type=BCFKS confluent.metadata.server.token.key.path=/var/ssl/private/encrypted_aes256_tokenKeypair.pem confluent.metadata.server.token.key.passphrase=REDACTED security.providers=io.confluent.kafka.security.fips.provider.BcFipsProviderCreator #confluent.metadata.server.security.providers=io.confluent.kafka.security.fips.provider.BcFipsProviderCreator confluent.metadata.server.token.max.lifetime.ms=3600000 confluent.metadata.server.token.signature.algorithm=RS256 confluent.metadata.topic.replication.factor=3 confluent.metrics.reporter.bootstrap.servers=REDACTED.us-west-2.compute.internal:9091,REDACTED.us-west-2.compute.internal:9091,REDACTED.us-west-2.compute.internal:9091 confluent.metrics.reporter.security.protocol=SSL confluent.metrics.reporter.ssl.key.password=REDACTED confluent.metrics.reporter.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks confluent.metrics.reporter.ssl.keystore.password=REDACTED confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks confluent.metrics.reporter.ssl.truststore.password=REDACTED confluent.metrics.reporter.ssl.keystore.type=BCFKS confluent.metrics.reporter.ssl.truststore.type=BCFKS confluent.metrics.reporter.topic.replicas=3 confluent.schema.registry.url=https://REDACTED.us-west-2.compute.internal:8081 confluent.security.event.logger.exporter.kafka.topic.replicas=3 confluent.ssl.key.password=REDACTED confluent.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks confluent.ssl.keystore.password=REDACTED confluent.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks confluent.ssl.truststore.password=REDACTED confluent.support.customer.id=anonymous confluent.support.metrics.enable=true group.initial.rebalance.delay.ms=3000 inter.broker.listener.name=BROKER kafka.rest.bootstrap.servers=REDACTED.us-west-2.compute.internal:9092,REDACTED.us-west-2.compute.internal:9092,REDACTED.us-west-2.compute.internal:9092 kafka.rest.client.security.protocol=SASL_SSL kafka.rest.client.ssl.key.password=REDACTED kafka.rest.client.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks kafka.rest.client.ssl.keystore.password=REDACTED kafka.rest.client.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks kafka.rest.client.ssl.truststore.password=REDACTED kafka.rest.client.ssl.keystore.type=BCFKS kafka.rest.client.ssl.truststore.type=BCFKS kafka.rest.confluent.metadata.basic.auth.user.info=pkcs8-7-5-x-74-test-cluster-main.kafka_erp:Confluent1! kafka.rest.confluent.metadata.bootstrap.server.urls=https://REDACTED.us-west-2.compute.internal:8090,https://REDACTED.us-west-2.compute.internal:8090,https://REDACTED.us-west-2.compute.internal:8090 kafka.rest.confluent.metadata.http.auth.credentials.provider=BASIC kafka.rest.confluent.metadata.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks kafka.rest.confluent.metadata.ssl.truststore.password=REDACTED kafka.rest.enable=true kafka.rest.kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension kafka.rest.public.key.path=/var/ssl/private/public.pem kafka.rest.rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler ldap.com.sun.jndi.ldap.read.timeout=3000 ldap.group.member.attribute.pattern=uid=(.),OU=rbac,DC=confluent,DC=io ldap.group.name.attribute=cn ldap.group.search.base=OU=rbac,DC=confluent,DC=io ldap.java.naming.factory.initial=com.sun.jndi.ldap.LdapCtxFactory ldap.java.naming.provider.url=ldap://ip-10-0-242-18.us-west-2.compute.internal:389 ldap.java.naming.security.authentication=simple ldap.java.naming.security.credentials=Confluent1! ldap.java.naming.security.principal=uid=mds,OU=rbac,DC=confluent,DC=io ldap.user.memberof.attribute.pattern=cn=(.),OU=rbac,DC=confluent,DC=io ldap.user.name.attribute=uid ldap.user.object.class=account ldap.user.search.base=OU=rbac,DC=confluent,DC=io listener.name.broker.ssl.client.auth=required listener.name.broker.ssl.key.password=REDACTED listener.name.broker.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks listener.name.broker.ssl.keystore.password=REDACTED listener.name.broker.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks listener.name.broker.ssl.truststore.password=REDACTED listener.name.custom.ssl.client.auth=required listener.name.custom.ssl.key.password=REDACTED listener.name.custom.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks listener.name.custom.ssl.keystore.password=REDACTED listener.name.custom.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks listener.name.custom.ssl.truststore.password=REDACTED listener.name.internal.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath="/var/ssl/private/public.pem"; listener.name.internal.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler listener.name.internal.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler listener.name.internal.principal.builder.class=io.confluent.kafka.security.authenticator.OAuthKafkaPrincipalBuilder listener.name.internal.sasl.enabled.mechanisms=OAUTHBEARER listener.name.internal.ssl.client.auth=required listener.name.internal.ssl.key.password=REDACTED listener.name.internal.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore.pk12 listener.name.internal.ssl.keystore.password=REDACTED listener.name.internal.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore.pk12 listener.name.internal.ssl.truststore.password=REDACTED listener.name.token.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath="/var/ssl/private/public.pem"; listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler listener.name.token.principal.builder.class=io.confluent.kafka.security.authenticator.OAuthKafkaPrincipalBuilder listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER listener.name.token.ssl.client.auth=required listener.name.token.ssl.key.password=REDACTED listener.name.token.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks listener.name.token.ssl.keystore.password=REDACTED listener.name.token.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks listener.name.token.ssl.truststore.password=REDACTED listener.security.protocol.map=INTERNAL:SASL_SSL,BROKER:SSL,CUSTOM:SSL,TOKEN:SASL_SSL listeners=INTERNAL://:9092,BROKER://:9091,CUSTOM://:9093,TOKEN://:9094 log.dirs=/var/lib/kafka/data log.retention.check.interval.ms=300000 log.retention.hours=168 log.segment.bytes=1073741824 metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter num.io.threads=16 num.network.threads=8 num.partitions=1 num.recovery.threads.per.data.dir=2 offsets.topic.replication.factor=3 sasl.enabled.mechanisms=OAUTHBEARER socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 socket.send.buffer.bytes=102400 ssl.key.password=REDACTED ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks ssl.keystore.password=REDACTED ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks ssl.truststore.password=REDACTED super.users=User:mds;User:C=US,ST=Ca,L=PaloAlto,O=CONFLUENT,OU=TEST,CN=kafka_broker transaction.state.log.min.isr=2 transaction.state.log.replication.factor=3 zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty zookeeper.connect=REDACTED.us-west-2.compute.internal:2182,REDACTED.us-west-2.compute.internal:2182,REDACTED.us-west-2.compute.internal:2182 zookeeper.connection.timeout.ms=18000 zookeeper.ssl.client.enable=true zookeeper.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore.pk12 zookeeper.ssl.keystore.password=REDACTED zookeeper.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore.pk12 zookeeper.ssl.truststore.password=REDACTED ssl.keystore.type=BCFKS ssl.truststore.type=BCFKS listener.name.internal.ssl.keystore.type=PKCS12 listener.name.internal.ssl.truststore.type=PKCS12 listener.name.broker.ssl.keystore.type=BCFKS listener.name.broker.ssl.truststore.type=BCFKS listener.name.custom.ssl.keystore.type=BCFKS listener.name.custom.ssl.truststore.type=BCFKS listener.name.token.ssl.keystore.type=BCFKS listener.name.token.ssl.truststore.type=BCFKS zookeeper.ssl.keystore.type=PKCS12 zookeeper.ssl.truststore.type=PKCS12 confluent.ssl.keystore.type=BCFKS confluent.ssl.truststore.type=BCFKSTroubleshoot¶
If you encounter configuration issues, review the following common errors and solutions.
Pad block corrupted / error finalizing cipher¶
Most likely, the specified password is incorrect. Verify that the password is correct and try again. Here’s an example stacktrace:
Example stacktrace for pad block corrupted / error finalizing cipher
Example stacktrace for pad block corrupted / error finalizing cipher .. code-block:: text [2023-05-22 20:22:49,321] ERROR Unable to load token keyPair from config token.key.path with path:/Users/ragrawal/testkeys/tokenKeyPairPkcs8EncV2Des3BlahPass.pem (io.confluent.tokenapi.jwt.JwsProvider) java.io.IOException: Unable to decrypt private key: at io.confluent.common.security.util.PemUtils.loadKeyPair(PemUtils.java:98) at io.confluent.tokenapi.jwt.JwsProvider.tryLoadKeyPair(JwsProvider.java:206) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.Collections$2.tryAdvance(Collections.java:4853) at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4861) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at io.confluent.tokenapi.jwt.JwsProvider.loadKeys(JwsProvider.java:141) at io.confluent.tokenapi.jwt.JwsProvider.configure(JwsProvider.java:82) at io.confluent.tokenapi.jwt.JwtProvider.configure(JwtProvider.java:50) at io.confluent.rbacapi.app.RbacApiApplication.setupResources(RbacApiApplication.java:354) at io.confluent.rest.Application.configureHandler(Application.java:301) at io.confluent.rest.ApplicationServer.doStart(ApplicationServer.java:201) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73) at io.confluent.http.server.KafkaHttpServerImpl.doStart(KafkaHttpServerImpl.java:111) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.bouncycastle.pkcs.PKCSException: unable to read encrypted data: Error finalising cipher at org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo.decryptPrivateKeyInfo(Unknown Source) at io.confluent.common.security.util.PemUtils.loadKeyPair(PemUtils.java:95) ... 18 more Caused by: org.bouncycastle.crypto.InvalidCipherTextException: Error finalising cipher at org.bouncycastle.jcajce.io.CipherInputStream.finaliseCipher(Unknown Source) at org.bouncycastle.jcajce.io.CipherInputStream.nextChunk(Unknown Source) at org.bouncycastle.jcajce.io.CipherInputStream.read(Unknown Source) at org.bouncycastle.util.io.Streams.pipeAll(Unknown Source) at org.bouncycastle.util.io.Streams.readAll(Unknown Source) ... 20 more Caused by: javax.crypto.BadPaddingException: Error finalising cipher data: pad block corrupted at org.bouncycastle.jcajce.provider.BaseCipher.engineDoFinal(Unknown Source) at java.base/javax.crypto.Cipher.doFinal(Cipher.java:2089) ... 25 moreUnsupported PEM object type: class org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo¶
This happens when an encrypted key is provided, but no passphrase is provided or an empty string is provided as passphrase.
Here’s an example stacktrace:
Example stacktrace for pad block corrupted / error finalizing cipher
Example stacktrace for pad block corrupted / error finalizing cipher .. code-block:: text [2023-05-22 20:19:28,316] ERROR Unable to load token keyPair from config token.key.path with path:/Users/ragrawal/testkeys/tokenKeyPairPkcs8EncV2Des3BlahPass.pem (io.confluent.tokenapi.jwt.JwsProvider) java.io.IOException: Unsupported PEM object type: class org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo at io.confluent.common.security.util.PemUtils.loadKeyPair(PemUtils.java:70) at io.confluent.tokenapi.jwt.JwsProvider.tryLoadKeyPair(JwsProvider.java:209) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.Collections$2.tryAdvance(Collections.java:4853) at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4861) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at io.confluent.tokenapi.jwt.JwsProvider.loadKeys(JwsProvider.java:141) at io.confluent.tokenapi.jwt.JwsProvider.configure(JwsProvider.java:82) at io.confluent.tokenapi.jwt.JwtProvider.configure(JwtProvider.java:50) at io.confluent.rbacapi.app.RbacApiApplication.setupResources(RbacApiApplication.java:354) at io.confluent.rest.Application.configureHandler(Application.java:301) at io.confluent.rest.ApplicationServer.doStart(ApplicationServer.java:201) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73) at io.confluent.http.server.KafkaHttpServerImpl.doStart(KafkaHttpServerImpl.java:111) at java.base/java.lang.Thread.run(Thread.java:833)Configure a primary Kafka cluster to host the MDS and role binding¶
Tip
You can store passwords and other configuration data securely using the Confluent CLI confluent secret commands. For more information refer to Manage Secrets in Confluent Platform.
Note
You must complete and run the MDS configuration steps described here on all available brokers in your cluster. The configuration example here is for a cluster with a standalone broker. To view an example of a multiple broker (inter-broker) configuration, refer to Configure mTLS Authentication and RBAC for Kafka Brokers.
The following sections describe how to configure a primary Kafka cluster to host the MDS:
- Configure the Confluent Server Authorizer
- Configure the LDAP identity provider
- Configure the MDS server
- Configure the token listener
- Full primary Kafka cluster MDS configuration
If you encounter issues configuring token authentication, refer to Token authentication.
Important
When all sections of the MDS configuration are complete, Start Confluent Platform.
Configure the Confluent Server Authorizer¶
An authorizer is a server plugin used by Apache Kafka® to authorize operations. More specifically, an authorizer controls whether or not to authorize an operation based on the principal and the resource being accessed.
The Confluent Server Authorizer supports proprietary role-based access control (RBAC) authorization for LDAP-based users and groups, as well as the setting of ACLs. Confluent Server Authorizer also supports pluggable authorization and group providers, enabling ACLs, and RBAC providers to be loaded at runtime.
Add the following configuration for Confluent Authorizer to your Kafka properties file
(/etc/kafka/server.properties
). Any content in brackets (<>
) must be
customized for your environment.
1############################# Confluent Server Authorizer Settings #############################
2authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
3
4# Specify a list of Access Rule Providers to retain ACLs that have already been enabled and to enable RBAC
5confluent.authorizer.access.rule.providers= KRAFT_ACL,CONFLUENT
6# Specify when bootstrapping Confluent Platform and to assign a SystemAdmin
7super.users=<User:admin;User:mds>
Important
When all sections of the MDS configuration are complete, Start Confluent Platform.
The following sections describe the configuration properties used to specify the Confluent Server Authorizer settings.
authorizer.class.name¶
Defines the authorizer to use and serves as an entry point for Confluent proprietary functionality. In this MDS configuration, it turns “on” the Confluent Server Authorizer.
confluent.authorizer.access.rule.providers¶
List of access rule providers that are enabled. The Kafka Authorizer uses access rule providers to determine what rules should be used in access decisions. Supported access rule providers are:
KRAFT_ACL
: Uses ACLs stored in the metadata topic to generate a set of access rule objects.CONFLUENT
: Uses RBAC role bindings and Centralized ACLs stored in Kafka to generate a set of access rule objects.ZK_ACL
: Default. Uses ACLs stored in ZooKeeper to generate a set of access rule objects.
KRAFT_ACL
or ZK_ACL
can be used, depending upon which mode the cluster is running in.
super.users¶
In the MDS configuration, you should assign the super.users
attribute to
define a user who has full access to all resources within a Metadata Service (MDS) cluster.
The primary use of super.users
is to bootstrap Confluent Platform and assign a SystemAdmin.
On MDS clusters, the super.user
can create role bindings for all other
clusters. Permissions granted by super.user
apply only to the broker where
the super.user
attribute is specified, and not to other brokers, clusters,
or Confluent Platform components. No authorization is enforced on users defined as
super.users
, so we strongly recommend that you specify this attribute with a
limited number of users (for example, 1-2 users who are responsible for
bootstrapping).
Note
Bootstrapping Confluent Platform means that when you bring up a cluster for the very first time, you are linking to or relying on another server/cluster that already includes the correct host and other configuration details needed to start up, rather than having to enter all of startup attributes each and every time you start up a new cluster. Bootstrapping saves time and resource, while providing a reliable cluster startup experience.
Configure the LDAP identity provider¶
The basic LDAP configuration for MDS is described below. This configuration shows the LDAP context to identify LDAP users and groups to the MDS. Be aware that nested LDAP groups are not supported.
Be prepared to provide the following information, which you will need to specify in your LDAP configuration:
- The hostname (LDAP server URL, for example,
LDAPSERVER.EXAMPLE.COM
), port (for example,389
), and any other security mechanisms (such as TLS) - The full DN (distinguished name) of LDAP users
- If you have a complex LDAP directory tree, consider providing search filters so that MDS can narrow down LDAP search results
Note
After configuring LDAP–but before configuring MDS–it is recommended that you connect to and query your LDAP server to verify your LDAP connection information. It is recommended that you use an LDAP tool to do this (for example, JXplorer).
Note
If you enable LDAP authentication for Kafka clients by adding the LDAP callback handler (not shown in this configuration):
- Specify
ldap.user.password.attribute
only if your LDAP server does not support simple bind. - If you define this property (
io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler
), LDAP will perform the user search and return the password back to Kafka and Kafka will perform the password check. - The LDAP server will return the user’s hashed password, so Kafka cannot authenticate the user unless the user’s properties file also uses the hashed password.
Add the following configuration for your identify provider (LDAP) to your Kafka properties file
(/etc/kafka/server.properties
). Any content in brackets (<>
) must be
customized for your environment.
1############################# Identity Provider Settings (LDAP) #############################
2# Search groups for group-based authorization.
3ldap.group.name.attribute=<sAMAccountName>
4ldap.group.object.class=group
5ldap.group.member.attribute=member
6ldap.group.member.attribute.pattern=CN=(.*),DC=rbac,DC=confluent,DC=io
7ldap.group.search.base=CN=Users,DC=rbac,DC=confluent,DC=io
8#Limit the scope of searches to subtrees off of base
9ldap.user.search.scope=2
10#Enable filters to limit search to only those groups needed
11ldap.group.search.filter=(|(CN=<specific group>)(CN=<specific group>))
12
13# Kafka authenticates to the directory service with the bind user.
14ldap.java.naming.provider.url=ldap://<hostname>:389
15ldap.java.naming.security.authentication=simple
16ldap.java.naming.security.credentials=<password>
17ldap.java.naming.security.principal=<mds-user-DN>
18
19# Locate users. Make sure that these attributes and object classes match what is in your directory service.
20ldap.user.name.attribute=<sAMAccountName>
21ldap.user.object.class=user
22ldap.user.search.base=<user-search-base-DN>
Important
When all sections of the MDS configuration are complete, Start Confluent Platform.
The following sections provide details about the baseline LDAP configuration options for user and group-based authorization. For more details about LDAP configuration, see Configure LDAP Group-Based Authorization for MDS and Configure LDAP Authentication.
ldap.group.name.attribute¶
Contains the name of the group in a group entry obtained using an LDAP search.
You can specify a regex pattern to extract the group name used in ACLs from this
attribute by configuring ldap.group.name.attribute.pattern
. The
<sAMAccountName>
is specific to Microsoft Active Directory and is the login
name used to support clients and servers running various versions of Windows OS.
Modify the value used if your LDAP configuration differs. The default for this
configuration option is cn
(common name).
ldap.group.object.class¶
Specifies the LDAP object class value that defines users in the directory service.
Specify group
to search groups for group-based authorization. Note that
group
has many applications, but is essentially a list of zero or more digital
identities.
ldap.group.member.attribute¶
The name of the attribute that contains the members of the group in a group
entry obtained using an LDAP search. The default is member
. You can specify
a regular expression (regex) pattern to extract the user principals from this
attribute by configuring ldap.group.member.attribute.pattern
.
ldap.group.member.attribute.pattern¶
A Java regular expression pattern that extracts the user principals of group
members from group member entries obtained from the LDAP attribute specified
using ldap.group.member.attribute
. By default the full value of the attribute
is used.
ldap.group.search.base¶
This attribute tells LDAP to limit the search base to group-based search using
the values specified. The default is ou=groups
.
ldap.group.search.scope¶
The LDAP search scope for group-based search. The value of 2
opens the search
to include all the subtrees off the specified base, which is often too vast a
space to search, and can result in timeouts. When specified, you also should specify
ldap.group.search.filter
. The default value is 1
.
ldap.group.search.filter¶
The LDAP search filter for group-based search. Enables filters to limit search
to only those groups needed. It is recommended that you list all the groups
that will be used for searching. This is typically required because the LDAP
trees in large organizations tend to be so large that trying to search it all
results in timeouts. For instance, after you add a scope of 2
in
ldap.group.search.scope
to search all subtrees, you need to narrow the groups
that are included in the search. You can include any number of groups in this
search filter.
The following sections provide details about the baseline LDAP configuration options that Kafka uses to authenticate to the directory service with the bind user.
ldap.java.naming.provider.url¶
This option defines the URL to use for connections to the LDAP server. The
default hostname is localhost
; the default port is 389
. You must
specify this option for the MDS configuration. To configure with multiple LDAP
servers for high availability, you can also supply space-separated list of LDAP
URLs.
ldap.java.naming.security.authentication¶
If password authentication is enabled on your LDAP server, you can configure
the user principal and password so that brokers can authenticate with the LDAP
server using simple authentication. If you do not want to authenticate with the
LDAP server, specify none
. The recommended value to get your MDS
configuration up and running is simple
, which is a PLAINTEXT authentication
protocol and provides no security. For production instances, you should specify
a more secure SASL method supported by your LDAP server (such as SASL_GSSAPI
).
ldap.java.naming.security.credentials¶
Specifies the security credentials (password) of the principal performing the LDAP search.
ldap.java.naming.security.principal¶
Specifies the security principal, which is the distinguished name of the LDAP user performing the LDAP search. In this configuration, specify the MDS user using the DN (LDAP distinguished name, which is a sequence of relative distinguished names (RDN) connected by commas).
The following sections provide details for the options used to locate users. Make sure that these attributes and object classes match what is in your directory service.
ldap.user.name.attribute¶
This attribute identifies the user principal in a user entry obtained using an
LDAP search. You can specify a regex pattern to extract the user
principal from this attribute by configuring ldap.user.name.attribute.pattern.
The <sAMAccountName>
is specific to Active Directory and is
the login name used to support clients and servers running various versions of
Windows OS. Modify this configuration if your LDAP configuration differs. The
default for this option is cn
(common name).
ldap.user.object.class¶
Specifies the LDAP object class value that defines users in the directory service.
Specify user
to search for user-based authorization.
ldap.user.search.base¶
Use to specify the LDAP search base for a user-based search. The default value
is ou=users
.
Configure the MDS server¶
This section describes the options to create a key pair for MDS and configure MDS on the broker. You must update paths to the key files to match your setup.
Note
The Confluent Server supports the servlet applications REST Proxy and MDS.
- If MDS is configured, then its listener configuration takes precedence over the REST Proxy listener configurations.
- If MDS is disabled, then the REST Proxy listener configurations take precedence.
Add the following configuration for MDS server settings to your Kafka properties file
(/etc/kafka/server.properties
). Any content in brackets (<>
) must be
customized for your environment.
1############################# MDS Server Settings #############################
2# Enable MDS controllers for KRaft mode.
3confluent.metadata.server.kraft.controller.enabled=true
4# Bind Metadata Service HTTP service to port 8090.
5confluent.metadata.server.listeners=http://0.0.0.0:8090
6# The key to encrypt the token (when you issue you a token)
7confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>
8# Supported authentication methods
9confluent.metadata.server.authentication.method=BEARER
Important
When all sections of the MDS configuration are complete, Start Confluent Platform.
The following sections describe the configuration options used to specify the MDS server settings.
confluent.metadata.server.kraft.controller.enabled¶
When using KRaft mode, enable this flag for MDS controller
configurations only. This flag is not supported for MDS broker configurations. The
default value is false
.
confluent.metadata.server.listeners¶
Use to bind the HTTP (or HTTPS) service to a port.
The default value (8090
) is specified here.
confluent.metadata.server.token.key.path¶
The location of the PEM-encoded public/private key pair to be used for signing and verifying tokens. The key pair is used to sign tokens when the token service issues a token, and to verify tokens when the token service authenticates a token. The key pair is generated using the RSA algorithm.
The token service supports:
- PKCS#8 PEM-encoded private keys
- RS256 signatures
confluent.metadata.server.token.key.passphrase¶
The passphrase for the private key. Specify this option if the private key is encrypted. If the private key is not encrypted, do not specify this option. If this option is specified, the value must not be an empty string.
confluent.metadata.server.authentication.method¶
Use to specify that you support token authentication on the MDS side. Use BEARER
to indicate that bearer token authentication is enabled for the configuration.
Configure the token listener¶
MDS can give a token in exchange for a user name and password. MDS can also accept a token from a user or client and authenticate that principal by the token (which can be used after the first time to continue to authenticate).
The tokens are used to authenticate to the Kafka-configured OAUTHBEARER listener. This section shows how to enable and configure the MDS token service.
- Be sure to use the same public keys across components and brokers.
- Do not use a unique key for each MDS service or component connecting to OAUTH.
- Be aware that after setting up MDS, the OAUTH listener does not display specific
errors in relation to the MDS decryption keys (
tokenKeypair.pem
andpublic.pem
) when they are different.
Add the following configuration for token listener settings to your Kafka properties file
(/etc/kafka/server.properties
). Any content in brackets (<>
) must be
customized for your environment.
1############################# Token Listener Settings #############################
2# Add named listener TOKEN to existing listeners and advertised.listeners
3advertised.listeners=<advertised.listeners>,TOKEN://localhost:9092
4listeners=<listeners>,TOKEN://:9092
5# Set SASL callback handler for handling tokens on login. This is essentially a noop if not used for inter-broker communication.
6listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
7# Set SASL callback handler for verifying authentication token signatures
8listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
9# Configure the public key used to verify RBAC Metadata Service signatures
10# Note: username, password and metadataServerUrls must be set if used for inter-broker communication
11listener.name.token.oauthbearer.sasl.jaas.config= \
12 org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
13 publicKeyPath="<path-to-public.pem>";
14# Add protocol mapping for newly-added named listener TOKEN
15listener.security.protocol.map=<listener.map>,TOKEN:SASL_PLAINTEXT
16listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER
Important
When all sections of the MDS configuration are complete, Start Confluent Platform.
The following sections describe the configuration options used to specify the token listener settings.
advertised.listeners¶
Comma-separated list of URIs and listener names for other brokers and clients to
use. In IaaS environments, this may need to be different from the interface to
which the broker binds. If this is not set, the value for listeners
will be used.
listeners¶
Comma-separated list of listeners that listen for API requests over either HTTP or HTTPS. Each listener must include the hostname and the port.
listener.name.rbac.oauthbearer.sasl.login.callback.handler.class¶
Use to configure and enable Kafka API calls (for example, produce, consume) so they can understand and authenticate MDS JSON web tokens and passed usernames-passwords. For details, refer to Use SASL/OAUTHBEARER Authentication between Confluent Server Brokers and Kafka Clients in Confluent Platform.
listener.name.rbac.oauthbearer.sasl.server.callback.handler.class¶
Use to configure and enable Kafka API calls (for example, produce, consume) such that they will be able to understand and authenticate MDS JSON web tokens and passed usernames-passwords. For details, refer to Use SASL/OAUTHBEARER Authentication between Confluent Server Brokers and Kafka Clients in Confluent Platform.
listener.name.rbac.oauthbearer.sasl.jaas.config¶
Used by listener.name.rbac.oauthbearer.sasl.login.callback.handler.class
to
verify incoming JSON Web Tokens (JWT).
listener.security.protocol.map¶
This is a Kafka broker configuration option that defines key/value pairs for the
security protocol to use, per listener name. Map between listener names and
security protocols. This must be defined for the same security protocol to be
usable in more than one port or IP. For example, internal and external traffic
can be separated even if TLS is required for both. More precisely, the user could
define listeners with names INTERNAL and EXTERNAL and this property as:
INTERNAL:SSL,EXTERNAL:SSL
. As shown, key and value are separated by a colon and
map entries are separated by commas. Each listener name should only appear once
in the map. Different security (TLS and SASL) settings can be configured for each
listener by adding a normalized prefix (the listener name is lowercase) to the
configuration name.
listener.name.rbac.sasl.enabled.mechanisms¶
A comma-separated list of SASL mechanisms enabled on the RBAC listener.
Important
For custom Kafka clients (Java or librdkafka), do not use the RBAC token listener for external client communications. Dependencies required for this authentication are not included in the Kafka client libraries. With RBAC enabled, token services are intended for internal communication between Confluent Platform components only (for example, it is valid for a Schema Registry licensed client), and not for long-running service principals or client authentication. For long-lived or client use cases, Use one of the supported authentication methods, like SASL or mTLS (mutual TLS). For details, see Authentication Methods Overview.
Important
When all sections of the MDS configuration are complete, Start Confluent Platform.
Full primary Kafka cluster MDS configuration¶
This example shows the full configuration for the primary Kafka cluster that is hosting MDS and role binding:
1############################# Confluent Server Authorizer Settings #############################
2authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
3
4# Specify a list of Access Rule Providers to retain ACLs that have already been enabled and to enable RBAC
5confluent.authorizer.access.rule.providers= KRAFT_ACL,ZK_ACL,CONFLUENT
6
7# Specify when bootstrapping Confluent Platform and to assign a SystemAdmin
8super.users=<User:admin;User:mds>
9
10############################# Identity Provider Settings (LDAP) #############################
11# Search groups for group-based authorization.
12ldap.group.name.attribute=<sAMAccountName>
13ldap.group.object.class=group
14ldap.group.member.attribute=member
15ldap.group.member.attribute.pattern=CN=(.*),DC=rbac,DC=confluent,DC=io
16ldap.group.search.base=CN=Users,DC=rbac,DC=confluent,DC=io
17#Limit the scope of searches to subtrees off of base
18ldap.user.search.scope=2
19#Enable filters to limit search to only those groups needed
20ldap.group.search.filter=(|(CN=<specific group>)(CN=<specific group>))
21
22# Kafka authenticates to the directory service with the bind user.
23ldap.java.naming.provider.url=ldap://<hostname>:389
24ldap.java.naming.security.authentication=simple
25ldap.java.naming.security.credentials=<password>
26ldap.java.naming.security.principal=<mds-user-DN>
27
28# Locate users. Make sure that these attributes and object classes match what is in your directory service.
29ldap.user.name.attribute=<sAMAccountName>
30ldap.user.object.class=user
31ldap.user.search.base=<user-search-base-DN
32
33############################# MDS Server Settings #############################
34# Bind Metadata Service HTTP service to port 8090.
35confluent.metadata.server.listeners=http://0.0.0.0:8090
36# The key to encrypt the token (when you issue you a token)
37confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>
38# Supported authentication methods
39confluent.metadata.server.authentication.method=BEARER
40
41############################# Token Listener Settings #############################
42# Add named listener TOKEN to existing listeners and advertised.listeners
43advertised.listeners=<advertised.listeners>,TOKEN://localhost:9092
44listeners=<listeners>,TOKEN://:9092
45# Set SASL callback handler for handling tokens on login. This is essentially a noop if not used for inter-broker communication.
46listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
47# Set SASL callback handler for verifying authentication token signatures
48listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
49# Configure the public key used to verify RBAC Metadata Service signatures
50# Note: username, password and metadataServerUrls must be set if used for inter-broker communication
51listener.name.token.oauthbearer.sasl.jaas.config= \
52 org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
53 publicKeyPath="<path-to-public.pem>";
54# Add protocol mapping for newly-added named listener TOKEN
55listener.security.protocol.map=<listener.map>,TOKEN:SASL_PLAINTEXT
56listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER
Troubleshooting MDS configuration issues¶
The following sections provide guidance to help your troubleshoot issues you may encounter in your MDS configuration.
Token authentication¶
Token login and authentication to MDS using OAUTHBEARER can fail without obvious
known exceptions or errors appearing in either server.log
or metadata-service.log
.
Such failures may be due to either the public and private key files being corrupt,
or because the token was generated using different public and private key files.
To troubleshoot token authentication:
Delete both
tokenKeypair.pem
andpublic.pem
from the folders configured inserver.properties
and regenerate them. The newly-generated key files will be placed in:confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>
After regenerating
tokenKeypair.pem
andpublic.pem
, restart the broker server where MDS is running.On the client machine, delete the local CLI cache (
~/.confluent/config.json
), which cached the token for the super user after the super user logged in.Log in again using the CLI.
MDS REST client configurations¶
If a component (such as Schema Registry, ksqlDB, Confluent Control Center, or Connect) client configured to communicate with MDS includes an incorrect username or password, it can result in an endless loop of attempts to authenticate, which can inadvertently produce a continuous loop of exceptions in your REST client exception log and impact performance. For example:
[2021-01-25 05:11:58,330] ERROR [pool-17-thread-1] Error while refreshing active metadata server urls, retrying (io.confluent.security.auth.client.rest.RestClient)
io.confluent.security.auth.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
at io.confluent.security.auth.client.rest.RestClient$HTTPRequestSender.lambda$submit$0(RestClient.java:353)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
To control the number of authentication retry attempts, include the following options in your REST client MDS configuration:
confluent.metadata.server.urls.max.retries
confluent.metadata.server.urls.fail.on.401
For details about these configuration options, refer to REST client configurations.
Configure a secondary Kafka cluster managed by the MDS of the primary Kafka cluster¶
The following sections describe how to configure a secondary Kafka cluster managed by the MDS of the primary Kafka cluster.
- Configure the Confluent Server Authorizer
- Configure the MDS
- Configure the token listener
- Full secondary Kafka cluster MDS configuration
Important
When all sections of the MDS configuration are complete, Start Confluent Platform.
Configure the Confluent Server Authorizer¶
See Configure the Confluent Server Authorizer for details about the Confluent Server Authorizer and the settings recommended here.
Add the following MDS configuration to your Kafka properties file
(/etc/kafka/server.properties
). Any content in brackets (<>
) must be
customized for your environment.
1############################# Confluent Server Authorizer Settings #############################
2authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
3confluent.authorizer.access.rule.providers= KRAFT_ACL,ZK_ACL,CONFLUENT
Important
When all sections of the MDS configuration are complete, Start Confluent Platform.
Configure the MDS¶
This section configures Kafka so that it can talk to the MDS cluster and
“consume” the role bindings. In the configuration example below, SASL_PLAINTEXT/PLAIN
is used, but you should use whatever security mechanism is required by the
Kafka broker running MDS. Also, you should specify the port exposed by the
other broker. For more information, see Metadata Service Configuration Settings.
1############################# MDS Settings #############################
2confluent.metadata.bootstrap.servers=<kafka-with-mds-hostname-1>:<port>,<kafka-with-mds-hostname-2>:<port>,...
3confluent.metadata.security.protocol=SASL_PLAINTEXT
4confluent.metadata.sasl.mechanism=PLAIN
5confluent.metadata.sasl.jaas.config= \
6 org.apache.kafka.common.security.plain.PlainLoginModule required \
7 username="<broker-username>" \
8 password="<broker-password>";
Important
When all sections of the MDS configuration are complete, Start Confluent Platform.
The following sections describe the configuration options used to specify the MDS settings.
confluent.metadata.bootstrap.servers¶
The Kafka hostname and port of the cluster host MDS. Used to determine how a broker
connects to the MDS. For the default KRaft mode, include the hostname and port for
MDS controller node configurations only. This option is required for managed
brokers, and optional for MDS brokers, which default to the inter-broker values.
An example value looks like ec2-22-222-22-222.compute-1.amazonaws.com:9092
.
confluent.metadata.security.protocol¶
Defines the security options to be used when connecting to an external MDS, and provides managed brokers the ability to consume RBAC data.
confluent.metadata.sasl.mechanism¶
Defines the security options to be used when connecting to an external MDS, and provides managed brokers the ability to consume RBAC data.
confluent.metadata.sasl.jaas.config¶
Defines the JAAS configuration for managed clusters to connect to and consume the role binding data so that they can locally enforce RBAC on direct Kafka API calls to themselves.
Configure the token listener¶
This section of the MDS configuration enables the listener with the OAUTHBEARER SASL mechanism, which is used for impersonation. For more information, see Configure Clients for SASL/OAUTHBEARER authentication in Confluent Platform. For details about token listener settings in an MDS configuration, see Configure the token listener.
Any differences between the token listener settings used here and those described above in Configure the token listener are detailed below.
1############################# Token-based Listener Settings #############################
2listeners=<listeners>,TOKEN://:9092
3advertised.listeners=<advertised.listeners>,TOKEN://<hostname>:9092
4listener.security.protocol.map=<advertised.listeners>,TOKEN:SASL_PLAINTEXT
5listener.name.token.oauthbearer.sasl.jaas.config= \
6 org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
7 publicKeyPath="<path-to-public.pem>";
8listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
9listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
listener.name.token.oauthbearer.sasl.jaas.config¶
The oauthbearer.sasl.jaas.config
of the defined LISTENER_NAME
.
Here, TOKEN
is the name of the listener. In other words,
listener.name.token.oauthbearer.sasl.jaas.config
is an instance of
listener.name.<LISTENER_NAME>.oauthbearer.sasl.jaas.config
.
Important
When all sections of the MDS configuration are complete, Start Confluent Platform.
Full secondary Kafka cluster MDS configuration¶
This example shows the full configuration for the secondary Kafka cluster that is managed by the MDS of the primary Kafka cluster:
1############################# Confluent Server Authorizer Settings #############################
2authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
3confluent.authorizer.access.rule.providers= KRAFT_ACL,ZK_ACL,CONFLUENT
4
5############################# MDS Settings #############################
6confluent.metadata.bootstrap.servers=<kafka-with-mds-host-1>:<port>,<kafka-with-mds-host-2>:<port>,...
7confluent.metadata.security.protocol=SASL_PLAINTEXT
8confluent.metadata.sasl.mechanism=PLAIN
9confluent.metadata.sasl.jaas.config= \
10 org.apache.kafka.common.security.plain.PlainLoginModule required \
11 username="<broker-username>" \
12 password="<broker-password>";
13
14############################# Token-based Listener Settings #############################
15listeners=<listeners>,TOKEN://:9092
16advertised.listeners=<advertised.listeners>,TOKEN://<hostname>:9092
17listener.security.protocol.map=<advertised.listeners>,TOKEN:SASL_PLAINTEXT
18listener.name.token.oauthbearer.sasl.jaas.config= \
19 org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
20 publicKeyPath="<path-to-public.pem>";
21listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
22listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
Next steps¶
RBAC:
- Role-Based Access Control for Confluent Platform Quick Start
- Configure RBAC for Control Center on Confluent Platform
- Deploy Secure ksqlDB with RBAC in Confluent Platform
- Configure Role-Based Access Control for Schema Registry in Confluent Platform
- Kafka Connect and RBAC
- Role-Based Access Control (RBAC)
Centralized ACLs:
Centralized audit logs:
Cluster registry: