著者: イーチュイ、ミンジ Alibaba Cloud EMR は 2020 年にリモートシャッフルサービス (RSS) を開始して以来、多くのお客様が Spark ジョブのパフォーマンスと安定性の問題を解決し、ストレージとコンピューティングの分離アーキテクチャの実装を可能にしてきました。誰もがより簡単に利用し、拡張できるようにするために、RSS は 2022 年初頭にオープンソース化され、あらゆる分野の開発者がその構築に貢献することを歓迎します。 RSSの全体的なアーキテクチャについては[1]を参照してください。この記事では、RSS の 2 つの最新の重要な機能、Adaptive Query Execution (AQE) とフロー制御のサポートについて紹介します。 1. RSSはAQEをサポートする1. AQE の概要適応型クエリ実行(AQE)はSpark 3の重要な機能です[2]。実行時統計を収集して後続の実行プランを動的に調整することで、オプティマイザーが統計を正確に推定できないために実行プランが適切に生成されない問題を解決します。 AQE には、パーティション結合、スイッチ結合戦略、およびスキュー結合の最適化という 3 つの主要な最適化シナリオがあります。これら 3 つのシナリオはすべて、Shuffle フレームワークの機能に新たな要件を提起します。 パーティションの結合パーティションのマージの目的は、リデューサーによって処理されるデータの量を適度かつ均一にすることです。このアプローチでは、まず、より多くのパーティションに応じてマッパーにシャッフル書き込みを実行させます。 AQE フレームワークは各パーティションのサイズをカウントします。連続する複数のパーティション内のデータ量が比較的少ない場合、これらのパーティションは 1 つにマージされ、処理のために Reducer に渡されます。手順は以下のとおりです。 上の図からわかるように、最適化された Reducer2 は、元々 Reducer2-4 に属していたデータを読み取る必要があります。 Shuffle フレームワークの要件は、ShuffleReader が範囲パーティションをサポートする必要があることです。 デフgetReader [ K , C ] ( 参加戦略の切り替え結合戦略を切り替える目的は、統計の推定が不正確であるために、オプティマイザーが誤ってブロードキャスト結合ではなく SortMerge 結合または ShuffleHash 結合を選択する問題を修正することです。具体的には、結合する 2 つのテーブルが Shuffle Write を完了した後、AQE フレームワークは実際のサイズをカウントします。小さいテーブルがブロードキャスト結合の条件を満たしていることが判明した場合、小さいテーブルはブロードキャストされ、大きいテーブルのローカルシャッフルデータと結合されます。プロセスは次のとおりです。 結合戦略を切り替えるための最適化は 2 つあります。1. ブロードキャスト結合に書き換える。 2. LocalShuffleReader を介して、大きなテーブルのデータをローカルで直接読み取ります。 Shuffle フレームワークの 2 番目の新しい要件は、ローカル読み取りをサポートすることです。 傾斜結合の最適化傾斜結合の最適化の目的は、より多くのリデューサーが傾斜パーティションを処理できるようにし、ロングテールを回避することです。具体的には、シャッフル書き込みが完了すると、AQE フレームワークは各パーティションのサイズをカウントし、特定のルールに基づいて偏りがあるかどうかを判断します。その場合、パーティションは複数の分割に分割され、各分割は別のテーブルの対応するパーティションと結合されます。下記の通りです。 パーティション分割は、Shuffle Output のサイズを MapId 順に累積することによって行われ、累積値がしきい値を超えると分割がトリガーされます。 Shuffle フレームワークの新しい要件は、ShuffleReader が範囲 MapId をサポートできることです。包括的なパーティションマージの最適化のための範囲パーティションの要件を考慮して、ShuffleReader のインターフェースは次のように進化しました。 デフgetReader [ K , C ] ( 2. RSSアーキテクチャのレビューRSS のコア設計は、プッシュ シャッフル + パーティション データ集約です。つまり、異なる Mapper が同じパーティションに属するデータを同じ Worker にプッシュして集約し、Reducer が集約されたファイルを直接読み取ります。下の図の通りです。 RSS は、コア設計に加えて、複数のレプリカ、フルリンク フォールト トレランス、マスター HA、ディスク フォールト トレランス、アダプティブ プッシャー、ローリング アップグレードなどの機能も実装しています。詳細については[1]を参照。 3. RSSはパーティションのマージをサポートしますパーティションのマージには、範囲パーティションをサポートする Shuffle フレームワークが必要です。 RSS では、各パーティションがファイルに対応しているため、次の図に示すように自然にサポートされます。 4. RSSは参加戦略の切り替えをサポートShuffle フレームワークの Join 戦略切り替えの要件は、LocalShuffleReader をサポートすることです。 RSS のリモート属性により、データは RSS クラスターに保存され、RSS とコンピューティング クラスターが同じ場所に配置されている場合にのみローカルに存在します。したがって、ローカル読み取りはまだサポートされていません (コロケーション シナリオは将来最適化され、サポートされる予定です)。ローカル読み取りはサポートされていませんが、結合書き換えには影響しないことに注意してください。 RSS は、次の図に示すように、結合書き換えの最適化をサポートしています。 5. RSSはJoin Tilt最適化をサポートします3 つの AQE シナリオの中で、Join 傾斜最適化の RSS サポートが最も困難です。 RSS のコア設計はパーティション データ集約であり、Shuffle Read のランダム読み取りをシーケンシャル読み取りに変換して、パフォーマンスと安定性を向上させることを目的としています。複数の Mapper が同時に RSS Worker にデータをプッシュします。 RSS はメモリ内のデータを集約し、それをディスクにフラッシュします。したがって、次の図に示すように、パーティション ファイル内の異なるマッパーのデータは順序が乱れます。 結合スキューの最適化には、Map1-2 からデータを読み取るなど、範囲マップを読み取る必要があります。一般的なアプローチは 2 つあります。
どちらのアプローチにも問題点は明らかです。方法 1 では、大量の冗長ディスク読み取りが発生します。方法 2 は基本的にランダム読み取りに戻り、RSS のコアの利点が失われ、インデックス ファイルの作成は、偏りのないデータであっても普遍的なオーバーヘッドになります (シャッフル書き込みプロセス中に偏りがあるかどうかを正確に予測することは困難です)。 上記の 2 つの問題を解決するために、Active Split + Sort On Read という新しい設計を提案しました。 アクティブスプリット歪んだパーティションのサイズは非常に大きくなる可能性があり、極端な場合にはディスクが直接爆発する可能性があります。非スキューシナリオでも、大きなパーティションが発生する可能性は依然として高くなります。したがって、ディスク負荷分散の観点から、パーティション ファイルのサイズを監視し、アクティブ分割を実行することが非常に重要です (デフォルトのしきい値は 256 m です)。 分割が発生すると、RSS は現在のパーティションのワーカーのペア (プライマリ レプリカとセカンダリ レプリカ) を再割り当てし、後続のデータは新しいワーカーにプッシュされます。実行中の Mapper への Split の影響を回避するために、Soft Split メソッドを提案しました。つまり、Split がトリガーされると、RSS は非同期的に新しい Worker を準備し、Ready 後に Mapper の PartitionLocation 情報をホット更新するため、Mapper の PushData に干渉することはありません。全体的なプロセスを下の図に示します。 既読順に並べ替えランダム読み取りの問題を回避するために、RSS は Sort On Read 戦略を採用しています。具体的には、File Split の最初の Range 読み取りによってソートがトリガーされ (Range 以外の読み取りではトリガーされません)、ソートされたファイルは位置インデックスとともにディスクに書き戻されます。後続の範囲読み取りは、順次読み取りであることが保証されます。下の図の通りです。 複数のサブリデューサーが同じファイル分割のソートを待機することを回避するために、次の図に示すように、各サブリデューサーが分割を読み取る順序を分割します。 ソートの最適化Sort On Read は冗長な読み取りとランダム読み取りを効果的に回避できますが、分割ファイル (256M) をソートする必要があります。このセクションでは、ソートの実装とオーバーヘッドについて説明します。ファイルの並べ替えには、ファイルの読み取り、MapId の並べ替え、ファイルの書き込みという 3 つの手順が含まれます。 RSS のデフォルトのブロック サイズは 256k で、ブロック数は約 1000 なので、ソート処理は非常に高速で、主なコストはファイルの読み取りと書き込みにあります。ソートプロセス全体には、おおよそ 3 つのオプションがあります。
IO の観点から見ると、一見すると、ソリューション 1 では十分なメモリを使用して順次読み取りと書き込みを行うことができません。ソリューション 2 にはランダム読み取りとランダム書き込みがあります。ソリューション 3 にはランダム書き込みがあります。直感的には、ソリューション 1 の方がパフォーマンスが優れています。ただし、PageCache が存在するため、ソリューション 3 でファイルを書き込むと、元のファイルが PageCache にキャッシュされる可能性があります。したがって、次の図に示すように、ソリューション 3 の実際のパフォーマンスは優れています。 同時に、ソリューション 3 は追加のプロセス メモリを占有する必要がないため、RSS はソリューション 3 のアルゴリズムを採用しています。次の図に示すように、ソートおよびインデックス付けを行わない Sort On Read とランダム読み取り方式の比較もテストしました。 全体的なプロセスJoin 傾斜最適化をサポートする RSS の全体的なプロセスを次の図に示します。 2. RSSフロー制御フロー制御の主な目的は、RSS ワーカーのメモリが枯渇するのを防ぐことです。フロー制御には通常 2 つの方法があります。
PushData は非常に高頻度でパフォーマンスが重要な操作であるため、プッシュごとに追加の RPC 操作を実行するとオーバーヘッドが高くなりすぎるため、バック プレッシャー戦略を採用しました。ワーカーの観点から見ると、受信データのソースは 2 つあります。
次の図に示すように、Worker2 は Mapper によってプッシュされた Partition3 のデータと Worker1 によって送信された Partition1 のレプリカ データの両方を受信し、Partition3 のデータを対応するスレーブ レプリカに送信します。 Mapper によってプッシュされたデータは、次の条件が満たされた場合にのみ解放されます。
プライマリ レプリカからプッシュされたデータは、次の条件が満たされた場合にのみメモリから解放されます。
フロー制御戦略を設計する際には、フロー制限(流入するデータ量の削減)だけでなく、フロー漏洩(メモリを適時に解放できる)も考慮する必要があります。具体的には、高水準点 (それぞれメモリ使用量 85% と 95% に相当) に対して 2 つのメモリしきい値を定義し、低水準点 (メモリ使用量 50%) に対しては 1 つのメモリしきい値のみを定義します。高水位しきい値に達すると、フロー制御がトリガーされ、Mapper によってプッシュされたデータの受信が一時停止され、ディスクが強制的にフラッシュされ、フロー排出の目的が達成されます。マッパーからの流入を制限するだけではプライマリレプリカからのフローを制御できないため、2 番目の最高水準点を定義します。このしきい値に達すると、プライマリレプリカによって送信されたデータの受信も同時に中断されます。水位が低水位より低くなると、正常に戻ります。全体的なプロセスを下の図に示します。 3. パフォーマンステストSpark 3.2.0 で AQE を有効にした RSS とネイティブの外部 Shufle サービス (ESS) のパフォーマンスを比較しました。 RSS はコロケーション アプローチを採用しており、追加のマシン リソースを占有しません。また、RSS が使用するメモリは 8 GB であり、これはマシン メモリ (マシン メモリは 352 GB) の 2.3% にすぎません。具体的な環境は以下の通りです。 1.テスト環境ハードウェア: ヘッダー マシン グループ 1x ecs.g5.4xlarge、ワーカー マシン グループ 8x ecs.d2c.24xlarge、96 CPU、352 GB、12x 3700GB HDD。 Spark AQE 関連の構成: spark .sql .adaptive .enabled がtrue RSS 関連の設定: RSS_マスター_メモリ= 2g 2. TPCDS 10Tテストセット10TB TPCDS をテストしました。 E2E の観点から見ると、ESS は 11734 秒、RSS シングルコピー/デュアルコピーはそれぞれ 8971 秒/10110 秒かかり、次の図に示すように、ESS よりも 23.5%/13.8% 高速でした。 RSS が 2 つのレプリカを有効にすると、ネットワーク帯域幅が上限に達することがわかりました。これが、2 つのレプリカが 1 つのレプリカよりも低くなる主な理由です。 各クエリの具体的な時間比較は次のとおりです。 関連リンクGitHub アドレス: https://github.com/alibaba/RemoteShuffleService 参照[1] Alibaba Cloud EMRリモートシャッフルサービスのXiaomiとオープンソースでの実践。 https://developer.aliyun.com/article/857757 [2]適応型クエリ実行:実行時のSpark SQLの高速化。 https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html |
<<: クラウド データ セキュリティのベスト プラクティスを学びましたか?
>>: クラウドコンピューティングが「後半」に突入する中、国内クラウドコンピューティングの活路はどこにあるのでしょうか?
[[433574]] 1. GCとは何かGC (ガベージ コレクション) ガベージ コレクションは、...
[51CTO.com からのオリジナル記事] 「1.0 時代のクラウドは、実行に複雑なコード命令のセ...
キーワードの選択は、SEO 担当者をテストする重要な要素です。キーワードの難易度、キーワードのレイア...
クラウド コンピューティングを水資源に例えると、クラウド ネットワークは蛇口のようなものです。クラウ...
LightCloudは現在、フランス、カナダ、韓国、香港のデータセンターでホスティングサービスを運営...
ウェブマスターがウェブサイトの品質を判断する際に最も重要な要素の 1 つは、ウェブサイトのスナップシ...
ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス1. 企業がWeiboマ...
キーワードは単語だけでなく、単語、フレーズ、単語、さらには文も指します。ウェブサイトを運営している人...
SEOER は 2012 年にウェブサイトのトラフィックをどのように増やすべきでしょうか? ウェブサ...
Siteground からの公式ニュース: 11 月 29 日から 12 月 3 日まで、仮想ホスト...
ユーザーはプレーンテキスト URL アドレスが指すページを直接クリックできず、バックリンク定義の「対...
今日、Baidu の外部リンク ツールを使用して、所有するいくつかの Web サイトの外部リンクを確...
ネットセレブやスターに倣って買い物をすることは、多くの人にとって一般的なショッピングパターンとなって...
海外ではイースタープロモーションがすでに始まっており、仮想ホスト、VPS、専用サーバーなど多くの企業...
検索エンジンは今やより人間化される傾向にあり、例えば、Baidu は現在、ユーザーのブラウザ内の C...