Alibaba Cloud RemoteShuffleService の新機能: AQE とフロー制御

Alibaba Cloud RemoteShuffleService の新機能: AQE とフロー制御

著者: イーチュイ、ミンジ

Alibaba Cloud EMR は 2020 年にリモートシャッフルサービス (RSS) を開始して以来、多くのお客様が Spark ジョブのパフォーマンスと安定性の問題を解決し、ストレージとコンピューティングの分離アーキテクチャの実装を可能にしてきました。誰もがより簡単に利用し、拡張できるようにするために、RSS は 2022 年初頭にオープンソース化され、あらゆる分野の開発者がその構築に貢献することを歓迎します。 RSSの全体的なアーキテクチャについては[1]を参照してください。この記事では、RSS の 2 つの最新の重要な機能、Adaptive Query Execution (AQE) とフロー制御のサポートについて紹介します。

1. RSSはAQEをサポートする

1. AQE の概要

適応型クエリ実行(AQE)はSpark 3の重要な機能です[2]。実行時統計を収集して後続の実行プランを動的に調整することで、オプティマイザーが統計を正確に推定できないために実行プランが適切に生成されない問題を解決します。 AQE には、パーティション結合、スイッチ結合戦略、およびスキュー結合の最適化という 3 つの主要な最適化シナリオがあります。これら 3 つのシナリオはすべて、Shuffle フレームワークの機能に新たな要件を提起します。

パーティションの結合

パーティションのマージの目的は、リデューサーによって処理されるデータの量を適度かつ均一にすることです。このアプローチでは、まず、より多くのパーティションに応じてマッパーにシャッフル書き込みを実行させます。 AQE フレームワークは各パーティションのサイズをカウントします。連続する複数のパーティション内のデータ量が比較的少ない場合、これらのパーティションは 1 つにマージされ、処理のために Reducer に渡されます。手順は以下のとおりです。

上の図からわかるように、最適化された Reducer2 は、元々 Reducer2-4 に属していたデータを読み取る必要があります。 Shuffle フレームワークの要件は、ShuffleReader が範囲パーティションをサポートする必要があることです。

デフgetReader [ K , C ] (
ハンドル: ShuffleHandle
開始パーティション: Int ,
終了パーティション: Int ,
コンテキスト: TaskContext ) : ShuffleReader [ K , C ]

参加戦略の切り替え

結合戦略を切り替える目的は、統計の推定が不正確であるために、オプティマイザーが誤ってブロードキャスト結合ではなく SortMerge 結合または ShuffleHash 結合を選択する問題を修正することです。具体的には、結合する 2 つのテーブルが Shuffle Write を完了した後、AQE フレームワークは実際のサイズをカウントします。小さいテーブルがブロードキャスト結合の条件を満たしていることが判明した場合、小さいテーブルはブロードキャストされ、大きいテーブルのローカルシャッフルデータと結合されます。プロセスは次のとおりです。

結合戦略を切り替えるための最適化は 2 つあります。1. ブロードキャスト結合に書き換える。 2. LocalShuffleReader を介して、大きなテーブルのデータをローカルで直接読み取ります。 Shuffle フレームワークの 2 番目の新しい要件は、ローカル読み取りをサポートすることです。

傾斜結合の最適化

傾斜結合の最適化の目的は、より多くのリデューサーが傾斜パーティションを処理できるようにし、ロングテールを回避することです。具体的には、シャッフル書き込みが完了すると、AQE フレームワークは各パーティションのサイズをカウントし、特定のルールに基づいて偏りがあるかどうかを判断します。その場合、パーティションは複数の分割に分割され、各分割は別のテーブルの対応するパーティションと結合されます。下記の通りです。

パーティション分割は、Shuffle Output のサイズを MapId 順に累積することによって行われ、累積値がしきい値を超えると分割がトリガーされます。 Shuffle フレームワークの新しい要件は、ShuffleReader が範囲 MapId をサポートできることです。包括的なパーティションマージの最適化のための範囲パーティションの要件を考慮して、ShuffleReader のインターフェースは次のように進化しました。

デフgetReader [ K , C ] (
ハンドル: ShuffleHandle
開始マップインデックス: Int
終了マップインデックス: Int ,
開始パーティション: Int ,
終了パーティション: Int ,
コンテキスト: TaskContext
メトリクス: ShuffleReadMetricsReporter ) : ShuffleReader [ K , C ]

2. RSSアーキテクチャのレビュー

RSS のコア設計は、プッシュ シャッフル + パーティション データ集約です。つまり、異なる Mapper が同じパーティションに属するデータを同じ Worker にプッシュして集約し、Reducer が集約されたファイルを直接読み取ります。下の図の通りです。

