Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Configuring GSSAPI

SASL/GSSAPI Overview

SASL/GSSAPI is for organizations using Kerberos (for example, by using Active Directory). You don’t need to install a new server just for Apache Kafka®. Ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools).

If you don’t already have a Kerberos server, your Linux vendor likely has packages for Kerberos and a short guide on how to install and configure it (Ubuntu, Red Hat). Note that if you are using Oracle Java, you must download JCE policy files for your Java version and copy them to $JAVA_HOME/jre/lib/security. You must create these principals yourself using the following commands:

sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"

It is a Kerberos requirement that all your hosts can be resolved with their fully qualified domain names (FQDNs).

The remainder of this page will show you how to configure SASL/GSSAPI for each component in the Confluent Platform.

GSSAPI Logging

To enable SASL/GSSAPI debug output, you can set the sun.security.krb5.debug system property to true. For example:

export KAFKA_OPTS=-Dsun.security.krb5.debug=true
bin/kafka-server-start etc/kafka/server.properties

Tip

These instructions assume you are installing Confluent Platform by using ZIP or TAR archives. For more information, see On-Premises Deployments.

Brokers

Configure all brokers in the Kafka cluster to accept secure connections from clients. Any configuration changes made to the broker will require a rolling restart.

Enable security for Kafka brokers as described in the section below. Additionally, if you are using Confluent Control Center or Auto Data Balancer, configure your brokers for:

JAAS

  1. First create the broker’s JAAS configuration file in each Kafka broker’s config directory, let’s call it kafka_server_jaas.conf for this example. This is the only way to configure JAAS for brokers; there is no broker configuration property sasl.jaas.config.

  2. In each broker’s JAAS file, configure a KafkaServer section with a unique principal and keytab, i.e., secret key, for each broker. Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting the Kafka broker.

    // Specifies a unique keytab and principal name for each broker
    KafkaServer {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/security/keytabs/kafka_server.keytab"
        principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
    };
    
  3. If you are having the broker authenticate a SASL to ZooKeeper, you must also configure a Client section. It also allows the brokers to set ACLs on ZooKeeper nodes, which locks down these nodes so that only the brokers can modify it.

    // ZooKeeper client authentication
    Client {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/security/keytabs/kafka_server.keytab"
        principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
    };
    

Note

By default, ZooKeeper uses the fully qualified principal for authorization. If you are defining ZooKeeper ACLs in the broker configuration using the zookeeper.set.acl parameter, use identical principals (which should not include hostnames) across all Kafka brokers. If you do not use identical principals, then you must set both the kerberos.removeHostFromPrincipal and kerberos.removeRealmFromPrincipal parameters to true in the ZooKeeper server configuration file. This configuration ensures that all brokers are authorized in the same way, and that the first part of the principal is the same across all Kafka brokers.

Configuration

  1. Enable GSSAPI mechanism in the server.properties file of every broker.
# List of enabled mechanisms, can be more than one
sasl.enabled.mechanisms=GSSAPI

# Specify one of of the SASL mechanisms
sasl.mechanism.inter.broker.protocol=GSSAPI
  1. If you want to enable SASL for inter-broker communication, add the following to the broker properties file (it defaults to PLAINTEXT). Set the protocol to:

    • SASL_SSL: if SSL encryption is enabled (SSL encryption should always be used if SASL mechanism is PLAIN)
    • SASL_PLAINTEXT: if SSL encryption is not enabled
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    security.inter.broker.protocol=SASL_SSL
    
  2. Tell the Kafka brokers on which ports to listen for client and inter-broker SASL connections. You must configure listeners, and optionally advertised.listeners if the value is different from listeners. Set the listener to:

    • SASL_SSL: if SSL encryption is enabled (SSL encryption should always be used if SASL mechanism is PLAIN)
    • SASL_PLAINTEXT: if SSL encryption is not enabled
    # With SSL encryption
    listeners=SASL_SSL://kafka1:9093
    advertised.listeners=SASL_SSL://0.0.0.0:9093
    
    # Without SSL encryption
    listeners=SASL_PLAINTEXT://kafka1:9093
    advertised.listeners=SASL_PLAINTEXT://0.0.0.0:9093
    
  3. Configure both SASL_SSL and PLAINTEXT ports if:

    • SASL is not enabled for inter-broker communication
    • Some clients connecting to the cluster do not use SASL

    Example SASL listeners with SSL encryption, mixed with PLAINTEXT listeners

    # With SSL encryption
    listeners=PLAINTEXT://kafka1:9092,SASL_SSL://kafka1:9093
    advertised.listeners=PLAINTEXT://0.0.0.0:9092,SASL_SSL://0.0.0.0:9093
    
    # Without SSL encryption
    listeners=PLAINTEXT://kafka1:9092,SASL_PLAINTEXT://kafka1:9093
    advertised.listeners=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:9093
    
  4. If you are using GSSAPI, configure a service name that matches the primary name of the Brokers configured in the Broker JAAS file. In earlier JAAS file examples, with principal="kafka/kafka1.hostname.com@EXAMPLE.COM";, the primary is “kafka”.

