DUICUO

Kafka の原理を説明していただけますか?

もしあなたがたった一人の女性を追いかけ、特に彼女に尽くすなら、あなたを知る人々は間違いなくあなたをいい男と言うでしょう。もしあなたがたくさんの女性に手を出すだけで、そのほとんどを追いかけるなら、あなたを知る人々は間違いなくあなたをクズ野郎と呼ぶでしょう。

[[287183]]

技術分野でも同じことが言えます。特定の技術を深く掘り下げれば、その分野で欠かせない人材になれます。しかし、あらゆる技術を習得しようとして困難に直面した途端に諦めてしまうと、すぐに代替されてしまうでしょう。

今日の中国社会は、まるで触媒のように、機敏な人材を生み出しています。週末、こんな話を聞きました。私の大学に、ある学生がクラスモニターを務めていました。裕福な家庭で育ち、幼い頃から社会に馴染んでいたためか、人との接し方においても成熟していました。彼は指導教員と非常に親密で、しょっちゅう授業を欠席すると、指導教員がメモを書いてくれるほどでした。クラスメイトが彼に60点を付けると、指導教員はこっそりと80点に修正するほどでした。私はこう言いました。「これは社会の触媒作用の産物だ。キャンパスは社会の縮図であり、多くのことがこの縮図の中から変化し始めるのだ」

私たちは、入手したものが合法かコンプライアンスに準拠しているかを一切考慮しません。かつてメンターが私にこう言ったのを覚えています。「善良な人間になりなさい。常に近道を探すな」。この言葉はお茶のようなものです。その真の味は、ゆっくりと丁寧に味わうことで生まれるのです。

さて、技術的な側面に戻りましょう。

Kafka アプリケーションの開発のみ、または本番環境で Kafka を使用する場合、Kafka の内部動作を理解する必要はありません。しかし、Kafka の内部動作を理解することは、Kafka の動作を理解し、迅速な問題診断を行うのに役立ちます。以下の 3 つの質問について検討してみましょう。

  • Kafka はどのようにレプリケーションを実行しますか?
  • Kafka はプロデューサーとコンシューマーからのリクエストをどのように処理しますか?
  • Kafka のストレージの詳細は何ですか?

ご興味がございましたら、ぜひ時間を取ってこの記事をじっくりとお読みください。

クラスターメンバー間の関係

Kafka は ZooKeeper 上で動作することが知られています。ZooKeeper はクラスター構成で動作するため、Kafka もクラスター構成で動作できます。ここで、複数のプロデューサーとコンシューマーがどのように連携するかという疑問が生じますが、ZooKeeper はクラスターメンバー間の関係維持を担います。ご存知のとおり、Kafka クラスターは複数のブローカーで構成され、各ブローカーには `broker.id` が付与されています。各 `broker.id` には識別用の一意の識別子が付与されており、この識別子は設定ファイルで手動で指定することも、自動生成することもできます。

Kafka は、broker.id.generation.enable と reserved.broker.max.id を使用して新しい broker.id を生成できます。

`broker.id.generation.enable` パラメータは、`broker.id` の自動生成を有効にするかどうかを設定します。デフォルトは `true` で、この機能が有効になっていることを意味します。自動生成される `broker.id` のデフォルト値は 1000 です。つまり、デフォルトでは自動生成される `broker.id` は 1001 から始まります。

Kafka は起動時に、ZooKeeper の `/brokers/ids` パスに、現在のブローカーと同じ ID を持つ一時ノードを登録します。Kafka のヘルスチェックはこのノードに依存しています。これらのコンポーネントは、ブローカーがクラスターに参加または離脱すると通知を受け取ります。

  • 同じ ID を持つ別のブローカーを起動しようとすると、エラーが発生します。新しいブローカーは登録を試みますが、ZooKeeper に同じ ID を持つブローカーが既に存在するため失敗します。
  • ブローカーがダウンした場合、パーティション分割が発生した場合、またはガベージコレクションによる長時間の一時停止が発生した場合、ブローカーはZooKeeperから切断されます。この際、ブローカーの起動時に作成されたエフェメラルノードはZooKeeperから削除されます。ブローカーリストを監視しているKafkaコンポーネントには、ブローカーが削除されたことが通知されます。
  • ブローカーがシャットダウンされると、対応するノードも消滅しますが、そのIDはトピックのレプリカリストなどの他のデータ構造に残ります。レプリカリストのレプリケーションについては後ほど説明します。ブローカーが完全にシャットダウンされた後、同じIDで全く新しいブローカーが起動された場合、そのブローカーは直ちにクラスターに参加し、古いブローカーと同じパーティションとトピックを持ちます。

ブローカーコントローラーの役割

Kafka のリバランスについて説明した際に、グループ間の関係を調整する役割を担うグループコーディネーターについて触れました。同様に、ブローカー間にはコントローラーコンポーネントがあり、これは Kafka のコアコンポーネントです。コントローラーの主な機能は、ZooKeeper を利用して Kafka クラスター全体を管理および調整することです。クラスター内の各ブローカーはコントローラーと見なすことができますが、Kafka クラスターの起動後にコントローラーになるブローカーは 1 つだけです。Kafka クラスターは ZooKeeper クラスターに依存しているため、ここでは znode について簡単に説明します。

