動的な構成

ブローカーとトピックの構成には、ブローカーを再起動しないでアップデートできるものがあります。このセクションでは、構成オプションを動的にアップデートする方法、ZooKeeper に暗号化した形式で格納することによりパスワード構成のセキュリティを確保する方法について説明します。

ブローカー構成の動的な変更

ブローカーの構成には、ブローカーを再起動しないでアップデートできるものがあります。各ブローカー構成のアップデートモードについては、「ブローカーの構成」の Dynamic Update Mode オプションを参照してください。

  • read-only: アップデートにブローカーの再起動が必要です。
  • per-broker: ブローカーごとに動的にアップデートできます。
  • cluster-wide: クラスター全体のデフォルトとして動的にアップデートできます。テスト用にブローカーごとの値としてアップデートすることもできます。

ブローカー ID 0 の現在のブローカー構成(たとえば、ログクリーナースレッドの数)を変更するには、以下のとおり実行します。

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2

同時に複数の構成を変更する、または複数の値を持つ構成(たとえば、 監査ログ用のルーター構成)を変更するには、.properties 形式で希望する値が設定されたファイルを作成し、--add-config-file を使用します。new.properties という名前のファイルに新しい構成があるとして、以下のとおり実行します。

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config-file new.properties

ブローカー ID 0 に対する現在の動的なブローカー構成を表示するには、以下のとおり実行します。

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe

構成のオーバーライドを削除し、ブローカー ID 0 の静的な構成(デフォルト値)に戻すには(たとえば、ログクリーナースレッドの数)、以下のとおり実行します。

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads

一部の構成をクラスター全体のデフォルトとして構成し、クラスター全体で一貫性のある値を維持することもできます。クラスターのすべてのブローカーにより、クラスターのデフォルトアップデートが処理されます。たとえば、すべてのブローカーでログクリーナースレッドをアップデートするには、以下のとおり実行します。

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2

現在のクラスター全体の動的なデフォルト構成を表示するには、以下のとおり実行します。

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe

クラスターレベルで構成可能なすべての構成は、個別のブローカーレベルで構成することもできます(テスト用など)。構成値が異なるレベルで定義される場合、次の優先順位が使用されます。

  • ZooKeeper に格納されたブローカーごとの動的な構成
  • ZooKeeper に格納されたクラスター全体の動的なデフォルト構成
  • server.properties の静的なブローカー構成

パスワード構成の動的なアップデート

動的にアップデートされたパスワード構成値は、ZooKeeper に格納される前に暗号化されます。ブローカー構成 password.encoder.secretserver.properties で構成し、パスワード構成の動的なアップデートを有効にします。シークレットは、ブローカーごとに異なります。

パスワードのエンコードに使用するシークレットは、ブローカーのローリング再起動時に切り替わります。ZooKeeper で現在パスワードのエンコードに使用されている古いシークレットは静的なブローカー構成 password.encoder.old.secret で指定し、新しいシークレットは password.encoder.secret で指定する必要があります。ZooKeeper に格納されたすべての動的なパスワード構成は、ブローカーを起動したとき、新しいシークレットで再エンコードされます。

kafka-configs を使用して構成をアップデートしたとき、パスワード構成が変更されない場合でも、動的にアップデートしたすべてのパスワード構成は、変更リクエストごとに指定される必要があります。

アップグレードを実行する場合、ブローカーのパスワードを ZooKeeper に追加し、server.properties から削除して、新しくアップグレードしたバージョンでブローカーを再起動します。パスワードが server.properties に残っている場合、無視され、ZooKeeper からの値が優先されます。動的な構成をサポートせずに古いバージョンにロールバックする場合、パスワードを server.properties に再び追加してからブローカーを再起動します。

ブローカー起動前の ZooKeeper よるパスワード構成のアップデート

ブートストラップでブローカーが起動される前に、kafka-configs により、ZooKeeper を使用して動的なブローカー構成をアップデートできます。これにより、すべてのパスワード構成が暗号化された形式で格納できます。 server.properties のパスワードをクリアする必要はありません。パスワード構成が変更コマンドに含まれる場合は、ブローカー構成 password.encoder.secret も指定する必要があります。追加の暗号化パラメーターも指定する必要があります。パスワードエンコーダー構成は ZooKeeper に保持されません。たとえば、ブローカー 0 のリスナー INTERNAL 用の SSL キーパスワードを格納するには、以下のとおり実行します。

bin/kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config \
'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'

構成 listener.name.internal.ssl.key.password は、指定したエンコーダー構成を使用して、暗号化された形式で ZooKeeper に保持されます。エンコーダーシークレットとイテレーションは、ZooKeeper に保持されません。

既存リスナーの SSL キーストアのアップデート