RSS は、コア設計に加えて、複数のレプリカ、フルリンク フォールト トレランス、マスター HA、ディスク フォールト トレランス、アダプティブ プッシャー、ローリング アップグレードなどの機能も実装しています。詳細については[1]を参照。

3. RSSはパーティションのマージをサポートします

パーティションのマージには、範囲パーティションをサポートする Shuffle フレームワークが必要です。 RSS では、各パーティションがファイルに対応しているため、次の図に示すように自然にサポートされます。

4. RSSは参加戦略の切り替えをサポート

Shuffle フレームワークの Join 戦略切り替えの要件は、LocalShuffleReader をサポートすることです。 RSS のリモート属性により、データは RSS クラスターに保存され、RSS とコンピューティング クラスターが同じ場所に配置されている場合にのみローカルに存在します。したがって、ローカル読み取りはまだサポートされていません (コロケーション シナリオは将来最適化され、サポートされる予定です)。ローカル読み取りはサポートされていませんが、結合書き換えには影響しないことに注意してください。 RSS は、次の図に示すように、結合書き換えの最適化をサポートしています。

5. RSSはJoin Tilt最適化をサポートします

3 つの AQE シナリオの中で、Join 傾斜最適化の RSS サポートが最も困難です。 RSS のコア設計はパーティション データ集約であり、Shuffle Read のランダム読み取りをシーケンシャル読み取りに変換して、パフォーマンスと安定性を向上させることを目的としています。複数の Mapper が同時に RSS Worker にデータをプッシュします。 RSS はメモリ内のデータを集約し、それをディスクにフラッシュします。したがって、次の図に示すように、パーティション ファイル内の異なるマッパーのデータは順序が乱れます。

結合スキューの最適化には、Map1-2 からデータを読み取るなど、範囲マップを読み取る必要があります。一般的なアプローチは 2 つあります。

  • ファイル全体を読み取り、範囲外のデータを破棄します。
  • インデックス ファイルをインポートし、各ブロックの場所と MapId を記録し、範囲内のデータのみを読み取ります。

どちらのアプローチにも問題点は明らかです。方法 1 では、大量の冗長ディスク読み取りが発生します。方法 2 は基本的にランダム読み取りに戻り、RSS のコアの利点が失われ、インデックス ファイルの作成は、偏りのないデータであっても普遍的なオーバーヘッドになります (シャッフル書き込みプロセス中に偏りがあるかどうかを正確に予測することは困難です)。

上記の 2 つの問題を解決するために、Active Split + Sort On Read という新しい設計を提案しました。

アクティブスプリット

歪んだパーティションのサイズは非常に大きくなる可能性があり、極端な場合にはディスクが直接爆発する可能性があります。非スキューシナリオでも、大きなパーティションが発生する可能性は依然として高くなります。したがって、ディスク負荷分散の観点から、パーティション ファイルのサイズを監視し、アクティブ分割を実行することが非常に重要です (デフォルトのしきい値は 256 m です)。

分割が発生すると、RSS は現在のパーティションのワーカーのペア (プライマリ レプリカとセカンダリ レプリカ) を再割り当てし、後続のデータは新しいワーカーにプッシュされます。実行中の Mapper への Split の影響を回避するために、Soft Split メソッドを提案しました。つまり、Split がトリガーされると、RSS は非同期的に新しい Worker を準備し、Ready 後に Mapper の PartitionLocation 情報をホット更新するため、Mapper の PushData に干渉することはありません。全体的なプロセスを下の図に示します。

既読順に並べ替え

ランダム読み取りの問題を回避するために、RSS は Sort On Read 戦略を採用しています。具体的には、File Split の最初の Range 読み取りによってソートがトリガーされ (Range 以外の読み取りではトリガーされません)、ソートされたファイルは位置インデックスとともにディスクに書き戻されます。後続の範囲読み取りは、順次読み取りであることが保証されます。下の図の通りです。

複数のサブリデューサーが同じファイル分割のソートを待機することを回避するために、次の図に示すように、各サブリデューサーが分割を読み取る順序を分割します。

ソートの最適化

