Use SASL/SCRAM Authentication in Confluent Platform
Salted Challenge Response Authentication Mechanism (SCRAM), or SASL/SCRAM, is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username and password authentication, such as PLAIN and DIGEST-MD5. SCRAM provides the following features:
The challenge-response mechanism of SASL/SCRAM protects against password sniffing on the network and against dictionary attacks on the password file. SCRAM allows the server to authenticate the client without ever transmitting or storing the client’s password in plain text.
Authentication information stored in the authentication database is not sufficient by itself to impersonate the client. The information is salted to prevent a pre-stored dictionary attack if the database is compromised.
For details on how SASL/SCRAM works, see RFC 5802.
Confluent Platform clusters support SCRAM-SHA-256 and SCRAM-SHA-512, which can be used with TLS to perform secure authentication. The following examples use SCRAM-SHA-256, but you can substitute the configuration for SCRAM-SHA-512 as needed.
The SCRAM implementation in a Confluent Platform cluster stores SCRAM credentials in KRaft and is suitable for use in Confluent Platform installations where KRaft is on a private network. Because of this, you must create SCRAM credentials for users in KRaft.
If you are migrating from a ZooKeeper-based cluster to KRaft, SCRAM credentials stored in ZooKeeper are automatically migrated to KRaft during the metadata migration process. For more information, see Migrate from ZooKeeper to KRaft on Confluent Platform.
KRaft-based Confluent Platform clusters
KRaft-backed Confluent Platform clusters can’t use SCRAM for controller-to-controller authentication. However, Confluent Server brokers can use SCRAM to authenticate to controllers and other brokers. You must create the SCRAM credentials before the brokers are up and running.
kafka-storage.sh tool
Use the kafka-storage.sh tool to create a cluster ID, initialize storage, and format log directories for each KRaft controller.
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
The tool generates a cluster UUID, initializes storage space, and formats the log directory on a controller. Before running the tool, make sure you have the following minimum configuration in the controller’s server.properties file:
# SAMPLE server.properties for kafka-storage.sh tool
process.roles=controller
node.id=10
controller.quorum.voters=10@kraftcontroller:9090
log.dirs=/tmp/kraftLogs
listeners=CONTROLLER://:9090
controller.listener.names=CONTROLLER
A broker can authenticate with any of the KRaft controllers, so you must run kafka-storage.sh on each controller in your cluster. Running the tool ensures each controller has what it needs to write to the metadata log.
Create SCRAM credentials
Use the --add-scram option of kafka-storage.sh to create SCRAM credentials on each controller node. You must create these credentials before the brokers start. When kafka-storage.sh generates a cluster UUID, it does not add SCRAM credentials automatically. If the credentials are not created on every controller, one controller might know the SCRAM credentials while the others do not.
Note
When it runs, the kafka-storage.sh tool creates a bootstrap.checkpoint file that contains UserScramCredentialsRecord records used to bootstrap the cluster. The --add-scram option adds a new ApiMessageAndVersion record that stores the SCRAM credentials for the specified user. Confluent Server brokers use this record to authenticate other brokers using SCRAM. The server side of each connection uses UserScramCredentialsRecord; the client side of each connection still needs to know the password.
To create SCRAM credentials for users in KRaft, use the --add-scram option of the kafka-storage.sh tool, as shown in the following example:
kafka-storage format [-h] --config CONFIG \
--cluster-id CLUSTER_ID \
--add-scram SCRAM_CREDENTIAL \
--release-version RELEASE_VERSION \
--ignore-formatted
where SCRAM_CREDENTIAL looks like one of the following:
'SCRAM-SHA-256=[name=alice,password=alice-secret]''SCRAM-SHA-512=[name=alice,iterations=8192,salt="MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=",saltedpassword="mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE="]'
The SCRAM_CREDENTIAL argument is a key-value pair. The key specifies the supported SCRAM mechanism, and the value contains another set of key-value pairs for populating the UserScramCredentialsRecord.
The SCRAM_CREDENTIAL subarguments require a name key and either a password key or a saltedpassword key. If you use a saltedpassword key, you must also supply an iteration key and a salt key. The value for salt and saltedpassword is a Base64 encoding of binary data. The iteration and salt key are otherwise optional. When these keys aren’t supplied, iteration count defaults to 4096 and the salt is randomly generated.
Manage SCRAM credentials
To manage existing SCRAM credentials, use the kafka-configs command. This example adds a user to an existing SCRAM setup:
kafka-configs --bootstrap-server localhost:9092 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' \
--entity-type users \
--entity-name alice
Security considerations for SASL/SCRAM
Review the following security considerations before deploying SASL/SCRAM in Confluent Platform.
If you require Confluent Server brokers to authenticate each other using SCRAM, you must create SCRAM credentials before the brokers are up and running.
Confluent Platform only supports the strong hash functions SHA-256 and SHA-512 with a minimum iteration count of 4096. Strong hash functions combined with strong passwords and high iteration counts protect against brute force attacks if KRaft security is compromised.
SCRAM should be used only with TLS encryption to prevent interception of SCRAM exchanges. This protects against dictionary or brute force attacks and against impersonation if KRaft is compromised.
The default SASL/SCRAM credential store can be overridden using custom callback handlers by configuring
sasl.server.callback.handler.classin installations where KRaft is not secure.For more details on security considerations, refer to RFC 5802.
The remainder of this page shows how to configure SASL/SCRAM for each component in Confluent Platform.
Configure Confluent Server brokers
Configure all brokers in the Kafka cluster to accept secure connections from clients. Any configuration changes made to the broker will require a rolling restart.
Enable security for Kafka brokers as described in the section below. Additionally, if you are using Confluent Control Center or Auto Data Balancer, configure your brokers for:
JAAS
Configure the Java Authentication and Authorization Service (JAAS) settings for Confluent Server brokers.
Note
Use of separate JAAS files is supported, but is not recommended. Instead, use the listener configuration specified in step 5 of Configuration to replace the following steps.
Create the broker’s JAAS configuration file in each Confluent Server broker’s configuration directory. For this example, it is named
kafka_server_jaas.conf.In each broker’s JAAS file, configure a
KafkaServersection. This configuration defines one user (admin). The propertiesusernameandpasswordare used by the broker to initiate connections to other brokers. In this example,adminis the user for interbroker communication.KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"; };
Configure broker properties for SASL/SCRAM
Enable SASL/SCRAM mechanism in the
server.propertiesfile of every broker.# List of enabled mechanisms, can be more than one sasl.enabled.mechanisms=SCRAM-SHA-256 # Specify one of of the SASL mechanisms sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
If you want to enable SASL for interbroker communication, add the following to the broker properties file (it defaults to
PLAINTEXT). Set the protocol to:SASL_SSL: if TLS/SSL encryption is enabled (TLS/SSL encryption should always be used if SASL mechanism is PLAIN)SASL_PLAINTEXT: if TLS/SSL encryption is not enabled
# Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT security.inter.broker.protocol=SASL_SSL
Tell the Kafka brokers on which ports to listen for client and interbroker
SASLconnections. You must configurelisteners, and optionallyadvertised.listenersif the value is different fromlisteners. Set the listener to:SASL_SSL: if TLS/SSL encryption is enabled (TLS/SSL encryption should always be used if SASL mechanism is PLAIN)SASL_PLAINTEXT: if TLS/SSL encryption is not enabled
# With TLS/SSL encryption listeners=SASL_SSL://kafka1:9093 advertised.listeners=SASL_SSL://localhost:9093 # Without TLS/SSL encryption listeners=SASL_PLAINTEXT://kafka1:9093 advertised.listeners=SASL_PLAINTEXT://localhost:9093
Configure both
SASL_SSLandPLAINTEXTports if:SASL is not enabled for interbroker communication
Some clients connecting to the cluster do not use SASL
Example SASL listeners with TLS/SSL encryption, mixed with PLAINTEXT listeners
# With TLS/SSL encryption listeners=PLAINTEXT://kafka1:9092,SASL_SSL://kafka1:9093 advertised.listeners=PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093 # Without TLS/SSL encryption listeners=PLAINTEXT://kafka1:9092,SASL_PLAINTEXT://kafka1:9093 advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093
If you are not using a separate JAAS configuration file to configure JAAS, then configure JAAS for the Kafka broker listener as follows:
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
Configure Kafka clients
Configure Kafka clients to use SASL/SCRAM authentication when connecting to Confluent Server brokers.
Important
If you are configuring this for Schema Registry or REST Proxy, you must prefix each parameter with confluent.license. For example, sasl.mechanism becomes confluent.license.sasl.mechanism. For additional information, see Configure license clients to authenticate to Kafka.
The new Producer and Consumer clients support security for Kafka versions 0.9.0 and higher.
If you are using the Kafka Streams API, you can read on how to configure equivalent SSL and SASL parameters.
Configure the following properties in a client properties file
client.properties.sasl.mechanism=SCRAM-SHA-256 # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT security.protocol=SASL_SSL
Configure the JAAS configuration property to describe how the clients like producer and consumer can connect to the Kafka Brokers. The properties
usernameandpasswordare used by clients to configure the user for client connections. In this example, clients connect to the broker as userkafkaclient1.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="kafkaclient1" \ password="kafkaclient1-secret";
Configure Kafka Connect
This section describes how to enable security for Kafka Connect. Securing Kafka Connect requires that you configure security for:
Kafka Connect workers: part of the Kafka Connect API, a worker is really just an advanced client, underneath the covers
Kafka Connect connectors: connectors may have embedded producers or consumers, so you must override the default configurations for Connect producers used with source connectors and Connect consumers used with sink connectors
Kafka Connect REST: Kafka Connect exposes a REST API that can be configured to use TLS/SSL using additional properties
Configure security for Kafka Connect as described in the section below. Additionally, if you are using Confluent Control Center streams monitoring for Kafka Connect, configure security for:
Configure all the following properties in connect-distributed.properties.
Configure the Connect workers to use SASL/SCRAM.
sasl.mechanism=SCRAM-SHA-256 # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT security.protocol=SASL_SSL
Configure the JAAS configuration property to describe how Connect’s producers and consumers can connect to the Kafka Brokers. The properties
usernameandpasswordare used by Connect to configure the user for connections. In this example, Connect workers connect to the broker as userconnect.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="connect" \ password="connect-secret";
For the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses. Depending on whether the connector is a source or sink connector:
Source connector: configure the same properties adding the
producerprefix.producer.sasl.mechanism=SCRAM-SHA-256 # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT producer.security.protocol=SASL_SSL producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="connect" \ password="connect-secret";
Sink connector: configure the same properties adding the
consumerprefix.consumer.sasl.mechanism=SCRAM-SHA-256 # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT consumer.security.protocol=SASL_SSL consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="connect" \ password="connect-secret";
Configure Confluent Replicator
Confluent Replicator is a type of Kafka source connector that replicates data from a source to destination Kafka cluster. An embedded consumer inside Replicator consumes data from the source cluster, and an embedded producer inside the Kafka Connect worker produces data to the destination cluster.
Replicator version 4.0 and earlier requires a connection to ZooKeeper in the origin and destination Kafka clusters. If ZooKeeper is configured for authentication, the client configures the ZooKeeper security credentials via the global JAAS configuration setting -Djava.security.auth.login.config on the Connect workers, and the ZooKeeper security credentials in the origin and destination clusters must be the same.
To configure Confluent Replicator security, you must configure the Replicator connector as shown below and additionally you must configure:
Configure Confluent Replicator to use SASL/SCRAM by adding these properties in the Replicator’s JSON configuration file. The JAAS configuration property defines username and password used by Replicator to configure the user for connections. In this example, Replicator connects to the broker as user replicator.
{
"name":"replicator",
"config":{
....
"src.kafka.security.protocol" : "SASL_SSL",
"src.kafka.sasl.mechanism" : "SCRAM-SHA-256",
"src.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"replicator\" password=\"replicator-secret\";",
....
}
}
}
To configure Confluent Replicator for a destination cluster with SASL/SCRAM authentication, modify the Replicator JSON configuration to include the following:
{
"name":"replicator",
"config":{
....
"dest.kafka.security.protocol" : "SASL_SSL",
"dest.kafka.sasl.mechanism" : "SCRAM-SHA-256",
"dest.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"replicator\" password=\"replicator-secret\";",
....
}
}
}
Additionally the following properties are required in the Connect worker:
sasl.mechanism=SCRAM-SHA-256
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="replicator" password="replicator-secret";
producer.sasl.mechanism=SCRAM-SHA-256
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="replicator" password="replicator-secret";
For more information see the general security configuration for Connect workers here.
Tip
To see an example Confluent Replicator configuration, see the SASL destination authentication demo script.
For demos of common security configurations see: Replicator security demos.
To learn more about Replicator ACLs, see ACLs Overview in the Replicator documentation.
Configure Confluent Control Center
Confluent Control Center uses Kafka Streams as a state store, so if all the Kafka brokers in the cluster backing Control Center are secured, then the Control Center application also needs to be secured.
Note
When RBAC is enabled, Control Center cannot be used in conjunction with Kerberos because Control Center cannot support any SASL mechanism other than OAUTHBEARER.
Enable security for the Control Center application as described in the section below. Additionally, configure security for the following components:
Confluent Metrics Reporter: required on the production cluster being monitored
Enable SASL/SCRAM and the security protocol for Control Center in the
etc/confluent-control-center/control-center.propertiesfile.confluent.controlcenter.streams.sasl.mechanism=SCRAM-SHA-256 # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT confluent.controlcenter.streams.security.protocol=SASL_SSL
Configure the JAAS configuration property to describe how Control Center can connect to the Kafka Brokers. The properties
usernameandpasswordare used by Control Center to configure connections.confluent.controlcenter.streams.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="confluent" \ password="confluent-secret";
Configure Confluent Metrics Reporter
Enable SASL/SCRAM for Confluent Metrics Reporter, the metrics component used by Confluent Control Center and Auto Data Balancer.
To configure the Confluent Metrics Reporter for SASL/SCRAM, make the following configuration changes in the server.properties file in every broker in the production cluster being monitored.
Verify that the Confluent Metrics Reporter is enabled.
metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter confluent.metrics.reporter.bootstrap.servers=kafka1:9093
Enable the SASL/SCRAM mechanism for Confluent Metrics Reporter.
confluent.metrics.reporter.sasl.mechanism=SCRAM-SHA-256 # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT confluent.metrics.reporter.security.protocol=SASL_SSL
Configure Schema Registry
Configure Schema Registry to use SASL/SCRAM authentication.
Important
If you are configuring this for Schema Registry or REST Proxy, you must prefix each parameter with confluent.license. For example, sasl.mechanism becomes confluent.license.sasl.mechanism.
Schema Registry uses Kafka to persist schemas, and so it acts as a client to write data to the Kafka cluster. Therefore, if the Kafka brokers are configured for security, you should also configure Schema Registry to use security. You may also refer to the complete list of Schema Registry configuration options.
Here is an example subset of
schema-registry.propertiesconfiguration parameters to add for SASL authentication:kafkastore.bootstrap.servers=kafka1:9093 # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT kafkastore.security.protocol=SASL_SSL kafkastore.sasl.mechanism=SCRAM-SHA-256
Configure the JAAS configuration property to describe how Schema Registry can connect to the Kafka Brokers. The properties
usernameandpasswordare used by Schema Registry to configure the user for connections. In this example, Schema Registry connects to the broker as userschemaregistry.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="schemaregistry" \ password="schemaregistry-secret";
Configure REST Proxy
Configure REST Proxy to use SASL/SCRAM authentication.
Important
If you are configuring this for Schema Registry or REST Proxy, you must prefix each parameter with confluent.license. For example, sasl.mechanism becomes confluent.license.sasl.mechanism.
Securing Confluent REST Proxy for SASL requires that you configure security between REST Proxy and the Confluent Platform cluster.
For a complete list of all configuration options, refer to SASL Authentication.
Configure the SASL/SCRAM mechanism in
kafka-rest.properties.Note
Make sure the
bootstrap.serversconfiguration is set withSASL_PLAINTEXT://host:port(orSASL_SSL://host:port) endpoints, or you’ll accidentally open an SASL connection to a non-SASL port. For more details, seebootstrap.serversin Standalone REST Proxy Configuration Options for Confluent Platform.bootstrap.servers=SASL_SSL://kafka1:9093 client.sasl.mechanism=SCRAM-SHA-256 # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT client.security.protocol=SASL_SSL
Configure the JAAS configuration property to describe how the REST Proxy can connect to the Kafka Brokers. The properties
usernameandpasswordare used by the REST Proxy to configure the user for connections. In this example, the REST Proxy connects to the broker as userrestproxy.client.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="restproxy" \ password="restproxy-secret";