本稼働環境での Kafka の実行

このセクションでは、Confluent Platform を使用した本稼働環境に移行する前の重要な考慮事項について説明します。

ハードウェア

標準的な開発パスを踏襲している方であれば、身近なマシンを寄せ集めた小さなクラスターやノート PC で Apache Kafka® に触れたことがあるかと思います。しかし、Kafka を本稼働環境にデプロイする場合には、考慮すべきいくつかの推奨事項があります。厳格なルールはありません。Kafka は、広範なユースケースにわたって、非常に多様なマシンで使用されています。一方で以下の推奨事項は、本稼働環境のクラスターに関する Confluent の経験に基づき、初めて使用される際の参考として示すものです。

メモリー

Kafka はメッセージの格納とキャッシュ処理をファイルシステムに大きく依存しています。すべてのデータはファイルシステムにある永続的ログに即座に書き込まれます。つまり、ディスクへのフラッシュ処理は必ずしも必要ありません。実際、これはカーネルのページキャッシュに送られるというだけのことです。最新の OS では、すべての空きメモリーを積極的にディスクキャッシュに流用するため、メモリーが再要求されたときに、ほとんどパフォーマンスペナルティがありません。さらに、Kafka では非常に慎重にヒープ空間を使用するため、6 GB を超えるヒープサイズの設定は必要ありません。これにより、ファイルシステムのキャッシュは、32 GB のマシンで最大で 28 ~ 30 GB となります。

アクティブリーダーとライターのバッファのために、十分なメモリーが必要です。メモリーのニーズを簡単な計算で見積もることができます。30 秒のバッファを確保する場合、メモリーのニーズを write_throughput * 30 で算出できます。

64 GB の RAM を搭載したマシンが適切な選択ですが、32 GB のマシンが使われることもあります。32 GB 未満では生産性が落ちる可能性があります(小規模マシンが大量に必要になります)。

CPU

多くの Kafka デプロイでは、CPU に対する要件は厳しくありません。そのため、プロセッサーを厳密に設定することは、他のリソースと比較して大きな問題ではありません。注意が必要なのは、 SSL が有効である場合、CPU の要件は、非常に高くなることがあります(詳細は、CPU のタイプと JVM の実装により異なります)。

マルチコアの最新のプロセッサーを選択する必要があります。一般的なクラスターでは、24 コアのマシンを利用します。

CPU の速度を高めるか、コア数を増やすかのどちらかを選ぶ必要があるなら、コア数を増やすようにしてください。複数のコアによってもたらされるコンカレンシー上の優位性の方が、クロック速度がわずかに向上するよりも、はるかに有意義です。

ディスク

注釈

  • 既知の制限」で説明されているように、階層型ストレージ では単一のマウントポイントが必要であるため、JBOD(Just a Bunch Of Disks)はサポートされません。階層型ストレージを使用する場合は、JBOD を使用しないでください。
  • 制限」で説明されているように、Self-Balancing Clusters では単一のマウントポイントが必要であるため、JBOD はサポートされません。Self-Balancing Clusters を使用する場合は、JBOD を使用しないでください。

スループットを最大化するために複数のドライブを使用する必要があります。良好なレイテンシに抑えるため、Kafka データに使用するドライブを、アプリケーションログや他の OS ファイルシステムのアクティビティと共有しないでください。これらのドライブを RAID(Redundant Array of Independent Disks)として単一のボリュームに結合することも、各ドライブを個別のディレクトリとしてフォーマットし、マウントすることもできます。Kafka にはレプリケーションがあるので、RAID に加えて、アプリケーションレベルでも冗長化が提供されます。この選択には、いくつかのトレードオフがあります。

複数のデータディレクトリを構成する場合、ブローカーは、現在格納されているパーティション数が最小であるパスに、新しいパーティションを配置します。各パーティションは、データディレクトリのいずれかに必ず入ります。パーティション間でデータのバランスが十分とれていない場合、ディスク間の負荷の不均衡につながることがあります。

RAID では、下位レベルで負荷のバランスを調整するため、ディスク間での負荷のバランス調整がうまくいく可能性があります(ただし必ずそうなるとは限りません)。

ほとんどのユースケースでは、安心できるオプションとして RAID 10 をお勧めします。その利点としては、向上した読み取り/書き込みパフォーマンス、データ保護(ディスク障害への耐性)、高速な再構築が挙げられます。

RAID の主なデメリットは、利用可能なディスク空間が少なくなることです。もう 1 つのデメリットは、ディスクに障害が発生したときにアレイを再構築するための I/O コストです。この再構築コストは RAID 全般に当てはまり、バージョンによって多少の差異があります。

