DUICUO

ByteDanceのオープンソースデータ統合エンジンBitSailの詳細な分析

1. はじめに

BitSailはByteDanceのオープンソースデータ統合エンジンであり、様々な異種データソース間のデータ同期をサポートし、オフライン、リアルタイム、フル、増分といったシナリオに対応する包括的なデータ統合ソリューションを提供します。現在、ByteDanceとそのVolcano Engineプラットフォームにおいて、複数のクライアントのデータ統合ニーズをサポートしています。ByteDanceの様々な事業分野からの膨大なデータ処理テストにも耐え、優れたパフォーマンスと安定性を実証しています。

10月26日、ByteDanceはBitSailプロジェクトをGitHubで正式にオープンソース化したと発表しました。これにより、より多くの企業と開発者に利便性を提供し、データインフラのコストを削減し、データから効率的に価値を生み出すことが可能になります。この記事では、BitSailのプレゼンテーションと主要な機能について、主に以下の4つの部分に焦点を当てます。

  • ByteDance内部データ統合の背景
  • BitSailテクノロジーの進化
  • BitSailの機能分析
  • 将来の展望

2. バイトダンス社内データ統合の背景

ByteDanceは「データ駆動」というコンセプトを常に重視し、実践してきました。データ駆動の一環として、データプラットフォーム機能の構築は極めて重要です。中でも、データプラットフォーム構築の基盤となるデータ統合は、主に異種データソースのデータ伝送、処理、取り扱いにおける問題を解決します。

BitSailは、ByteDanceが自社開発したデータ統合エンジンDTS(Data Transmission Service)を起源としています。当初はApache Flinkをベースとし、ByteDanceの社内業務に約5年間利用されてきました。現在では、バッチ統合、ストリーミング統合、増分統合の3つの同期モードを備え、分散型水平スケーリングと統合型ストリーム・バッチアーキテクチャをサポートしています。単一のフレームワークで、様々なデータ量やシナリオにわたるデータ統合ニーズに対応できます。さらに、BitSailはプラグインアーキテクチャを採用し、ランタイム分離をサポートすることで、優れた柔軟性を提供し、企業が新しいデータソースを容易に統合できるようにします。

3. ビットセイルの進化

3.1 グローバルデータ統合エンジンの進化の3つの段階

ByteDance のデータ統合エンジン BitSail の進化は、3 つの段階に分けられます。

① 初期段階: 2018年以前、当社には統一されたデータ統合フレームワークが存在せず、各チャネルが個別に実装されていたため、MapReduceやSparkといったビッグデータエンジンも分散していました。データソース間の接続もメッシュ接続であり、開発・運用・保守コストは全体的に比較的高くなっていました。

②成長期: 3つの段階に分けられます。

  • 2018-2019年:Flinkエコシステムの進化に伴い、ますます多くの企業がビッグデータコンピューティングエンジンの第一選択肢としてFlinkを選択するようになりました。ByteDanceも例外ではなく、Flinkの活用を継続的に検討しています。2019年には、異種データソース間でデータを転送するためのFlinkベースの手法を提案し、バッチ処理シナリオの統一を実現しました。
  • 2020年 - 2021年: Flinkのバッチとストリームの統合の改善により、ByteDanceは元のアーキテクチャに大幅なアップグレードを施し、ストリーミングシナリオをカバーし、バッチシナリオとストリームシナリオの統合を完了しました。
  • 2021 - 2022 : Hudi Data Lake Engine と統合し、CDC データのリアルタイム同期問題を解決し、統合レイク ウェアハウス ソリューションを提供します。

③ 成熟段階:2022年以降、フルドメインデータ統合エンジンの全体アーキテクチャは、ByteDance内の様々な事業ラインの実稼働環境で安定化・テストされ、その性能と安定性は十分に保証されています。そのため、チームはその機能をより多くの企業や開発者に提供し、データ構築コストを削減し、データから効率的に価値を創造できるよう努めています。

3.2 BitSailデータ統合エンジン技術アーキテクチャの進化

3.2.1 Flinkに基づく異種データソース転送アーキテクチャ

これは、Flink 1.5 DataSet APIに基づいて実装された異種データソース伝送アーキテクチャであり、バッチ処理シナリオのみをサポートします。フレームワークの核となる考え方は、生の入力層データをBaseInputとして抽象化し、主にソースからデータを取得するために使用します。また、出力層をBaseOutputとして抽象化し、外部システムへのデータ書き込みを担うようにすることです。同時に、フレームワーク層は型システム、自動並列処理、フロー制御、ダーティデータ検出などの基本サービスを提供しており、これらのサービスはすべてのデータソースチャネルに適用されます。