ZooKeeperは、znodeと呼ばれるノードにデータを保存しています。znodeは、Linuxオペレーティングシステムのファイルパスに似たツリー状のファイル構造です。ZooKeeperのルートノードは/です。

データの永続化方法に基づいて、znode はエフェメラルノードと永続ノードに分類されます。永続ノードは ZooKeeper の状態変化によって消滅することはありませんが、エフェメラルノードは ZooKeeper の再起動時に自動的に消滅します。

znodeにはウォッチャーメカニズムが搭載されています。データが変更されると、ZooKeeperはウォッチャーイベントを生成し、クライアントに送信します。ウォッチャーメカニズムはZooKeeperの重要な機能です。ZooKeeperで作成されたノードに基づいて、ノードデータの変更、ノードの削除、子ノードの状態変化などのリスナーイベントをこれらのノードにバインドできます。このイベントメカニズムを通じて、分散ロック、クラスタ管理、その他ZooKeeperベースの機能を実装できます。

コントローラーの選出

Kafka のコントローラー選出の現在のルールは次のとおりです。Kafka クラスターで最初に起動したブローカーは、ZooKeeper に一時ノード「/controller」を作成してコントローラーになります。他のブローカーは起動時にこのノードを作成しようとしますが、既に存在するため、後で「/controller」ノードを作成しようとしたときに「ノードが既に存在します」という例外が発生します。その後、他のブローカーはこのコントローラーに ZooKeeper ウォッチオブジェクトを登録します。「/controller」ノードが変更されると、他のブローカーはノード変更の通知を受け取ります。この方法により、コントローラーは 1 つだけ存在するようになります。しかし、ノードが 1 つしかない場合、単一障害点という問題が必然的に発生します。

コントローラがシャットダウンするか、ZooKeeperから切断されると、ZooKeeper上のエフェメラルノードは消滅します。クラスター内の他のノードがコントローラがオフラインであることを示すウォッチオブジェクトを受信すると、各ブローカーノードは新しいコントローラになろうとします。他のノードの作成ルールは最初のノードと同じです。ZooKeeperにコントローラノードを最初に正常に作成したブローカーが新しいコントローラになります。その後、他のノードはノードが既に存在することを示す例外を受け取り、新しいコントローラノード上に再度ウォッチオブジェクトを作成して変更をリッスンします。

コントローラーの役割

さて、ここまで読んできて、コントロールとは何でしょうか?コントローラーの機能は何でしょうか?あるいは、コントローラーコンポーネントの目的は何でしょうか?ご安心ください。それについては次に説明します。

Kafka は、ステートマシンをシミュレートするマルチスレッド コントローラーとして設計されており、次の機能を実行できます。

  • コントローラーは、部門 (クラスター) 内のブローカー コントローラーのようなもので、部門メンバー (ブローカー) を管理するために使用されます。
  • コントローラーはすべてのブローカーのモニターとして機能し、ブローカーのオンラインおよびオフラインのアクティビティを監視するために使用されます。
  • ブローカーがクラッシュした後、コントローラーは新しいパーティション リーダーを選出できます。
  • コントローラは、ブローカーの新しく選択されたリーダーにメッセージを送信できます。

さらに細かく分けると、以下の5点になります。

  • トピック管理:Kafkaコントローラーは、Kafkaトピックへのパーティションの作成、削除、追加を支援します。つまり、パーティションに対する最高権限を付与します。

つまり、kafka-topics スクリプトを実行すると、バックグラウンド作業のほとんどはコントローラーによって実行されます。

  • パーティションの再割り当て:パーティションの再割り当てとは、主にkafka-reassign-partitionsスクリプトによって提供される既存のトピックパーティションのきめ細かな割り当てを指します。この機能はコントローラーによっても実装されています。
  • 優先リーダー選出: 優先リーダー選出は、一部のブローカーの過負荷を回避するためにリーダーを置き換えるために Kafka によって提供されるソリューションです。
  • クラスター メンバー管理: 主にブローカーの追加、ブローカーのシャットダウン、ブローカーの障害の処理を管理します。
  • データサービス:コントローラーの最後の主要なタスクは、他のブローカーにデータサービスを提供することです。コントローラーは最も包括的なクラスターメタデータ情報を保存し、他のすべてのブローカーはコントローラーから定期的にメタデータ更新リクエストを受信し、メモリ内にキャッシュされたデータを更新します。このデータについては後述します。

コントローラーは、ブローカーがクラスターから離脱したことを検出すると(関連するZooKeeperパスを監視することで)、そのブローカーが管理するパーティションに新しいリーダーが必要であることを示すメッセージを受信します。コントローラーは各パーティションを反復処理し、どのパーティションが新しいリーダーとして適しているかを判断し、新しいリーダーまたは既存のフォロワーを含むすべてのパーティションにリクエストメッセージを送信します。このリクエストメッセージには、新しいリーダーとフォロワーに関する情報が含まれます。新しいリーダーはプロデューサーとコンシューマーからのリクエストの処理を開始し、フォロワーは新しいリーダーからのレプリケーションを実行します。

