DUICUO

ByteDanceが自社開発のShuffleフレームワークをオープンソース化 – Cloud Shuffle Service

本日、ByteDance は Cloud Shuffle Service を正式にオープンソース化したと発表しました。

Cloud Shuffle Service(以下、CSS)は、ByteDanceが開発した汎用リモートシャッフルサービスフレームワークです。Spark、FlinkBatch、MapReduceなどのコンピューティングエンジンをサポートし、ネイティブソリューションよりも安定性、パフォーマンス、柔軟性に優れたデータシャッフル機能を提供します。また、ストレージとコンピューティングの分離やオンラインとオフラインの共存といったシナリオにも対応したリモートシャッフルソリューションを提供します。

現在、CSS は GitHub でオープンソース化されており、開発への参加にご興味のある方はどなたでも歓迎いたします。

プロジェクトアドレス:

https://github.com/bytedance/CloudShuffleService

オープンソースの背景

ビッグデータコンピューティングエンジンでは、プルベースのソートシャッフルが一般的なシャッフルソリューションです。例えば、Spark、MapReduce、FlinkBatch(バージョン1.15以降)はすべて、エンジンのデフォルトソリューションとしてソートシャッフルを使用しています。しかし、ソートシャッフルの実装メカニズムには特定の欠陥があり、大規模な本番環境ではシャッフルの問題がジョブの安定性に影響を与えることがよくあります。

Spark の Sort Shuffle を例に挙げます。

上の図に示すように、ソートシャッフルには次の問題があります。

  • 複数の Spill ファイルを 1 つのファイルにマージすると、追加の読み取り/書き込み I/O が消費されます。

m個のMapTaskとn個のReduceTaskがあり、m*n個のネットワーク接続を生成するとします。この数が非常に大きい場合、以下のようになります。

  • 多数のネットワーク リクエストが発生すると、簡単にシャッフル サービスのバックログが発生する可能性があります。
  • シャッフル サービスは大量のランダム読み取りを生成するため、特に HDD クラスターでは IO ボトルネックが発生しやすくなります。
  • Shuffle Serviceはアプリケーションのリソース分離を実現できません。1つのジョブに異常が発生すると、同じShuffle Serviceノード上の他のすべてのジョブに影響が及ぶ可能性があり、問題が拡大しやすくなります。
  • MapTaskによって生成されるシャッフルデータファイルはローカルにのみ保存されます。ディスクに障害が発生するとデータが失われ、FetchFailedの問題も発生します。
  • シャッフル データ ファイルをローカル ディスクに書き込む方法は、コンピューティング ノード上のディスクに依存するため、ストレージとコンピューティングの分離を実現できません。

これらの要因により、ShuffleRead 操作が遅くなったりタイムアウトしたりしやすくなり、FetchFailed エラーが発生して、オンライン ジョブの安定性に深刻な影響を与える可能性があります。また、ShuffleRead が遅いとリソース使用率 (CPU とメモリ) も大幅に低下します。一方、FetchFailed エラーにより、ステージ内の関連タスクが再計算され、多くのリソースが浪費され、クラスター全体のジョブ実行が遅くなります。ストレージとコンピューティングを分離できないアーキテクチャでは、オフライン/オンラインの共同デプロイメント (オンライン ディスク リソースが不十分)/サーバーレス クラウド ネイティブ環境などのシナリオでの要件を満たすのも困難です。ByteDance は Spark を主要なオフライン ビッグ データ処理エンジンとして使用し、毎日 100 万を超えるジョブを実行し、平均 1 日の Shuffle ボリュームは 300 PB を超えています。HDFS の共同デプロイメントとオンライン/オフラインの共同デプロイメントを含むシナリオでは、Spark ジョブの安定性が損なわれることが多く、ビジネスの SLA に影響を及ぼします。

  • HDD ディスク I/O 機能が制限されていたり、ディスクに障害が発生すると、多数の Shuffle FetchFailed エラーが発生し、ジョブの遅延、障害、ステージの再計算が発生し、安定性とリソース使用率に影響を及ぼします。
  • 外部シャッフル サービス (ESS) は、ストレージと計算を分離できないため、ディスク容量が少ないマシンではディスクがいっぱいになり、ジョブの実行に影響が出ることがよくあります。

このような背景から、ByteDanceはSparkのネイティブESSソリューションの問題点を解決するため、独自のCSSを開発しました。1年半前に社内導入されて以来、CSSは現在1,500以上のオンラインノードと1日平均20PBを超えるシャッフルボリュームを誇り、Sparkジョブのシャッフル安定性を大幅に向上させ、ビジネスSLAの遵守に貢献しています。

クラウドシャッフルサービスの紹介

CSSはByteDanceが独自に開発したプッシュベースのシャッフルサービスです。すべてのMapTasksは、同じパーティションのシャッフルデータをPush経由で同じCSS Workerノードに送信し、保存します。ReduceTasksは、CSS Workerを介してノードからパーティションのデータを順次直接読み取ります。ESSのランダム読み取りと比較して、シーケンシャル読み取りのIO効率は大幅に向上します。

CSSアーキテクチャ

Cloud Shuffle Service (CSS) アーキテクチャ図: CSS クラスターはスタンドアロンのシャッフル サービスであり、主に次のコンポーネントで構成されます。