次のセクションでは、バッチ シナリオにおける興味深い機能を紹介し、実際の業務運用で発生するいくつかの問題点についても説明します。

上の画像の左上部分は、Flinkの元のランタイムログです。このログからは、タスクの現在の実行率やタスク完了までの所要時間といったタスクの進捗データや予測データは確認できません。

左下のセクションには、Flink UI から提供されるタスク実行メタデータが表示されています。読み取りカウントと書き込みカウントの両方が 0 であることがわかります。Flink エンジンの観点から見ると、すべてのオペレーターが全体として入力も出力も持っていないため、これは妥当な値です。しかし、ユーザーの観点から見ると、タスク全体の進行状況と現在処理済みのレコード数を確認することは不可能であり、タスクが停止しているのではないかと疑われます。画像の右側は、改善された結果を示しています。ログには、現在処理されているレコード数、リアルタイムの進行状況、そして消費時間が明確に出力されています。この機能は、ByteDance 社内で実装されて以来、様々な事業部門から高い評価を得ています。

具体的な実装については後述します。

まず、Flinkタスクの実行プロセスを確認しましょう。従来のMapReduceやSparkの駆動モデルとは異なり、Flinkはタスク駆動型です。タスクマネージャー(JM)がSplitを作成すると、タスクは継続的に実行され、JMに新しいSplitを常に要求します。すべてのSplitの処理が完了した後にのみ、タスクは終了します。この時点で、完了したタスクの総数をタスクの総数で割ると、進行状況が多少歪んで表示されます。最初はすべてのタスクが実行され、常にSplitをプルしているため、表示される進行状況は0になります。JMがSplitの処理を完了すると、すべてのタスクが同時に終了し、進行状況は突然100%に跳ね上がりますが、その間の進行状況情報にはギャップがあります。

この問題を解決するには、データ駆動型オペレーションの核心に立ち返り、ジョブ全体の実行プロセスを分割という観点から測定する必要があります。図の右側は、Flink UIが提供するAPIを通じて、タスク全体のトポロジ情報を取得し、それをソースレイヤーとオペレーターレイヤーという2つのオペレーターレイヤーに分割して変更する方法を示しています。

ソースレイヤー

ネイティブのSource APIを、具体的には2つの部分で変更しました。まず、Splitを作成した後、Splitの合計数を取得し、Metricsにアップロードします。次に、Source内の各タスクがSplitの処理後にCompletedSplitを報告します。これにより、Flink UIを介して現在完了したSplitの数とSplitの合計数を取得できるようになり、完了したSplitの数をSplitの合計数で割ることでSourceノードの進行状況を測定できます。

オペレータ層

まず、現在のオペレータの上流ノードの出力数と、現在のノードが読み込んだ出力数を確認します。次に、現在のノードが読み込んだ出力数を上流ノードからの出力数で割ることで、現在のオペレータの進行状況を決定します。さらに、勾配制約を実装します。つまり、現在のノードの進行状況は、上流ノードの進行状況以下になる必要があります。

3.2.2 Flinkベースのバッチおよびストリーム処理アーキテクチャ

以下は、バッチデータとストリーミングデータを統合するためのアーキテクチャです。ByteDanceデータプラットフォームチームは、元のアーキテクチャと比較して、以下のアップグレードを実施しました。

  • Flink をバージョン 1.5 から 1.9 にアップグレードし、同時に DataSet API を分析して DataStream API にアップグレードし、統合されたバッチおよびストリーム アーキテクチャをサポートしました。
  • データソースのサポートが拡張されました。従来のオフラインデータソースに加え、メッセージキューなどのリアルタイムデータソースも追加されました。
  • フレームワーク レイヤーが拡張され、Exactly Once、イベント時間の書き込み、自動 DDL などの機能をサポートするようになりました。
  • エンジン層に改良が加えられ、投機的実行やリージョンフェイルオーバーなどの機能が追加されました。
  • クラウドネイティブ アーキテクチャをサポートするために、ランタイム層がさらに拡張されました。

リアルタイム シナリオにおける典型的なリンク、つまり MQ から Hive へのリンクを分析します。

左の図(Shuffle)は現在のコミュニティ実装を表しており、HudiやIcebergなどの多くのデータレイクは書き込みにこの構造を使用しています。この構造は2層の演算子で構成されています。最初の層はデータ処理層で、データの読み書きを担当します。2番目の層は単一ノードコミット層で、単一同時実行性を持ち、主にHiveパーティションの生成やその他のメタデータ操作など、メタデータのコミットを担当します。