これは、アウトソーシング会社の出張専門部署のようなもので、全員が別々の拠点で勤務しています。ところが、本社に部長がいましたが、突然辞任してしまいました。会社は外部からの採用は考えておらず、部内から優秀な人材を選抜して部署を率いることに決めました。新リーダーは、部署のメンバーに任命を確認し、誰をマネジメントするかを明記したメッセージを送る必要があります。全員がこのメッセージを把握した後、部署内でそれぞれの業務に取り組むことができます。

コントローラは、クラスタに参加するブローカーを検出すると、ブローカーIDを使用して、新しく参加したブローカーに既存のパーティションのレプリカが含まれているかどうかを確認します。レプリカが含まれている場合、コントローラは新しく参加したブローカーと既存のブローカーの両方にメッセージを送信します。

上記のパーティションのコピーセクションについては後で説明します。

ブローカーコントローラーのデータストレージ

前述のように、ブローカー コントローラーは大量の Kafka クラスター データを保存するためのデータ サービスを提供します。(下の図を参照)

上記で保存される情報は、主に次の 3 つの種類に分類できます。

  • ブローカー内のすべてのパーティション、ブローカーのすべてのパーティションのレプリカ、現在実行中のブローカー、シャットダウン中のブローカーなど、ブローカーに関するすべての情報。
  • リーダー レプリカが誰であるか、ISR コレクション内にどのレプリカがあるかなどの特定のパーティション情報を含むすべてのトピック情報。
  • 運用タスクに関与するすべてのパーティション。これには、現在優先リーダー選出およびパーティション再割り当てが行われているパーティションのリストが含まれます。

Kafka は ZooKeeper に依存しているため、このデータも ZooKeeper に保存されます。コントローラーが初期化されるたびに、対応するメタデータが ZooKeeper から読み込まれ、自身のキャッシュに格納されます。

ブローカーコントローラのフェイルオーバー

前述の通り、ZooKeeper の `/brokers/ids` 以下にノードを作成した最初のブローカーがブローカーコントローラーになります。つまり、ブローカーコントローラーは 1 つしか存在しないため、必然的に単一障害点が発生します。Kafka はこの問題に対処するためにフェイルオーバー機能を提供しています。下の図をご覧ください。

最初に、broker1 がコントローラーとして正常に登録されます。その後、ネットワークのジッターなどにより、broker1 はオフラインになります。ZooKeeper は Watch メカニズムを通じて broker1 の切断を検出します。その後、生き残っているすべてのブローカーがコントローラーになるために競争します。このとき、broker3 が最初に登録に成功します。この時点で、ZooKeeper に保存されているコントローラー情報は、broker1 から broker3 に変更されます。その後、broker3 は ZooKeeper からメタデータ情報を読み取り、自身のキャッシュに初期化します。

注: ZooKeeper はキャッシュ情報を保存しません。ブローカーがキャッシュ情報を保存します。

ブローカーコントローラーの問題

Kafka バージョン 0.11 より前のバージョンでは、コントローラーの設計は非常に複雑でした。前述のように、Kafka コントローラーはステートマシンをシミュレートするマルチスレッドコントローラーとして設計されていましたが、この設計にはいくつかの問題がありました。

  • コントローラーの状態の変更は、異なるリスナーによって同時に実行されるため、複雑な同期が必要となり、エラーが発生しやすく、デバッグが困難になります。
  • 状態の伝播が同期されていません。ブローカーは不確実な時間に複数の状態に存在する可能性があり、これにより不要な追加データ損失が発生する可能性があります。
  • コントローラーはトピックの削除用に追加の I/O スレッドも作成するため、パフォーマンスのオーバーヘッドが発生します。
  • コントローラーのマルチスレッド設計は、共有データへのアクセスも行います。ご存知の通り、共有データへのマルチスレッドアクセスは、スレッド同期において最も厄介な部分です。データセキュリティを保護するために、コントローラーはコード内でReentrantLock同期メカニズムを多用する必要があり、これによりコントローラー全体の処理速度がさらに低下します。

ブローカーコントローラーの内部設計原則

Kafka 0.11以降、Kafkaコントローラーは新しい設計を採用し、マルチスレッドアプローチからイベントキューを使用したシングルスレッドアプローチに変更されました。下の図をご覧ください。

主な変更点は次のとおりです。

最初の改善点は、イベントエグゼキュータスレッドの追加です。図に示すように、イベントキューとコントローラコンテキストの両方がこのスレッドによって処理されます。これまで実行されていたすべての操作は独立したイベントとしてモデル化され、専用のイベントキューに送信され、このスレッドで処理されます。

2つ目の改善点は、これまで同期していたZooKeeper操作をすべて非同期操作に変更したことです。ZooKeeper APIは、同期と非同期の2つの読み取り/書き込みメソッドを提供しています。以前はZooKeeperのコントローラー操作はすべて同期でしたが、今回の非同期化により、テストの結果、効率が10倍向上しました。

