Configure SASL/SCRAM authentication in Confluent Platform

SASL/SCRAM Overview

Salted Challenge Response Authentication Mechanism (SCRAM), or SASL/SCRAM, is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication, like PLAIN and DIGEST-MD5. SCRAM provides the following features:

  • The challenge-response mechanism of SASL/SCRAM protects against password sniffing on the network and against dictionary attacks on the password file. SCRAM allows the server to authenticate the client without ever transmitting or storing the client’s password in plain text.
  • Authentication information stored in the authentication database is not sufficient by itself to impersonate the client. The information is salted to prevent a pre-stored dictionary attack if the database is compromised.

For details on how SASL/SCRAM works, see RFC 5802.

Confluent Platform clusters support SCRAM-SHA-256 and SCRAM-SHA-512, which can be used with TLS to perform secure authentication. The examples below use SCRAM-SHA-256, but you can substitute the configuration for SCRAM-SHA-512 as needed.

The default SCRAM implementation in Kafka stores SCRAM credentials in KRaft or ZooKeeper and is suitable for use in Confluent Platform installations where KRaft or ZooKeeper is on a private network. Because of this, you must create SCRAM credentials for users in KRaft or ZooKeeper.

KRaft-based clusters

If you want Kafka brokers to authenticate to each other using SCRAM, the SCRAM credentials must be created before the brokers are up and running. To create SCRAM credentials for users in KRaft, use the --add-scram option of the kafka-storage command, like this:

kafka-storage format [-h] --config CONFIG \
  --cluster-id CLUSTER_ID \
  --add-scram SCRAM_CREDENTIAL \
  --release-version RELEASE_VERSION] \
  --ignore-formatted]

where SCRAM_CREDENTIAL looks like one of the following:

  • 'SCRAM-SHA-256=[name=alice,password=alice-secret]'
  • 'SCRAM-SHA-512=[name=alice,iterations=8192,salt="MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=",saltedpassword="mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE="]'

The SCRAM_CREDENTIAL argument is a key-value pair where the key specifies the SCRAM mechanism supported and the value includes a set of key-value pairs used to populate the UserScramCredentialsRecord.

The SCRAM_CREDENTIAL subarguments require a name key and either a password key or a saltedpassword key. If you use a saltedpassword key, you must also supply an iteration key and a salt key. The iteration and salt key are otherwise optional. However, if they are not supplied, iteration count will default to 4096 and the salt will be randomly generated. The value for salt and saltedpassword is a Base64 encoding of binary data.

The kafka-storage tool initializes the storage space for each Kafka broker and controller. One of the files created is the bootstrap.checkpoint file, which contains a set of UserScramCredentialsRecord records that are used to bootstrap the cluster. The --add-scram option adds a new ApiMessageAndVersion record to the bootstrap.checkpoint file. The record contains a UserScramCredentialsRecord that is used to store the SCRAM credentials for the specified user. This record is used by Kafka brokers to authenticate other brokers to it using SCRAM. The record is for the server side of each connection; the client side of each connection still needs to know the password.

ZooKeeper-based clusters

To create and manage SCRAM credentials on ZooKeeper-based clusters, add the --bootstrap-server option to the kafka-configs command, specifying the bootstrap server and port, the SCRAM configuration, the entity type, and the entity name. For example, to create SCRAM credentials:

kafka-configs --bootstrap-server localhost:9092 --alter \
  --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' \
  --entity-type users \
  --entity-name alice

kafka-configs --bootstrap-server localhost:9092 --alter \
  --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]'
  --entity-type users \
  --entity-name admin

If you want Kafka brokers to authenticate to each other using SCRAM, and you want to create SCRAM credentials before the brokers are up and running, you must create SCRAM credentials for users in ZooKeeper using the --zookeeper option (you cannot use the --bootstrap-server option):

kafka-configs --zookeeper localhost:2181 --alter \
  --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' \
  --entity-type users \
  --entity-name alice

kafka-configs --zookeeper localhost:2181 --alter \
  --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' \
  --entity-type users \
  --entity-name admin

The default iteration count of 4096 is used if iterations are not specified. A random salt is created and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in ZooKeeper.