このアーキテクチャの利点は、明確な全体トポロジ(データ処理フロー)と明確に定義されたオペレータ関数です。しかし、大きな欠点もあります。単一の同時実行ノードを追加すると、タスク全体がシャッフル接続になってしまうのです。シャッフル接続の本質的な弱点は、タスクフェイルオーバーが発生した際に、グローバルな再起動を直接実行してしまうことです。

右の図(パイプライン化)は、変更されたデータ処理フローを示しています。データ書き込み部分は変更されていませんが、変更は後続のコミット部分にあります。この設計は、タスクのフォールトトレランス中にグローバルな再起動が発生しないように、元のパイプラインアーキテクチャを維持することを目的としています。元の単一の同時コミットノードは廃止され、すべてのメタデータコミットはJM(Jack Manager)で処理されるようになりました。タスクとJM間の通信は、Aggregate Managerを介して行われます。このアーキテクチャへの切り替えにより、大容量データを扱うシナリオにおける安定性が大幅に向上しました。

3.2.3 Flinkベースのレイクウェアアーキテクチャ

レイク ウェアハウス統合アーキテクチャを導入する目的は、CDC データのほぼリアルタイムの同期を解決することです。

右の図は元のアーキテクチャを示しており、処理フローには 3 つのモジュールが含まれています。

  • バッチ プル タスク: CDC からすべてのデータをプルし、ベース イメージとして Hive に書き込むために使用されます。
  • リアルタイム タスク: CDC から変更ログを取得し、増分データとしてリアルタイムで HDFS に書き込みます。
  • オフライン スケジューリング タスク: 定期的にマージを実行し、完全データと増分データを結合して新しい完全データを作成します。

前述のアーキテクチャは非常に複雑で、FlinkやSparkといった複数のコンピューティングエンジンに依存しています。リアルタイム性能に関しては、T+1レイテンシしか達成できず、最速でも1時間程度にとどまるため、準リアルタイムの分析シナリオを効果的にサポートすることはできません。効率性に関しては、各パーティションがフルイメージであるためストレージオーバーヘッドが比較的大きく、計算コストも高く、マージごとにグローバルシャッフルが必要になります。

右の図はアップグレードされたアーキテクチャを示しています。主なアップグレードは次のとおりです。

  • Flink 1.9をFlink 1.11にアップグレードし、Hudiデータレイクエンジンを統合することで、CDCのほぼリアルタイムなデータ同期をサポートしました。これは、Hudiエンジンが完全なインデックスメカニズムと高性能なUpsertを備えているためです。
  • 全体的な書き込み効率と安定性を向上させるために、Hudi エンジンにいくつかの基本的な改善も行われました。

最終的な実装では、全体的なレイテンシが10分未満でほぼリアルタイムの書き込みを実現し、元のアーキテクチャと比較して70%以上のパフォーマンス向上を実現しました。これにより、フルドメインデータ統合アーキテクチャの統合が完了し、単一のシステムであらゆる同期シナリオをカバーできるようになりました。

3.3 建築の進化における実践経験の共有

次のセクションでは、実際の進化の過程でのいくつかの考え、問題点、および改善策を紹介します。

テーブルタイプの選択

データレイクは、CopyOnWrite(COW)テーブルやMergeOnRead(MOR)テーブルなど、様々なテーブル形式をサポートしています。COWテーブルは読み取りパフォーマンスに優れていますが、書き込み増幅(write acceleration)につながる可能性があります。一方、MORテーブルは書き込みパフォーマンスに優れていますが、読み取り増幅(read acceleration)につながる可能性があります。テーブル形式の選択は、最終的には具体的なビジネスニーズによって異なります。

私たちのビジネスシナリオでは、CDCデータのほぼリアルタイム同期が求められます。CDCデータの主な特徴は、大量のランダム更新が発生することです。このシナリオでコピーオンライト(COW)テーブルを選択すると、深刻な書き込み増幅の問題が発生するため、Modified Origin(MOR)テーブルを選択しました。上の図は、MORテーブルのクエリと書き込みプロセスを示しています。最初の要素は列指向のベースイメージファイル(ベースファイル)で、2番目の要素は行指向の増分ログファイル(ログファイル)です。

クエリを実行するたびに、ログファイルとベースファイルをマージする必要があります。MORテーブルにおけるリードアンプリフィケーションの問題を解決するために、通常はコンパクションサービスを構築し、定期的なスケジュールに基づいてログファイルとベースファイルをマージし、新しいベースファイルを生成します。