3 つ目の改善点は、リクエスト処理の優先順位付けです。以前の設計では、ブローカーはコントローラーから送信されたすべてのリクエストを公平に処理していました。これはどういう意味でしょうか?公平性だけで十分ではないでしょうか?はい、状況によっては十分です。例えば、ブローカーがプロダクションリクエストをキューイングしているときに、コントローラーが StopReplica リクエストを発行した場合、どうしますか?プロダクションリクエストの処理を続行しますか?プロダクションリクエストはまだ有効ですか?最も合理的なアプローチは、StopReplica リクエストに高い優先度を与え、プリエンプティブ処理を受けられるようにすることです。

インスタンスメカニズム

レプリケーションはKafkaアーキテクチャの中核機能です。ドキュメントでは、Kafkaは分散型、パーティション化、レプリケーションされたコミットログサービスであると説明されています。永続的なメッセージストレージはマスターノードに障害が発生した場合でも高可用性を確保するために不可欠であり、レプリケーションは非常に重要です。レプリケーションメカニズム(バックアップメカニズムとも呼ばれます)は、通常、ネットワークに接続された複数のマシン間で同一のデータのバックアップ/コピーを維持する分散システムを指します。

Kafka はトピックを使用してデータを整理し、各トピックはさらにパーティションに分割されます。パーティションは1つ以上のブローカーにデプロイされ、各パーティションには複数のレプリカが存在します。したがって、レプリカもブローカーに保存され、各ブローカーには数万ものレプリカが保存される可能性があります。次の図はレプリカのレプリケーションを示しています。

上の図では、簡略化のためブローカーを2つだけ描いています。各ブローカーは1つのトピックのメッセージを格納します。ブローカー1では、パーティション0がリーダーであり、パーティションのコピーを担当します。ブローカー1のパーティション0のコピーをブローカー2のトピックAのパーティション0に作成します。同様に、トピックAのパーティション1にも同じロジックが適用されます。

レプリカには、リーダー レプリカとフォロワー レプリカの 2 種類があります。

リーダーダンジョン

パーティションを作成するときに、Kafka はリーダー レプリカと呼ばれるレプリカを選択します。

フォロワーコピー

リーダーレプリカ以外のレプリカは、フォロワーレプリカと総称されます。フォロワーは外部にサービスを提供しません。以下では、リーダーレプリカの仕組みについて説明します。

この画像に関しては以下の点に留意する必要があります。

  • Kafkaでは、フォロワーレプリカは外部サービスを提供しません。つまり、単一のフォロワーレプリカがコンシューマーまたはプロデューサーからのリクエストに応答することはできません。すべてのリクエストはリーダーレプリカによって処理されます。つまり、すべてのリクエストはリーダーレプリカが存在するブローカーに送信する必要があります。フォロワーレプリカはデータ取得のみに使用され、非同期取得と独自のコミットログへの書き込みによってリーダーとの同期を実現します。
  • リーダーレプリカをホストするブローカーがダウンすると、Kafka は ZooKeeper が提供する監視機能を利用してこれをリアルタイムで検知し、新たな選挙ラウンドを開始してフォロワーレプリカの中からリーダーを選出します。ダウンしたブローカーが再起動すると、そのパーティションのレプリカはフォロワーとして再び参加します。

リーダーのもう一つのタスクは、どのフォロワーが自分と同じ状態にあるかを判断することです。リーダーとの一貫性を保つため、フォロワーは新しいメッセージが到着する前に、リーダーからのメッセージをコピーしようとします。整合性を維持するために、フォロワーは、コンシューマーがメッセージを読むために送信するリクエストと同様に、データ取得リクエストをリーダーに送信します。

フォロワーがリーダーにメッセージを送信するプロセスは以下のとおりです。まず、メッセージ1を要求し、次にメッセージ1を受信します。要求1を受信した後、リクエスト2を送信します。フォロワーは、リーダーからメッセージを受信するまで、それ以上メッセージを送信しません。このプロセスは以下のとおりです。

フォロワーレプリカは、応答メッセージを受信するまでメッセージの送信を継続しないことが重要です。リーダーは、各フォロワーが要求した最新のオフセットをチェックすることで、各フォロワーのレプリケーションの進行状況を把握します。フォロワーが10秒以内にメッセージを要求しない場合、または要求を送信したが10秒以内にメッセージを受信しない場合は、同期されていないとみなされます。レプリカがリーダーと同期していない場合、すべてのメッセージを受け取れないため、リーダーがオフラインになった後もレプリカは再びリーダーになることはできません。

逆に、フォロワーが同期したメッセージがリーダーのレプリカ内のメッセージと一致する場合、このフォロワーのレプリカは同期レプリカと呼ばれます。つまり、リーダーがオフラインになった場合、同期レプリカのみがリーダーと呼ばれることができます。

ダンジョンの仕組みについては長々と説明してきましたが、その利点は何でしょうか?

  • 書き込まれたメッセージをすぐに確認できるということは、プロデューサー API を使用してパーティションにメッセージを正常に書き込んだ後、コンシューマーを使用して書き込まれたメッセージをすぐに読み取ることができることを意味します。
  • メッセージングにおいてべき等性を実現するとはどういう意味でしょうか。それは、プロデューサーによって生成されたすべてのメッセージについて、コンシューマーがそれを消費するときに常にそのメッセージが表示されることを意味します。メッセージが欠落する状況は決して発生しません。

