Dynamic Configurations¶
Some of the broker and topic configurations can be updated without restarting the broker. This section talks about updating these configuration options dynamically and securing password configurations by storing them in encrypted form in ZooKeeper.
Changing Broker Configurations Dynamically¶
Some of the broker configurations can be updated without restarting the broker. See the
Dynamic Update Mode
option in Broker Configurations
for the update mode of each broker configuration.
read-only
: Requires a broker restart for update.per-broker
: May be updated dynamically for each broker.cluster-wide
: May be updated dynamically as a cluster-wide default. May also be updated as a per-broker value for testing.
To alter the current broker configurations for broker ID 0 (for example, the number of log cleaner threads):
bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
To alter several configurations at the same time, or to alter configurations
with complex values (for example, the Router config for Audit Logs),
create a file with the desired values in .properties
format and use
--add-config-file
. If you have the new configurations in a file named new.properties
:
bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config-file new.properties
To describe the current dynamic broker configs for broker ID 0:
bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
To delete a configuration override and revert to the statically configured or default value for broker ID 0 (for example, the number of log cleaner threads):
bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads
Some configurations may be configured as a cluster-wide default to maintain consistent values across the whole cluster. All brokers in the cluster will process the cluster default update. For example, to update log cleaner threads on all brokers:
bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2
To describe the currently configured dynamic cluster-wide default configurations:
bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
All configurations that are configurable at cluster level may also be configured at per-broker level (e.g. for testing). If a configuration value is defined at different levels, the following order of precedence is used:
- Dynamic per-broker config stored in ZooKeeper
- Dynamic cluster-wide default config stored in ZooKeeper
- Static broker config from
server.properties
Updating Password Configurations Dynamically¶
Password configuration values that are dynamically updated are encrypted before
storing in ZooKeeper. The broker config password.encoder.secret
must be configured
in server.properties
to enable dynamic update of password configurations.
The secret may be different on different brokers.
The secret used for password encoding may be rotated with a rolling restart of brokers.
The old secret used for encoding passwords currently in ZooKeeper must be provided in
the static broker configuration password.encoder.old.secret
and the new secret
must be provided in password.encoder.secret
. All dynamic password configurations
stored in ZooKeeper will be re-encoded with the new secret when the broker starts up.
All dynamically updated password configurations must be provided in every alter
request when updating configurations using kafka-configs
, even if the password
configuration is not being altered.
If performing an upgrade, add the broker passwords to ZooKeeper and remove them
from server.properties
, and then restart the broker in the newly-upgraded version.
If passwords remain in server.properties
, they are ignored, and the value from
ZooKeeper takes higher precedence. If you wish to roll back to an older version
without dynamic configuration support, then you must add the passwords back into
server.properties
before restarting the broker.
Updating Password Configurations in ZooKeeper Before Starting Brokers¶
kafka-configs
enables dynamic broker configurations to be updated using ZooKeeper before
starting brokers for bootstrapping. This enables all password configurations to
be stored in encrypted form, avoiding the need for clear passwords in
server.properties
. The broker configuration password.encoder.secret
must
also be specified if any password configurations are included in the alter command.
Additional encryption parameters may also be specified. Password encoder configurations
are not be persisted in ZooKeeper. For example, to store SSL key password for listener
INTERNAL
on broker 0:
bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config \
'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'
The configuration listener.name.internal.ssl.key.password
is persisted in ZooKeeper
in encrypted form using the provided encoder configurations. The encoder secret
and iterations are not persisted in ZooKeeper.
Updating SSL Keystore of an Existing Listener¶
Brokers may be configured with SSL key stores with short validity periods to reduce
the risk of compromised certificates. Key stores may be updated dynamically without
restarting the broker. The configuration name must be prefixed with the listener prefix
listener.name.{listenerName}.
so that only the key store configuration of a specific
listener is updated. The following configurations may be updated in a single alter request
at per-broker level:
ssl.keystore.type
ssl.keystore.location
ssl.keystore.password
ssl.key.password
If the listener is the inter-broker listener, then the update is allowed only if the new key store is trusted by the trust store configured for that listener. For other listeners, no trust validation is performed on the key store by the broker. Certificates must be signed by the same certificate authority that signed the old certificate to avoid any client authentication failures.
Updating SSL Trust Store of an Existing Listener¶
Broker trust stores may be updated dynamically without restarting the broker to
add or remove certificates. Updated trust store will be used to authenticate new
client connections. The configuration name must be prefixed with the listener prefix
listener.name.{listenerName}.
so that only the trust store configuration of a specific
listener is updated. The following configurations may be updated in a single alter request
at per-broker level:
ssl.truststore.type
ssl.truststore.location
ssl.truststore.password
If the listener is the inter-broker listener, then the update is allowed only if the existing key store for that listener is trusted by the new trust store. For other listeners, no trust validation is performed by the broker before the update. Removal of CA certificates used to sign client certificates from the new trust store can lead to client authentication failures.
To dynamically update SSL listeners so that connections use a new keystore (and expired certificates are updated on brokers without a rolling restart):
kafka-configs --command-config /etc/kafka/client.properties --bootstrap-server hostname:port --entity-type brokers --entity-name <broker-ID> --alter --add-config listener.name.<listener-name>.ssl.keystore.location=<path-to-keystore.jks>
Changing Topic Configurations Dynamically¶
Many config settings in Kafka are static and are wired through the properties file.
However, there are several settings that you can change per topic. These settings
can be changed dynamically using the <path-to-confluent>/bin/kafka-configs
tool without having to restart the brokers. When changed using the kafka-configs
tool, each change is persistent and remains through broker restarts.
Following are some per-topic configurations that you can change.
cleanup.policy¶
Indicates whether old segments in the log are deleted or deduplicated.
- Type: string
- Default
delete
- Importance: Medium
flush.messages¶
The number of messages that can be written to the log before a flush is forced.
- Type: long
- Default:
Long.MAX_VALUE
- Importance: Medium
flush.ms¶
The amount of time the log can have dirty data before a flush is forced.
- Type: long
- Default:
Long.MAX_VALUE
- Importance: Medium
max.message.bytes¶
The maximum size of a message.
- Type: int
- Default:
Integer.MAX_VALUE
- Importance: Medium
min.insync.replicas¶
If the number of in sync replicas drops below this number, Kafka stops accepting
writes with -1 (or all) request.required.acks
.
- Type: int
- Default:
1
- Importance: High
retention.bytes¶
The maximum size a log partition can grow to before old log segments are discarded to free up space. This only applies if the “delete” retention policy is used.
- Type: long
- Default:
Long.MAX_VALUE
- Importance: Low
retention.ms¶
The maximum time a log is retained before old log segments are discarded. This only applies if the “delete” retention policy is used.
- Type: long
- Default:
Long.MAX_VALUE
- Importance: Low
segment.bytes¶
The hard maximum for the size of a segment file in the log.
- Type: int
- Default:
1073741824
- Importance: Low
segment.jitter.ms¶
The maximum random jitter subtracted from the scheduled segment roll time to avoid segment rolling.
- Type: long
- Default:
0L
- Importance: Low
segment.ms¶
The soft maximum on the amount of time before a new log segment is rolled.
- Type: long
- Default:
Long.MAX_VALUE
- Importance: Low
unclean.leader.election.enable¶
Indicates whether unclean leader election is enabled. If it is, then a leader may be moved to a replica that is not in sync with the leader when all in sync replicas are not available, leading to possible data loss.
- Type: boolean
- Default:
false
- Importance: High