Hudiリアルタイムライティングの問題点

図に示すように、これはネイティブHudiリアルタイム書き込みのフローチャートです。

まず、Hudiデータにアクセスすると、インデクサーとして機能するFlink Stateに入ります。Hudiは、Bloom Indexなど、多くのインデックス作成メカニズムを提供しています。しかし、Bloom Indexには欠点があります。誤検知が発生し、ファイル全体を走査するフォールバックが発生する可能性があり、効率に影響を及ぼします。Flink Stateの利点は、増分更新をサポートし、読み取りパフォーマンスが比較的高いことです。Flink Stateを通過すると、レコードがUpsertレコードかInsertレコードかを確認し、ファイルIDを割り当てることができます。

次に、このファイルIDを用いてKeyByレイヤーを実行し、同じファイルIDのデータを同じタスクに割り当てます。タスクは各ファイルIDをローカルにキャッシュし、キャッシュの上限に達するとデータをHoodieクライアントにフラッシュします。Hoodieクライアントは主に、増分ログデータをブロック単位で書き込み、ミニバッチでHDFSにフラッシュする役割を担います。

次に、単一の同時コミットノードを接続します。最新バージョンはコーディネーターに基づいています。すべてのオペレータがチェックポイントを完了すると、書き込みが成功したことを示すメタデータを送信するコミットが行われます。チェックポイント作成中は、タスクのキャッシュとHoodieクライアントのキャッシュも更新し、同時にHDFSに書き込みます。通常は、主にMORテーブルの読み取り増幅の問題に対処するために、コンパクションオペレータも接続します。

このアーキテクチャでは、実際の運用環境では次の問題が発生します。

(1)データ量が多い場合、 Flink Stateの拡大がより顕著になり、タスクの速度やチェックポイントの成功率に影響を与えます。

(2) Compaction演算子に関して言えば、Flinkのストリーミングタスクのリソースは永続的ですが、Compaction自体は周期的なスケジューリングです。同時実行性が高く設定されていると、多くの場合、より多くのリソースが無駄に消費されます。

(3) Flinkは、スロット共有など、リソース全体の利用率を向上させるための多くのリソース最適化戦略を提供しています。しかし、コンパクションは実際のデータ読み取り・書き込みオペレータとリソースを競合するため、リソース競合の問題が発生します。コンパクション自体もI/O負荷が高く、CPUを集中的に使用する操作であるため、増分ログと完全ログを継続的に読み取り、完全なデータセットを出力する必要があります。

上記の問題に対処するために、Hudi の書き込みプロセスを最適化しました。

まず、CDCから変更ログを収集し、メッセージキューに送信します。次に、メッセージキューから変更ログを消費し、以下の3つの最適化を実行します。

(1) 従来のFlink Stateは廃止され、ハッシュインデックスに置き換えられました。ハッシュインデックスの利点は、外部ストレージに依存しないことです。Hoodie Recordが到着すると、対応するBucketを知るために必要なのは簡単なハッシュ処理だけです。

(2)圧縮サービスはオフラインタスク化され、リソースの無駄やリソースの競合の問題を解決するために定期的にスケジュールされます。

(3) タスクキャッシュとHudiキャッシュを統合したのは、タスクキャッシュはチェックポイントが作成されるたびに更新する必要があり、HudiキャッシュはHDFSに書き込む必要があるためです。キャッシュされたデータの量が多いと、チェックポイント全体の時間が長くなります。

最適化後、安定性の面では数百万 QPS をサポートでき、エンドツーエンドのチェックポイントのレイテンシは 1 分以内に制御され、チェックポイントの成功率は 99% に達します。

4. BitSailの機能分析

現在の当社の技術アーキテクチャは比較的成熟しており、ByteDance社内の様々な事業部門で検証済みであるため、一定レベルのデータの安定性と効率性が確保されています。そのため、私たちは蓄積してきた経験を共有することで、より多くの企業や開発者に利便性を提供し、データインフラのコストを削減し、データから高効率な価値を創造できるようにしたいと考えています。この目標を達成するために、2つの主要な機能の構築に取り組む必要があります。

4.1 低コストの共同建設能力

データ統合には明確なネットワーク効果があり、各ユーザーはそれぞれ異なるデータ統合シナリオに直面しています。そのため、データ統合の機能とエコシステムを向上させるには、全員の参加が不可欠です。そのためには、共同構築コストの問題を解決し、誰もが低コストでプロジェクト全体の共同構築と反復作業に参加できるようにする必要があります。