同期レプリケーションと非同期レプリケーション

レプリケーション メカニズムについて学習しているときに、リーダー レプリカとフォロワー レプリカは同期レプリケーション方式である send-wait メカニズムを使用しているのに、リーダー レプリカと同期するフォロワー レプリカはなぜ非同期操作であると言われるのでしょうか。

動作は次のように行われると考えています。リーダーレプリカと同期した後、フォロワーレプリカはメッセージをローカルログに保存します。この時点で、フォロワーレプリカはリーダーレプリカに応答メッセージを送信し、メッセージの保存に成功したことを通知します。同期レプリケーションでは、リーダーレプリカはすべてのフォロワーレプリカがメッセージの書き込みに成功するまで待機し、プロデューサーに成功メッセージを返します。非同期レプリケーションでは、リーダーレプリカはフォロワーレプリカがメッセージの書き込みに成功したかどうかを気にする必要はありません。リーダーレプリカがメッセージをローカルログに保存すれば、プロデューサーに成功メッセージを返します。以下は、同期レプリケーションと非同期レプリケーションのプロセスです。

同期コピー

  • プロデューサーはリーダーを識別するために ZooKeeper に通知します。
  • プロデューサーはリーダーにメッセージを書きます。
  • リーダーはメッセージを受信すると、そのメッセージをローカル ログに書き込みます。
  • フォロワーはリーダーから情報を取得します。
  • フォロワーはログをローカルマシンに書き込みます。
  • フォロワーは書き込みが成功したことを示すメッセージをリーダーに送信します。
  • リーダーはすべてのフォロワーからのメッセージを受信します。
  • リーダーは書き込みが成功したことを示すメッセージをプロデューサーに送信します。

非同期レプリケーション

同期レプリケーションと同期レプリケーションの違いは、リーダーがローカル ログに書き込んだ後、すべてのフォロワーがレプリケーションを完了するのを待たずに、書き込み成功メッセージをすぐにクライアントに送信することです。

情報セキュリティ

Kafka は、In-Sync Replicas (ISR) のセットを動的に維持します。これは非常に重要な概念です。前述のように、フォロワーレプリカはサービスを提供せず、リーダーレプリカから定期的に非同期的にデータをプルするだけです。このプル操作は、Ctrl + C + Ctrl + V のような、誰もが慣れ親しんでいるコピー操作に相当します。しかし、これは ISR セット内のメッセージ数が常にリーダーレプリカ内のメッセージ数と同じであることを意味するのでしょうか?必ずしもそうではありません。この決定は、ブローカーのパラメータ `replica.lag.time.max.ms` の値に基づいています。このパラメータは、フォロワーレプリカがリーダーレプリカから遅れることができる最大時間間隔を表します。

`replica.lag.time.max.ms` パラメータのデフォルト時間は 10 秒です。フォロワーレプリカがリーダーレプリカより 10 秒以内の遅延をしている場合、フォロワーレプリカがリーダーレプリカより少ないメッセージ数しか保存していなくても、Kafka はリーダーとフォロワーが同期しているとみなします。フォロワーレプリカがリーダーレプリカより 10 秒以上遅延している場合、そのフォロワーレプリカは ISR (In-Sync Replica) から削除されます。フォロワーレプリカが最終的にリーダーレプリカに追いついた場合、ISR に再び追加できます。これは、ISR が静的なセットではなく、動的に調整されるセットであることを示しています。

不公正な党首選挙

ISR(同期レプリカ)は動的に調整できるため、ISRセットが空になる場合があります。リーダーレプリカは常にISRセットに存在するため、空のISRセットは、リーダーレプリカにも障害が発生したことを示します。そのため、Kafkaは新しいリーダーを選出する必要があります。この選出はどのように機能するのでしょうか?ここで考え方を変える必要があります。前述のように、ISRセットにはリーダーと同期しているレプリカを含める必要があります。ISRセットに含まれていないレプリカは、リーダーと同期していないレプリカである必要があります。つまり、ISRリストに含まれていないフォロワーレプリカは、一部のメッセージを失うことになります。ブローカー側のパラメータ「unclean.leader.election.enable」を有効にすると、これらの同期していないレプリカから次のリーダーが選出されます。この選出は、アンクリーンリーダー選出とも呼ばれます。

分散プロジェクトに携わったことがある方なら、CAP定理についてよくご存知でしょう。このアンクリーンリーダー選出は、Kafkaの高可用性を確保するためにデータの一貫性を犠牲にします。

アンクリーンリーダー選出を有効にするかどうかは、実際のビジネスシナリオに基づいて決定できます。データの整合性は可用性よりもはるかに重要であるため、一般的にこのパラメータを有効にすることは推奨されません。

Kafka リクエスト処理フロー