重要な点として、ネットワーク接続ストレージ(NAS)は推奨しません。NAS は遅いことがあり、平均レイテンシから大きく外れて、大きなレイテンシを示すことがあります。また単一障害点になります。

ネットワーク

分散システムでは、高速で信頼性の高いネットワークが本質的なパフォーマンス関連要素になります。待ち時間が短ければ、ノードの通信が容易になります。また、帯域幅が広ければ、シャードの移動や回復が円滑化されます。大半のクラスターは、近年のデータセンターネットワーク(1 GbE、10 GbE)であれば問題ありません。

ファイルシステム

Kafka を XFS または ext4 で実行する必要があります。

全般の考慮事項

一般に、Kafka には中規模から大規模のマシンが適しています。

  • 小型のマシンは避けてください。1000 ノードのクラスターの管理は困難であり、Kafka を実行する際のオーバーヘッドは小型マシンの方が明らかであるためです。
  • 大型のマシンも推奨できません。リソースの使用量が不均衡になることがよくあるためです。たとえば、すべてのメモリーが使用されているのに CPU にはかなり余裕があるということになります。また、マシンごとに複数ノードを実行する必要がある場合に、実行が複雑になることがあります。

VMware の最適化

Confluent と VMWare では、圧縮を有効にして、ビジネス継続性を維持しつつパフォーマンスへの影響を抑えることを推奨しています。

警告

Confluent Platform では、vMotion およびディスクスナップショットを無効にしてください。これらの機能は、Kafka で使用するとクラスター全体が停止する可能性があるためです。

JVM/Java 仮想マシン

このバージョンの Confluent Platform では Java 8 および Java 11 がサポートされています。セキュリティの観点から見ると、最新バージョンのパッチリリースの使用をお勧めします。無料で提供されている以前のバージョンには、セキュリティ上の脆弱性が見つかっています。

Java 9 と 10 はサポートされていません。

詳細については、 サポートされている Java バージョン に関するセクションを参照してください。

Java の正しいバージョンを個別にインストールしてから、Confluent Platform のインストール処理を開始する必要があります。

推奨する GC の調整(JDK 1.8 u5 での大規模デプロイでテスト済み)は次のようになります。

-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
       -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
       -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

リファレンスとして、LinkedIn の高負荷のクラスター(ピーク時)の 1 つにおける統計情報を示します。

  • 60 ブローカー
  • 5 万パーティション(レプリケーション係数 2)
  • 80 万受信メッセージ/秒
  • 300 MBps インバウンド、1 GBps 以上 アウトバウンド

調整がかなり大胆に見えますが、対象クラスターのすべてのブローカーでは、GC ポーズ時間の 90% は約 21 ミリ秒で、young に対する GC の実行は 1 秒に 1 つ未満です。

本稼働環境の構成オプション

Kafka のデフォルト設定は、特にパフォーマンス関連の設定とオプションは、多くの場合にうまく動作します。ただし、一部のロジスティック構成は、クラスターレイアウトに従って本稼働環境向けに変更する必要があります。

ちなみに

  • Apache Kafka Raft メタデータモード(KRaft)の採用によって、Kafka のアーキテクチャは劇的に単純化されました。その具体的な内容については、「KRaft: Apache Kafka without ZooKeeper」を参照してください。
  • 最新のハードウェアで実施された、クラウドにおける Kafka のパフォーマンスベンチマークテストとその結果については、「Apache Kafka Performance, Latency, Throughput, and Test」を参照してください。

一般構成

zookeeper.connect

ブローカーを登録する ZooKeeper ホストのリストです。ZooKeeper クラスターのすべてのホストにこれを構成することを推奨します。

  • 型: string
  • 重要度: 高
broker.id
ブローカーを識別する整数の ID です。同じ Kafka クラスターのブローカーに同じ ID を使用できません。
  • 型: int
  • 重要度: 高
log.dirs

Kafka ログデータが配置されるディレクトリです。

  • 型: string
  • デフォルト: "/tmp/kafka-logs"
  • 重要度: 高
リスナー

ブローカーがリッスンする URI(プロトコルを含む)のコンマ区切りのリストです。ホスト名に 0.0.0.0 を指定すると、すべてのインターフェイスにバインドされます。空のままにすると、デフォルトインターフェイスにバインドされます。たとえば、PLAINTEXT://myhost:9092 のようになります。

  • 型: string
  • デフォルト : PLAINTEXT://host.name:porthost.name のデフォルト値は空の文字列で、port9092 です。)
  • 重要度: 高
advertised.listeners

クライアントで使用するために ZooKeeper にパブリッシュするリスナーです。IaaS 環境では、場合によりブローカーがバインドするインターフェイスと異なるものにする必要があります。これが設定されていない場合、listeners の値が使用されます。

  • 型: string
  • デフォルト : listeners
  • 重要度: 高
