DUICUO

Kafka の設計原則: 読んで、忘れて、また読む?

メッセージキューとは何でしょうか?簡単に言うと、メッセージキューとはメッセージを格納するためのコンテナです。クライアントはメッセージサーバーにメッセージを送信したり、メッセージサーバーからメッセージを取得したりできます。

[[270989]]

Pexelsからの画像

今日は、以下の質問についての私の考えを共有したいと思います。

  • なぜメッセージングシステムが必要なのでしょうか?
  • Kafka のアーキテクチャの背後にある原則は何ですか?
  • Kafka はどのようにメッセージを保存しますか?
  • プロデューサーはどのようにメッセージを送信しますか?
  • コンシューマーはどのようにしてメッセージを消費するのでしょうか?
  • オフセットを保存するにはどうすればいいですか?
  • メッセージング システムではどのような問題が発生する可能性がありますか?

なぜメッセージングシステムが必要なのでしょうか?

ピークシェービング

データベースの処理能力には限界があります。ピーク時には、過剰なリクエストがバックグラウンドで処理され、システムの処理能力を超えるとシステムがクラッシュする可能性があります。

上図に示すように、システムの処理能力は2k/s、MQの処理能力は8k/s、ピーク時のリクエストレートは5k/sです。MQの処理能力はデータベースよりもはるかに高く、ピーク時にはリクエストがMQにキューイングされ、システムは自身の処理能力に基づいて2k/sのレートでこれらのリクエストを処理できます。

ピーク期間が過ぎると、リクエスト数は 100/s のみになり、システムは MQ に蓄積されたリクエストをすぐに消費できます。

上記のリクエストは書き込みリクエストを指していることに注意してください。クエリ リクエストは通常​​、キャッシュを通じて処理されます。

デカップリング

次のシナリオでは、システム S はシステム A、B、C と密結合されています。要件の変更により、システム A はコードを変更し、システム S もシステム A の関連コードを調整する必要があります。

数日後、システム C を削除する必要があり、すぐにシステム S で C 関連のコードを削除します。さらに数日後、システム D を追加する必要があり、システム S で D 関連のコードを追加する必要があります。さらにその数日後、プログラマーが発狂します...

システム間の密結合は、保守と拡張を妨げます。メッセージキュー(MQ)を導入することで、システムAへの変更はA自身のコードを変更するだけで対応でき、システムCへの削除はサブスクライブ解除するだけで解決でき、システムDへの追加は関連メッセージをサブスクライブするだけで実現できます。

メッセージ ミドルウェアを導入することで、すべてのシステムがメッセージ キュー (MQ) と対話できるようになり、システム間の複雑な呼び出し関係を回避できます。

Kafka のアーキテクチャの背後にある原則は何ですか?

Kafka 関連の概念:

  • ブローカー: Kafka クラスターに含まれるサーバー。
  • プロデューサー: メッセージプロデューサー。
  • コンシューマー: メッセージのコンシューマー。
  • コンシューマーグループ: 各コンシューマーはいずれかのコンシューマーグループに属します。各メッセージは、コンシューマーグループ内の1つのコンシューマーによってのみコンシュームできますが、複数のコンシューマーグループによってコンシュームされることは可能です。
  • トピック: メッセージのカテゴリ。各メッセージは特定のトピックに属し、異なるトピックは互いに独立しています。つまり、Kafka はトピックベースです。
  • パーティション: 各トピックは複数のパーティションに分割されます。パーティションは、Kafka によって割り当てられる単位です。Kafka では物理的にはディレクトリのようなもので、そのディレクトリ内のログファイルがパーティションを構成します。
  • レプリカ: パーティションのコピー。パーティションの高可用性を保証します。
  • リーダー: レプリカにおける役割。プロデューサーとコンシューマーはリーダーとのみやり取りします。
  • フォロワー: リーダーからデータを複製するレプリカ内の役割。
  • コントローラー: Kafka クラスター内のサーバーのうちの 1 つ。リーダー選出やさまざまなフェイルオーバー操作に使用されます。
  • Zookeeper: Kafka は Zookeeper を使用してクラスターのメタ情報を保存します。

トピックとログ

メッセージはトピックごとに整理され、各トピックは複数のパーティション (server.properties/num.partitions に対応) に分割できます。