sasl.kerberos.service.name=kafka

Run

When you start each Kafka broker, pass the name of the JAAS file as a JVM parameter:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
bin/kafka-server-start etc/kafka/server.properties

Tip

These instructions assume you are installing Confluent Platform by using ZIP or TAR archives. For more information, see On-Premises Deployments.

Here are some optional settings that you can pass in as a JVM parameter when you start each broker from the command line.

-Dzookeeper.sasl.client For the SASL authentication to ZooKeeper, if you want to change the section name, set this system property to the appropriate name

  • Default: Client
  • Usage example: pass the parameter as a JVM parameter when you start the broker, e.g. -Dzookeeper.sasl.client=ZkClient

-Dzookeeper.sasl.client.username For the SASL authentication to ZooKeeper, if you want to change the service name, set the system property the appropriate name

  • Default: zookeeper
  • Usage example: pass the parameter as a JVM parameter when you start the broker, e.g. -Dzookeeper.sasl.client.username=zk
-Djava.security.krb5.conf
Optionally specify the path to the krb5.conf file (see JDK’s Kerberos Requirements for more details)

Clients

The new Producer and Consumer clients support security for Kafka versions 0.9.0 and higher.

If you are using the Kafka Streams API, you can read on how to configure equivalent SSL and SASL parameters.

  1. Configure the following properties in a client properties file client.properties.

    sasl.mechanism=GSSAPI
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    security.protocol=SASL_SSL
    
  2. Configure a service name that matches the primary name of the Kafka server configured in the broker JAAS file.

    sasl.kerberos.service.name=kafka
    
  3. Configure the JAAS configuration property with a unique principal, i.e., usually the same name as the user running the client, and keytab, i.e., secret key, for each client.

    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true \
        storeKey=true \
        keyTab="/etc/security/keytabs/kafka_client.keytab" \
        principal="kafkaclient1@EXAMPLE.COM";
    
  4. For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used along with useTicketCache=true.

    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useTicketCache=true;
    

ZooKeeper

This sections describes how to configure ZooKeeper so that brokers can use SASL/GSSAPI to authenticate to it.

For further details on ZooKeeper SASL authentication:

  1. Client-Server mutual authentication : between the Kafka Broker (client) and ZooKeeper (server)
  2. Server-Server mutual authentication: between the ZooKeeper nodes within an ensemble

JAAS

  1. Create a JAAS file for each ZooKeeper node, let’s call it zookeeper_jaas.conf for this example.
  2. In each ZooKeeper node’s JAAS file, configure a Server section with a unique principal and keytab, i.e., secret key, for each node. Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting the ZooKeeper node.
// Specifies a unique keytab and principal name for each ZooKeeper node
Server {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/path/to/server/keytab"
    storeKey=true
    useTicketCache=false
    principal="zookeeper/yourzkhostname@EXAMPLE.COM";
};

You must make a corresponding Client section in each broker’s JAAS file.

Configuration

  1. The metadata stored in ZooKeeper is such that only brokers will be able to modify the corresponding znodes, but znodes are world readable. While the data stored in ZooKeeper is not sensitive, inappropriate manipulation of znodes can cause cluster disruption.

    In the etc/kafka/server.properties file, enable ZooKeeper ACLs.

    zookeeper.set.acl=true
    
  2. It is recommended to limit access to ZooKeeper via network segmentation (only brokers and some admin tools need access to ZooKeeper if the new consumer and new producer are used).