BitSail では、2 つのアプローチを通じてこの機能を進化させています。

4.1.1 モジュール分解

エンジン層、データソース層、基本フレームワーク層を含むすべてのモジュールが、単一の巨大なJARファイルにバンドルされていました。そのため、モジュール間の結合度が高く、データ処理フローが不明確でした。この問題に対処するため、システムを機能モジュールごとに分割し、基本フレームワークとデータソースをエンジンから分離しました。さらに、ダーティデータ検出、スキーマ同期、モニタリングなど、環境によって実装方法が異なるさまざまなユーザー環境に対応できるよう、技術コンポーネントはプラガブル設計を採用しました。

4.1.2 インターフェースの抽象化

このフレームワークはFlink APIに深く結びついているため、ユーザーはFlinkエンジンの内部構造を深く理解する必要があり、コネクタ統合コストが全体的に高くなります。この問題に対処するため、エンジンから独立した新しい読み取り/書き込みインターフェースを抽象化しました。ユーザーは新しいインターフェースを開発するだけで済みます。内部的には、抽象インターフェースとエンジンインターフェース間の変換に新しい抽象化レイヤーが使用されています。この変換はユーザーからは見えないため、ユーザーは基盤となるエンジンの詳細を理解する必要はありません。

4.2 アーキテクチャの互換性

企業によってビッグデータコンポーネントやデータソースのバージョンは異なり、バージョン間の互換性の問題が発生する可能性があります。そのため、異なる環境間で迅速なインストール、展開、検証を実現するためには、アーキテクチャの互換性を向上させる必要があります。この機能を構築するために、私たちは2つのアプローチを採用しています。

4.2.1 マルチエンジンアーキテクチャ

現在のアーキテクチャはFlinkエンジンに深く依存しており、ユースケースが限られています。例えば、Sparkエンジンやその他のエンジンを使用しているお客様もいらっしゃいます。Flinkエンジンへの依存度が高いため、シンプルなシナリオや小規模なデータセットでは、リソースの無駄が顕著になります。

この問題に対処するため、エンジン層に複数のエンジンエントリポイントを用意しました。既存のFlinkエンジンをベースに、Sparkエンジンやローカルエンジンも含めた拡張を行う予定です。実装面では、実行環境を抽象化し、様々なエンジンが抽象クラスを実装できるようにしました。同時に、ローカル実行方式も検討しています。これは、Flinkジョブなどの処理を開始することなく、スレッドを使用して小規模なデータセットをローカルで処理することで、全体的なリソース利用効率を向上させるものです。

4.2.2 依存関係の分離

現在、システムには外部環境に存在しない内部依存関係がいくつか存在し、ビッグデータ基盤も社内バージョンにバインドされています。そこで、以下の3つの側面で最適化を行いました。

  • 社内の依存関係を排除し、オープンソースの汎用ソリューションを採用して、さまざまなビジネス シナリオに対応します。
  • ビッグデータインフラストラクチャでは、固定のインフラストラクチャに縛られることなく、提供された依存関係が使用されます。インフラストラクチャは実行時に外部で指定され、互換性のないシナリオはMavenプロファイルとMaven Shadesによって分離されます。
  • データソースの複数バージョンやバージョン非互換性といった問題に対処するため、動的ロード戦略を採用し、データソースを独立したコンポーネントとして扱います。必要なデータソースのみが毎回ロードされるため、分離という目標が達成されます。

5. 今後の展望

BitSailは、データが価値ある場所へスムーズに流れることを願っており、皆様との協力を通してデータ統合機能とエコシステムの向上に努めてまいります。さらに、以下の3つの分野における取り組みをさらに深化させていきます。

① マルチエンジンアーキテクチャ:ローカルエンジンの実装を検討し、ローカル実行をサポートし、シンプルで小規模なデータシナリオでのリソース利用率を向上させます。インテリジェントなエンジン選択戦略を実装し、シンプルなシナリオではローカルエンジンを使用し、複雑なシナリオではビッグデータエンジンの機能を再利用します。

② 一般的な能力構築:新しいインターフェースを推進し、エンジンの詳細をユーザーから保護し、コネクタ開発コストを削減します。

Connector の多言語ソリューションをご覧ください。

③ストリーミングデータレイク:パフォーマンス面では数千万QPSを安定的にサポートする統合CDCデータ取り込みソリューション。

データ レイク プラットフォーム機能の構築に関しては、バッチ、ストリーミング、増分使用シナリオを包括的にカバーします。

✳️この記事は、DataFun ボランティアの Zhong Xiaohua のおかげでまとめられました。