パーティションは順次追加ログであり、順次ディスク書き込みを伴います (順次ディスク書き込みはランダム メモリ書き込みよりも効率的であり、Kafka のスループットを保証します)。

その構造は次のとおりです: server.properties/num.partitions は、server.properties ファイル内の num.partitions 構成項目を表し、以下も同様です。

パーティション内の各レコード (メッセージ) には、Offset、messageSize、Data の 3 つの属性が含まれます。

Offset はメッセージのオフセットを表し、messageSize はメッセージのサイズを表し、Data はメッセージの内容を表します。

パーティションはファイル システム内のファイルとして保存され、その場所は server.properties/log.dirs によって指定され、命名規則は次のとおりです。 -

たとえば、トピック「page_visits」のメッセージは 5 つのパーティションに分割され、そのディレクトリ構造は次のようになります。

パーティションは異なるブローカー上に存在できます。パーティションはセグメント化されており、各セグメントはセグメントファイルです。

セグメントの一般的な構成は次のとおりです。

  1. #サーバープロパティ
  2.  
  3. #セグメントファイルサイズ、デフォルトは1G
  4. ログセグメントバイト数=1024*1024*1024
  5. #ローリング方式で新しいセグメントファイルを生成するための最大期間
  6. ログロール時間=24*7
  7. #セグメント ファイルの最大保持時間。このタイムアウト後には削除されます。
  8. ログ保持時間=24時間*7日

パーティションディレクトリには、データファイルとインデックスファイルが含まれます。次の図は、パーティションのディレクトリ構造を示しています。

インデックスはスパース ストレージを使用します。つまり、すべてのメッセージに対してインデックスを作成するのではなく、インデックス ファイルが多くのスペースを占有することを避けるために、特定のバイト数ごとにインデックスを作成します。

欠点は、インデックスのないオフセットではメッセージの位置を 1 ステップで見つけることができないことです。順次スキャンが必要ですが、スキャン範囲は非常に狭くなります。

インデックスは、相対オフセットと位置の 2 つの部分 (どちらも 4 バイトの数値) で構成されます。

相対オフセットはセグメント ファイル内のオフセットを参照し、位置はデータ ファイル内のメッセージの位置を参照します。

要約すると、Kafka のメッセージ ストレージでは、パーティショニング、シーケンシャル ディスク読み取り/書き込み、セグメンテーション、スパース インデックスを採用して高い効率を実現します。

パーティションとレプリカ

トピックは物理的に複数のパーティションに分割され、それぞれ異なるブローカー上に配置されます。レプリカがない場合、ブローカーがダウンすると、そのすべてのパーティションが利用できなくなります。

各パーティションには複数のレプリカ (server.properties/default.replication.factor に対応) を含めることができ、それらは異なるブローカーに割り当てられます。

リーダーの 1 つは読み取りと書き込みを担当し、プロデューサーとコンシューマーからのリクエストを処理します。他のリーダーはフォロワーとして機能し、リーダーからメッセージを引き出して同期を保ちます。

パーティションとレプリカをブローカーに割り当てるにはどうすればよいでしょうか。手順は次のとおりです。

  • すべてのブローカー (合計で n 個のブローカーがあると仮定) と割り当てるパーティションを並べ替えます。
  • i 番目のパーティションを (i mod n) 番目のブローカーに割り当てます。
  • i番目のパーティションのj番目のレプリカを((i + j)モードn)番目のブローカーに割り当てます。

上記の割り当てルールに従うと、レプリカ数がブローカー数より多い場合、必然的に同じブローカーに2つの同一のレプリカが割り当てられ、冗長性が生じます。したがって、レプリカ数はブローカー数以下である必要があります。

リーダー選挙

Kafka は、ZooKeeper (/brokers/topics/[topic]/partitions/[partition]/state) で ISR (同期レプリカ) を動的に維持します。

ISR 内のすべてのレプリカがリーダーに追いついたため、コントローラは ISR から 1 つを選択してリーダーになります。

具体的な手順は以下のとおりです。

  • コントローラーはZookeeperの/brokers/ids/[brokerId]ノードにウォッチャーを登録します。ブローカーがダウンすると、Zookeeperはウォッチを起動します。
  • コントローラーは、/brokers/ids ノードから利用可能なブローカーを読み取ります。
  • コントローラは、障害が発生したブローカー上のすべてのパーティションを含む set_p を決定します。
  • set_p 内の各パーティションについて、/brokers/topics/[topic]/partitions/[partition]/state ノードから ISR を読み取り、新しいリーダーを決定し、新しいリーダー、ISR、controller_epoch、leader_epoch などの情報を State ノードに書き込みます。
  • RPC 経由で leaderAndISRRequest コマンドを関連するブローカーに送信します。

