Alibaba Cloud RemoteShuffleService の新機能: AQE とフロー制御

Alibaba Cloud RemoteShuffleService の新機能: AQE とフロー制御

著者: イーチュイ、ミンジ

Alibaba Cloud EMR は 2020 年にリモートシャッフルサービス (RSS) を開始して以来、多くのお客様が Spark ジョブのパフォーマンスと安定性の問題を解決し、ストレージとコンピューティングの分離アーキテクチャの実装を可能にしてきました。誰もがより簡単に利用し、拡張できるようにするために、RSS は 2022 年初頭にオープンソース化され、あらゆる分野の開発者がその構築に貢献することを歓迎します。 RSSの全体的なアーキテクチャについては[1]を参照してください。この記事では、RSS の 2 つの最新の重要な機能、Adaptive Query Execution (AQE) とフロー制御のサポートについて紹介します。

1. RSSはAQEをサポートする

1. AQE の概要

適応型クエリ実行(AQE)はSpark 3の重要な機能です[2]。実行時統計を収集して後続の実行プランを動的に調整することで、オプティマイザーが統計を正確に推定できないために実行プランが適切に生成されない問題を解決します。 AQE には、パーティション結合、スイッチ結合戦略、およびスキュー結合の最適化という 3 つの主要な最適化シナリオがあります。これら 3 つのシナリオはすべて、Shuffle フレームワークの機能に新たな要件を提起します。

パーティションの結合

パーティションのマージの目的は、リデューサーによって処理されるデータの量を適度かつ均一にすることです。このアプローチでは、まず、より多くのパーティションに応じてマッパーにシャッフル書き込みを実行させます。 AQE フレームワークは各パーティションのサイズをカウントします。連続する複数のパーティション内のデータ量が比較的少ない場合、これらのパーティションは 1 つにマージされ、処理のために Reducer に渡されます。手順は以下のとおりです。

上の図からわかるように、最適化された Reducer2 は、元々 Reducer2-4 に属していたデータを読み取る必要があります。 Shuffle フレームワークの要件は、ShuffleReader が範囲パーティションをサポートする必要があることです。

デフgetReader [ K , C ] (
ハンドル: ShuffleHandle
開始パーティション: Int ,
終了パーティション: Int ,
コンテキスト: TaskContext ) : ShuffleReader [ K , C ]

参加戦略の切り替え

結合戦略を切り替える目的は、統計の推定が不正確であるために、オプティマイザーが誤ってブロードキャスト結合ではなく SortMerge 結合または ShuffleHash 結合を選択する問題を修正することです。具体的には、結合する 2 つのテーブルが Shuffle Write を完了した後、AQE フレームワークは実際のサイズをカウントします。小さいテーブルがブロードキャスト結合の条件を満たしていることが判明した場合、小さいテーブルはブロードキャストされ、大きいテーブルのローカルシャッフルデータと結合されます。プロセスは次のとおりです。

結合戦略を切り替えるための最適化は 2 つあります。1. ブロードキャスト結合に書き換える。 2. LocalShuffleReader を介して、大きなテーブルのデータをローカルで直接読み取ります。 Shuffle フレームワークの 2 番目の新しい要件は、ローカル読み取りをサポートすることです。

傾斜結合の最適化

傾斜結合の最適化の目的は、より多くのリデューサーが傾斜パーティションを処理できるようにし、ロングテールを回避することです。具体的には、シャッフル書き込みが完了すると、AQE フレームワークは各パーティションのサイズをカウントし、特定のルールに基づいて偏りがあるかどうかを判断します。その場合、パーティションは複数の分割に分割され、各分割は別のテーブルの対応するパーティションと結合されます。下記の通りです。

パーティション分割は、Shuffle Output のサイズを MapId 順に累積することによって行われ、累積値がしきい値を超えると分割がトリガーされます。 Shuffle フレームワークの新しい要件は、ShuffleReader が範囲 MapId をサポートできることです。包括的なパーティションマージの最適化のための範囲パーティションの要件を考慮して、ShuffleReader のインターフェースは次のように進化しました。

デフgetReader [ K , C ] (
ハンドル: ShuffleHandle
開始マップインデックス: Int
終了マップインデックス: Int ,
開始パーティション: Int ,
終了パーティション: Int ,
コンテキスト: TaskContext
メトリクス: ShuffleReadMetricsReporter ) : ShuffleReader [ K , C ]

2. RSSアーキテクチャのレビュー