Security Considerations for SASL/SCRAM

  • The default implementation of SASL/SCRAM in Kafka stores SCRAM credentials in ZooKeeper. This is suitable for production use in installations where ZooKeeper is secure and on a private network.
  • For cases where you require Kafka brokers to authenticate each other using SCRAM, and you need to create SCRAM credentials before the brokers are up and running, use the --zookeeper option to create SCRAM credentials.
  • Kafka only supports the strong hash functions SHA-256 and SHA-512 with a minimum iteration count of 4096. Strong hash functions combined with strong passwords and high iteration counts protect against brute force attacks if ZooKeeper security is compromised.
  • SCRAM should be used only with TLS-encryption to prevent interception of SCRAM exchanges. This protects against dictionary or brute force attacks and against impersonation if ZooKeeper is compromised.
  • The default SASL/SCRAM credential store may be overridden using custom callback handlers by configuring sasl.server.callback.handler.class in installations where ZooKeeper is not secure.
  • For more details on security considerations, refer to RFC 5802.

The remainder of this page will show you how to configure SASL/SCRAM for each component in the Confluent Platform.

Brokers

Configure all brokers in the Kafka cluster to accept secure connections from clients. Any configuration changes made to the broker will require a rolling restart.

Enable security for Kafka brokers as described in the section below. Additionally, if you are using Confluent Control Center or Auto Data Balancer, configure your brokers for:

JAAS

Note

Use of separate JAAS files is supported, but is not recommended. Instead, use the listener configuration specified in step 5 of Configuration to replace the steps below.

  1. First create the broker’s JAAS configuration file in each Kafka broker’s configuration directory. For this example, it is named kafka_server_jaas.conf.

  2. In each broker’s JAAS file, configure a KafkaServer section. This configuration defines one user (admin). The properties username and password are used by the broker to initiate connections to other brokers. In this example, admin is the user for interbroker communication.

    KafkaServer {
       org.apache.kafka.common.security.scram.ScramLoginModule required
       username="admin"
       password="admin-secret";
    };
    

Configuration

  1. Enable SASL/SCRAM mechanism in the server.properties file of every broker.

    # List of enabled mechanisms, can be more than one
    sasl.enabled.mechanisms=SCRAM-SHA-256
    
    # Specify one of of the SASL mechanisms
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
    
  1. If you want to enable SASL for interbroker communication, add the following to the broker properties file (it defaults to PLAINTEXT). Set the protocol to:

    • SASL_SSL: if TLS/SSL encryption is enabled (TLS/SSL encryption should always be used if SASL mechanism is PLAIN)
    • SASL_PLAINTEXT: if TLS/SSL encryption is not enabled
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    security.inter.broker.protocol=SASL_SSL
    
  2. Tell the Kafka brokers on which ports to listen for client and interbroker SASL connections. You must configure listeners, and optionally advertised.listeners if the value is different from listeners. Set the listener to:

    • SASL_SSL: if TLS/SSL encryption is enabled (TLS/SSL encryption should always be used if SASL mechanism is PLAIN)
    • SASL_PLAINTEXT: if TLS/SSL encryption is not enabled
    # With TLS/SSL encryption
    listeners=SASL_SSL://kafka1:9093
    advertised.listeners=SASL_SSL://localhost:9093
    
    # Without TLS/SSL encryption
    listeners=SASL_PLAINTEXT://kafka1:9093
    advertised.listeners=SASL_PLAINTEXT://localhost:9093
    
  3. Configure both SASL_SSL and PLAINTEXT ports if:

    • SASL is not enabled for interbroker communication
    • Some clients connecting to the cluster do not use SASL

    Example SASL listeners with TLS/SSL encryption, mixed with PLAINTEXT listeners

    # With TLS/SSL encryption
    listeners=PLAINTEXT://kafka1:9092,SASL_SSL://kafka1:9093
    advertised.listeners=PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093
    
    # Without TLS/SSL encryption
    listeners=PLAINTEXT://kafka1:9092,SASL_PLAINTEXT://kafka1:9093
    advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093
    
  4. If you are not using a separate JAAS configuration file to configure JAAS, then configure JAAS for the Kafka broker listener as follows:

    listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required
       username="admin"
       password="admin-secret";
    

Run