ブローカーの主な役割は、クライアント、パーティションレプリカ、コントローラーからパーティションリーダーに送信されるリクエストを処理することです。これらのリクエストは通常​​、リクエスト/レスポンスベースで、リクエスト/レスポンス方式に初めて触れたのはHTTPリクエストだったのではないでしょうか。実際、HTTPリクエストは同期または非同期にすることができます。通常のHTTPリクエストは通常​​同期であり、同期リクエストの最大の特徴は、リクエストの送信→サーバーによる処理の待機→レスポンスの受信というプロセスの間、クライアントブラウザは何もできないことです。非同期リクエストの最大の特徴は、リクエストがイベントによってトリガーされ→サーバーが処理し(ブラウザは他の処理を実行できます)→処理が完了することです。

つまり、同期リクエストは順番に処理されるのに対し、非同期リクエストでは複数の実行スレッドの作成が必要であり、各スレッドの実行順序が異なるため、非同期リクエストの実行方法は不確定であると言えます。

ここで注意すべき点は、例として HTTP リクエストのみを使用しているのに対し、Kafka は通信にソケットに基づく TCP を使用していることです。

では、これら 2 つの方法の欠点は何でしょうか?

同期処理の最大の欠点は、スループットの低さとリソース利用率の極端に低いこと、これは賢いあなたならすぐにお分かりいただけると思います。リクエストは順番にしか処理できないため、各リクエストは前のリクエストが完了するまで待たなければなりません。この方法は、リクエストの送信頻度が非常に低いシステムにのみ適しています。

非同期メソッドの欠点は、リクエストごとにスレッドを作成するとコストが非常にかかり、場合によってはサービス全体に負担がかかってしまう可能性があることです。

レスポンシブモデル

では、Kafka は同期操作と非同期操作のどちらを使用するのでしょうか?どちらも使用しません。Kafka はリアクティブ(Reactor)モデルを使用します。リアクティブモデルとは何でしょうか?簡単に言うと、Reactor パターンはイベント駆動型アーキテクチャの実装であり、特に複数のクライアントが同時にサーバーにリクエストを送信するシナリオの処理に適しています(下の図を参照)。

Kafka のブローカーには、プロセッサのように動作する SocketServer コンポーネントがあります。SocketServer は TCP ソケット接続を使用してクライアントからのリクエストを受け入れます。すべてのリクエストメッセージには、以下の情報を含むヘッダーが含まれます。

  • リクエストタイプ(APIキーなど)
  • リクエストバージョン(ブローカーは異なるクライアントバージョンを処理し、クライアントバージョンに応じて異なる応答を返すことができます)
  • 相関 ID --- 要求メッセージを識別するために使用される一意の番号。応答メッセージとエラー ログにも表示されます (問題の診断用)。
  • クライアントID --- リクエストを送信したクライアントを識別するために使用されます

ブローカーは、リッスンする各ポートでアクセプタスレッドを実行します。このスレッドは接続を作成し、それをプロセッサ(ネットワークスレッドプール)に渡します。プロセッサの数はnum.network.threadsで設定できます。デフォルト値は3で、各ブローカーは起動時にクライアントから送信されたリクエストを処理するために3つのスレッドを作成します。

アクセプタスレッドは、ラウンドロビン方式を用いて、受信したリクエストをネットワークスレッドプールに公平に送信します。そのため、実際には、これらのスレッドがリクエストキューに割り当てられる確率は通常均等です。その後、レスポンスキューからレスポンスメッセージを取得し、クライアントに送信します。プロセッサネットワークスレッドプールにおけるリクエスト・レスポンス処理は比較的複雑です。以下は、ネットワークスレッドプールにおける処理のフローチャートです。

プロセッサネットワークスレッドプールは、クライアントや他のブローカーからメッセージを受信した後、リクエストキューに配置します。ネットワークスレッドプールはマルチスレッドメカニズムを使用しているため、リクエストキューは共有キューであることに注意してください。つまり、リクエストキュー内のメッセージは複数のスレッド間で共有されます。IOスレッドプールはメッセージを処理し、メッセージの種類に基づいて適切なアクションを決定します。例えば、PRODUCEリクエストはログに書き込まれ、FETCHリクエストはディスクまたはページキャッシュから読み込まれます。つまり、IOスレッドプールは実際に決定を下し、リクエストを処理するコンポーネントです。IOスレッドプールは処理を完了すると、メッセージをレスポンスキューに配置するか、パーガトリーキューに配置するかを決定します。パーガトリーキューについては後で説明しますが、ここではレスポンスキューについて説明します。リアクティブモデルではリクエストがどこに送信されるかは考慮されないため、レスポンスキューは各スレッドに固有です。したがって、レスポンスを返すタスクは各スレッドに委任されるため、キューを共有する必要はありません。

注: IOスレッドプールは、ブローカー側のパラメータ num.io.threads で設定できます。デフォルトのスレッド数は8です。つまり、各ブローカーは起動後に自動的に8つのIO処理スレッドを作成します。

リクエストの種類

以下に一般的なリクエストの種類を示します。

生産依頼

簡単に言うと、設定によって書き込み成功の定義が異なります。「acks = 1」の場合、リーダーがメッセージを受信した時点で書き込み成功とみなされます。「acks = 0」の場合、戻り値に関係なく、リーダーがメッセージを送信した時点で書き込み成功とみなされます。「acks = all」の場合、リーダーはすべてのレプリカからメッセージを受信して​​から初めて書き込み成功とみなされます。