RSS のコア設計は、プッシュ シャッフル + パーティション データ集約です。つまり、異なる Mapper が同じパーティションに属するデータを同じ Worker にプッシュして集約し、Reducer が集約されたファイルを直接読み取ります。下の図の通りです。

RSS は、コア設計に加えて、複数のレプリカ、フルリンク フォールト トレランス、マスター HA、ディスク フォールト トレランス、アダプティブ プッシャー、ローリング アップグレードなどの機能も実装しています。詳細については[1]を参照。

3. RSSはパーティションのマージをサポートします

パーティションのマージには、範囲パーティションをサポートする Shuffle フレームワークが必要です。 RSS では、各パーティションがファイルに対応しているため、次の図に示すように自然にサポートされます。

4. RSSは参加戦略の切り替えをサポート

Shuffle フレームワークの Join 戦略切り替えの要件は、LocalShuffleReader をサポートすることです。 RSS のリモート属性により、データは RSS クラスターに保存され、RSS とコンピューティング クラスターが同じ場所に配置されている場合にのみローカルに存在します。したがって、ローカル読み取りはまだサポートされていません (コロケーション シナリオは将来最適化され、サポートされる予定です)。ローカル読み取りはサポートされていませんが、結合書き換えには影響しないことに注意してください。 RSS は、次の図に示すように、結合書き換えの最適化をサポートしています。

5. RSSはJoin Tilt最適化をサポートします

3 つの AQE シナリオの中で、Join 傾斜最適化の RSS サポートが最も困難です。 RSS のコア設計はパーティション データ集約であり、Shuffle Read のランダム読み取りをシーケンシャル読み取りに変換して、パフォーマンスと安定性を向上させることを目的としています。複数の Mapper が同時に RSS Worker にデータをプッシュします。 RSS はメモリ内のデータを集約し、それをディスクにフラッシュします。したがって、次の図に示すように、パーティション ファイル内の異なるマッパーのデータは順序が乱れます。

結合スキューの最適化には、Map1-2 からデータを読み取るなど、範囲マップを読み取る必要があります。一般的なアプローチは 2 つあります。

  • ファイル全体を読み取り、範囲外のデータを破棄します。
  • インデックス ファイルをインポートし、各ブロックの場所と MapId を記録し、範囲内のデータのみを読み取ります。

どちらのアプローチにも問題点は明らかです。方法 1 では、大量の冗長ディスク読み取りが発生します。方法 2 は基本的にランダム読み取りに戻り、RSS のコアの利点が失われ、インデックス ファイルの作成は、偏りのないデータであっても普遍的なオーバーヘッドになります (シャッフル書き込みプロセス中に偏りがあるかどうかを正確に予測することは困難です)。

上記の 2 つの問題を解決するために、Active Split + Sort On Read という新しい設計を提案しました。

アクティブスプリット

歪んだパーティションのサイズは非常に大きくなる可能性があり、極端な場合にはディスクが直接爆発する可能性があります。非スキューシナリオでも、大きなパーティションが発生する可能性は依然として高くなります。したがって、ディスク負荷分散の観点から、パーティション ファイルのサイズを監視し、アクティブ分割を実行することが非常に重要です (デフォルトのしきい値は 256 m です)。

分割が発生すると、RSS は現在のパーティションのワーカーのペア (プライマリ レプリカとセカンダリ レプリカ) を再割り当てし、後続のデータは新しいワーカーにプッシュされます。実行中の Mapper への Split の影響を回避するために、Soft Split メソッドを提案しました。つまり、Split がトリガーされると、RSS は非同期的に新しい Worker を準備し、Ready 後に Mapper の PartitionLocation 情報をホット更新するため、Mapper の PushData に干渉することはありません。全体的なプロセスを下の図に示します。

既読順に並べ替え

ランダム読み取りの問題を回避するために、RSS は Sort On Read 戦略を採用しています。具体的には、File Split の最初の Range 読み取りによってソートがトリガーされ (Range 以外の読み取りではトリガーされません)、ソートされたファイルは位置インデックスとともにディスクに書き戻されます。後続の範囲読み取りは、順次読み取りであることが保証されます。下の図の通りです。

複数のサブリデューサーが同じファイル分割のソートを待機することを回避するために、次の図に示すように、各サブリデューサーが分割を読み取る順序を分割します。

ソートの最適化