If using a separate JAAS file, pass the name of the JAAS file as a JVM parameter when you start each Kafka broker:

export KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
kafka-server-start etc/kafka/server.properties

Following are some optional settings that you can pass in as a JVM parameter when you start each broker from the command line.

zookeeper.sasl.client

Use to enable SASL authentication to ZooKeeper.

  • Type: Boolean
  • Default: true
  • Usage example: To pass the parameter as a JVM parameter when you start the broker, specify -Dzookeeper.sasl.client=true.
zookeeper.sasl.client.username

For SASL authentication to ZooKeeper, to change the username set the system property to use the appropriate name.

  • Type: string
  • Default: zookeeper
  • Usage example: To pass the parameter as a JVM parameter when you start the broker, specify -Dzookeeper.sasl.clientconfig=ZkClient.
zookeeper.sasl.clientconfig

Specifies the context key in the JAAS login file. This is used to change the section name for SASL authentication to ZooKeeper.

  • Type: string
  • Default: Client
  • Usage example: To pass the parameter as a JVM parameter when you start the broker, specify -Dzookeeper.sasl.clientconfig=ZkClient.

Clients

Important

If you are configuring this for Schema Registry or REST Proxy, you must prefix each parameter with confluent.license. For example, sasl.mechanism becomes confluent.license.sasl.mechanism. For additional information, see Configure license clients to authenticate to Kafka.

The new Producer and Consumer clients support security for Kafka versions 0.9.0 and higher.

If you are using the Kafka Streams API, you can read on how to configure equivalent SSL and SASL parameters.

  1. Configure the following properties in a client properties file client.properties.

    sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    security.protocol=SASL_SSL
    
  2. Configure the JAAS configuration property to describe how the clients like producer and consumer can connect to the Kafka Brokers. The properties username and password are used by clients to configure the user for client connections. In this example, clients connect to the broker as user kafkaclient1.

    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="kafkaclient1" \
      password="kafkaclient1-secret";
    

ZooKeeper

ZooKeeper does not support SASL/SCRAM authentication, but it does support another mechanism SASL/DIGEST-MD5.

For further details on ZooKeeper SASL authentication:

  1. Client-Server mutual authentication: between the Kafka Broker (client) and ZooKeeper (server)
  2. Server-Server mutual authentication: between the ZooKeeper nodes within an ensemble

Kafka Connect

This section describes how to enable security for Kafka Connect. Securing Kafka Connect requires that you configure security for:

  1. Kafka Connect workers: part of the Kafka Connect API, a worker is really just an advanced client, underneath the covers
  2. Kafka Connect connectors: connectors may have embedded producers or consumers, so you must override the default configurations for Connect producers used with source connectors and Connect consumers used with sink connectors
  3. Kafka Connect REST: Kafka Connect exposes a REST API that can be configured to use TLS/SSL using additional properties

Configure security for Kafka Connect as described in the section below. Additionally, if you are using Confluent Control Center streams monitoring for Kafka Connect, configure security for:

Configure all the following properties in connect-distributed.properties.

  1. Configure the Connect workers to use SASL/SCRAM.

    sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    security.protocol=SASL_SSL
    
  2. Configure the JAAS configuration property to describe how Connect’s producers and consumers can connect to the Kafka Brokers. The properties username and password are used by Connect to configure the user for connections. In this example, Connect workers connect to the broker as user connect.

    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="connect" \
      password="connect-secret";
    
  3. For the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses. Depending on whether the connector is a source or sink connector:

    • Source connector: configure the same properties adding the producer prefix.

      producer.sasl.mechanism=SCRAM-SHA-256
      # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
      producer.security.protocol=SASL_SSL
      producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="connect" \
        password="connect-secret";
      
    • Sink connector: configure the same properties adding the consumer prefix.

      consumer.sasl.mechanism=SCRAM-SHA-256
      # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
      consumer.security.protocol=SASL_SSL
      consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="connect" \
        password="connect-secret";
      

Confluent Replicator

Confluent Replicator is a type of Kafka source connector that replicates data from a source to destination Kafka cluster. An embedded consumer inside Replicator consumes data from the source cluster, and an embedded producer inside the Kafka Connect worker produces data to the destination cluster.