num.partitions

自動作成されたトピック用のログパーティションのデフォルト数です。トピックに対してパーティションが多い方が望ましいため、これを増やす必要があります。パーティションが多いトピックでは、データのバランス調整がうまくいき、コンシューマーの並列処理に役に立ちます。キー付きデータでは、トピックのパーティション数を変更しないてください。

  • 型: int
  • デフォルト: 1
  • 指定可能な値: [1,…]
  • 重要度: 中
  • Dynamic Update Mode: 読み取り専用

レプリケーション構成

default.replication.factor

自動作成されたトピックに適用されるデフォルトのレプリケーション係数です。少なくとも 2 に設定する必要があります。

  • 型: int
  • デフォルト: 1
  • 重要度: 中
min.insync.replicas

required.acks=-1 (つまりすべて)による生成リクエストをコミットするために、ISR で必要なレプリカの最小数です。

  • 型: int
  • デフォルト: 1
  • 重要度: 中
unclean.leader.election.enable

最終手段として、ISR セットにないレプリカをリーダーに選出できるようにするかどうかを示します。ただしこの場合、データ喪失の可能性があります。

  • 型: boolean
  • デフォルト: false
  • 重要度: 中

ファイルディスクリプタと mmap

Kafka は、クライアントと通信するために、非常に多数のファイルとソケットを使用します。これらすべてで、かなり多数の利用可能なファイルディスクリプタが必要になります。

多くの Linux の最新ディストリビューションでは、プロセスごとに利用できるファイルディスクリプタは 1,024 個だけです。これは Kafka では少なすぎます。

ファイルディスクリプタ数を少なくとも 100,000 に増やす必要があります。この処理は難しく、特定の OS やディストリビューションで大きく異なります。使用する OS のドキュメントを参照して、使用可能なファイルディスクリプタ数をどのように変更するのが最適であるかを確認してください。

いくつかの推奨事項を示します。

現在の mmap 数を算出するには、Kafka データディレクトリにある .index ファイルを数えます。.index ファイルは、メモリマップファイルの大半に相当しています。手順を示します。

  1. 次のコマンドで .index ファイルを数えます。

    find . -name '*index' | wc -l
    
  2. セッションに vm.max_map_count を設定します。これによりメモリマップファイルの現在の数を算出します。mmap の制限の最小値( vm.max_map_count)はオープンファイルの ulimit の数です。

    重要

    vm.max_map_count.index ファイルの数より十分大きな値に設定し、ブローカーセグメントの拡大に対応できるようにする必要があります。

    sysctl -w vm.max_map_count=262144
    
  3. vm.max_map_count の設定を再起動後にも維持するには、次のコマンドを使用します。

    echo 'vm.max_map_count=262144' >> /etc/sysctl.conf
    sysctl -p
    

マルチノード構成

本稼働環境では、複数のブローカーが必要です。起動中に、ブローカーはクラスターのメンバーになるように、ZooKeeper にそれ自体を登録します。

Apache Kafka® プロパティファイル(/etc/kafka/server.properties)に移動し、次のとおりカスタマイズします。

  • 同じ ZooKeeper アンサンブルに接続します。これにはすべてのノードで、zookeeper.connect を同じ値に設定します。localhost のすべてのインスタンスをノードのホスト名または FQDN(完全修飾ドメイン名)に置き換えます。たとえば、ホスト名が zookeeper である場合、以下のとおり設定します。

    zookeeper.connect=zookeeper:2181
    
  • クラスター内の各ノードに対するブローカー ID を次のいずれかの方法で構成します。

    • ブローカー ID を動的に生成 : broker.id.generation.enable=true を追加し、broker.id の行をコメントアウトします。以下に例を示します。

      ############################# Server Basics #############################
      
      # The ID of the broker. This must be set to a unique integer for each broker.
      #broker.id=0
      broker.id.generation.enable=true
      
    • ブローカー ID を手動で設定 : 各ノードの broker.id に一意の値を設定します。

  • 他のブローカーやクライアントがこのブローカーと通信する方法を、listeners または場合により advertised.listeners を使用して構成します。

    • listeners: リッスン対象の URI とリスナー名のコンマ区切りのリスト。
    • advertised.listeners: 他のブローカーやクライアントで使用する URI とリスナー名のコンマ区切りのリスト。advertised.listeners パラメーターでは、ブローカーがローカルと外部の両方のホストからアクセスできるアドレスをアドバタイズすることを保証します。

    詳細については「本稼働環境の構成オプション」を参照してください。

  • 環境のセキュリティを構成します。