ブローカーでは、SSL キーストアに短い有効期限を構成して、証明書の侵害リスクを抑えることができます。キーストアは、ブローカーを再起動しないで、動的にアップデートできます。構成名の先頭にはリスナーのプレフィックス listener.name.{listenerName}. を付けて、特定リスナーのキーストア構成だけをアップデートする必要があります。次の構成は、単一の変更リクエストによって個別のブローカーレベルでアップデートできます。

  • ssl.keystore.type
  • ssl.keystore.location
  • ssl.keystore.password
  • ssl.key.password

ブローカー間リスナーの場合は、対象リスナー向けに構成されたトラストストアで、新しいキーストアが信頼されている場合にのみ、アップデートが許可されます。他のリスナーの場合、キーストアでは、信頼の検証がブローカーによって実行されません。証明書は、古い証明書を署名した認証機関と同じ認証機関で署名する必要があります。これによりクライアント認証のエラーが回避されます。

既存リスナーの SSL トラストストアのアップデート

ブローカーのトラストストアは、証明書の追加や削除のためにブローカーを再起動することなく、動的にアップデートできます。アップデートされたトラストストアは、新しいクライアント接続を認証するために使用されます。構成名の先頭にはリスナーのプレフィックス listener.name.{listenerName}. を付けて、特定リスナーのトラストストア構成だけをアップデートする必要があります。次の構成は、単一の変更リクエストによって個別のブローカーレベルでアップデートできます。

  • ssl.truststore.type
  • ssl.truststore.location
  • ssl.truststore.password

ブローカー間リスナーの場合は、新しいトラストストアで、対象リスナーの既存のキーストアが信頼されている場合にのみ、アップデートが許可されます。他のリスナーの場合、アップデートの前に、信頼の検証がブローカーによって実行されません。クライアント証明書の署名に使用した CA 証明書を新しいトラストストアから削除すると、クライアント認証のエラーが発生することがあります。

接続で新しいキーストアが使用される(また、ブローカーで期限切れの証明書が、ローリング再起動を実行することなく更新される)ように SSL リスナーを動的に更新するには:

kafka-configs --command-config /etc/kafka/client.properties --bootstrap-server hostname:port --entity-type brokers --entity-name <broker-ID> --alter --add-config listener.name.<listener-name>.ssl.keystore.location=<path-to-keystore.jks>

トピック構成の動的な変更

Kafka の多くの構成設定は静的で、プロパティファイルを介して行われます。ただし、トピックごとに変更できる設定がいくつかあります。これらの設定は、<path-to-confluent>/bin/kafka-configs ツールを使用して、ブローカーを再起動することなく動的に変更できます。kafka-configs ツールを使用して変更した場合、それぞれの変更は永続的で、ブローカーを再起動しても維持されます。

変更可能なトピックごとの構成をいくつか次に示します。

cleanup.policy

ログの古いセグメントを削除するのか、重複排除するのかを示します。

  • 型: string
  • デフォルト : delete
  • 重要度: 中

flush.messages

強制的にフラッシュする前に、ログに書き込むことができるメッセージの数です。

  • 型: long
  • デフォルト : Long.MAX_VALUE
  • 重要度: 中

flush.ms

強制的にフラッシュする前に、ログでダーティデータを保持できる時間です。

  • 型: long
  • デフォルト : Long.MAX_VALUE
  • 重要度: 中

max.message.bytes

メッセージの最大サイズです。

  • 型: int
  • デフォルト : Integer.MAX_VALUE
  • 重要度: 中

min.insync.replicas

同期レプリカの数がこの数値より小さくなると、Kafka は -1(つまりすべて)が request.required.acks の場合に書き込みの判断を停止します。

  • 型: int
  • デフォルト : 1
  • 重要度: 高

retention.bytes

古いログセグメントを破棄してスペースを解放するまでに、ログパーティションを拡張できる最大サイズです。これが適用されるのは、"delete" 保持ポリシーが使用されている場合のみです。

  • 型: long
  • デフォルト : Long.MAX_VALUE
  • 重要度: 低

retention.ms

古いログセグメントを破棄するまでに、ログを保持する最大時間です。これが適用されるのは、"delete" 保持ポリシーが使用されている場合のみです。

  • 型: long
  • デフォルト : Long.MAX_VALUE
  • 重要度: 低

segment.bytes

ログ内にあるセグメントファイルのハードリミットサイズです。

  • 型: int
  • デフォルト: 1073741824
  • 重要度: 低

segment.jitter.ms

セグメントのローリングを回避するために、スケジュール設定されたセグメントロール時間から減算する最大ランダムジッターです。

  • 型: long
  • デフォルト : 0L
  • 重要度: 低

segment.ms

新しいログセグメントがロールされるまでの、ソフトリミット時間です。

  • 型: long
  • デフォルト : Long.MAX_VALUE
  • 重要度: 低

unclean.leader.election.enable

クリーンではないリーダーの選出を有効にするかどうかを示します。有効である場合、すべての同期レプリカが利用できないときに、リーダーと同期していないレプリカにリーダーを移動できますが、これはデータの紛失につながる可能性があります。

  • 型: boolean
  • デフォルト : false
  • 重要度: 高