ISR が空の場合、レプリカ (必ずしも ISR メンバーではない) がリーダーとして選択されます。すべてのレプリカがダウンしている場合は、任意のレプリカが復活し、リーダーとして選択されます。

ISR(レプリカリスト)では、すべてのフォロワーがリーダーに「追いついた」状態です。「追いついた」とは、完全に同じ状態になるという意味ではありません。これは、server.properties/replica.lag.time.max.ms によって設定されます。

これは、リーダーがフォロワーのメッセージ同期を待機する最大時間を示します。このタイムアウトが発生した場合、リーダーはフォロワーをISRから削除します。設定項目「replica.lag.max.messages」は削除されました。

レプリカ同期

Kafka は「プル モード」を使用してメッセージを同期します。このモードでは、フォロワーがリーダーからデータを一括してプルして同期します。

具体的な信頼性はプロデューサーによって決定されます (構成項目 producer.properties/acks に基づきます)。

Kafka 0.9 では、プロデューサーの設定である request.required.acks=-1 が acks=all に置き換えられましたが、この古い設定はドキュメントに残っています。

バージョン 0.9 では、プロデューサー構成オプション request.required.acks=-1 は acks=all に置き換えられましたが、古い構成オプションも引き続き文書化されています。

PS: 最新のドキュメント 2.2.x request.required.acks は存在しなくなりました。

Acks=-1 の場合、ISR が min.insync.replicas で指定された数より小さいと、NotEnoughReplicas または NotEnoughReplicasAfterAppend 例外がスローされます。

プロデューサーはどのようにメッセージを送信しますか?

プロデューサーはまずメッセージを ProducerRecord インスタンスにカプセル化します。

メッセージルーティング:

  • メッセージを送信するときにパーティションが指定されている場合は、それが直接使用されます。
  • キーが指定されている場合、そのキーがハッシュ化されてパーティションが選択されます。このハッシュ(つまり、パーティション分割メカニズム)は、producer.properties/partitioner.class で指定されたクラスによって実装されます。このクラスは Partitioner インターフェースを実装する必要があります。
  • 何も指定されていない場合は、ラウンドロビンを使用してパーティションが選択されます。

メッセージはすぐに送信されるのではなく、シリアル化されてから、前述のハッシュ関数であるパー​​ティショナーに送信されます。パーティショナーがターゲットパーティションを決定した後、メッセージはメモリバッファ(送信キュー)に送信されます。

プロデューサーの別のワーカー スレッド (つまり、送信スレッド) は、準備されたメッセージをバッファーからリアルタイムで抽出し、それらをバッチにカプセル化して、対応するブローカーに送信する役割を担います。

プロセスはおおよそ次のようになります。

画像は123archuから

コンシューマーはどのようにしてメッセージを消費するのでしょうか?

各コンシューマーは論理コンシューマーグループに割り当てられます。パーティションは同じコンシューマーグループ内の1つのコンシューマーによってのみ使用されますが、異なるコンシューマーグループによって使用されることは可能です。

トピック内のパーティション数が p で、コンシューマー グループ内でこのトピックをサブスクライブしているコンシューマー数が c の場合、次のようになります。

  1. p < c の場合: c - p の消費者はアイドル状態になり、無駄が生じます。
  2. p > c: 1人の消費者が複数のパーティションに対応する
  3. p = c: 1つの消費者は1つのパーティションに対応する

リソースの不均衡を避けるため、コンシューマーとパーティションの数は適切に割り当てる必要があります。理想的には、パーティションの数はコンシューマーの数の整数倍である必要があります。

① コンシューマーにパーティションを割り当てる方法

生成プロセスでは、ブローカーはパーティションを割り当てる必要があります。同様に、消費プロセスでも、パーティションをコンシューマーに割り当てる必要があります。

ブローカーがコントローラーを選択するのと同様に、コンシューマーもブローカーからコーディネーターを選択してパーティションを割り当てます。