メッセージがパーティションのリーダーに書き込まれた後、acks 構成値が all の場合、これらの要求は、リーダー レプリカがフォロワー レプリカがすべてメッセージをコピーしたことを検出するまで Purgatory バッファーに保存され、その時点で応答がクライアントに送信されます。

リクエストを取得

ブローカーがリクエストを受信する方法は、プロダクションリクエストを処理する方法と似ています。クライアントは、トピックパーティション内の特定のオフセットにあるメッセージを要求するリクエストをブローカーに送信します。指定されたオフセットが存在する場合、Kafkaはゼロレプリケーション技術を使用してクライアントにメッセージを送信します。Kafkaはバッファを経由せずにファイルからネットワークチャネルに直接メッセージを送信するため、パフォーマンスが向上します。

クライアントは、要求するデータ量の上限と下限を設定できます。上限とは、クライアントが十分なメッセージを受信するために割り当てるメモリ空間を指します。この制限は非常に重要です。上限が大きすぎると、クライアントのメモリが枯渇する可能性があります。下限とは、送信前に十分なデータパケットを蓄積することを意味します。これは、プロジェクトマネージャーがプログラマーに10個のバグを割り当てることに似ています。プログラマーはバグを修正するたびにプロジェクトマネージャーに報告します。バグが修正される場合もあれば、修正されない場合もあり、コミュニケーションコストと時間コストが増加します。したがって、下限は、プログラマーが報告前に10個のバグをすべて修正することを意味します。(下の図を参照)

如图你可以看到,在拉取消息---> 消息之间是有一个等待消息积累这么一个过程的,这个消息积累你可以把它想象成超时时间,不过超时会跑出异常,消息积累超时后会响应回执。延迟时间可以通过replica.lag.time.max.ms 来配置,它指定了副本在复制消息时可被允许的最大延迟时间。

元数据请求

生产请求和响应请求都必须发送给领导者副本,如果broker 收到一个针对某个特定分区的请求,而该请求的首领在另外一个broker 中,那么发送请求的客户端会收到非分区首领的错误响应;如果针对某个分区的请求被发送到不含有领导者的broker 上,也会出现同样的错误。Kafka 客户端需要把请求和响应发送到正确的broker 上。这不是废话么?我怎么知道要往哪发送?

事实上,客户端会使用一种元数据请求,这种请求会包含客户端感兴趣的主题列表,服务端的响应消息指明了主题的分区,领导者副本和跟随者副本。元数据请求可以发送给任意一个broker,因为所有的broker 都会缓存这些信息。

一般情况下,客户端会把这些信息缓存,并直接向目标broker 发送生产请求和相应请求,这些缓存需要隔一段时间就进行刷新,使用metadata.max.age.ms 参数来配置,从而知道元数据是否发生了变更。比如,新的broker 加入后,会触发重平衡,部分副本会移动到新的broker 上。这时候,如果客户端收到不是首领的错误,客户端在发送请求之前刷新元数据缓存。

Kafka 重平衡流程

我在真的,关于Kafka 入门看这一篇就够了中关于消费者描述的时候大致说了一下消费者组和重平衡之间的关系,实际上,归纳为一点就是让组内所有的消费者实例就消费哪些主题分区达成一致。

我们知道,一个消费者组中是要有一个群组协调者(Coordinator)的,而重平衡的流程就是由Coordinator 的帮助下来完成的。

这里需要先声明一下重平衡发生的条件:

  • 消费者订阅的任何主题发生变化
  • 消费者数量发生变化
  • 分区数量发生变化
  • 如果你订阅了一个还尚未创建的主题,那么重平衡在该主题创建时发生。如果你订阅的主题发生删除那么也会发生重平衡
  • 消费者被群组协调器认为是DEAD 状态,这可能是由于消费者崩溃或者长时间处于运行状态下发生的,这意味着在配置合理时间的范围内,消费者没有向群组协调器发送任何心跳,这也会导致重平衡的发生。

在了解重平衡之前,你需要知道这两个角色

群组协调器(Coordinator):群组协调器是一个能够从消费者群组中收到所有消费者发送心跳消息的broker。在最早期的版本中,元数据信息是保存在ZooKeeper 中的,但是目前元数据信息存储到了broker 中。每个消费者组都应该和群组中的群组协调器同步。当所有的决策要在应用程序节点中进行时,群组协调器可以满足JoinGroup 请求并提供有关消费者组的元数据信息,例如分配和偏移量。群组协调器还有权知道所有消费者的心跳,消费者群组中还有一个角色就是领导者,注意把它和领导者副本和kafka controller 进行区分。领导者是群组中负责决策的角色,所以如果领导者掉线了,群组协调器有权把所有消费者踢出组。因此,消费者群组的一个很重要的行为是选举领导者,并与协调器读取和写入有关分配和分区的元数据信息。

消费者领导者:每个消费者群组中都有一个领导者。如果消费者停止发送心跳了,协调者会触发重平衡。

在了解重平衡之前,你需要知道状态机是什么