Run

When you start ZooKeeper, pass the name of its JAAS file as a JVM parameter:

export KAFKA_OPTS="-Djava.security.auth.login.config=etc/kafka/zookeeper_jaas.conf"
bin/zookeeper-server-start etc/kafka/zookeeper.properties

Kafka Connect

This section describes how to enable security for Kafka Connect. Securing Kafka Connect requires that you configure security for:

  1. Kafka Connect workers: part of the Kafka Connect API, a worker is really just an advanced client, underneath the covers
  2. Kafka Connect connectors: connectors may have embedded producers or consumers, so you must override the default configurations for Connect producers used with source connectors and Connect consumers used with sink connectors
  3. Kafka Connect REST: Kafka Connect exposes a REST API that can be configured to use SSL using additional properties

Configure security for Kafka Connect as described in the section below. Additionally, if you are using Confluent Control Center streams monitoring for Kafka Connect, configure security for:

Configure all the following properties in connect-distributed.properties.

  1. Configure the Connect workers to use SASL/GSSAPI.

    sasl.mechanism=GSSAPI
    sasl.kerberos.service.name=kafka
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    security.protocol=SASL_SSL
    
  2. Configure the JAAS configuration property with a unique principal, i.e., usually the same name as the user running the worker, and keytab, i.e., secret key, for each worker.

    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
       useKeyTab=true \
       storeKey=true \
       keyTab="/etc/security/keytabs/kafka_client.keytab" \
       principal="connect@EXAMPLE.COM";
    
  3. For the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses. Depending on whether the connector is a source or sink connector:

    • Source connector: configure the same properties adding the producer prefix.

      producer.sasl.mechanism=GSSAPI
      producer.sasl.kerberos.service.name=kafka
      # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
      producer.security.protocol=SASL_SSL
      producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
         useKeyTab=true \
         storeKey=true \
         keyTab="/etc/security/keytabs/kafka_client.keytab" \
         principal="connect@EXAMPLE.COM";
      
    • Sink connector: configure the same properties adding the consumer prefix.

      consumer.sasl.mechanism=GSSAPI
      consumer.sasl.kerberos.service.name=kafka
      # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
      consumer.security.protocol=SASL_SSL
      consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
         useKeyTab=true \
         storeKey=true \
         keyTab="/etc/security/keytabs/kafka_client.keytab" \
         principal="connect@EXAMPLE.COM";
      

Confluent Replicator

Confluent Replicator is a type of Kafka source connector that replicates data from a source to destination Kafka cluster. An embedded consumer inside Replicator consumes data from the source cluster, and an embedded producer inside the Kafka Connect worker produces data to the destination cluster.

Replicator version 4.0 and earlier requires a connection to ZooKeeper in the origin and destination Kafka clusters. If ZooKeeper is configured for authentication, the client configures the ZooKeeper security credentials via the global JAAS configuration setting -Djava.security.auth.login.config on the Connect workers, and the ZooKeeper security credentials in the origin and destination clusters must be the same.

To configure Confluent Replicator security, you must configure the Replicator connector as shown below and additionally you must configure:

Configure Confluent Replicator to use SASL/GSSAPI by adding these properties in the Replicator’s JSON configuration file.

{
  "name":"replicator",
    "config":{
      ....
      "src.kafka.security.protocol" : "SASL_SSL",
      "src.kafka.sasl.mechanism" : "GSSAPI",
      "src.kafka.sasl.kerberos.service.name" : "kafka",
      "src.kafka.sasl.jaas.config" : "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/security/keytabs/kafka_client.keytab\" principal=\"replicator@EXAMPLE.COM\";",
      ....
    }
  }
}

Confluent Control Center

Confluent Control Center uses Kafka Streams as a state store, so if all the Kafka brokers in the cluster backing Control Center are secured, then the Control Center application also needs to be secured.

