.. _kafka_adding_security: Adding Security to a Running Cluster ------------------------------------ This topic describes how to add security (SSL or SASL) to a running cluster. .. _migrate-brokers-and-clients: Migrating Brokers and Clients ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can secure a running cluster using one or more of the supported protocols. This is done in phases: #. Incrementally restart the cluster nodes to open additional secured port(s). #. Restart clients using the secured rather than ``PLAINTEXT`` port (assuming you are securing the client-broker connection). #. Incrementally restart the cluster again to enable broker-to-broker security (if this is required). #. A final incremental restart to close the ``PLAINTEXT`` port. The specific steps for configuring security protocols are described in the respective sections for :ref:`SSL ` and :ref:`SASL `. Follow these steps to enable security for your desired protocol(s). The security implementation lets you configure different protocols for both broker-client and broker-broker communication. These must be enabled in separate restarts. A ``PLAINTEXT`` port must be left open throughout so brokers and/or clients can continue to communicate. When performing an incremental restart, take into consideration the recommendations for doing :ref:`rolling restarts ` to avoid downtime for end users. For example, if you want to encrypt both broker-client and broker-broker communication with SSL: #. In the first incremental restart, open an SSL port on each node: .. codewithvars:: bash listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092 .. note:: In Kafka 1.1 and later, you can update some broker configurations without restarting the broker by adding or removing listeners :ref:`dynamically `. When adding a new listener, provide the security configuration of the listener using the listener prefix ``listener.name.{listenerName}``. If the new listener uses SASL, then provide the JAAS configuration property ``sasl.jaas.config`` with the listener and mechanism prefix. For more details, refer to :ref:`jaas-config`. #. Then restart the clients, changing their configuration to point at the newly-opened, secured port: .. codewithvars:: bash bootstrap.servers=[broker1:9092,...] security.protocol=SSL ...etc For more details, refer to :ref:`kafka_ssl_encryption`. #. In the second incremental server restart, instruct |ak-tm| to use SSL as the broker-broker protocol (which will use the same SSL port): .. codewithvars:: bash listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092 security.inter.broker.protocol=SSL #. In the final restart, secure the cluster by closing the ``PLAINTEXT`` port: .. codewithvars:: bash listeners=SSL://broker1:9092 security.inter.broker.protocol=SSL Alternatively, you might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. If you want to use SSL encryption throughout (i.e. for broker-broker and broker-client communication), but also want to add SASL authentication to the broker-client connection: #. Open two additional ports during the first restart: .. codewithvars:: bash listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093 #. Again, restart the clients, changing their configuration to point at the newly-opened, SASL and SSL secured port: .. codewithvars:: bash bootstrap.servers=[broker1:9093,...] security.protocol=SASL_SSL ...etc For more details, refer to :ref:`kafka_sasl_auth`. #. The second server restart would switch the cluster to use encrypted broker-broker communication using the SSL port you previously opened on port 9092: .. codewithvars:: bash listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093 security.inter.broker.protocol=SSL #. The final restart secures the cluster by closing the ``PLAINTEXT`` port: .. codewithvars:: bash listeners=SSL://broker1:9092,SASL_SSL://broker1:9093 security.inter.broker.protocol=SSL .. _zookeeper_adding_security: Migrating |zk| ~~~~~~~~~~~~~~ If you are running a version of |ak| that does not support security or you have security disabled and you want to make the cluster secure, then you must perform the following steps to enable |zk| authentication with minimal disruption to your operations: .. note:: Migrating |zk| security when the |ak| cluster is not running (no controller node in |zk|) results in a controller node being created, but populated with a null value. In this scenario, leader election may not work properly in subsequent starts of the |ak| cluster. #. In ``zookeeper.properties``, add the authentication provider to enable |zk| security: .. codewithvars:: bash authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl #. Export the |zk| JAAS file before restarting |zk|: .. codewithvars:: bash KAFKA_OPTS=-Djava.security.auth.login.config=/zookeeper_server_jaas.conf The content of ``zookeeper_server_jaas.conf`` should look like the following: .. codewithvars:: json Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_super="adminsecret" user_bob="bobsecret"; }; #. Perform |zk| rolling restarts with the JAAS login file shown above; this enables brokers to authenticate. #. Create the JAAS file for the brokers to authenticate with |zk|. Add the Client information to the broker JAAS configuration and then export the JAAS file: .. codewithvars:: bash export KAFKA_OPTS=-Djava.security.auth.login.config=/kafka_server_jaas.conf The content of the broker JAAS file (``kafka_server_jaas.conf``) should look like the following: .. codewithvars:: json Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="bob" password="bobsecret"; }; #. Perform a rolling restart of brokers, this time setting the configuration parameter ``zookeeper.set.acl`` to true, which enables the use of secure ACLs when creating znodes. #. Execute the ZkSecurityMigrator tool using the script: ``bin/zookeeper-security-migration`` with ``zookeeper.acl`` set to ``secure``. This tool traverses the corresponding sub-trees, changing the ACLs of the znodes. If you wish to validate that security has been enabled between the broker and |zk|: #. Export the broker JAAS configuration: .. codewithvars:: bash export KAFKA_OPTS=-Djava.security.auth.login.config=/kafka_server_jaas.conf #. Create a new topic named ``test`` using the ``kafka-topic`` command: .. codewithvars:: bash bin/kafka-topics --zookeepr : --create --topic test --partitions 2 --replication-factor 2 #. Log in to the ``zookeeper-shell`` and check the ACL of the newly-created znode, which should have the ACL enabled: .. codewithvars:: bash bin/zookeeper-shell : [zk: localhost:12181(CONNECTED) 9] getAcl /config/topics/test 'world,'anyone : r 'sasl,'bob : cdrwa If you want to turn off authentication in a secure cluster: #. Perform a rolling restart of brokers setting the JAAS login file, which enables brokers to authenticate, but setting ``zookeeper.set.acl`` to ``false``. At the end of the rolling restart, brokers stop creating znodes with secure ACLs, but are still able to authenticate and manipulate all znodes. #. Run the ZkSecurityMigrator tool using the script ``bin/zookeeper-security-migration`` with ``zookeeper.acl`` set to ``unsecure``. This tool traverses the corresponding sub-trees, changing the ACLs of the znodes. #. Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file. Here is an example of how to run the migration tool: .. codewithvars:: bash bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connect=localhost:2181 Run this command to see the full list of parameters: .. codewithvars:: bash bin/zookeeper-security-migration --help Migrating the |zk| ensemble ~~~~~~~~~~~~~~~~~~~~~~~~~~~ You must also enable authentication on the |zk| ensemble. This requires that you perform a rolling restart of the ensemble and set a few properties. Refer to the |zk| documentation for more detail: - `Apache ZooKeeper documentation `_ - `Apache ZooKeeper wiki `_