Kafka 设计了一套消费者组状态机(State Machine) ,来帮助协调者完成整个重平衡流程。消费者状态机主要有五种状态它们分别是Empty、Dead、PreparingRebalance、CompletingRebalance 和Stable。

了解了这些状态的含义之后,下面我们用几条路径来表示一下消费者状态的轮转

1.消费者组一开始处于Empty 状态,当重平衡开启后,它会被置于PreparingRebalance状态等待新消费者的加入,一旦有新的消费者加入后,消费者群组就会处于CompletingRebalance 状态等待分配,只要有新的消费者加入群组或者离开,就会触发重平衡,消费者的状态处于PreparingRebalance 状态。等待分配机制指定好后完成分配,那么它的流程图是这样的。

2.在上图的基础上,当消费者群组都到达Stable 状态后,一旦有新的消费者加入/离开/心跳过期,那么触发重平衡,消费者群组的状态重新处于PreparingRebalance 状态。那么它的流程图是这样的。

3.在上图的基础上,消费者群组处于PreparingRebalance 状态后,很不幸,没人玩儿了,所有消费者都离开了,这时候还可能会保留有消费者消费的位移数据,一旦位移数据过期或者被刷新,那么消费者群组就处于Dead 状态了。它的流程图是这样的:

4.在上图的基础上,我们分析了消费者的重平衡,在PreparingRebalance或者CompletingRebalance 或者Stable 任意一种状态下发生位移主题分区Leader 发生变更,群组会直接处于Dead 状态,它的所有路径如下:

这里面需要注意两点:

一般出现Required xx expired offsets in xxx milliseconds 就表明Kafka 很可能就把该组的位移数据删除了

只有Empty 状态下的组,才会执行过期位移删除的操作。

重平衡流程

上面我们了解到了消费者群组状态的转化过程,下面我们真正开始介绍Rebalance 的过程。重平衡过程可以从两个方面去看:消费者端和协调者端,首先我们先看一下消费者端

从消费者看重平衡

从消费者看重平衡有两个步骤:分别是消费者加入组和等待领导者分配方案。这两个步骤后分别对应的请求是JoinGroup 和SyncGroup。

新的消费者加入群组时,这个消费者会向协调器发送JoinGroup 请求。在该请求中,每个消费者成员都需要将自己消费的topic 进行提交,我们上面描述群组协调器中说过,这么做的目的就是为了让协调器收集足够的元数据信息,来选取消费者组的领导者。通常情况下,第一个发送JoinGroup 请求的消费者会自动称为领导者。领导者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。如图:

在所有的消费者都加入进来并把元数据信息提交给领导者后,领导者做出分配方案并发送SyncGroup请求给协调者,协调者负责下发群组中的消费策略。下图描述了SyncGroup 请求的过程。

当所有成员都成功接收到分配方案后,消费者组进入到Stable 状态,即开始正常的消费工作。

从协调者来看重平衡

从协调者角度来看重平衡主要有下面这几种触发条件,

新成员加入组

组成员主动离开

组成员崩溃离开

组成员提交位移

我们分别来描述一下,先从新成员加入组开始

新成员入组

我们讨论的场景消费者集群状态处于Stable 等待分配的过程,这时候如果有新的成员加入组的话,重平衡的过程。

从这个角度来看,协调者的过程和消费者类似,只是刚刚从消费者的角度去看,现在从领导者的角度去看

组成员离开

组成员离开消费者群组指的是消费者实例调用close() 方法主动通知协调者它要退出。这里又会有一个新的请求出现LeaveGroup()请求。如下图所示:

组成员崩溃

组成员崩溃是指消费者实例出现严重故障,宕机或者一段时间未响应,协调者接收不到消费者的心跳,就会被认为是组成员崩溃,崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数session.timeout.ms 控制的。如下图所示:

重平衡时提交位移

这个过程我们就不再用图形来表示了,大致描述一下就是消费者发送JoinGroup 请求后,群组中的消费者必须在指定的时间范围内提交各自的位移,然后再开启正常的JoinGroup/SyncGroup 请求发送。

記事参照:

  • 《Kafka 权威指南》
  • https://blog.csdn.net/u013256816/article/details/80546337
  • https://learning.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch05.html#kafka_internals
  • https://www.cnblogs.com/kevingrace/p/9021508.html
  • https://www.cnblogs.com/huxi2b/p/6980045.html
  • 《极客时间-Kafka核心技术与实战》
  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Redesign
  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals
  • kafka 分区和副本以及kafaka 执行流程,以及消息的高可用
  • Http中的同步请求和异步请求
  • Reactor模式详解
  • https://kafka.apache.org/documentation/
  • https://www.linkedin.com/pulse/partitions-rebalance-kafka-raghunandan-gupta
  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design

[編集者のおすすめ]

  1. 如何验证Kafka 系统的可靠性?这下我终于懂了
  2. Zookeeper的选举算法和脑裂问题深度讲解
  3. 开源Apache Cassandra、Kafka、Spark和ES何时该用,何时不该用?
  4. 每秒几十亿实时处理,阿里巴巴超大规模Flink 集群运维揭秘
  5. 每天处理千亿级日志量,Kafka是如何做到的?