Replicator version 4.0 and earlier requires a connection to ZooKeeper in the origin and destination Kafka clusters. If ZooKeeper is configured for authentication, the client configures the ZooKeeper security credentials via the global JAAS configuration setting -Djava.security.auth.login.config on the Connect workers, and the ZooKeeper security credentials in the origin and destination clusters must be the same.

To configure Confluent Replicator security, you must configure the Replicator connector as shown below and additionally you must configure:

Configure Confluent Replicator to use SASL/SCRAM by adding these properties in the Replicator’s JSON configuration file. The JAAS configuration property defines username and password used by Replicator to configure the user for connections. In this example, Replicator connects to the broker as user replicator.

{
  "name":"replicator",
    "config":{
      ....
      "src.kafka.security.protocol" : "SASL_SSL",
      "src.kafka.sasl.mechanism" : "SCRAM-SHA-256",
      "src.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"replicator\" password=\"replicator-secret\";",
      ....
    }
  }
}

To configure Confluent Replicator for a destination cluster with SASL/SCRAM authentication, modify the Replicator JSON configuration to include the following:

{
  "name":"replicator",
    "config":{
      ....
      "dest.kafka.security.protocol" : "SASL_SSL",
      "dest.kafka.sasl.mechanism" : "SCRAM-SHA-256",
      "dest.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"replicator\" password=\"replicator-secret\";",
      ....
    }
  }
}

Additionally the following properties are required in the Connect worker:

sasl.mechanism=SCRAM-SHA-256
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="replicator" password="replicator-secret";
producer.sasl.mechanism=SCRAM-SHA-256
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="replicator" password="replicator-secret";

For more information see the general security configuration for Connect workers here.

Tip

Confluent Control Center

Confluent Control Center uses Kafka Streams as a state store, so if all the Kafka brokers in the cluster backing Control Center are secured, then the Control Center application also needs to be secured.

Note

When RBAC is enabled, Control Center cannot be used in conjunction with Kerberos because Control Center cannot support any SASL mechanism other than OAUTHBEARER.

Enable security for the Control Center application as described in the section below. Additionally, configure security for the following components:

  1. Enable SASL/SCRAM and the security protocol for Control Center in the etc/confluent-control-center/control-center.properties file.

    confluent.controlcenter.streams.sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    confluent.controlcenter.streams.security.protocol=SASL_SSL
    
  2. Configure the JAAS configuration property to describe how Control Center can connect to the Kafka Brokers. The properties username and password are used by Control Center to configure connections.

    confluent.controlcenter.streams.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="confluent" \
      password="confluent-secret";
    

Confluent Metrics Reporter

This section describes how to enable SASL/SCRAM for Confluent Metrics Reporter, which is used for Confluent Control Center and Auto Data Balancer.

To configure the Confluent Metrics Reporter for SASL/SCRAM, make the following configuration changes in the server.properties file in every broker in the production cluster being monitored.

  1. Verify that the Confluent Metrics Reporter is enabled.

    metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    confluent.metrics.reporter.bootstrap.servers=kafka1:9093
    
  2. Enable the SASL/SCRAM mechanism for Confluent Metrics Reporter.

    confluent.metrics.reporter.sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    confluent.metrics.reporter.security.protocol=SASL_SSL
    

Confluent Monitoring Interceptors

Confluent Monitoring Interceptors are used for Confluent Control Center streams monitoring. This section describes how to enable security for Confluent Monitoring Interceptors in three places:

  1. General clients
  2. Kafka Connect
  3. Confluent Replicator

Important

The typical use case for Confluent Monitoring Interceptors is to provide monitoring data to a separate monitoring cluster that most likely has different configurations. Interceptor configurations do not inherit configurations for the monitored component. If you wish to use configurations from the monitored component, you must add the appropriate prefix. For example, the option confluent.monitoring.interceptor.security.protocol=SSL, if being used for a producer, must be prefixed with producer. and would appear as producer.confluent.monitoring.interceptor.security.protocol=SSL.

Interceptors for General Clients

