全体的なアーキテクチャ BlockManager は Spark の重要なコンポーネントです。 BlockManager は、Spark の実行プロセスのあらゆる場所に存在します。 BlockManager の原理とメカニズムを理解することによってのみ、Spark をより深く理解することができます。今日は、BlockaManager の基本原理と設計のアイデアを紹介します。 BlockManager は、Spark に組み込まれた、Spark 向けにカスタマイズされたキー値分散ストレージ システムです。 BlockManager は、すべてのドライバーとエグゼキューターを含む、Spark アプリケーション内のすべてのノード上でローカル キャッシュとして実行されます。 BlockManager は、ローカルとリモートに対して一貫したデータ ブロックの取得および設定インターフェイスを提供します。 BlockManager 自体は、メモリ、ディスク、オフヒープなどのさまざまなストレージ方法を使用してこのデータを保存しています。 上記は全体的なアーキテクチャ図です。 BlockManagerMaster には、BlockManagerMasterEndpoint のアクターとすべての BlockManagerSlaveEndpoint の参照があります。これらの参照を通じてスレーブにコマンドを発行できます。 executor ノード上の BlockManagerMaster には、BlockManagerMasterEndpoint の参照と独自の BlockManagerSlaveEndpoint アクターがあります。マスターの参照を通じて自身を登録できます。 マスターとスレーブが正常に通信できるようになると、設計された相互作用プロトコルに従って相互作用できるようになり、分散キャッシュ システム全体が稼働できるようになります。 初期化 sparkEnv が起動するとさまざまなコンポーネントが起動されることがわかっていますが、BlockManager も例外ではありません。これもこのタイミングで開始されます。 起動時には、ドライバー側か実行側かによって異なる起動プロセスが実行されます。
上の図は、マスター上で sparkEnv が起動されると、BlockManagerMasterEndpoint が構築され、この Endpoint が rpcEnv に登録され、独自の BlockManager も起動されることを示しています。 上の図は、executor 上で sparkEnv が起動されると、setupEndpointRef メソッドを通じて BlockManagerMaster の参照 BlockManagerMasterRef を取得し、独自の BlockManager も起動することを示しています。 BlockManager は初期化されると、BlockManagerMasterEndpoint に自身を登録します。 BlockManagerMasterEndpoint は registerBlockManager メッセージを送信します。 BlockManagerMasterEndpoint はメッセージを受信し、後で使用するために BlockManagerSlaveEndpoint の参照を独自の blockManagerInfo データ構造に保存します。 分散プロトコル 次の表は、マスターとスレーブが受信するさまざまな種類のメッセージと、メッセージを受信した後に実行される処理を示しています。
上記のプロトコルに基づいて、相互作用プロセス全体を明確に推測できると考えています。一般的なプロセスは次のようになります。スレーブの BlockManager は独自の接続でブロックを保存し、この BlockId をマスターの BlockManager に報告します。キャッシュ、シャッフル、またはブロードキャストの後、他のノードが前のステップのブロックを必要とする場合、マスターに移動してデータの場所を取得し、対応するノードに移動して取得します。 ストレージ層 RDD レベルでは、RDD はさまざまなパーティションで構成されており、実行する変換とアクションはパーティションに対して実行されることがわかります。ストレージ モジュール内では、RDD は異なるブロックで構成されていると見なされ、RDD はブロック単位でアクセスされます。本質的には、パーティションとブロックは同等ですが、異なる視点から見られます。 Spark ストレージ モジュール内のデータにアクセスするための最小単位はブロックであり、すべての操作はブロック内で実行されます。 BlockManager オブジェクトが作成されると、ブロックにアクセスするための MemoryStore オブジェクトと DiskStore オブジェクトが作成されます。メモリに十分なメモリがある場合は、MemoryStore がストレージとして使用されます。そうでない場合は、ディスクに書き出され、DiskStore を通じて保存されます。
BlockManagerのPUTおよびGETインターフェース BlockManager は Put インターフェースと Get インターフェースを提供します。これら 2 つの API は基礎となる詳細を保護します。基礎となるレイヤーがどのように実装されているかを見てみましょう。
blockManagerとblockTransferServiceの関係 Spark はこれまで 2 つのネットワーク フレームワークを使用してきました。当初、Akka は RPC 呼び出しに使用され、Netty は大容量ファイルの転送に使用されていました。その後、すべてNettyが使用されるようになりました。ここでの大きなファイル転送は、実際には Netty によって行われます。 blockManager を起動すると、blockTransferService サービスが開始されます。このサービスは大きなファイルを転送するために使用されます。対応する特定のクラスは NettyBlockTransferService です。このインスタンスには、BlocakManager への参照もあります。 NettyBlockRpcServer の Netty ハンドラーが起動され、サービスを提供するために BlocakManager への参照も保持されます。 BlocakManager は BlockId に従ってブロックを取得し、それを ManagedBuffer オブジェクトとしてラップします。 リモート エンドからブロックを取得する必要がある場合は、大きなバイト配列を転送するために blockTransferService が必要です。 まず、ドライバーからブロックの実際のストレージ場所を取得し、次に blockTransferService の fetchBlocks メソッドを呼び出して、他の実際のストレージ ノードからデータを取得する必要があります。クライアントはクライアント リソース プールから取得されます。 1対1フェッチの場合は、OneForOneBlockFetcher が使用されます。このフェッチャーはチャンク単位で個別にフェッチします。各チャンクはブロックのデータに対応します。設定に応じて、再試行の最大回数に達するまで再試行が行われ、対応する BlockId を含む OpenBlocks メッセージが送信されます。他のノード サーバーは、BlockId に従って blockManager からデータを取得し、それを送信に使用します。 Nettyのストリーミング伝送方式を採用しており、コールバック関数も存在します。 バックアップ中にブロックが同期的にアップロードされると、他のノード サーバーは、uploadBlock メッセージに含まれる BlockId に基づいて、ローカル BlockManager にコピーを冗長的に保存します。 ChunkFetch にも Stream に似た概念があります。 ChunkFetch のオブジェクトは「メモリ内の Iterator[ManagedBuffer]」、つまり Buffer のグループであり、各 Buffer は chunkIndex に対応し、Iterator[ManagedBuffer] 全体は StreamID によって識別されます。クライアントからの各 ChunkFetch リクエストは、(streamId、chunkIndex) で構成される一意の StreamChunkId です。サーバーは StreamChunkId に基づいてそれをバッファーとして取得し、クライアントに返します。 Stream か ChunkFetch かに関係なく、StreamID とリソース間のマッピング セットは、サーバーのメモリ、つまり StreamManager クラスで管理する必要があります。StreamManager クラスは、ChunkFetch と Stream の 2 つの操作にそれぞれ応答するための getChunk と openStream という 2 つのインターフェイスを提供し、バッファー セットを登録するためのサーバーの ChunkFetch 用の registerStream インターフェイスを提供します。たとえば、BlockManager 内の BlockID のセットに対応する Iterator[ManagedBuffer] を StreamManager に登録して、リモート ブロック フェッチ操作をサポートできます。 ExternalShuffleService (このノード上のすべてのシャッフル マップ出力を他のコンピューティング ノードに提供する別のシャッフル サービス プロセス) の場合、リモート Executor 用の OpenBlocks RPC インターフェイスが提供されます。つまり、要求された appid、executorid、blockid (appid+executor はローカル ディレクトリのセットに対応し、blockid は展開されます) に従って、ローカル ディスクからメモリに FileSegmentManagedBuffer のセットをロードし、ロードされた streamId をクライアントに返して、後続の ChunkFetch 操作をサポートします。 パーティションとブロックの関係 RDD 操作はパーティションに基づいていることは誰もが知っています。各タスクは、パーティション上のステージ内の計算クロージャを表します。タスクは複数のエグゼキュータで実行されるようにスケジュールされます。では、どこでブロックになるのでしょうか?変換プロセスを確認するために、Spark 2.11 のソース コードを標準として採用してみましょう。 RDD がエグゼキュータにディスパッチされると、getOrCompute メソッドが呼び出されます。
BlockManager 内に Block が存在する場合は、BlockManager から取得されます。存在しない場合は、ブロックが計算され、次回簡単に使用できるように BlockManager に保存されて保持されます。 もちろん、取得する場合は、まずローカルの BlockManager から取得します。ローカルで利用できない場合は、リモートから取得されます。まず、メタデータ ブロックの場所がドライバーから取得され、次に実際のノードから取得されます。 そうでない場合は、計算されてから、ストレージ レベルに応じて、コンピューティング ノードのローカル BlockManager のメモリまたはディスクに保存されます。 このように、RDD の変換とアクションはブロック データと結びついています。抽象的には操作はパーティション レベルで実行されますが、パーティションは最終的にブロックにマップされます。したがって、実際には、私たちのすべての操作はブロックの処理とアクセスです。 SparkにおけるblockManagerの役割 BlockManager は非常に重要な Spark コンポーネントです。 BlockManager がいかに重要であるかを示すために、いくつかの例を見てみましょう。
Spark キャッシュと Spark ブロードキャスト タスク Spark Cache と Spark Broadcast がタスクをスケジュールするときに BlockManager を使用する方法を示す 2 つの例を示します。 スパークキャッシュ rdd を計算するときは、まず RDD ID とパーティション インデックスに従ってブロック ID (rdd_xx_xx) を構築し、BlockManager から対応するブロックを取り出します。ブロックが存在する場合、この RDD は以前に計算され、BlockManager に保存されているため、再計算せずに取り出すことができます。ブロックが存在しない場合は、計算を行い、doPutIterator関数を介してノード上のBlockManagerにブロックを保存し、ブロック情報をドライバーに報告し、次回同じRDDが使用される場合、対応するブロックを分散ストレージから直接取得できます。 ソースコードを見てみましょう
ストレージ レベルが NONE でない場合、getOrCompute が呼び出されます。私たちはすでにこれを見てきました。実際には、SparkEnv.get.blockManager.getOrElseUpdate メソッドを呼び出します。 BlockManager にブロックが存在する場合は、BlockManager から取得されます。そうでない場合、ブロックは計算され、次回の使用のために BlockManager に保存され、保持されます。 BlockManager はデータを保存した後、次のコードを呼び出してブロック情報をドライバーに報告します。
実際には、マスターは masterEndpoint の参照に UpdateBlockInfo メッセージを送信し、マスターはこの blockId に対応する場所をドライバーに配置します。 同様に、ブロックが計算されている場合は、ドライバーから位置情報が取得されます。
スパークブロードキャストタスク 複数のタスクにタスクをスケジュールするプロセスはコードが多すぎるため、ここでは投稿しません。プロセスについてのみお話しします。
SparkストリーミングにおけるblockManagerの応用
blockManager のテスト 簡単なテストをしてみましょう。 2 つのコードの違いは、一方はキャッシュし、もう一方はキャッシュしないことです。
ログから、最初のコード セグメントでは、両方のジョブが HDFS からファイルを 2 回読み取っていることがわかります。
以下のログがあります
ファイルを初めて読み取った後、そのファイルが blockManager にキャッシュされていることがわかりました。次のジョブが実行されると、ローカル BlockManager は hdfs ファイルを読み取らずにブロックが取得されたことを直接検出しました。 Spark UI では、キャッシュ ブロックも見つかりました。これらはすべてメモリにキャッシュされています。 |
<<: IoTデバイスは爆発的に増加し、クラウドコンピューティングは「フォグコンピューティング」へと移行している
>>: 分散アーキテクチャにおける「負荷分散」について 1 つの記事で学ぶ
インターネットの発展により、オンライン情報はあらゆる方法で私たちの生活に浸透し、私たちの生活に利便性...
現在でも、バックリンクは、Baidu や Google などの検索エンジンがウェブサイトの人気度を測...
かつてはキーワード密度が非常に重要で、ウェブサイトのキーワードランキングに影響を与えるほどだったこと...
ZJiNet は今年のダブルイレブンの独立サーバープロモーション活動に新しいコンテンツを追加しました...
7月19日、International Data Corporation(IDC)は「2017年下半...
この記事は、GOPS 2022 上海での Yu Tao 教授の講演に基づいてまとめられています。さら...
国内のサーバープロバイダーであるLeica Cloud(lcayun.com)は、付加価値通信ライセ...
ウクライナのホスティング会社である Takewyn は、ウクライナの仮想ホスティング、ウクライナの ...
ecvpsは中国人が開設したVPSです。数年にわたって利用されており、非常に安定しています。コミュニ...
[[333298]]現在、チップのパフォーマンスの向上は限られているため、分散トレーニングは超大規模...
アメリカのホスティング会社 usdedicated は、ロサンゼルスのデータセンターでサーバープロモ...
20年以上運営しているドイツの老舗企業Contaboは、ドイツと米国セントルイスにすでにデータセンタ...
MySQL 分散アーキテクチャのスケールアップとスケールダウンは非常に興味深いトピックです。厳密に言...
コンテナの操作は、多くのユーザーや開発者にとって日常的なタスクです。コンテナ開発者は、コンテナイメー...
月収10万元の起業の夢を実現するミニプログラム起業支援プランあっという間に建国記念日の祝日が終わり、...