DUICUO

Apache Iceberg にインデックスを導入してクエリパフォーマンスを向上

Apache Iceberg は、タイムトラベル、ACID トランザクション、パーティション進化、スキーマ進化などの強力な機能とオープン エコシステムを提供するオープン ソースの Lakehouse テーブル形式です。

この記事では、インデックスを導入することでクエリ パフォーマンスを向上させる、Iceberg コンポーネントに対する Volcano Engine EMR チームの最適化戦略について説明します。

Iceberg を使用してデータ レイク ウェアハウスを構築する

Volcano Engine E-MapReduce(EMR)は、Volcano Engine Digital Intelligence Platform(VeDI)を基盤とするクラウドネイティブのオープンソース・ビッグデータ・プラットフォーム製品です。Hadoop、Spark、Flink、Hive、Presto、Kafka、StarRocks、Doris、Hudi、Icebergといったエンタープライズグレードのビッグデータ・エコシステム・コンポーネントを提供し、100%のオープンソース互換性を実現しています。企業がエンタープライズグレードのビッグデータ・プラットフォームを迅速に構築し、運用のハードルを下げるのに役立ちます。業界をリードするEMRステートレス・コンセプトに準拠したVolcano Engine EMRは、クラスターレベルの柔軟なスケーリングを可能にします。ビジネス需要がないときにはクラスターを解放し、需要があるときには再起動します。インテリジェントなホットデータとコールドデータの階層型ストレージ機能と組み合わせることで、企業はビッグデータ・インフラストラクチャ分野におけるコストをさらに削減し、効率を向上させることができます。

Volcano Engine EMR製品をベースとして、データレイク・ウェアハウス、準リアルタイム・データウェアハウス、そしてリアルタイム・データウェアハウスを構築できます。例えば、Icebergを用いてデータレイク・ウェアハウスを構築し、ODSからDWDまでの様々なレイヤーをモデリングし、HFDSまたはTOS(Volcano Engineオブジェクトストレージ製品)にデータを保存し、TrinoまたはSparkを用いて分析することが可能です。

クエリ パフォーマンスを高速化して、専用の分散データ ウェアハウス (ClickHouse など) のパフォーマンスにできるだけ近づける方法は、検討して調査する必要がある問題です。

インデックスはクエリパフォーマンスを向上させるための業界で一般的な手法であり、Icebergでもこのアプローチを採用しました。頻繁に使用される列にインデックスを構築し、テーブルスキャン時にこれらのインデックスが一致するデータのみを返すようにすることで、照合対象となるデータの量を削減し、クエリパフォーマンスを大幅に向上させました。

氷山の紹介

Iceberg Indexの機能を紹介する前に、Icebergアーキテクチャについて簡単に説明します。Icebergは、以下に示すように階層化されたメタデータアーキテクチャを採用しています。

Spark、Presto、Flinkなどの複数のエンジンは、階層的なメタデータを使用してデータファイルのリストを検索し、Icebergからデータを読み取ります。例えば、SparkエンジンはSQL文を解析し、IcebergのAPIを呼び出してデータファイルを取得し、タスクに分割します。

マニフェスト ファイルには、データ ファイル内のフィールドの最大値と最小値が記録されます。

 "data_file": { "content": 0, "file_path": "hdfs://emr-cluster/warehouse/hive/db.db/sample/data/ts_day=2020-12-31/category=diamond/00000-0-220aa9a6-4530-499f-9450-da946d667624-00001.parquet", "file_format": "PARQUET", ...... "lower_bounds": { "array": [{ "key": 1, "value": "\u0006\u0000\u0000\u0000" }, { "key": 2, "value": "diamond" }, { "key": 3, "value": "\u0000\u0004Ü Å·\u0005\u0000" }] }, "upper_bounds": { "array": [{ "key": 1, "value": "\u0007\u0000\u0000\u0000" }, { "key": 2, "value": "diamond" }, { "key": 3, "value": "\u0000¨odÆ·\u0005\u0000" }] }, ...... }

この情報を使用すると、データ ファイル レベルで予備フィルタリングを実行し、基準を満たさないデータ ファイルを除外して、読み取られるデータの量を削減できます。