Enable security for the Control Center application as described in the section below. Additionally, configure security for the following components:

  1. Enable SASL/GSSAPI and the security protocol for Control Center in the etc/confluent-control-center/control-center.properties file.

    confluent.controlcenter.streams.sasl.mechanism=GSSAPI
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    confluent.controlcenter.streams.security.protocol=SASL_SSL
    
  2. Since you are using GSSAPI, configure a Kerberos service name that matches the primary name of the Kafka server configured in the broker JAAS file.

    confluent.controlcenter.streams.sasl.kerberos.service.name=kafka
    
  3. Configure the JAAS configuration property with a unique principal, i.e., usually the same name as the user running Confluent Control Center, and keytab, i.e., secret key.

    confluent.controlcenter.streams.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
       useKeyTab=true \
       storeKey=true \
       keyTab="/etc/security/keytabs/kafka_client.keytab" \
       principal="controlcenter@EXAMPLE.COM";
    

Confluent Metrics Reporter

This section describes how to configure your brokers to enable security for Confluent Metrics Reporter, which is used for Confluent Control Center and Auto Data Balancer.

To configure the Confluent Metrics Reporter for SASL/GSSAPI, make the following configuration changes in the server.properties file in every broker in the production cluster being monitored.

  1. Verify that the Confluent Metrics Reporter is enabled.

    metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    confluent.metrics.reporter.bootstrap.servers=kafka1:9093
    
  2. Enable the SASL/GSSAPI mechanism for Confluent Metrics Reporter.

    confluent.metrics.reporter.sasl.mechanism=GSSAPI
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    confluent.metrics.reporter.security.protocol=SASL_SSL
    
  3. Since you are using GSSAPI, configure a Kerberos service name that matches the primary name of the Kafka server configured in the broker JAAS file.

    confluent.metrics.reporter.sasl.kerberos.service.name=kafka
    
  4. Configure the JAAS configuration property with a unique principal and keytab, i.e., secret key.

    confluent.metrics.reporter.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
       useKeyTab=true \
       storeKey=true \
       keyTab="/etc/security/keytabs/kafka_client.keytab" \
       principal="client@EXAMPLE.COM";
    

Confluent Monitoring Interceptors

Confluent Monitoring Interceptors are used for Confluent Control Center streams monitoring. This section describes how to enable security for Confluent Monitoring Interceptors in three places:

  1. General clients
  2. Kafka Connect
  3. Confluent Replicator

Interceptors for General Clients

For Confluent Control Center stream monitoring to work with Kafka clients, you must configure SASL/GSSAPI for the Confluent Monitoring Interceptors in each client.

  1. Verify that the client has configured interceptors.
  • Producer:

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    
  • Consumer:

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    
  1. Configure the SASL mechanism and security protocol for the interceptor.

    confluent.monitoring.interceptor.sasl.mechanism=GSSAPI
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    confluent.monitoring.interceptor.security.protocol=SASL_SSL
    
  2. Configure the Kerberos service name for the Confluent Monitoring Interceptors confluent.monitoring.interceptor.sasl.kerberos.service.name to match the broker’s Kerberos service name sasl.kerberos.service.name.

    confluent.monitoring.interceptor.sasl.kerberos.service.name=kafka
    
  3. Configure the JAAS configuration property with a unique principal and keytab, i.e., secret key.

    confluent.monitoring.interceptor.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
       useKeyTab=true \
       storeKey=true \
       keyTab="/etc/security/keytabs/kafka_client.keytab" \
       principal="client@EXAMPLE.COM";
    