コンシューマーの追加、コンシューマーの削減 (アクティブまたはパッシブ)、パーティションの追加など、パーティションまたはコンシューマーの数が変更されると、再バランスが実行されます。

プロセスは次のとおりです。

  • コンシューマーはコーディネーターにJoinGroupRequestを送信します。他のコンシューマーがハートビートリクエストを送信すると、コーディネーターはリバランスが必要であることを通知します。他のコンシューマーもJoinGroupRequestを送信します。
  • コーディネーターはコンシューマーの中からリーダーを選択し、他のコンシューマーはフォロワーになります。その後、コーディネーターは各コンシューマーにこの情報を通知します。リーダーには、フォロワーのメタデータも提供します。
  • コンシューマー リーダーは、コンシューマー メタデータに基づいてパーティションを再割り当てします。
  • コンシューマはコーディネーターにSyncGroupRequestを送信します。リーダーのSyncGroupRequestには割り当て情報が含まれます。コーディネーターは、リーダーの情報を含む割り当て情報をコンシューマに通知するパケットで応答します。

②コンシューマーフェッチメッセージ

消費者は「プル モデル」を使用してメッセージを消費し、独自の消費行動を決定できます。

ConsumerはPoll(duration)を呼び出してサーバーからメッセージを取得します。メッセージ取得の具体的な動作は、以下の設定項目によって決まります。

  1. #消費者プロパティ
  2.  
  3. #コンシューマーは最大いくつのレコードをポーリングできますか?
  4. 最大ポーリングレコード数= 500
  5.  
  6. #コンシューマー ポーリング中にパーティションによって返されるデータの最大量
  7. 最大パーティションフェッチバイト数=1048576
  8.  
  9. #コンシューマー最大ポーリング間隔
  10. #この値を超えると、サーバーはコンシューマーが失敗したと見なします。
  11. #そして、このコンシューマーを対応するコンシューマーグループから追い出します。   
  12. 最大ポーリング間隔(ミリ秒) = 300000

パーティション内では、各メッセージにオフセットが設定されます。新しいメッセージはパーティションの末尾(最新のセグメントファイルの末尾)に書き込まれます。各パーティション内のメッセージは順番に消費されますが、異なるパーティション間での消費順序は予測できません。

コンシューマーが複数のパーティションを消費する場合、各パーティション間の消費順序は不確実ですが、各パーティション内の消費は順次行われます。

異なるコンシューマー グループの複数のコンシューマーが同じパーティションを消費する場合、各コンシューマー間の消費は互いに影響せず、各コンシューマーには独自のオフセットが存在します。

コンシューマーAとコンシューマーBは異なるコンシューマーグループに属しています。コンシューマーAはオフセット9を読み取り、コンシューマーBはオフセット11を読み取ります。この値は次に読み取る位置を示します。

つまり、コンシューマー A はオフセット 0 から 8 のメッセージを読み取り、コンシューマー B はオフセット 0 から 10 のメッセージを読み取ります。

再バランスが発生する可能性があるため、オフセット = 9 から次に読み取られるコンシューマーは、必ずしもコンシューマー A のままであるとは限りません。

オフセットを保存するにはどうすればいいですか?

コンシューマーがパーティションを消費する場合、現在の消費位置を記録するためにオフセットを保存する必要があります。

オフセットは、コンシューマーのcommitSync()またはcommitAsync()メソッドを呼び出すことで、自動コミットまたは手動コミットのいずれかに設定できます。関連する設定は次のとおりです。

  1. #オフセットを自動的にコミットするかどうか
  2. enable.auto.commit = true  
  3.  
  4. # 自動コミット間隔。enable.auto.commit = trueの場合に有効です
  5. 自動コミット間隔(ミリ秒) =5000

オフセットは__consumeroffsetsという名前のトピックに保存されます。メッセージを書き込むためのキーはGroupId、Topic、Partitionで構成され、値はオフセットです。

通常、各キーのオフセットはメモリにキャッシュされるため、クエリ実行中にパーティションをトラバースする必要はありません。キャッシュが存在しない場合は、最初のクエリでパーティションをトラバースしてキャッシュを確立し、その後クエリが結果を返します。

__consumeroffsets 内のパーティションの数は、次のサーバー構成によって決まります。

  1. オフセット.トピック.番号.パーティション=50

