|
Kafka は、LinkedIn のアクティビティ ストリームと運用データ処理パイプラインの基盤として LinkedIn によって最初に開発されたメッセージング システムです。
Pexelsからの画像 現在、多種多様な企業で、様々なデータパイプラインやメッセージングシステムとして利用されています。アクティビティストリームデータは、ほぼすべてのウェブサイトがウェブサイトの利用状況を報告する際に使用するデータの中で、最も一般的なものです。 アクティビティ データには、ページ ビュー、閲覧したコンテンツの情報、検索アクティビティが含まれます。 このデータを処理する一般的な方法は、まずさまざまなアクティビティをログの形式でファイルに書き込み、次にこれらのファイルに対して定期的に統計分析を実行することです。 運用データとは、サーバーのパフォーマンスデータ(CPU、IO使用率、リクエスト時間、サービスログなど)を指します。運用データを収集する方法は数多くあります。 近年、アクティビティおよび運用データの処理は、Web サイト ソフトウェア製品の機能の重要な要素となっており、それをサポートするにはやや複雑なインフラストラクチャが必要になっています。 Kafka の基本概念 Kafka は、次の主な設計目標を持つ、分散型のパブリッシュ/サブスクライブ ベースのメッセージング システムです。
生産者と消費者 Kafka には、基本的に 2 種類のクライアントがあります。
さらに、データ統合用のKafka Connect APIやストリーミング処理用のKafka Streamsといった高度なクライアントも存在します。ただし、これらの高度なクライアントは、基盤レベルでは依然としてプロデューサーAPIとコンシューマーAPIに基づいており、上位レベルでカプセル化されているだけです。 これは簡単に理解できます。プロデューサー (パブリッシャーとも呼ばれます) はメッセージを作成し、コンシューマー (サブスクライバーとも呼ばれます) はメッセージを消費または読み取る責任があります。 トピックとパーティション Kafka では、メッセージはトピック別に分類され、各トピックはデータベース内のテーブルに似た「メッセージ キュー」に対応します。 しかし、類似のメッセージをすべて「中央」のキューに詰め込むと、必然的にスケーラビリティが不足します。プロデューサー/コンシューマーの数が増えた場合でも、メッセージ数が増えた場合でも、システムのパフォーマンスやストレージ容量が枯渇する可能性があります。 これを説明するために実際の例を使用しましょう。都市 A で生産された特定の製品を高速道路で都市 B に輸送する必要があります。 そのため、片側一車線の高速道路では、「A市の貨物量が増加した」場合でも、「C市もB市に貨物を輸送する必要がある」場合でも、「処理能力不足」という問題に直面することになります。 そこで、ここでは「より多くのレーンを構築できるようにする」ことに似たパーティションの概念を導入し、トピックを水平に拡張します。 ブローカーとクラスター Kafka サーバー (ブローカーとも呼ばれる) は、プロデューサーから送信されたメッセージを受け入れてディスクに保存します。また、ブローカーはコンシューマーのパーティション メッセージをプルする要求に応答し、現在コミットされているメッセージを返します。 特定のマシンハードウェアを使用することで、単一のブローカーで毎秒数万のパーティションと数百万のメッセージを処理できます。(最近では数百万は当たり前ですが、実際に調べてみたところ、クラスター構成ではスループットがかなり高くなるようです。) クラスターは複数のブローカーで構成されます。クラスター内のブローカーの1つがクラスターコントローラーとなり、ブローカーへのパーティションの割り当てやブローカーの障害監視など、クラスターの管理を担当します。 クラスター内では、パーティションはブローカー (そのパーティションのリーダーとも呼ばれます) によって管理されます。 もちろん、冗長性を実現するためにパーティションを複数のブローカーにコピーすることができるため、ブローカーに障害が発生した場合、そのパーティションを別のブローカーに再割り当てして引き継ぐことができます。 以下の画像は例です。 Kafka の重要な特性の一つはログの保持です。トピックに対して、ログを特定の期間保持したり、特定のサイズのログを保持したりするなど、メッセージ保持ポリシーを設定できます。 これらの制限を超えると、古いメッセージは削除されます。また、特定のトピックごとに個別のメッセージ有効期限ポリシーを設定することも可能で、さまざまなアプリケーションに合わせてカスタマイズできます。 マルチクラスター ビジネスが成長するにつれて、通常は次のような理由から複数のクラスターが必要になることがよくあります。
複数のデータセンターを構築する場合、相互運用性を実現することがしばしば必要になります。例えば、ユーザーがプロファイルを変更した場合、どのデータセンターで処理されるかに関係なく、以降のリクエストにその更新を反映させる必要があります。あるいは、複数のデータセンターからのデータを中央制御センターに集約し、データ分析を行う必要がある場合もあります。 上記のパーティションレプリケーション冗長化メカニズムは、同一のKafkaクラスタ内でのみ適用されます。複数のKafkaクラスタ間でメッセージを同期するには、Kafkaが提供するMirrorMakerツールを使用できます。 本質的に、MirrorMaker はキューで接続された Kafka のコンシューマーとプロデューサーです。あるクラスターからメッセージをコンシュームし、別のクラスターにメッセージを生成します。 Kafka の設計と実装 上記ではKafkaの基本概念をいくつか説明しましたが、成熟した「メッセージキュー」ミドルウェアとして、Kafkaには検討に値する興味深い設計が数多くあります。以下に、そのいくつかを簡単に挙げます。 Kafka はファイル システムに保存されます。 はい、まずKafkaのメッセージはファイルシステムに保存されることを知っておく必要があります。Kafkaはメッセージの保存とキャッシュにファイルシステムに大きく依存しており、「ディスクは遅い」という理由でこの設計に懐疑的な人が多いです。 実際には、ディスクの速度は使用方法に応じて、人々が期待するよりもはるかに速くなったり遅くなったりしますが、適切なディスク アーキテクチャ設計により、ディスク速度をネットワーク速度と同じくらい速くすることができます。 最新のオペレーティング システムでは、ディスク アクセスを高速化するために、ディスクの読み取りおよび書き込み操作の最適化が実装されています。 例えば、先読み操作では、大きなディスクブロックを事前にメモリに読み込みます。後書き操作では、多数の小さな論理書き込み操作を1つの大きな物理書き込み操作に結合します。 さらに、オペレーティング システムはメイン メモリ内の残りのすべての空きメモリ領域をディスク キャッシュとして使用し、すべてのディスク読み取りおよび書き込み操作は統合ディスク キャッシュを通過します (ディスク キャッシュをバイパスする直接 I/O を除く)。 これらの最適化機能を考慮すると、シーケンシャルディスクアクセスの場合、ランダムメモリアクセスよりも高速になる場合があり、ネットワーク速度に匹敵することさえあります。 前述のトピックは、実際にはコンシューマーとプロデューサー向けの論理的な概念です。物理的には、トピックはパーティションとして保存されます。各パーティションは最終的にディレクトリに対応し、すべてのメッセージとインデックスファイルが格納されます。 デフォルトでは、トピックの作成時にパーティションの数が指定されていない場合、各トピックは 1 つのパーティションのみを作成します。 例えば、パーティション数を指定せずに「test」という名前のトピックを作成すると、デフォルトで「test-0」という名前のフォルダが作成されます。ここでの命名規則は以下のとおりです。 パーティションにパブリッシュされたメッセージはすべて、そのパーティションのデータファイルの末尾に追加されます。このシーケンシャルなディスク書き込み操作により、Kafka は非常に効率的に動作します(シーケンシャルなディスク書き込みはランダムなメモリ書き込みよりも効率的であることが検証されており、これは Kafka の高スループットを保証する非常に重要な要素です)。 ブローカーに送信された各メッセージは、パーティションルールに基づいてパーティションに保存されます。パーティションルールが適切に設定されていれば、すべてのメッセージを複数のパーティションに均等に分散できます。 Kafka の基礎となるストレージ設計 現在、Kafka クラスターにブローカーが 1 つしかなく、それぞれ 1 個と 2 個のパーティションを持つ「Topic1」と「Topic2」という名前の 2 つのトピックを作成するとします。 すると、ルート ディレクトリに次の 3 つのフォルダーが作成されます。
Kafka のファイル ストレージでは、1 つのトピックに複数の異なるパーティションを含めることができ、各パーティションはディレクトリになります。 各ディレクトリはさらに、同じサイズの複数のセグメントファイルに分割されます。各セグメントファイルはインデックスファイルとデータファイルで構成され、これらは常にペアで存在します。拡張子「.index」と「.log」は、それぞれセグメントインデックスファイルとデータファイルを表します。 ここで、各セグメントのサイズを 500 MB に設定し、プロデューサーを起動して大量のデータを topic1 に書き込むと、topic1-0 フォルダーには次のようなファイルが生成されます。
セグメントは、Kafka における最小のストレージ単位です。セグメントファイルの命名規則:パーティション内の最初のセグメントは番号 0 から始まり、後続の各セグメントファイルの名前は、前のセグメントファイルの最後のメッセージのオフセット値になります。 最大値は64ビット長で、19桁の数字です。数字が存在しない場合は、0が埋め込まれます。例としては、000000000000000368769.index、00000000000000368769.logなどがあります。 上記のセグメント ファイルのペアを例にして、インデックス ファイルとデータ ファイル間の対応関係を示します。 インデックス ファイル内のメタデータ <3, 497> を例にとると、これはデータ ファイル内の 3 番目のメッセージ (グローバル パーティション内の 368769 + 3 = 368772 番目のメッセージ) を表し、メッセージの物理オフセット アドレスは 497 です。 このインデックスファイルは0から始まるわけではなく、毎回1ずつ増加するわけでもありません。これは、Kafkaがスパースインデックスストレージ方式を採用しており、一定バイト数のデータごとにインデックスを作成するためです。 これにより、インデックス ファイルのサイズが縮小され、インデックスをメモリにマップできるようになり、クエリ プロセスに多くの時間を追加することなく、クエリ中のディスク I/O オーバーヘッドが削減されます。 ファイル名は前のセグメントの最後のメッセージのオフセットであるため、特定のオフセットを持つメッセージを検索する必要がある場合は、すべてのセグメントのファイル名に対してバイナリ検索を実行することで、そのメッセージが属するセグメントを見つけることができます。 次に、インデックス ファイル内でメッセージの物理的な場所を見つけることで、メッセージを取得できます。 メッセージはパーティションのセグメント データ ファイル内で順次読み書きされ、消費後に削除されないため (削除ポリシーは期限切れのセグメント ファイル用)、これが、Kafka の高パフォーマンスにとって順次ディスク I/O ストレージが非常に重要である重要な理由です。 Kafka はどのようにしてメッセージのオフセットを正確に決定するのでしょうか。これは、Kafka が標準のデータ ストレージ構造を定義しており、パーティション内の各メッセージに次の 3 つの属性が含まれているためです。
プロデューサーデザインの概要 メッセージを送信する前に、いくつかの質問を自問してみましょう。すべてのメッセージが重要であり、損失は許容できないのでしょうか?時折のメッセージの重複は許容できるのでしょうか?メッセージの遅延や書き込みスループットについて懸念があるのでしょうか? 例えば、クレジットカード取引処理システムでは、取引が発生するとKafkaにメッセージを送信します。別のサービスがメッセージを読み取り、ルールエンジンに基づいて取引が承認されたかどうかを確認し、結果をKafkaに返します。 この種のビジネスでは、メッセージの損失や重複は許されません。トランザクション量が多いため、スループットは可能な限り高くする必要がありますが、レイテンシは多少高くても構いません。 例えば、ウェブページ上のユーザーのクリックデータを収集する必要がある場合、このようなシナリオでは、メッセージの損失や重複が多少あっても許容されます。レイテンシは、ユーザーエクスペリエンスに影響を与えない限り重要ではなく、スループットはリアルタイムのユーザー数に基づいて決定されます。 ビジネスオペレーションによって、必要な記述方法と設定は異なります。ここでは具体的な方法については触れませんが、まずはプロデューサーがメッセージを書き込む基本的なプロセスを見てみましょう。 プロセスは次のとおりです。
消費者向けデザインの概要 ①消費者と消費者団体 次のシナリオを考えてみましょう: Kafka からメッセージを読み取り、チェックを実行し、最後に結果データを生成します。 これを実行するためにコンシューマー インスタンスを作成することもできますが、プロデューサーがメッセージを書き込む速度がコンシューマーがメッセージを読み取る速度よりも速い場合はどうなるでしょうか。 時間が経つにつれて、メッセージのバックログはますます深刻になります。このシナリオでは、水平スケーリングのために複数のコンシューマーを追加する必要があります。 Kafka のコンシューマーはコンシューマーグループに属します。複数のコンシューマーがコンシューマーグループを形成してトピックを消費する場合、各コンシューマーは異なるパーティションからメッセージを受信します。 4 つのパーティションを持つトピック T1 と、コンシューマー C1 を 1 つだけ持つコンシューマー グループ G1 があるとします。 次に示すように、コンシューマー C1 はこれら 4 つのパーティションからメッセージを受信します。 新しいコンシューマー C2 をコンシューマー グループ G1 に追加すると、以下に示すように、各コンシューマーは 2 つのパーティションからメッセージを受信します。 コンシューマーの数が 4 に増えると、以下に示すように、各コンシューマーは 1 つのパーティションのメッセージを受信します。 ただし、このコンシューマー グループにコンシューマーを追加し続けると、残りのコンシューマーはアイドル状態になり、メッセージを受信しなくなります。 結論として、消費者グループ内の消費者数を増やすことで水平的に拡大し、購買力を高めることができます。 そのため、テーマを作成するときは、多数のパーティションを使用して、コンシューマーの負荷が高いときにコンシューマーを追加してパフォーマンスを向上させることをお勧めします。 さらに、余分なコンシューマーはアイドル状態になり、何の役にも立たないため、コンシューマーの数はゾーンの数を超えてはなりません。 Kafka の主な機能の 1 つは、メッセージを 1 回書き込むだけで、任意の数のアプリケーションで読み取ることができることです。 つまり、各アプリケーションはメッセージ全体を読み取ることができます。各アプリケーションがメッセージ全体を読み取ることができるようにするには、アプリケーションごとに異なるコンシューマーグループが必要です。 上記の例では、2 人のコンシューマーを持つ新しいコンシューマー グループ G2 を追加すると、次のようになります。 このシナリオでは、コンシューマー グループ G1 とコンシューマー グループ G2 の両方がトピック T1 からのすべてのメッセージを受信でき、論理的には異なるアプリケーションに属します。 結論: アプリケーションがすべてのメッセージを読み取る必要がある場合は、そのアプリケーションのコンシューマー グループを設定します。アプリケーションの消費容量が不十分な場合は、このコンシューマー グループにコンシューマーを追加することを検討してください。 ② 消費者層と地域の再調整 ご覧のとおり、新しいコンシューマーがコンシューマー グループに参加すると、以前は他のコンシューマーによって処理されていた 1 つ以上のパーティションを消費します。 さらに、コンシューマーがコンシューマー グループを離れると (再起動、クラッシュなどにより)、そのコンシューマーが消費していたパーティションは他のパーティションに割り当てられます。 この現象はリバランスと呼ばれます。リバランスはKafkaの非常に重要な特性であり、高可用性と水平スケーラビリティを保証します。 ただし、再バランス調整期間中はすべてのコンシューマーがメッセージを消費できないため、コンシューマー グループ全体が一時的に使用できなくなることに注意してください。 さらに、パーティションの再バランス調整により、既存のコンシューマーの状態が期限切れになる可能性があり、コンシューマーは状態の更新を余儀なくされます。その結果、その間コンシューマーのパフォーマンスが低下します。再バランス調整を安全に実行する方法と、再バランス調整を可能な限り回避する方法について後ほど説明します。 コンシューマーは、グループ コーディネーターとして機能するブローカーにハートビートを定期的に送信することで、コンシューマー グループ内で存続します。 このブローカーは固定ではなく、コンシューマーグループごとに異なる場合があります。コンシューマーがメッセージをプルまたはコミットすると、ハートビートが送信されます。 コンシューマーが一定期間ハートビートを送信しない場合、そのセッションは期限切れとなり、グループ コーディネーターはコンシューマーがダウンしていると見なし、再バランス調整をトリガーします。 ご覧のとおり、コンシューマーがクラッシュしてからセッションが期限切れになるまでの間には一定の期間があり、その間はコンシューマーのパーティションはメッセージを消費できません。 通常、正常なシャットダウンを実行できます。これにより、コンシューマーはグループ コーディネータに離脱メッセージを送信し、グループ コーディネータはセッションの有効期限が切れるのを待たずにすぐに再バランスをとることができます。 バージョン0.10.1では、Kafkaはハートビートのメカニズムを変更し、ハートビートの送信とメッセージのフェッチを分離しました。これにより、ハートビートの送信頻度はフェッチ頻度の影響を受けなくなりました。 さらに、Kafka の最新バージョンでは、コンシューマーがメッセージをプルせずに生存できる時間を設定できます。この設定はライブロックを回避するのに役立ちます。ライブロックは、アプリケーションに障害がないにもかかわらず、何らかの理由でメッセージを消費できない場合に発生します。 ③パーティションと消費モデル 前述の通り、Kafkaトピック内のメッセージは複数のパーティションに分散して保存されます。コンシューマーグループがメッセージを消費する際には、異なるパーティションからメッセージを取得する必要があります。では、トピック内のメッセージの順序は最終的にどのように再構築されるのでしょうか? 答えは「方法はありません」です。Kafka は、全体のメッセージ順序に関係なく、パーティション内のメッセージ順序のみを保証します。 次の疑問は、パーティション内のメッセージが複数回(異なるコンシューマーグループによって)消費される場合、消費されたメッセージはいつパーティションから削除されるのか、そしてパーティションはどのようにしてコンシューマーグループの現在の消費状況を把握するのか、ということです。 メッセージが消費されたかどうかに関わらず、パーティションは有効期限が切れない限りメッセージを削除しません。例えば、保持期間が2日間に設定されている場合、どのグループでもメッセージを発行から2日間は使用できますが、2日を過ぎるとメッセージは自動的に削除されます。 パーティションは各コンシューマーグループのオフセットを格納し、グループがデータを消費した位置を記録します。下の図をご覧ください。 ④ なぜKafkaはプルモデルなのでしょうか? コンシューマーはブローカーからデータをプルする必要がありますか、それともブローカーはコンシューマーにデータをプッシュする必要がありますか? メッセージング システムとして、Kafka はプロデューサーがブローカーにメッセージをプッシュし、コンシューマーがブローカーからメッセージをプルするという従来のアプローチに従います。 FacebookのScribeやClouderaのFlumeなど、ログ記録中心のシステムの中にはプッシュモデルを採用しているものもあります。実際、プッシュモデルとプルモデルにはそれぞれ長所と短所があります。 プッシュ モードでは、メッセージの送信レートがブローカーによって決定されるため、消費レートの異なるコンシューマーに適応することが困難です。 プッシュ モードの目的は、できるだけ早くメッセージを配信することですが、これにより、コンシューマーがメッセージを時間内に処理できなくなる可能性が簡単にあり、通常はサービス拒否やネットワーク輻輳として現れます。 一方、プル モードでは、コンシューマーの消費能力に基づいて適切な速度でメッセージを消費できます。 Kafka には、Pull モードの方が適しています。Pull モードはブローカーの設計を簡素化し、コンシューマーはメッセージの消費速度を自律的に制御できます。 一方、コンシューマーは、バッチで消費するか、一度に 1 つのアイテムを消費するかなど、独自の消費方法を制御できます。また、異なる送信セマンティクスを実現するために、異なる送信方法を選択することもできます。 Kafka はどのようにして信頼性を保証するのでしょうか? 信頼性について議論する際、私たちは常に「保証」という言葉を使います。信頼性の保証は基礎であり、私たちはこの基盤の上にアプリケーションを構築します。 たとえば、リレーショナル データベースの信頼性の保証は ACID であり、これは原子性、一貫性、独立性、永続性を表します。 Kafka は次の 4 つの点に基づいて信頼性を保証します。
ここでの書き込み操作は、ファイルシステムキャッシュへの書き込みのみであり、必ずしもディスクにフラッシュされるとは限りません。プロデューサーは、パーティションのプライマリレプリカへの書き込みが完了するまで待ってから戻る、または同期中のすべてのレプリカへの書き込みが完了するまで待ってから戻るなど、さまざまなタイミングで確認を待つことができます。
これらの基本的な保証を用いて、信頼性の高いシステムを構築します。この時点で、次の質問を検討する必要があります。アプリケーションには実際にどの程度の信頼性が必要なのでしょうか? 信頼性は無料ではありません。システムの可用性、スループット、レイテンシ、ハードウェア価格と密接に関連しており、望むものすべてを手に入れることはできません。そのため、多くの場合、トレードオフが必要となり、盲目的に信頼性を追求することは現実的ではありません。 Kafkaを設定しましょう 上記の説明で、Kafka の概要はご理解いただけたかと思います。では、実際にローカルにインストールして、実際に使ってみましょう。 ステップ1: Kafkaをダウンロードする 以下は、Homebrew がインストールされた Mac OS を使用する例です。次のコードを実行します。
Kafka は Zookeeper に依存しているため、ダウンロード プロセス中に自動的にダウンロードされます。 ステップ2: サービスを開始する 始める前に、まず Kafka のリスニング アドレスとポートを localhost:9092 に変更する必要があります。
次に、下の画像のように変更します。 Zookeeper と Kafka を順番に起動します。
次に、次のステートメントを実行して、「test」という名前のトピックを作成します。
次のコマンドを使用してトピックのリストを表示できます。
ステップ3: メッセージを送信する 次に、新しいコンソールを作成し、次のコマンドを実行して、作成したトピックに従うコンシューマーを作成します。
コンソールを使用して、作成したトピックにメッセージを追加し、作成したコンシューマー ウィンドウを確認します。
正しいメッセージは、コンシューマー ウィンドウを通じて確認できます。 参考文献:
|