Kafka プロデューサー¶
Confluent Platform には、Apache Kafka® に同梱されている Java プロデューサーが含まれています。
このセクションでは、プロデューサーの仕組みの大まかな概要を説明し、調整に使用する構成設定を紹介します。
Confluent Developer の無料の「Apache Kafka 101」コースで、プロデューサー API について学習できます。
さまざまな言語で記述されたプロデューサーの例については、これらのガイド を参照してください。Confluent Cloud の使用など、その他の例については、「Apache Kafka® のサンプルコード」を参照してください。
概念¶
Kafka プロデューサーでは、グループの調整を行う必要がないため、プロデューサーの概念はコンシューマーよりもはるかにシンプルです。プロデューサーの パーティショナー が、各メッセージをトピックのパーティションにマッピングし、プロデューサーはそのパーティションのリーダーに生成リクエストを送信します。Kafka に付属のパーティショナーでは、空でない同じキーを持つメッセージはすべて同じパーティションに送信されることが保証されます。
重要
ProducerRecord の作成時にパーティションフィールドを明示的に設定すると、このセクションに記載されているデフォルトの動作はオーバーライドされます。
キーが指定されている場合、パーティショナーは murmur2 アルゴリズムでキーのハッシュ値を計算し、その値をパーティション数で除算します。その結果、同じキーは必ず同じパーティションに割り当てられます。キーが指定されていない場合、動作は、Confluent Platform のバージョンによって異なります。
- Confluent Platform バージョン 5.4.x 以降では、バッチ処理を認識してパーティションが割り当てられます。1 つのバッチに十分なレコードがなく、まだブローカーに送信されていない場合、前のレコードと同じパーティションが選択されます。新しく作成されたバッチに対するパーティションは、ランダムに割り当てられます。詳細については、『KIP-480: Sticky Partitioner 』および関連する Confluent のブログ記事 を参照してください。
- 5.4.x より前のバージョンの Confluent Platform では、パーティションは、ランダムに選択されたパーティションを起点としてラウンドロビン方式で割り当てられます。
Kafka クラスターの各パーティションには、複数のブローカーとともに、1 つのリーダーと一連のレプリカがあります。パーティションへの書き込みはすべて、パーティションリーダーを介して行う必要があります。レプリカは、リーダーからフェッチすることで同期を維持します。リーダーがシャットダウンされるか障害が発生した場合、同期レプリカの中から次のリーダーが選出されます。プロデューサーの構成に応じて、パーティションリーダーへの各生成リクエストは、正常な書き込みの確認応答がレプリカから返されるまで、保留される可能性があります。これにより、全体のスループットがある程度犠牲になりますが、プロデューサーがメッセージの持続性をある程度コントロールできます。
プロデューサーの確認応答の設定にかかわらず、パーティションリーダーに書き込まれたメッセージは、すぐにコンシューマーから読み取ることができるわけではありません。すべての同期レプリカが書き込みについて確認応答を行うと、メッセージは コミット済み と見なされ、読み取りが可能になります。これにより、メッセージが読み取られた後、ブローカーで障害が発生しても、メッセージが失われないようにすることができます。これはつまり、リーダーのみが確認応答を行ったメッセージ( acks=1
の場合)は、レプリカがそのメッセージをコピーする前にパーティションリーダーで障害が発生した場合、失われる可能性があるということです。ただしこれは、スループットにあまり大きな影響を与えずに、多くの場合に持続性を確保するうえで、実際上、合理的な妥協点となる場合が多いでしょう。
プロデューサーで細心の注意が必要な部分の多くは、バッチ処理や圧縮による高いスループットの実現と、前述のようにメッセージデリバリーの保証に関する部分です。次のセクションでは、プロデューサーの動作を調整するための一般的な設定について説明します。
Kafka プロデューサーの構成¶
すべての構成設定については、「プロデューサーの構成」に一覧が掲載されています。主な構成設定と、その設定によりプロデューサーの動作にどのような影響があるかを以下にまとめました。
コアの構成: プロデューサーが Kafka クラスターを見つけられるように、bootstrap.servers
プロパティを設定する必要があります。必須ではありませんが、client.id
を必ず設定することをお勧めします。この設定をしておくと、ブローカーに対するリクエストと、そのリクエストを行ったクライアントインスタンスの関連付けが容易になります。これらの設定は、Java、C/C++、Python、Go、.NET クライアントのいずれでも同じです。
メッセージの持続性: Kafka に書き込まれるメッセージの持続性を acks
の設定で制御できます。デフォルト値の 1
では、書き込みの成功を示す明示的な確認応答がパーティションリーダーから得られる必要があります。Kafka で最も高い保証が得られる設定は acks=all
です。パーティションリーダーが書き込みを受け付けただけでなく、すべての同期レプリカへのレプリケーションが成功したことも保証されます。値 0
を使用するとスループットを最大化できますが、メッセージがブローカーのログに正常に書き込まれたことはまったく保証されなくなります。この場合、ブローカーから応答が送信されなくなるためです。そのため、メッセージのオフセットの確認もできなくなります。これは、C/C++、Python、Go、.NET クライアントでは、トピック単位での構成になります。ただし、C/C++ では default_topic_conf
サブ構成、Python、Go、.NET. では default.topic.config
サブ構成を使用すると、グローバルに適用できます。
メッセージの順序: 一般に、メッセージはプロデューサークライアントが受信したのと同じ順番でブローカーに書き込まれます。ただし、retries
にデフォルトの 0
よりも大きな値を設定して、メッセージの再試行を有効にした場合、メッセージの順序が変わることがあります。これは、後続のメッセージの書き込みが成功した後に再試行が行われる場合があるためです。再試行を有効にし、順序が変更されないようにするには、max.in.flight.requests.per.connection
に 1
を設定して、ブローカーに対して一度に 1 つのリクエストのみが送信されるようにします。再試行が有効でない場合、ブローカーは受信した順序を維持して書き込みを行いますが、個々のメッセージで送信エラーが発生した場合にギャップが生じる可能性があります。
バッチ処理と圧縮: Kafka プロデューサーは、送信メッセージをバッチにまとめることで、スループットの向上を図ります。Java クライアントでは、batch.size
を使用して、各メッセージバッチのバイト数の最大サイズを設定できます。バッチが満杯になるまでの時間を増やすためには、linger.ms
を使用して、プロデューサーの送信までの待機時間を長くします。圧縮を有効にするには、compression.type
の設定を使用します。圧縮はメッセージのバッチ全体に対して行われるため、通常はバッチのサイズが大きいほど圧縮率は高くなります。C/C++、Python、Go、.NET クライアントでは、batch.num.messages
を使用することで、1 つのバッチに含まれるメッセージ数の上限を設定できます。
Snappy 圧縮を使用する場合は、/tmp
ディレクトリへの書き込みのアクセス権限が必要です。/tmp
ディレクトリに noexec
が設定されていて書き込みアクセス権限がない場合は、書き込みアクセス権限があるディレクトリパスを Snappy に設定する必要があります。
-Dorg.xerial.snappy.tempdir=/path/to/newtmp
キューイング制限: buffer.memory
を使用すると、未送信メッセージを収集するために Java クライアントで使用されるメモリーの合計容量を制限できます。この上限に達すると、プロデューサーでそれ以降の送信がブロックされ、max.block.ms
が経過すると例外がスローされます。また、レコードが無期限にキューに残ることがないように、request.timeout.ms
を使用してタイムアウトを設定することができます。メッセージを正常に送信できるようになる前に、このタイムアウトの時間が経過した場合、そのメッセージはキューから削除され、例外がスローされます。C/C++、Python、Go、.NET クライアントにも同様の設定があります。queue.buffering.max.messages
を使用すると、特定の時点で(送信、再試行、配信レポート用に)キューに挿入できるメッセージの総数を制限できます。queue.buffering.max.ms
では、バッチが満杯になるまでのクライアントの待機時間に上限を設定できます。この時間が経過すると、ブローカーにバッチが送信されます。
コードの例¶
サンプルの Kafka プロデューサーアプリケーションを細かく分けて説明するステップバイステップのチュートリアルについては、「How to build your first Apache KafkaProducer application」を参照してください。
Java を含む、さまざまなプログラミング言語での Kafka クライアントの基本的なコード例については、「Apache Kafka® のサンプルコード」を参照してください。これらのコード例にはすべて、オンプレミスまたは Confluent Cloud で実行される Kafka クラスターに接続できるプロデューサーとコンシューマーが含まれています。また、Schema Registry を使用して Avro データを生成および消費する方法の例も示されています。