DUICUO

Kafka ソースコード分析と原理の図解説明: ブローカー側

[[277321]]

まず、Kafka でトピックを作成する方法から始めましょう。

  1. kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  

その中には次のようなパラメータがあります。

  • --zookeeper: ZooKeeperのアドレス
  • --replication-factor: レプリケーション係数
  • --partitions: パーティションの数(デフォルトは1)
  • --topic: トピック名

II. パーティションとは何ですか?

トピックは複数のパーティションを持つことができ、各パーティションには異なるメッセージが含まれます。パーティションの数が多いほどスループットは向上しますが、パーティションの数が多いほど良いとは限りません。一般的に、パーティションの数はKafkaクラスター内のマシン数を超えないようにしてください。パーティションの数が増えると、メモリとファイルハンドルの消費量が増えます。一般的なパーティション数は3~10です。例えば、クラスターに3台のマシンがあり、2つのパーティションを持つ「test」という名前のトピックを作成する場合、設定は次のようになります。


パーティションは、順序付けされた不変のレコード集合であり、ログファイルに継続的に追加されます。パーティション内の各メッセージにはIDが割り当てられ、これがオフセットとなります。オフセットは、パーティション内のレコードをマークするために使用されます。ここでは、公式ウェブサイトの図を使用しますが、私の図はあまり良くありません。


2.1 プロデューサーとパーティションの関係