Sort On Read は冗長な読み取りとランダム読み取りを効果的に回避できますが、分割ファイル (256M) をソートする必要があります。このセクションでは、ソートの実装とオーバーヘッドについて説明します。ファイルの並べ替えには、ファイルの読み取り、MapId の並べ替え、ファイルの書き込みという 3 つの手順が含まれます。 RSS のデフォルトのブロック サイズは 256k で、ブロック数は約 1000 なので、ソート処理は非常に高速で、主なコストはファイルの読み取りと書き込みにあります。ソートプロセス全体には、おおよそ 3 つのオプションがあります。

  • ファイル サイズのメモリを事前に割り当て、ファイル全体を読み取り、MapId を解析して並べ替え、ブロックを MapId 順にディスクに書き戻します。
  • メモリを割り当てず、各ブロックの位置をシークし、MapId を解析してソートし、元のファイルのブロックを MapId の順序で新しいファイルに転送します。
  • 小さなメモリ ブロック (256k など) を割り当て、ファイル全体を順番に読み取り、MapId を解析して並べ替え、元のファイルのブロックを MapId の順序で新しいファイルに転送します。

IO の観点から見ると、一見すると、ソリューション 1 では十分なメモリを使用して順次読み取りと書き込みを行うことができません。ソリューション 2 にはランダム読み取りとランダム書き込みがあります。ソリューション 3 にはランダム書き込みがあります。直感的には、ソリューション 1 の方がパフォーマンスが優れています。ただし、PageCache が存在するため、ソリューション 3 でファイルを書き込むと、元のファイルが PageCache にキャッシュされる可能性があります。したがって、次の図に示すように、ソリューション 3 の実際のパフォーマンスは優れています。

同時に、ソリューション 3 は追加のプロセス メモリを占有する必要がないため、RSS はソリューション 3 のアルゴリズムを採用しています。次の図に示すように、ソートおよびインデックス付けを行わない Sort On Read とランダム読み取り方式の比較もテストしました。

全体的なプロセス

Join 傾斜最適化をサポートする RSS の全体的なプロセスを次の図に示します。

2. RSSフロー制御

フロー制御の主な目的は、RSS ワーカーのメモリが枯渇するのを防ぐことです。フロー制御には通常 2 つの方法があります。

  • クライアントは各 PushData の前にワーカー用のメモリを予約し、予約が成功した後にのみプッシュがトリガーされます。
  • 作業者側のバックプレッシャー。

PushData は非常に高頻度でパフォーマンスが重要な操作であるため、プッシュごとに追加の RPC 操作を実行するとオーバーヘッドが高くなりすぎるため、バック プレッシャー戦略を採用しました。ワーカーの観点から見ると、受信データのソースは 2 つあります。

  • クライアントからプッシュされたデータ
  • プライマリレプリカから送信されたデータ

次の図に示すように、Worker2 は Mapper によってプッシュされた Partition3 のデータと Worker1 によって送信された Partition1 のレプリカ データの両方を受信し、Partition3 のデータを対応するスレーブ レプリカに送信します。

Mapper によってプッシュされたデータは、次の条件が満たされた場合にのみ解放されます。

  • レプリケーションが正常に実行されました
  • データはディスクに正常に書き込まれました

プライマリ レプリカからプッシュされたデータは、次の条件が満たされた場合にのみメモリから解放されます。

  • データはディスクに正常に書き込まれました

フロー制御戦略を設計する際には、フロー制限(流入するデータ量の削減)だけでなく、フロー漏洩(メモリを適時に解放できる)も考慮する必要があります。具体的には、高水準点 (それぞれメモリ使用量 85% と 95% に相当) に対して 2 つのメモリしきい値を定義し、低水準点 (メモリ使用量 50%) に対しては 1 つのメモリしきい値のみを定義します。高水位しきい値に達すると、フロー制御がトリガーされ、Mapper によってプッシュされたデータの受信が一時停止され、ディスクが強制的にフラッシュされ、フロー排出の目的が達成されます。マッパーからの流入を制限するだけではプライマリレプリカからのフローを制御できないため、2 番目の最高水準点を定義します。このしきい値に達すると、プライマリレプリカによって送信されたデータの受信も同時に中断されます。水位が低水位より低くなると、正常に戻ります。全体的なプロセスを下の図に示します。

3. パフォーマンステスト

Spark 3.2.0 で AQE を有効にした RSS とネイティブの外部 Shufle サービス (ESS) のパフォーマンスを比較しました。 RSS はコロケーション アプローチを採用しており、追加のマシン リソースを占有しません。また、RSS が使用するメモリは 8 GB であり、これはマシン メモリ (マシン メモリは 352 GB) の 2.3% にすぎません。具体的な環境は以下の通りです。

1.テスト環境

ハードウェア:

ヘッダー マシン グループ 1x ecs.g5.4xlarge、ワーカー マシン グループ 8x ecs.d2c.24xlarge、96 CPU、352 GB、12x 3700GB HDD。