Interceptors for Kafka Connect

  1. For Confluent Control Center stream monitoring to work with Kafka Connect, you must configure SASL/GSSAPI for the Confluent Monitoring Interceptors in Kafka Connect. Configure the Connect workers by adding these properties in connect-distributed.properties, depending on whether the connectors are sources or sinks.
  • Source connector: configure the Confluent Monitoring Interceptors SASL mechanism with the producer prefix.

    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    producer.confluent.monitoring.interceptor.sasl.mechanism=GSSAPI
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
    producer.confluent.monitoring.interceptor.sasl.kerberos.service.name=kafka
    
  • Sink connector: configure the Confluent Monitoring Interceptors SASL mechanism with the consumer prefix.

    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    consumer.confluent.monitoring.interceptor.sasl.mechanism=GSSAPI
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
    consumer.confluent.monitoring.interceptor.sasl.kerberos.service.name=kafka
    
  1. Configure the Kerberos service name for the Confluent Monitoring Interceptors confluent.monitoring.interceptor.sasl.kerberos.service.name to match the broker’s Kerberos service name sasl.kerberos.service.name. Use the correct configuration parameter prefix for source and sink connectors.
  • Source connector: configure the Confluent Monitoring Interceptor to use the Kerberos service name with the producer prefix.

    producer.confluent.monitoring.interceptor.sasl.kerberos.service.name=kafka
    
  • Sink connector: configure the Confluent Monitoring Interceptor to use the Kerberos service name with the consumer prefix.

    consumer.confluent.monitoring.interceptor.sasl.kerberos.service.name=kafka
    
  1. Configure the JAAS configuration property with a unique principal, i.e., usually the same name as the user running the Connect worker, and keytab, i.e., secret key. Use the correct configuration parameter prefix for source and sink connectors.
  • Source connector: configure the Confluent Monitoring Interceptors JAAS configuration with the producer prefix.

    producer.confluent.monitoring.interceptor.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
       useKeyTab=true \
       storeKey=true \
       keyTab="/etc/security/keytabs/kafka_client.keytab" \
       principal="connect@EXAMPLE.COM";
    
  • Sink connector: configure the Confluent Monitoring Interceptors JAAS configuration with the consumer prefix.

    consumer.confluent.monitoring.interceptor.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
       useKeyTab=true \
       storeKey=true \
       keyTab="/etc/security/keytabs/kafka_client.keytab" \
       principal="connect@EXAMPLE.COM";
    

Interceptors for Replicator

For Confluent Control Center stream monitoring to work with Replicator, you must configure SASL for the Confluent Monitoring Interceptors in the Replicator JSON configuration file. Here is an example subset of configuration properties to add.

{
  "name":"replicator",
    "config":{
      ....
      "src.consumer.group.id": "replicator",
      "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
      "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "GSSAPI",
      "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL",
      "src.consumer.confluent.monitoring.interceptor.sasl.kerberos.service.name": "kafka",
      "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config" : "com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/kafka_client.keytab\" \nprincipal=\"replicator@EXAMPLE.COM\";",
      ....
    }
  }
}

Schema Registry

Schema Registry uses Kafka to persist schemas, and so it acts as a client to write data to the Kafka cluster. Therefore, if the Kafka brokers are configured for security, you should also configure Schema Registry to use security. You may also refer to the complete list of Schema Registry configuration options.

  1. Here is an example subset of schema-registry.properties configuration parameters to add for SASL authentication:

    kafkastore.bootstrap.servers=SASL_SSL://kafka1:9093
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    kafkastore.security.protocol=SASL_SSL
    kafkastore.sasl.mechanism=GSSAPI
    
  2. Since you are using GSSAPI, configure a service name that matches the primary name of the Kafka server configured in the broker JAAS file.

    kafkastore.sasl.kerberos.service.name=kafka
    
  3. Configure the JAAS configuration property with a unique principal, i.e., usually the same name as the user running Schema Registry, and keytab, i.e., secret key.

    kafkastore.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
       useKeyTab=true \
       storeKey=true \
       keyTab="/etc/security/keytabs/kafka_client.keytab" \
       principal="schemaregistry@EXAMPLE.COM";
    

REST Proxy

Securing Confluent REST Proxy for SASL requires that you configure security between the REST proxy and the Kafka cluster.

You may also refer to the complete list of REST Proxy SASL configuration options.

  1. Here is an example subset of kafka-rest.properties configuration parameters to add for SASL/GSSAPI authentication

    client.bootstrap.servers=kafka1:9093
    client.sasl.mechanism=GSSAPI
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    client.security.protocol=SASL_SSL
    
  2. Configure a service name that matches the primary name of the Kafka server configured in the broker JAAS file.

    client.sasl.kerberos.service.name=kafka
    
  3. Configure the JAAS configuration property with a unique principal, i.e., usually the same name as the user running the REST Proxy, and keytab, i.e., secret key.

    client.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
       useKeyTab=true \
       storeKey=true \
       keyTab="/etc/security/keytabs/kafka_client.keytab" \
       principal="restproxy@EXAMPLE.COM";