Dynamic Kafka Configurations

Some of the Apache Kafka® broker and topic configurations can be updated without restarting the broker. This section talks about updating these configuration options dynamically and securing password configurations by storing them in encrypted form in ZooKeeper.

Changing Broker Configurations Dynamically

Some of the broker configurations can be updated without restarting the broker. See the Dynamic Update Mode option in Broker Configurations for the update mode of each broker configuration.

  • read-only: Requires a broker restart for update.
  • per-broker: May be updated dynamically for each broker.
  • cluster-wide: May be updated dynamically as a cluster-wide default. May also be updated as a per-broker value for testing.

To alter the current broker configurations for broker ID 0 (for example, the number of log cleaner threads):

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2

To alter several configurations at the same time, or to alter configurations with complex values (for example, the Router config for Audit Logs), create a file with the desired values in .properties format and use --add-config-file. If you have the new configurations in a file named new.properties:

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config-file new.properties

To describe the current dynamic broker configs for broker ID 0:

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe

To delete a configuration override and revert to the statically configured or default value for broker ID 0 (for example, the number of log cleaner threads):

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads

Some configurations may be configured as a cluster-wide default to maintain consistent values across the whole cluster. All brokers in the cluster will process the cluster default update. For example, to update log cleaner threads on all brokers:

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2

To describe the currently configured dynamic cluster-wide default configurations:

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe

All configurations that are configurable at cluster level may also be configured at per-broker level (e.g. for testing). If a configuration value is defined at different levels, the following order of precedence is used:

  • Dynamic per-broker config stored in ZooKeeper
  • Dynamic cluster-wide default config stored in ZooKeeper
  • Static broker config from server.properties

Updating Password Configurations Dynamically

Password configuration values that are dynamically updated are encrypted before storing in ZooKeeper. The broker config password.encoder.secret must be configured in server.properties to enable dynamic update of password configurations. The secret may be different on different brokers.

The secret used for password encoding may be rotated with a rolling restart of brokers. The old secret used for encoding passwords currently in ZooKeeper must be provided in the static broker configuration password.encoder.old.secret and the new secret must be provided in password.encoder.secret. All dynamic password configurations stored in ZooKeeper will be re-encoded with the new secret when the broker starts up.

All dynamically updated password configurations must be provided in every alter request when updating configurations using kafka-configs, even if the password configuration is not being altered.

If performing an upgrade, add the broker passwords to ZooKeeper and remove them from server.properties, and then restart the broker in the newly-upgraded version. If passwords remain in server.properties, they are ignored, and the value from ZooKeeper takes higher precedence. If you wish to roll back to an older version without dynamic configuration support, then you must add the passwords back into server.properties before restarting the broker.

Updating Password Configurations in ZooKeeper Before Starting Brokers

kafka-configs enables dynamic broker configurations to be updated using ZooKeeper before starting brokers for bootstrapping. This enables all password configurations to be stored in encrypted form, avoiding the need for clear passwords in server.properties. The broker configuration password.encoder.secret must also be specified if any password configurations are included in the alter command. Additional encryption parameters may also be specified. Password encoder configurations are not be persisted in ZooKeeper. For example, to store SSL key password for listener INTERNAL on broker 0:

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config \
'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'

The configuration listener.name.internal.ssl.key.password is persisted in ZooKeeper in encrypted form using the provided encoder configurations. The encoder secret and iterations are not persisted in ZooKeeper.

Updating SSL Keystore of an Existing Listener

Brokers may be configured with SSL key stores with short validity periods to reduce the risk of compromised certificates. Key stores may be updated dynamically without restarting the broker. The configuration name must be prefixed with the listener prefix listener.name.{listenerName}. so that only the key store configuration of a specific listener is updated. The following configurations may be updated in a single alter request at per-broker level:

  • ssl.keystore.type
  • ssl.keystore.location
  • ssl.keystore.password
  • ssl.key.password

If the listener is the inter-broker listener, then the update is allowed only if the new key store is trusted by the trust store configured for that listener. For other listeners, no trust validation is performed on the key store by the broker. Certificates must be signed by the same certificate authority that signed the old certificate to avoid any client authentication failures.

Updating SSL Trust Store of an Existing Listener

Broker trust stores may be updated dynamically without restarting the broker to add or remove certificates. Updated trust store will be used to authenticate new client connections. The configuration name must be prefixed with the listener prefix listener.name.{listenerName}. so that only the trust store configuration of a specific listener is updated. The following configurations may be updated in a single alter request at per-broker level:

  • ssl.truststore.type
  • ssl.truststore.location
  • ssl.truststore.password

If the listener is the inter-broker listener, then the update is allowed only if the existing key store for that listener is trusted by the new trust store. For other listeners, no trust validation is performed by the broker before the update. Removal of CA certificates used to sign client certificates from the new trust store can lead to client authentication failures.

To dynamically update SSL listeners so that connections use a new keystore (and expired certificates are updated on brokers without a rolling restart):

kafka-configs --command-config /etc/kafka/client.properties --bootstrap-server hostname:port --entity-type brokers --entity-name <broker-ID> --alter --add-config listener.name.<listener-name>.ssl.keystore.location=<path-to-keystore.jks>

Changing Topic Configurations Dynamically

Many config settings in Kafka are static and are wired through the properties file. However, there are several settings that you can change per topic. These settings can be changed dynamically using the <path-to-confluent>/bin/kafka-configs tool without having to restart the brokers. When changed using the kafka-configs tool, each change is persistent and remains through broker restarts.

Following are some per-topic configurations that you can change.

cleanup.policy

Indicates whether old segments in the log are deleted or deduplicated.

  • Type: list
  • Default delete
  • Importance: Medium

flush.messages

The number of messages that can be written to the log before a flush is forced.

  • Type: long
  • Default: Long.MAX_VALUE
  • Importance: Medium

flush.ms

The amount of time the log can have dirty data before a flush is forced.

  • Type: long
  • Default: Long.MAX_VALUE
  • Importance: Medium

max.message.bytes

The maximum size of a message.

  • Type: int
  • Default: Integer.MAX_VALUE
  • Importance: Medium

min.insync.replicas

If the number of in sync replicas drops below this number, Kafka stops accepting writes with -1 (or all) request.required.acks.

  • Type: int
  • Default: 1
  • Importance: High

retention.bytes

The maximum size a log partition can grow to before old log segments are discarded to free up space. This only applies if the “delete” retention policy is used.

  • Type: long
  • Default: Long.MAX_VALUE
  • Importance: Low

retention.ms

The maximum time a log is retained before old log segments are discarded. This only applies if the “delete” retention policy is used.

  • Type: long
  • Default: Long.MAX_VALUE
  • Importance: Low

segment.bytes

The hard maximum for the size of a segment file in the log.

  • Type: int
  • Default: 1073741824
  • Importance: Low

segment.jitter.ms

The maximum random jitter subtracted from the scheduled segment roll time to avoid segment rolling.

  • Type: long
  • Default: 0L
  • Importance: Low

segment.ms

The soft maximum on the amount of time before a new log segment is rolled.

  • Type: long
  • Default: Long.MAX_VALUE
  • Importance: Low

unclean.leader.election.enable

Indicates whether unclean leader election is enabled. If it is, then a leader may be moved to a replica that is not in sync with the leader when all in sync replicas are not available, leading to possible data loss.

  • Type: boolean
  • Default: false
  • Importance: High