CSSワーカー

CSSワーカーは起動後、ZooKeeperノードにノード情報を登録します。プッシュとフェッチという2つのサービスリクエストを提供します。プッシュサービスは、MapTasksからのデータプッシュリクエストを受け取り、同じパーティションから同じファイルにデータを書き込みます。フェッチサービスは、ReduceTasksからのデータフェッチリクエストを受け取り、対応するパーティションデータファイルを読み取り、それを返します。CSSワーカーは、シャッフルデータのクリーンアップも担当します。ドライバーがUnregisterShuffleリクエストを発行して、ZooKeeper内の対応するShuffleIdを持つZnodeを削除すると、またはアプリケーションが終了してZooKeeper内のApplicationIdを持つZnodeを削除すると、CSSワーカーは関連するイベントを監視し、シャッフルデータをクリーンアップします。

CSSマスター

ジョブ開始後、Sparkドライバー内でCSSマスターが起動されます。CSSマスターはZooKeeperからCSSワーカーノードのリストを取得し、後続のMapTaskによって生成される各パーティションにn個(デフォルトは2個)のCSSワーカーノードのレプリカを割り当てます。また、ReduceTaskがPartitionIdを持つCSSワーカーノードを取得できるように、これらのノードのメタ情報を管理します。同時に、RegisterShuffle/UnregisterShuffleプロセス中に、ApplicationId/ShuffleIdを持つ対応するZnodeがZooKeeperに作成されます。CSSワーカーはDeleteイベントを監視し、シャッフルデータをクリーンアップします。

動物園飼育員

前述の通り、CSS Workerノードの情報やShuffleIdなどを格納するために使用されます。

CSSの機能

マルチエンジンサポート

Spark (2.x & 3.x) のサポートに加えて、CSS は他のエンジンとも統合できます。現在、ByteDance では、CSS は MapReduce/FlinkBatch エンジンとも統合されています。

パーティショングループはサポートします

単一のパーティションが小さすぎる場合にプッシュ効率が低下する問題に対処するために、実際には複数の連続したパーティションが、プッシュ用に大きなパーティション グループに結合されます。

非常に効率的で統合されたメモリ管理

ESSと同様に、MapTaskのCSSバッファはパーティションのすべてのデータをまとめて保存します。データはスピル前にpartitionIdでソートされ、partitionGroupのディメンションに従ってプッシュされます。同時に、CSSバッファはSparkのUnifiedMemoryManagerメモリ管理システムに完全に統合されており、メモリ関連のパラメータはSparkによって統一的に管理されます。

フォールトトレランス

プッシュ失敗: スピルをトリガーしてパーティショングループデータをプッシュする場合、各プッシュは 4MB(1 バッチ)です。プッシュバッチが失敗しても、以前に正常にプッシュされたデータには影響しません。単にノードを再割り当てし、失敗したデータとプッシュされていない残りのデータをプッシュし続けます。後続の ReduceTasks は、古いノードと新しいノードの両方から完全なパーティションデータを読み取ります。マルチレプリカストレージ: ReduceTasks は、バッチレベルで CSS ワーカーからパーティションデータを取得します。CSS ワーカーに障害が発生し(ネットワークの問題やディスク障害など)、バッチを取得できない場合は、別のレプリカノードを選択して、そのバッチと後続のバッチの読み取りを続行できます。データ重複排除: ジョブが投機的仮定で有効になっている場合、複数の AttempTasks が同時に実行されます。データ読み取り中は重複排除が必要です。バッチをプッシュすると、バッチデータにヘッダー情報が追加されます。ヘッダー情報には、MapId、AttempId、BatchId などが含まれます。ReduceTask はこの ID 情報を使用して、読み取り時にデータを重複排除できます。

適応型クエリ実行(AQE)適応

CSSは、Reduceタスク数の動的調整、SkewJoin最適化、Join戦略最適化など、AQE関連機能を完全にサポートしています。SkewJoinに関しては、CSSはさらなる適応と最適化を経て、複数のReduceTasksによってSkew Partitionデータが繰り返し読み取られる問題を解決し、パフォーマンスを大幅に向上させました。

CSSパフォーマンステスト

専用のラベルコンピューティングリソースを使用した1TB TPC-DSベンチマークテストにおいて、CSSとオープンソースのESSを比較しました。全体として、エンドツーエンドのパフォーマンスは約15%向上し、一部のクエリでは30%を超えるパフォーマンス向上が見られました。また、1TB TPC-DSベンチマークテストにおいて、CSSとオンラインの混合デプロイメントリソースキュー(ESSは比較的安定性が低い)を比較したところ、エンドツーエンドのパフォーマンスが全体で約4倍向上しました。

CSS 1TB テストではクエリ パフォーマンスが 30% 以上向上しました。

今後の計画

CSS には現在オープンソース化された機能があり、今後さらに多くの機能と最適化が徐々にリリースされる予定です。

  • MapReduce/FlinkBatch エンジンをサポートします。
  • CSS クラスターは、CSS ワーカーのステータスと負荷情報を管理するための ClusterManager サービス ロールを追加し、CSS ワーカーの割り当て機能を現在の CSS マスターから ClusterManager に移動します。
  • 異機種マシン (異なるディスク機能など)/負荷などに基づく CSS ワーカー割り当て戦略。