|
メッセージキューとは何でしょうか?簡単に言うと、メッセージキューとはメッセージを格納するためのコンテナです。クライアントはメッセージサーバーにメッセージを送信したり、メッセージサーバーからメッセージを取得したりできます。
Pexelsからの画像 今日は、以下の質問についての私の考えを共有したいと思います。
なぜメッセージングシステムが必要なのでしょうか? ピークシェービング データベースの処理能力には限界があります。ピーク時には、過剰なリクエストがバックグラウンドで処理され、システムの処理能力を超えるとシステムがクラッシュする可能性があります。 上図に示すように、システムの処理能力は2k/s、MQの処理能力は8k/s、ピーク時のリクエストレートは5k/sです。MQの処理能力はデータベースよりもはるかに高く、ピーク時にはリクエストがMQにキューイングされ、システムは自身の処理能力に基づいて2k/sのレートでこれらのリクエストを処理できます。 ピーク期間が過ぎると、リクエスト数は 100/s のみになり、システムは MQ に蓄積されたリクエストをすぐに消費できます。 上記のリクエストは書き込みリクエストを指していることに注意してください。クエリ リクエストは通常、キャッシュを通じて処理されます。 デカップリング 次のシナリオでは、システム S はシステム A、B、C と密結合されています。要件の変更により、システム A はコードを変更し、システム S もシステム A の関連コードを調整する必要があります。 数日後、システム C を削除する必要があり、すぐにシステム S で C 関連のコードを削除します。さらに数日後、システム D を追加する必要があり、システム S で D 関連のコードを追加する必要があります。さらにその数日後、プログラマーが発狂します... システム間の密結合は、保守と拡張を妨げます。メッセージキュー(MQ)を導入することで、システムAへの変更はA自身のコードを変更するだけで対応でき、システムCへの削除はサブスクライブ解除するだけで解決でき、システムDへの追加は関連メッセージをサブスクライブするだけで実現できます。 メッセージ ミドルウェアを導入することで、すべてのシステムがメッセージ キュー (MQ) と対話できるようになり、システム間の複雑な呼び出し関係を回避できます。 Kafka のアーキテクチャの背後にある原則は何ですか? Kafka 関連の概念:
トピックとログ メッセージはトピックごとに整理され、各トピックは複数のパーティション (server.properties/num.partitions に対応) に分割できます。 パーティションは順次追加ログであり、順次ディスク書き込みを伴います (順次ディスク書き込みはランダム メモリ書き込みよりも効率的であり、Kafka のスループットを保証します)。 その構造は次のとおりです: server.properties/num.partitions は、server.properties ファイル内の num.partitions 構成項目を表し、以下も同様です。 パーティション内の各レコード (メッセージ) には、Offset、messageSize、Data の 3 つの属性が含まれます。 Offset はメッセージのオフセットを表し、messageSize はメッセージのサイズを表し、Data はメッセージの内容を表します。 パーティションはファイル システム内のファイルとして保存され、その場所は server.properties/log.dirs によって指定され、命名規則は次のとおりです。 たとえば、トピック「page_visits」のメッセージは 5 つのパーティションに分割され、そのディレクトリ構造は次のようになります。 パーティションは異なるブローカー上に存在できます。パーティションはセグメント化されており、各セグメントはセグメントファイルです。 セグメントの一般的な構成は次のとおりです。
パーティションディレクトリには、データファイルとインデックスファイルが含まれます。次の図は、パーティションのディレクトリ構造を示しています。 インデックスはスパース ストレージを使用します。つまり、すべてのメッセージに対してインデックスを作成するのではなく、インデックス ファイルが多くのスペースを占有することを避けるために、特定のバイト数ごとにインデックスを作成します。 欠点は、インデックスのないオフセットではメッセージの位置を 1 ステップで見つけることができないことです。順次スキャンが必要ですが、スキャン範囲は非常に狭くなります。 インデックスは、相対オフセットと位置の 2 つの部分 (どちらも 4 バイトの数値) で構成されます。 相対オフセットはセグメント ファイル内のオフセットを参照し、位置はデータ ファイル内のメッセージの位置を参照します。 要約すると、Kafka のメッセージ ストレージでは、パーティショニング、シーケンシャル ディスク読み取り/書き込み、セグメンテーション、スパース インデックスを採用して高い効率を実現します。 パーティションとレプリカ トピックは物理的に複数のパーティションに分割され、それぞれ異なるブローカー上に配置されます。レプリカがない場合、ブローカーがダウンすると、そのすべてのパーティションが利用できなくなります。 各パーティションには複数のレプリカ (server.properties/default.replication.factor に対応) を含めることができ、それらは異なるブローカーに割り当てられます。 リーダーの 1 つは読み取りと書き込みを担当し、プロデューサーとコンシューマーからのリクエストを処理します。他のリーダーはフォロワーとして機能し、リーダーからメッセージを引き出して同期を保ちます。 パーティションとレプリカをブローカーに割り当てるにはどうすればよいでしょうか。手順は次のとおりです。
上記の割り当てルールに従うと、レプリカ数がブローカー数より多い場合、必然的に同じブローカーに2つの同一のレプリカが割り当てられ、冗長性が生じます。したがって、レプリカ数はブローカー数以下である必要があります。 リーダー選挙 Kafka は、ZooKeeper (/brokers/topics/[topic]/partitions/[partition]/state) で ISR (同期レプリカ) を動的に維持します。 ISR 内のすべてのレプリカがリーダーに追いついたため、コントローラは ISR から 1 つを選択してリーダーになります。 具体的な手順は以下のとおりです。
ISR が空の場合、レプリカ (必ずしも ISR メンバーではない) がリーダーとして選択されます。すべてのレプリカがダウンしている場合は、任意のレプリカが復活し、リーダーとして選択されます。 ISR(レプリカリスト)では、すべてのフォロワーがリーダーに「追いついた」状態です。「追いついた」とは、完全に同じ状態になるという意味ではありません。これは、server.properties/replica.lag.time.max.ms によって設定されます。 これは、リーダーがフォロワーのメッセージ同期を待機する最大時間を示します。このタイムアウトが発生した場合、リーダーはフォロワーをISRから削除します。設定項目「replica.lag.max.messages」は削除されました。 レプリカ同期 Kafka は「プル モード」を使用してメッセージを同期します。このモードでは、フォロワーがリーダーからデータを一括してプルして同期します。 具体的な信頼性はプロデューサーによって決定されます (構成項目 producer.properties/acks に基づきます)。 Kafka 0.9 では、プロデューサーの設定である request.required.acks=-1 が acks=all に置き換えられましたが、この古い設定はドキュメントに残っています。 バージョン 0.9 では、プロデューサー構成オプション request.required.acks=-1 は acks=all に置き換えられましたが、古い構成オプションも引き続き文書化されています。 PS: 最新のドキュメント 2.2.x request.required.acks は存在しなくなりました。 Acks=-1 の場合、ISR が min.insync.replicas で指定された数より小さいと、NotEnoughReplicas または NotEnoughReplicasAfterAppend 例外がスローされます。 プロデューサーはどのようにメッセージを送信しますか? プロデューサーはまずメッセージを ProducerRecord インスタンスにカプセル化します。 メッセージルーティング:
メッセージはすぐに送信されるのではなく、シリアル化されてから、前述のハッシュ関数であるパーティショナーに送信されます。パーティショナーがターゲットパーティションを決定した後、メッセージはメモリバッファ(送信キュー)に送信されます。 プロデューサーの別のワーカー スレッド (つまり、送信スレッド) は、準備されたメッセージをバッファーからリアルタイムで抽出し、それらをバッチにカプセル化して、対応するブローカーに送信する役割を担います。 プロセスはおおよそ次のようになります。 画像は123archuから コンシューマーはどのようにしてメッセージを消費するのでしょうか? 各コンシューマーは論理コンシューマーグループに割り当てられます。パーティションは同じコンシューマーグループ内の1つのコンシューマーによってのみ使用されますが、異なるコンシューマーグループによって使用されることは可能です。 トピック内のパーティション数が p で、コンシューマー グループ内でこのトピックをサブスクライブしているコンシューマー数が c の場合、次のようになります。
リソースの不均衡を避けるため、コンシューマーとパーティションの数は適切に割り当てる必要があります。理想的には、パーティションの数はコンシューマーの数の整数倍である必要があります。 ① コンシューマーにパーティションを割り当てる方法 生成プロセスでは、ブローカーはパーティションを割り当てる必要があります。同様に、消費プロセスでも、パーティションをコンシューマーに割り当てる必要があります。 ブローカーがコントローラーを選択するのと同様に、コンシューマーもブローカーからコーディネーターを選択してパーティションを割り当てます。 コンシューマーの追加、コンシューマーの削減 (アクティブまたはパッシブ)、パーティションの追加など、パーティションまたはコンシューマーの数が変更されると、再バランスが実行されます。 プロセスは次のとおりです。
②コンシューマーフェッチメッセージ 消費者は「プル モデル」を使用してメッセージを消費し、独自の消費行動を決定できます。 ConsumerはPoll(duration)を呼び出してサーバーからメッセージを取得します。メッセージ取得の具体的な動作は、以下の設定項目によって決まります。
パーティション内では、各メッセージにオフセットが設定されます。新しいメッセージはパーティションの末尾(最新のセグメントファイルの末尾)に書き込まれます。各パーティション内のメッセージは順番に消費されますが、異なるパーティション間での消費順序は予測できません。 コンシューマーが複数のパーティションを消費する場合、各パーティション間の消費順序は不確実ですが、各パーティション内の消費は順次行われます。 異なるコンシューマー グループの複数のコンシューマーが同じパーティションを消費する場合、各コンシューマー間の消費は互いに影響せず、各コンシューマーには独自のオフセットが存在します。 コンシューマーAとコンシューマーBは異なるコンシューマーグループに属しています。コンシューマーAはオフセット9を読み取り、コンシューマーBはオフセット11を読み取ります。この値は次に読み取る位置を示します。 つまり、コンシューマー A はオフセット 0 から 8 のメッセージを読み取り、コンシューマー B はオフセット 0 から 10 のメッセージを読み取ります。 再バランスが発生する可能性があるため、オフセット = 9 から次に読み取られるコンシューマーは、必ずしもコンシューマー A のままであるとは限りません。 オフセットを保存するにはどうすればいいですか? コンシューマーがパーティションを消費する場合、現在の消費位置を記録するためにオフセットを保存する必要があります。 オフセットは、コンシューマーのcommitSync()またはcommitAsync()メソッドを呼び出すことで、自動コミットまたは手動コミットのいずれかに設定できます。関連する設定は次のとおりです。
オフセットは__consumeroffsetsという名前のトピックに保存されます。メッセージを書き込むためのキーはGroupId、Topic、Partitionで構成され、値はオフセットです。 通常、各キーのオフセットはメモリにキャッシュされるため、クエリ実行中にパーティションをトラバースする必要はありません。キャッシュが存在しない場合は、最初のクエリでパーティションをトラバースしてキャッシュを確立し、その後クエリが結果を返します。 __consumeroffsets 内のパーティションの数は、次のサーバー構成によって決まります。
オフセットが格納されるパーティション、つまり __consumeroffsets のパーティション分割メカニズムは、次のように表すことができます。
`groupMetadataTopicPartitionCount` は、上記で設定されたパーティションの数です。パーティションは同じコンシューマーグループ内の 1 人のコンシューマーによってのみ消費されるため、`GroupId` を使用して、コンシューマーがオファーを消費するパーティションを示すことができます。 メッセージング システムではどのような問題が発生する可能性がありますか? Kafka は 3 つのメッセージ配信セマンティクスをサポートしています。
データの取得 -> コミットオフセット -> ビジネス処理
データ取得 -> 業務処理 -> コミットオフセット。
① メッセージが繰り返し消費されないようにするにはどうすればよいでしょうか?(メッセージの冪等性) 更新操作は本質的に冪等です。挿入操作では、各メッセージに一意のIDを割り当て、処理前に前回の処理状況を確認できます。このIDはRedisに保存するか、データベースへの書き込みの場合は主キー制約を使用できます。 ② 確実なメッセージ伝送を確保するには?(メッセージ損失の問題) Kafka アーキテクチャによれば、メッセージはコンシューマー、プロデューサー、サーバーの 3 つの場所で失われる可能性があります。 コンシューマー側でのデータ損失:server.properties/enable.auto.commit が True に設定されている場合、Kafka はメッセージを処理する前にオフセットをコミットします。この時点で例外が発生すると、メッセージは失われます。 したがって、自動オフセットコミットをオフにし、処理後に手動でオフセットをコミットすることで、メッセージが失われないようにすることができます。ただし、オフセットコミットが失敗すると、重複した消費が発生する可能性があります。この場合、冪等性を確保すれば十分です。 Kafka メッセージの損失: ブローカーが誤ってダウンし、レプリカが 1 つしかない場合、ブローカー上のメッセージは失われます。 レプリカ数が1より大きい場合、新しいフォロワーが新しいリーダーとして選択されます。フォロワーに同期されていないメッセージが残っている場合、それらのメッセージは失われます。 上記の問題は、次のように設定することで回避できます。
プロデューサーがメッセージを失った場合: プロデューサー側で acks=all を設定し、書き込みが成功したと判断する前にすべての ISR がメッセージを同期していることを確認します。 ③ メッセージの順序を確実にするにはどうすればいいですか? Kafka では、パーティション上のメッセージは順序付けされます。順番に消費する必要があるメッセージを同じパーティションに送信し、単一のコンシューマーで消費することができます。 上記は私がKafkaを学習する際にまとめたものです。誤りや矛盾点などございましたら、お気軽にご指摘ください。 参考文献:
著者: lbzhello プロフィール: Javaプログラマー、メールアドレス: [email protected] |