インデックス実装の必要性

Iceberg はすでにデータ ファイル レベルのフィルタリングを提供しているのに、なぜインデックスを導入する必要があるのでしょうか。次の例でこれを示します。左側の 2 つのテーブルはデータ ファイルの内容を表し、右側のテーブルは対応するマニフェスト ファイルを表しています。

SELECT * FROM table WHERE age > 50場合、最小最大統計を使用すると、データ ファイル 1 に条件を満たすデータがないため、データ ファイル 1 は計算に含まれないことが簡単にわかります。

しかし、 name = 'LiLy' AND age > 30のような多次元分析では、 nameと ` ageの最小最大統計を用いてそれぞれname = 'LiLy'age > 30という条件を判定すると、データファイル1とデータファイル2の両方が条件を満たしていることがわかります。しかし、データファイル1とデータファイル2のデータを詳しく分析すると、条件に一致するデータは存在しないため、最小最大フィルタリング効果は理想的ではありません。したがって、適切なインデックスを導入することで、データのスキップ確率を高め、クエリのパフォーマンスを向上させることができます。

1. まず、インデックスの種類を調べます。

インデックスの種類には、ブルームフィルタ、リボンフィルタ、ディクショナリインデックス、ビットマップなど、様々なものがあります。多次元分析シナリオのニーズを満たすため、高カーディナリティシナリオに適しており、=、<、>、IN、BETWEENなどの演算を用いた多次元分析をサポートする[Range-Encoded BitMap](https://www.featurebase.com/blog/range-encoded-bitmaps)(Base-2、ビットスライスインデックス)を選択しました。

例えば、上記の「name」列と「age」列のインデックス情報を計算してみましょう。「name」は文字列型であるため、インデックス情報を計算する前に辞書エンコードする必要があります。Range-Encoded技術を使用することで、データのバイナリ情報と対応する「pos」情報に基づいてインデックスデータが生成されます。インデックスデータを分析するとname = 'LiLy'かつage > 30という条件を同時に満たすデータは同じ行に存在しないことがわかります。このデータは、Range-Encodedデータの積和演算によって除外できるため、データファイル1は計算に関与する必要はありません。

つまり、BitMap の交差および結合操作により、複雑なフィルタリング条件下でもより多くのデータ ファイルをフィルタリングできるようになります。

2. 次に、インデックスの粒度を調べます。

IcebergのMin-Maxもファイルレベルインデックスの一種です。ファイルレベルインデックスは、フィルター条件を満たさないデータファイルを除外します。ファイルレベルインデックスは様々なファイルタイプに適用できますが、粒度は比較的粗いです。データファイル内のレコードが1つでも条件を満たすと、そのファイル内のすべてのデータが読み込まれ、計算に使用されるため、SQLクエリのパフォーマンスに影響します。

ParquetおよびORCファイル形式では、ファイルチャンク(行グループまたはストライプ)の概念が提供されており、行グループ/ストライプの粒度でデータをフィルタリングできます。(説明を簡単にするために、行グループとストライプの両方をまとめて「スプリット」と呼びます。)

例えば、SQL文「 SELECT * FROM table WHERE col_1> v1 AND col_2 = v2では、col_1フィールドとcol_2フィールドのインデックス情報が既に構築されています。これで、このインデックスがSQL文に適用されます。

SQL 文を解析した後、条件を満たすデータファイルのリストが分割され、多数の分割リストが生成されます。インデックスを使用して、各分割内のデータが条件を満たしているかどうかが分析され、満たしていない場合はスキップされます。上図に示すように、データファイルリストを分割した後、数万の分割が得られます。split1 にインデックスデータを適用すると、split1 には条件col_1> v1 AND col_2 = v2同時に満たすデータがないため、split1 のデータは計算に含まれません。最終的に、少数の分割リストのみが取得され、10% を超えるデータフィルタリング率と、クエリパフォーマンスの大幅な向上が実現されました。

したがって、行グループ/ストライプ レベルできめ細かいインデックスを使用すると、ほとんどのデータをフィルター処理できます。

きめ細かなインデックス実装ロジック

Iceberg メタデータのマニフェスト ファイルは、最小値と最大値などの統計情報の提供に加えて、分割関連の情報も提供します: "split_offsets":{"array":[4,...]} 。これにより、行グループ/ストライプ レベルでのきめ細かいインデックスの実装が大幅に容易になります。

  1. インデックスを構築するためのAPIを提供します

Icebergはインデックス構築用のAPIを提供しており、エンジンはこのAPIを呼び出すことでインデックス構築機能を実装できます。Spark 3.3以降では、インデックス付きSQL文が既に提供されています。IcebergのSparkモジュールで、Sparkが提供するインデックスインターフェースを実装するだけで済みます。

  1. インデックスの構築

メインタスクに影響を与えない非同期インデックス構築を採用しています。また、追加されたデータのみをインデックス化する増分インデックス構築機能も提供しています。TableScanを呼び出してデータを読み取り、データファイルの分割オフセットに従ってデータを分割し、インデックスを構築し、インデックスデータと対応するメタデータを保存します。ファイルサイズが小さくなるのを避けるため、インデックスデータはマージされます。

  1. インデックスファイルの保存

インデックスファイルはバイナリ形式の[puffin]https://iceberg.apache.org/puffin-spec/形式を使用します。 Magic Blob₁ Blob₂ ... Blobₙ Footer

フッターには各BLOBのメタデータ情報が保存されます。インデックスが正常に構築されると、次のようなファイルが生成されます。

インデックスの利点

範囲エンコードされたビットマップは多次元分析シナリオに適しており、その有効性は範囲が狭い場合に特に顕著です。以下では、Sparkエンジンに基づくパフォーマンステストを紹介します。

  1. 1TB の SSB テスト データを作成し、インデックスの構築前と構築後に次のテスト ケースをテストします。
 Q1: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 19665277 Q2: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 19665277 AND lo_revenue = 2141624 Q3: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 19665277 AND lo_revenue >=10304000 Q4: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 21877827 AND lo_revenue >= 83800 AND lo_revenue <= 103800 Q5: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice > 21877827 AND lo_revenue >= 83800 AND lo_revenue <= 93800 Q6: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice >= 93565 AND lo_ordtotalprice < 93909 Q7: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice >= 93565 AND lo_ordtotalprice < 91003562 AND lo_revenue >=904300 AND lo_revenue <= 9904300

左のグラフは、インデックスありとなしの7つのSQL文の実行時間を示しています。右のグラフは、インデックス使用後の7つのSQL文のデータ分割回数を示しています。データ分割回数が少ないほど、インデックスのパフォーマンスが向上することは明らかです。最悪のケースでは、すべての分割がパラメータを用いて計算され、インデックスを構築しない場合と同等の効果が得られます。

  1. SSBベンチマークテストの使用

SSBが提供するテストシナリオは、Range-Encodedの有利なシナリオと完全には一致しないため、インデックス使用の効果は顕著ではありません。ただし、インデックスを使用しない場合よりも悪くなることはありません。下の左の図に示すように、SQL文の実行時間はインデックス構築前後であり、インデックス構築のメリットは明らかではありません。右の図では、すべての分割が計算に関係していることがわかります。

要約

上記の紹介に基づいて、Iceberg のインデックス実装のいくつかの特徴をまとめると次のようになります。

  • きめ細かなインデックス レベル: RowGroup/Stripe レベルのインデックスを提供します。これにより、データのクエリ範囲をより正確に特定し、不要なデータ入力を削減して、クエリ パフォーマンスを向上させることができます。
  • インデックスは実行側で動作します。クエリ タスクは複数の実行側に割り当てられ、各実行側ではそのノード上の RowGroup/Stripe データが一致するかどうかを確認するだけで済みます。
  • 複数のエンジンと互換性があります: インデックスが構築されると、複数のエンジンで使用できます。
  • メインのビジネス プロセスに影響を与えないように、非同期のインデックス構築を提供します。
  • ストレージフットプリントが小さく、高カーディナリティと低カーディナリティの両方のシナリオに適しています。範囲クエリと等価性クエリをサポートしています。範囲が狭いほど、メリットは大きくなります。