Sort On Read は冗長な読み取りとランダム読み取りを効果的に回避できますが、分割ファイル (256M) をソートする必要があります。このセクションでは、ソートの実装とオーバーヘッドについて説明します。ファイルの並べ替えには、ファイルの読み取り、MapId の並べ替え、ファイルの書き込みという 3 つの手順が含まれます。 RSS のデフォルトのブロック サイズは 256k で、ブロック数は約 1000 なので、ソート処理は非常に高速で、主なコストはファイルの読み取りと書き込みにあります。ソートプロセス全体には、おおよそ 3 つのオプションがあります。

  • ファイル サイズのメモリを事前に割り当て、ファイル全体を読み取り、MapId を解析して並べ替え、ブロックを MapId 順にディスクに書き戻します。
  • メモリを割り当てず、各ブロックの位置をシークし、MapId を解析してソートし、元のファイルのブロックを MapId の順序で新しいファイルに転送します。
  • 小さなメモリ ブロック (256k など) を割り当て、ファイル全体を順番に読み取り、MapId を解析して並べ替え、元のファイルのブロックを MapId の順序で新しいファイルに転送します。

IO の観点から見ると、一見すると、ソリューション 1 では十分なメモリを使用して順次読み取りと書き込みを行うことができません。ソリューション 2 にはランダム読み取りとランダム書き込みがあります。ソリューション 3 にはランダム書き込みがあります。直感的には、ソリューション 1 の方がパフォーマンスが優れています。ただし、PageCache が存在するため、ソリューション 3 でファイルを書き込むと、元のファイルが PageCache にキャッシュされる可能性があります。したがって、次の図に示すように、ソリューション 3 の実際のパフォーマンスは優れています。

同時に、ソリューション 3 は追加のプロセス メモリを占有する必要がないため、RSS はソリューション 3 のアルゴリズムを採用しています。次の図に示すように、ソートおよびインデックス付けを行わない Sort On Read とランダム読み取り方式の比較もテストしました。

全体的なプロセス

Join 傾斜最適化をサポートする RSS の全体的なプロセスを次の図に示します。

2. RSSフロー制御

フロー制御の主な目的は、RSS ワーカーのメモリが枯渇するのを防ぐことです。フロー制御には通常 2 つの方法があります。

  • クライアントは各 PushData の前にワーカー用のメモリを予約し、予約が成功した後にのみプッシュがトリガーされます。
  • 作業者側のバックプレッシャー。

PushData は非常に高頻度でパフォーマンスが重要な操作であるため、プッシュごとに追加の RPC 操作を実行するとオーバーヘッドが高くなりすぎるため、バック プレッシャー戦略を採用しました。ワーカーの観点から見ると、受信データのソースは 2 つあります。

  • クライアントからプッシュされたデータ
  • プライマリレプリカから送信されたデータ

次の図に示すように、Worker2 は Mapper によってプッシュされた Partition3 のデータと Worker1 によって送信された Partition1 のレプリカ データの両方を受信し、Partition3 のデータを対応するスレーブ レプリカに送信します。

Mapper によってプッシュされたデータは、次の条件が満たされた場合にのみ解放されます。

  • レプリケーションが正常に実行されました
  • データはディスクに正常に書き込まれました

プライマリ レプリカからプッシュされたデータは、次の条件が満たされた場合にのみメモリから解放されます。

  • データはディスクに正常に書き込まれました

フロー制御戦略を設計する際には、フロー制限(流入するデータ量の削減)だけでなく、フロー漏洩(メモリを適時に解放できる)も考慮する必要があります。具体的には、高水準点 (それぞれメモリ使用量 85% と 95% に相当) に対して 2 つのメモリしきい値を定義し、低水準点 (メモリ使用量 50%) に対しては 1 つのメモリしきい値のみを定義します。高水位しきい値に達すると、フロー制御がトリガーされ、Mapper によってプッシュされたデータの受信が一時停止され、ディスクが強制的にフラッシュされ、フロー排出の目的が達成されます。マッパーからの流入を制限するだけではプライマリレプリカからのフローを制御できないため、2 番目の最高水準点を定義します。このしきい値に達すると、プライマリレプリカによって送信されたデータの受信も同時に中断されます。水位が低水位より低くなると、正常に戻ります。全体的なプロセスを下の図に示します。

3. パフォーマンステスト

Spark 3.2.0 で AQE を有効にした RSS とネイティブの外部 Shufle サービス (ESS) のパフォーマンスを比較しました。 RSS はコロケーション アプローチを採用しており、追加のマシン リソースを占有しません。また、RSS が使用するメモリは 8 GB であり、これはマシン メモリ (マシン メモリは 352 GB) の 2.3% にすぎません。具体的な環境は以下の通りです。

1.テスト環境

ハードウェア:

ヘッダー マシン グループ 1x ecs.g5.4xlarge、ワーカー マシン グループ 8x ecs.d2c.24xlarge、96 CPU、352 GB、12x 3700GB HDD。

Spark AQE 関連の構成:

 spark .sql .adaptive .enabled がtrue
spark .sql .adaptive .coalescePartitions .enabled true
spark .sql .adaptive .coalescePartitions .initialPartitionNum 1000
spark .sql .adaptive .skewJoin .enabled true
spark .sql .adaptive .localShuffleReader .enabled がfalse

RSS 関連の設定:

 RSS_マスター_メモリ= 2g
RSS_WORKER_MEMORY = 1g
RSS_WORKER_OFFHEAP_MEMORY = 7g

2. TPCDS 10Tテストセット

10TB TPCDS をテストしました。 E2E の観点から見ると、ESS は 11734 秒、RSS シングルコピー/デュアルコピーはそれぞれ 8971 秒/10110 秒かかり、次の図に示すように、ESS よりも 23.5%/13.8% 高速でした。 RSS が 2 つのレプリカを有効にすると、ネットワーク帯域幅が上限に達することがわかりました。これが、2 つのレプリカが 1 つのレプリカよりも低くなる主な理由です。

各クエリの具体的な時間比較は次のとおりです。

関連リンク

GitHub アドレス: https://github.com/alibaba/RemoteShuffleService

参照

[1] Alibaba Cloud EMRリモートシャッフルサービスのXiaomiとオープンソースでの実践。 https://developer.aliyun.com/article/857757

[2]適応型クエリ実行:実行時のSpark SQLの高速化。 https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html

<<:  クラウド データ セキュリティのベスト プラクティスを学びましたか?

>>:  クラウドコンピューティングが「後半」に突入する中、国内クラウドコンピューティングの活路はどこにあるのでしょうか?

推薦する

JVM 世代別ガベージコレクションメカニズムとガベージコレクションアルゴリズム

[[433574]] 1. GCとは何かGC (ガベージ コレクション) ガベージ コレクションは、...

スーパーコンピュータ + デジタルネイティブオペレーティングシステム、Alibaba Cloud 2.0を理解する

[51CTO.com からのオリジナル記事] 「1.0 時代のクラウドは、実行に複雑なコード命令のセ...

これら4つのポイントをマスターすれば、キーワードの選択に悩む必要はありません

キーワードの選択は、SEO 担当者をテストする重要な要素です。キーワードの難易度、キーワードのレイア...

クラウドにも独自のネットワークが必要です。 SDNとVPCの存在

クラウド コンピューティングを水資源に例えると、クラウド ネットワークは蛇口のようなものです。クラウ...

LightCloud - 国慶節 12% オフ/高防御 VPS+CN2 ネットワーク+ゴールデン シールド ファイアウォール「言葉では言い表せない」

LightCloudは現在、フランス、カナダ、韓国、香港のデータセンターでホスティングサービスを運営...

タイトルを変更してウェブサイトのスナップショットを更新する方法の実践的な分析

ウェブマスターがウェブサイトの品質を判断する際に最も重要な要素の 1 つは、ウェブサイトのスナップシ...

Weiboマーケティングをビジネスに活用する方法

ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス1. 企業がWeiboマ...

ウェブサイトのキーワードの選び方

キーワードは単語だけでなく、単語、フレーズ、単語、さらには文も指します。ウェブサイトを運営している人...

2012年にウェブサイトのトラフィックを増やす方法

SEOER は 2012 年にウェブサイトのトラフィックをどのように増やすべきでしょうか? ウェブサ...

Baidu はプレーンテキストリンクをクロールします Lee が SEO 外部リンク構築ルールについて語ります

ユーザーはプレーンテキスト URL アドレスが指すページを直接クリックできず、バックリンク定義の「対...

Baiduのテキストリンクに関する概要と推測

今日、Baidu の外部リンク ツールを使用して、所有するいくつかの Web サイトの外部リンクを確...

ブルーオーシャンからレッドオーシャンへ、ソーシャルコンテンツ電子商取引の未来はどこにあるのでしょうか?

ネットセレブやスターに倣って買い物をすることは、多くの人にとって一般的なショッピングパターンとなって...

海外イースターホスト商品プロモーション概要

海外ではイースタープロモーションがすでに始まっており、仮想ホスト、VPS、専用サーバーなど多くの企業...

ウェブサイトの最適化で検索エンジンに対応する方法

検索エンジンは今やより人間化される傾向にあり、例えば、Baidu は現在、ユーザーのブラウザ内の C...