Spark AQE 関連の構成:

 spark .sql .adaptive .enabled がtrue
spark .sql .adaptive .coalescePartitions .enabled true
spark .sql .adaptive .coalescePartitions .initialPartitionNum 1000
spark .sql .adaptive .skewJoin .enabled true
spark .sql .adaptive .localShuffleReader .enabled がfalse

RSS 関連の設定:

 RSS_マスター_メモリ= 2g
RSS_WORKER_MEMORY = 1g
RSS_WORKER_OFFHEAP_MEMORY = 7g

2. TPCDS 10Tテストセット

10TB TPCDS をテストしました。 E2E の観点から見ると、ESS は 11734 秒、RSS シングルコピー/デュアルコピーはそれぞれ 8971 秒/10110 秒かかり、次の図に示すように、ESS よりも 23.5%/13.8% 高速でした。 RSS が 2 つのレプリカを有効にすると、ネットワーク帯域幅が上限に達することがわかりました。これが、2 つのレプリカが 1 つのレプリカよりも低くなる主な理由です。

各クエリの具体的な時間比較は次のとおりです。

関連リンク

GitHub アドレス: https://github.com/alibaba/RemoteShuffleService

参照

[1] Alibaba Cloud EMRリモートシャッフルサービスのXiaomiとオープンソースでの実践。 https://developer.aliyun.com/article/857757

[2]適応型クエリ実行:実行時のSpark SQLの高速化。 https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html

<<:  クラウド データ セキュリティのベスト プラクティスを学びましたか?

>>:  クラウドコンピューティングが「後半」に突入する中、国内クラウドコンピューティングの活路はどこにあるのでしょうか?

推薦する

BAT31 PR: さまざまな特徴と強みが国内のインターネットを非常に活発にしている

インターネット企業は、ネットユーザーが毎日利用し、そのサービスにはより高い広報能力が求められる百度、...

6つの主要なマーケティングの方向性を明確にし、医療マーケティングにおける回り道を避ける

医療マーケティングはオンラインマーケティングの最も一般的なタイプですが、その効果は特に良くありません...

ピンドゥオドゥオの「内部論理」は変化した

明らかな認識の一つは、私たちがよく知っているPinduoduoが変化しているということです。 Pin...

ウェブマスターの観点から見た 360 Search、Baidu、Google の比較

360 Search は数日間「ひっそりと」オンラインになっており、当初のシンプルなページから現在の...

百度百科事典のイメージ構築

百度百科事典はさまざまな価値に満ちた金鉱だということはわかっています。例えば、高品質な外部リンク、安...

巨木観察03: 熱帯雨林の風システム: ゼロから数億まで、実はあなたもできる

観測対象の概要みなさんこんにちは。私は曹源兄弟です。今日はCool Grassrootsが第03回大...

gigsgigscloud: フィリピン VPS/フィリピン クラウド サーバー、CMI 直通回線、月額 4.2 ドル、1G メモリ/1 コア/10gSSD/500G トラフィック

gigsgigscloudは、フィリピンVPS/フィリピンクラウドサーバー事業を新たに開始しました。...

海外メディアは、Weiboが4月17日に上場すると報じた。

【TechWeb Report】昨日、米国の金融ウェブサイトiposcoopによると、Weiboは米...

ウェブサイトの基本的な最適化にはどのような詳細が含まれますか?

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っていますウェブサイ...

フレンドリーリンクを作成して自社のPRを向上させる新しいウェブサイトの作り方

今日はフレンドリーリンクの作り方についてお話します。ウェブサイトが開設されてからわずか1週間後、Ba...

SEOにおけるnofollowタグの属性と機能の包括的な分析

この記事では、nofollow タグの使い方と書き方を 5 つの知識ポイントから総合的に分析します。...

クラウド セキュリティ: 2023 年の 5 つの予測

COVID-19 パンデミックの間、企業の働き方は根本的に変化し、パブリック クラウド環境とプライベ...

BaiduとGoogleの最適化の違いの詳細な説明

多くの人が私に尋ねます。最適化の面で Google と Baidu の違いは何ですか? 私だけではな...

キーワード密度とアンカーポイントを正しく制御し理解する方法について簡単に説明します。

みなさんこんにちは。私はハルビンバーチャルリアリティデザインです。最近、キーワードのランキングを研究...

Baidu SEO と Google SEO には違いがありますか?ザックは、検索エンジンは良いウェブサイトを好むと言った。

百度がネガティブなニュースに登場するたびに、多くの人がグーグルを懐かしみ、グーグルが来てくれたら最高...