オフセットが格納されるパーティション、つまり __consumeroffsets のパーティション分割メカニズムは、次のように表すことができます。

  1. groupId.hashCode() モード groupMetadataTopicPartitionCount

`groupMetadataTopicPartitionCount` は、上記で設定されたパーティションの数です。パーティションは同じコンシューマーグループ内の 1 人のコンシューマーによってのみ消費されるため、`GroupId` を使用して、コンシューマーがオファーを消費するパーティションを示すことができます。

メッセージング システムではどのような問題が発生する可能性がありますか?

Kafka は 3 つのメッセージ配信セマンティクスをサポートしています。

  • 最大 1 回: メッセージは失われる可能性がありますが、重複することはありません。

データの取得 -> コミットオフセット -> ビジネス処理

  • 少なくとも 1 回: メッセージは失われませんが、重複する可能性があります。

データ取得 -> 業務処理 -> コミットオフセット。

  • 正確に 1 回: メッセージは、損失や重複なく 1 回だけ消費されます (バージョン 0.11 で実装されていますが、Kafka などのダウンストリーム システムにのみ適用されます)。

① メッセージが繰り返し消費されないようにするにはどうすればよいでしょうか?(メッセージの冪等性)

更新操作は本質的に冪等です。挿入操作では、各メッセージに一意のIDを割り当て、処理前に前回の処理状況を確認できます。このIDはRedisに保存するか、データベースへの書き込みの場合は主キー制約を使用できます。

② 確実なメッセージ伝送を確保するには?(メッセージ損失の問題)

Kafka アーキテクチャによれば、メッセージはコンシューマー、プロデューサー、サーバーの 3 つの場所で失われる可能性があります。

コンシューマー側でのデータ損失:server.properties/enable.auto.commit が True に設定されている場合、Kafka はメッセージを処理する前にオフセットをコミットします。この時点で例外が発生すると、メッセージは失われます。

したがって、自動オフセットコミットをオフにし、処理後に手動でオフセットをコミットすることで、メッセージが失われないようにすることができます。ただし、オフセットコミットが失敗すると、重複した消費が発生する可能性があります。この場合、冪等性を確保すれば十分です。

Kafka メッセージの損失: ブローカーが誤ってダウンし、レプリカが 1 つしかない場合、ブローカー上のメッセージは失われます。

レプリカ数が1より大きい場合、新しいフォロワーが新しいリーダーとして選択されます。フォロワーに同期されていないメッセージが残っている場合、それらのメッセージは失われます。

上記の問題は、次のように設定することで回避できます。

  • トピックの replication.factor パラメータを設定します。この値は 1 より大きい必要があり、各パーティションには少なくとも 2 つのレプリカが必要です。
  • Kafkaサーバーで、`min.insync.replicas`パラメータを設定します。この値は1より大きくする必要があります。これにより、リーダーは少なくとも1つのフォロワーがまだ通信中であり、遅延していないことを認識している必要があります。これにより、リーダーに障害が発生しても、フォロワーが確実に存在するようになります。
  • プロデューサー側で acks=all を設定する場合、書き込みが成功したとみなされる前に、各データがすべてのレプリカに書き込まれる必要があります。
  • プロデューサー側で retries=MAX (非常に大きな値、つまり無限の再試行) を設定すると、書き込み操作が失敗すると無限の再試行が必要になり、ここで問題が発生します。

プロデューサーがメッセージを失った場合: プロデューサー側で acks=all を設定し、書き込みが成功したと判断する前にすべての ISR がメッセージを同期していることを確認します。

③ メッセージの順序を確実にするにはどうすればいいですか?

Kafka では、パーティション上のメッセージは順序付けされます。順番に消費する必要があるメッセージを同じパーティションに送信し、単一のコンシューマーで消費することができます。

上記は私がKafkaを学習する際にまとめたものです。誤りや矛盾点などございましたら、お気軽にご指摘ください。

参考文献:

  • Kafka 学習ノート: 主要概念のまとめ
  • 高度なJava
  • Kafka ログストレージ分析
  • Kafka プロデューサーパラメータ設定とチューニングの提案 - 商用環境向け実践シリーズ
  • 衝撃的!これがカフカのすべてか!
  • Kafkaの設定
  • カフカ 2.3.0 API
  • Kafka コンシューマーの設定とコミット方法の詳細な説明

著者: lbzhello

プロフィール: Javaプログラマー、メールアドレス: [email protected]