For Confluent Control Center stream monitoring to work with Kafka clients, you must configure SASL/SCRAM for the Confluent Monitoring Interceptors in each client.

  1. Verify that the client has configured interceptors.

    • Producer:

      interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
      
    • Consumer:

      interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
      
  2. Configure the SASL mechanism and security protocol for the interceptor.

    confluent.monitoring.interceptor.sasl.mechanism=SCRAM
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    confluent.monitoring.interceptor.security.protocol=SASL_SSL
    
  3. Configure the JAAS configuration property with a unique username and password.

    confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="confluent" \
      password="confluent-secret";
    

Interceptors for Kafka Connect

  1. For Confluent Control Center stream monitoring to work with Kafka Connect, you must configure SASL/SCRAM for the Confluent Monitoring Interceptors in Kafka Connect. Configure the Connect workers by adding these properties in connect-distributed.properties, depending on whether the connectors are sources or sinks.

    • Source connector: configure the Confluent Monitoring Interceptors SASL mechanism with the producer prefix.

      producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
      producer.confluent.monitoring.interceptor.sasl.mechanism=SCRAM-SHA-256
      # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
      producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
      
    • Sink connector: configure the Confluent Monitoring Interceptors SASL mechanism with the consumer prefix.

      consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
      consumer.confluent.monitoring.interceptor.sasl.mechanism=SCRAM-SHA-256
      # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
      consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
      
  2. Configure the JAAS configuration property with a username and password.

    • Source connector: configure the Confluent Monitoring Interceptors JAAS configuration with the producer prefix.

      producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="confluent" \
        password="confluent-secret";
      
    • Sink connector: configure the Confluent Monitoring Interceptors JAAS configuration with the consumer prefix.

      consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="confluent" \
        password="confluent-secret";
      

Interceptors for Replicator

For Confluent Control Center stream monitoring to work with Replicator, you must configure SASL for the Confluent Monitoring Interceptors in the Replicator JSON configuration file. Here is an example subset of configuration properties to add.

{
  "name":"replicator",
    "config":{
      ....
      "src.consumer.group.id": "replicator",
      "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
      "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "SCRAM",
      "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL",
      "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=\"confluent\" \npassword=\"confluent-secret\";",
      ....
    }
  }
}

Schema Registry

Important

If you are configuring this for Schema Registry or REST Proxy, you must prefix each parameter with confluent.license. For example, sasl.mechanism becomes confluent.license.sasl.mechanism.

Schema Registry uses Kafka to persist schemas, and so it acts as a client to write data to the Kafka cluster. Therefore, if the Kafka brokers are configured for security, you should also configure Schema Registry to use security. You may also refer to the complete list of Schema Registry configuration options.

  1. Here is an example subset of schema-registry.properties configuration parameters to add for SASL authentication:

    kafkastore.bootstrap.servers=kafka1:9093
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    kafkastore.security.protocol=SASL_SSL
    kafkastore.sasl.mechanism=SCRAM-SHA-256
    
  2. Configure the JAAS configuration property to describe how Schema Registry can connect to the Kafka Brokers. The properties username and password are used by Schema Registry to configure the user for connections. In this example, Schema Registry connects to the broker as user schemaregistry.

    kafkastore.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="schemaregistry" \
      password="schemaregistry-secret";
    

REST Proxy

Important

If you are configuring this for Schema Registry or REST Proxy, you must prefix each parameter with confluent.license. For example, sasl.mechanism becomes confluent.license.sasl.mechanism.

Securing Confluent REST Proxy for SASL requires that you configure security between the REST proxy and the Kafka cluster.

For a complete list of all configuration options, refer to SASL Authentication.

  1. Configure the SASL/SCRAM mechanism in kafka-rest.properties.

    Note

    Make sure the bootstrap.servers configuration is set with SASL_PLAINTEXT://host:port (or SASL_SSL://host:port) endpoints, or you’ll accidentally open an SASL connection to a non-SASL port. For more details, see bootstrap.servers in REST Proxy Configuration Options.

    bootstrap.servers=SASL_SSL://kafka1:9093
    client.sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    client.security.protocol=SASL_SSL
    
  2. Configure the JAAS configuration property to describe how the REST Proxy can connect to the Kafka Brokers. The properties username and password are used by the REST Proxy to configure the user for connections. In this example, the REST Proxy connects to the broker as user restproxy.

    client.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="restproxy" \
      password="restproxy-secret";