図に示されているシナリオでは、プロデューサーはメッセージキュー(MQ)をどのパーティションに割り当てるのでしょうか?これは、前のセクションで説明したパラメータ「partitioner.class」に関連しています。デフォルトのパーティショナーは、次のように処理します。キーが存在する場合、murmur2アルゴリズムを使用してキーのハッシュ値を計算し、パーティションの合計数を法としてパーティション番号を決定します。キーが存在しない場合は、ラウンドロビンプロセス(org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition)を実行します。もちろん、「org.apache.kafka.clients.producer.Partitioner」インターフェースを実装することで、パーティション戦略をカスタマイズすることもできます。

  1. /**
  2. *指定されたレコードパーティションを計算します。
  3. *
  4. * @param topic トピック 
  5. * @param keyキー  または いいえの場合はnull  
  6. * @param keyBytes シリアル化されたキー  または いいえの場合はnull  
  7. * @param valueパーティションする または ヌル 
  8. * @param valueBytesパーティションシリアル化さた値 または ヌル 
  9. * @param cluster現在のクラスタメタデータ
  10. /
  11. 公共  intパーティション(文字列トピック、オブジェクトキー、byte[] keyBytes、オブジェクト値、byte[] valueBytes、クラスタークラスター) {
  12. List<PartitionInfo> パーティション = cluster.partitionsForTopic(topic);
  13. int numPartitions = パーティション。size ( );
  14. キーバイト == null場合
  15. int nextValue = nextValue(トピック);
  16. リスト<PartitionInfo> 利用可能なパーティション = cluster.availablePartitionsForTopic(topic);
  17. 利用可能なパーティションのサイズが0より大きい場合
  18. int part = Utils.toPositive(nextValue) % availablePartitions. size ();
  19. availablePartitions.get(part).partition()を返します
  20. }それ以外{
  21. // 利用可能なパーティションがない場合は、利用できないパーティションを指定してください
  22. Utils.toPositive(nextValue) % numPartitions を返します
  23. }
  24. }それ以外{
  25. // keyBytes をハッシュしてパーティションを選択する
  26. Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions を返します
  27. }
  28. }

2.2 消費者側とパーティションの関係

まず、コンシューマー グループの正式な定義を見てみましょう。コンシューマーは自身にコンシューマー グループ名を付け、トピックに公開される各レコードは、サブスクライブしている各コンシューマー グループ内の 1 つのコンシューマー インスタンスに配信されます。

翻訳: コンシューマーはコンシューマー グループ名を使用して自身を識別し、トピックのメッセージは、そのトピックにサブスクライブしているコンシューマー グループのコンシューマー インスタンスに送信されます。

コンシューマーグループは、高いスケーラビリティとフォールトトレラント性を備えたコンシューマーメカニズムを実装するために使用されます。コンシューマーに障害が発生した場合、または新しいコンシューマーが追加された場合、コンシューマーグループはリバランスを実行します。リバランスメカニズムについては、コンシューマーのセクションで詳しく説明するため、このセクションでは説明しません。それでは、上の図に従ってコンシューマー側を描き続けましょう。


これは最良のシナリオを表しており、2つのパーティションが1つのグループ内の2つのコンシューマーに対応しています。では、コンシューマーグループのコンシューマー数がパーティションの数よりも多い場合、あるいは少ない場合はどうなるでしょうか?

コンシューマー グループ内のコンシューマーの数がパーティションの数より多い場合、余分なコンシューマーはメッセージを消費できないため無駄になります。

コンシューマーグループ内のコンシューマー数がパーティション数より少ない場合、対応するコンシューマーパーティション割り当て戦略が存在します。1つはRange(デフォルト)、もう1つはRound Robin(ラウンドロビン)です。もちろん、カスタム戦略も使用できます。基本的に、基本的なコンセプトは同じです。つまり、各コンシューマーは負荷分散された作業を実行できます。詳細は、ここでは説明せず、コンシューマーのセクションで説明します。

推奨事項: パーティションの数をコンシューマーの数の整数倍になるように構成します。

III. コピーとISR設計

3.1 コピーとは何ですか?

トピックを作成する際に、レプリカの数を設定する「--replication-factor」というパラメータがあります。Kafkaは、高いシステム可用性を維持するために、複数の同一バックアップを使用します。これらのバックアップはKafkaではレプリカと呼ばれます。レプリカは以下の3つのカテゴリに分類されます。

  • リーダー レプリカ: プロデューサーからの読み取りおよび書き込み要求に応答します。
  • フォロワー レプリカ: リーダー レプリカのデータをバックアップし、プロデューサーからの読み取り/書き込み要求に応答しません。
  • ISR レプリカ セット: 1 つのリーダー レプリカとすべてのフォロワー レプリカが含まれます (フォロワー レプリカが含まれない場合もあります)。

Kafka は、すべてのレプリカを Kafka クラスター内のすべてのブローカーに均等に分散し、1 つのレプリカをリーダ​​ーレプリカ、残りのレプリカをフォロワーレプリカとして選択します。リーダーレプリカをホストするブローカーがダウンした場合、フォロワーレプリカの 1 つがリーダーレプリカになります。リーダーレプリカはプロデューサーからの読み取りおよび書き込みリクエストを受け取りますが、フォロワーレプリカはリーダーレプリカにデータを要求するだけで、読み取りおよび書き込みリクエスト自体は受け取りません。


3.2 レプリカ同期メカニズム

前述のように、ISR は同期されたレプリカのセットを動的に維持し、リーダー レプリカは常に ISR セットに含まれます。ISR 内のレプリカのみがリーダー レプリカとして選出される資格があります。プロデューサーの ack パラメータが all (-1) に設定されている場合、プロデューサーによって書き込まれたメッセージがコミットされたと見なされるには、ISR 内のすべてのレプリカが受信する必要があります。もちろん、前のセクションで述べたように、望ましい効果を得るには、ack パラメータをブローカーの min.insync.replicas パラメータ (デフォルトは 1) と組み合わせて使用​​する必要があります。このパラメータは、ISR 内の何個のレプリカが正常に書き込まれたと見なされるかを制御します。ISR 内のレプリカの数が min.insync.replicas より少ない場合、クライアントは例外 org.apache.kafka.common.errors.NotEnoughReplicasException: 同期中のレプリカの数が必須の数より少ないため、メッセージが拒否されます を返します。

レプリカ同期のメカニズムを理解するには、まずいくつかの用語を学ぶ必要があります。

  • ハイ・ウォーターマーク: レプリカのハイ・ウォーターマーク。略称はHWです。HW以下のメッセージは「バックアップ」とみなされ、HWは次のメッセージを指します。リーダーレプリカのHW値によって、コンシューマーがポーリングできるメッセージ数が決まります。コンシューマーはHW値未満のメッセージのみを消費できます。
  • LEO: ログ終了オフセット、次のメッセージのオフセット。つまり、LEOが指す位置は、まだメッセージが存在しないことを示しています。
  • リモートLEO: 厳密に言えば、これはコレクションです。リーダーレプリカが存在するブローカーは、対応するパーティション情報を格納するためのパーティションオブジェクトをメモリ内に保持します。このパーティションはレプリカリストを保持し、そのパーティションのすべてのレプリカオブジェクトを格納します。このリストに含まれる、リーダーレプリカ以外のレプリカオブジェクトのLEOは、リモートLEOと呼ばれます。

以下は、Hu Xiのブログを参考にした実用的な例です。ここでは、レプリケーション係数が2の単一パーティションを取り上げています。つまり、リーダーレプリカとフォロワーレプリカがそれぞれ1つずつ存在し、ISRにはこれら2つのレプリカセットが含まれています。まず、プロデューサーがメッセージを送信した際に、リーダー/フォロワーブローカー上のレプリカオブジェクトに何が起こるか、そしてパーティションのハードウェアがどのように更新されるかを見てみましょう。まず、初期状態です。


この時点で、プロデューサーはこのトピックパーティションにメッセージを送信します。現在の状態は次の図のようになります。


上の図に示すように、プロデューサーがメッセージを正常に送信すると (acks=1 と仮定し、リーダーは書き込みが成功すると戻ります)、フォロワーは新しい FECTH 要求を送信し、引き続き fetchOffset = 0 でデータを要求します。前回とは異なり、今回は読み取るデータがあるため、処理フロー全体を次の図に示します。


明らかに、リーダーとフォロワーの両方が0シフトしたメッセージを保存しましたが、両側のHW値は更新されていません。次のFETCHリクエスト処理で更新する必要があります(下図参照)。


簡単に言うと、2 回目の FETCH 要求では、フォロワーは fetchOffset = 1 で FETCH 要求を送信します。これは、 fetchOffset = 0 のメッセージがすでにフォロワーのローカル ログに正常に書き込まれているため、今回は fetchOffset = 1 でデータを要求します。 FETCH 要求を受信した後、リーダー ブローカーは最初に他のレプリカの LEO 値を更新します。つまり、リモート LEO を 1 に更新し、次にパーティション HW 値を 1 に更新します。具体的な更新ルールについては、上記の説明を参照してください。 これを行った後、現在のパーティション HW 値 (1) を FETCH 応答にカプセル化してフォロワーに送信します。 FETCH 応答を受信した後、フォロワー ブローカーはそこから現在のパーティション HW 値 1 を抽出し、それを自身の LEO 値と比較して、自身の HW 値を 1 に更新します。この時点で、HW および LEO 更新サイクル全体が終了します。

3.3 ISRメンテナンス

バージョン0.9.0.0以降では、レプリカをISRセットに含めるかどうかを判断する際に使用されるパラメータは「replica.lag.time.max.ms」のみとなります。このパラメータのデフォルト値は10秒です。つまり、フォロワーレプリカがリーダーレプリカへの応答に10秒以上かかる場合、Kafkaはこのレプリカが遅延しすぎていると判断され、同期レプリカリストから削除されます。

IV. ログ設計

Kafka の各トピックは互いに分離されています。各トピックは 1 つ以上のパーティションを持つことができ、各パーティションにはメッセージデータを記録するログファイルがあります。


この図は、8つのパーティションを持つ「demo-topic」というトピックを示しています。各パーティションには、「[topic-partition]」という名前のメッセージログファイルが含まれています。これらのパーティションログファイルには、同じプレフィックスを持つもののファイルタイプが異なる複数のファイルがあります。例えば、図に示されている3つのファイル、「(0000000000000000000.index, 0000000000000000000.timestamp, 0000000000000000000.log)」です。これは「LogSegment」と呼ばれます。

4.1 ログセグメント

テスト環境からの具体的な例として、ALC.ASSET.EQUITY.SUBJECT.CHANGE というトピックのパーティション 0 のログ ファイルを見てみましょう。


各 LogSegment には、同一のファイル名を持つファイルのセットが含まれます。ファイル名は20桁に固定されています。例えば、00000000000000000000000 のようなファイル名は、現在の LogSegment の最初のメッセージのオフセットが 0 であることを示します。00000000000000000097 のようなファイル名は、現在の LogSegment の最初のメッセージのオフセットが 97 であることを示します。ログファイルには様々なファイル拡張子がありますが、ここでは .index、.timestamp、.log ファイルに注目してください。

  • .index: オフセットインデックスファイル
  • .timeindex: 時間インデックスファイル
  • .log: ログファイル
  • .snapshot: スナップショットファイル
  • .swap: ログ圧縮後に作成される一時ファイル。

4.2 インデックスとログファイル

Kafka には 2 種類のインデックス ファイルがあります。1 つ目は .index 拡張子で終わるオフセット インデックス ファイル、2 つ目は .timeindex 拡張子で終わるタイムスタンプ インデックス ファイルです。

kafka-run-class.sh を使用して、オフセット インデックス ファイルの内容を表示できます。


ご覧のとおり、各行には offset: xxx と position: xxxx という形式が含まれています。これら2つは直接関連していません。

  • オフセット: 相対オフセット
  • 位置: 物理アドレス

では、最初の行の offset: 12 position: 4423 は何を意味するのでしょうか?これは、オフセットが0から12までのメッセージの物理アドレスが0から4423の間であることを意味します。

同様に、2 行目の offset: 24 position: 8773 の意味も推測できます。これは、オフセットが 13 から 24 のメッセージの物理アドレスが 4424-8773 であることを意味します。

`kafka-run-class.sh` を使って `.log` ファイルの内容を調べ、`baseOffset` と `position` の値に注目してください。上記の内容と一致しているかどうかを確認してください。

[[277322]]

4.3 オフセットを使った検索方法

上記の例を使用して、オフセット 60 のメッセージをクエリするにはどうすればよいでしょうか?

まず、オフセットに基づいて対応する LogSegment を見つけます。この場合、000000000000000000000 です。

バイナリサーチを使用して、オフセット以下の最大のインデックスが見つかりました。ここで、オフセットは24、位置は8773です。

ファイル 000000000000000000000.log を開き、オフセット=60 のメッセージが見つかるまで、位置 8773 から順番にスキャンを開始します。