Spark 独自の分散ストレージ システム - BlockManager

Spark 独自の分散ストレージ システム - BlockManager

全体的なアーキテクチャ

BlockManager は Spark の重要なコンポーネントです。 BlockManager は、Spark の実行プロセスのあらゆる場所に存在します。 BlockManager の原理とメカニズムを理解することによってのみ、Spark をより深く理解することができます。今日は、BlockaManager の基本原理と設計のアイデアを紹介します。

BlockManager は、Spark に組み込まれた、Spark 向けにカスタマイズされたキー値分散ストレージ システムです。

BlockManager は、すべてのドライバーとエグゼキューターを含む、Spark アプリケーション内のすべてのノード上でローカル キャッシュとして実行されます。 BlockManager は、ローカルとリモートに対して一貫したデータ ブロックの取得および設定インターフェイスを提供します。 BlockManager 自体は、メモリ、ディスク、オフヒープなどのさまざまなストレージ方法を使用してこのデータを保存しています。

上記は全体的なアーキテクチャ図です。 BlockManagerMaster には、BlockManagerMasterEndpoint のアクターとすべての BlockManagerSlaveEndpoint の参照があります。これらの参照を通じてスレーブにコマンドを発行できます。

executor ノード上の BlockManagerMaster には、BlockManagerMasterEndpoint の参照と独自の BlockManagerSlaveEndpoint アクターがあります。マスターの参照を通じて自身を登録できます。

マスターとスレーブが正常に通信できるようになると、設計された相互作用プロトコルに従って相互作用できるようになり、分散キャッシュ システム全体が稼働できるようになります。

初期化

sparkEnv が起動するとさまざまなコンポーネントが起動されることがわかっていますが、BlockManager も例外ではありません。これもこのタイミングで開始されます。

起動時には、ドライバー側か実行側かによって異なる起動プロセスが実行されます。

  1. デフレジスタOrLookupEndpoint(
  2. 名前: 文字列、エンドポイントクリエータ: => RpcEndpoint):
  3. Rpcエンドポイントリファレンス = {
  4. if (isDriver) {
  5. logInfo( "登録中" +名前)
  6. rpcEnv.setupEndpoint(名前, エンドポイント作成者)
  7. }それ以外{
  8. RpcUtils.makeDriverRef(名前、conf、rpcEnv)
  9. }
  10. }

上の図は、マスター上で sparkEnv が起動されると、BlockManagerMasterEndpoint が構築され、この Endpoint が rpcEnv に登録され、独自の BlockManager も起動されることを示しています。

上の図は、executor 上で sparkEnv が起動されると、setupEndpointRef メソッドを通じて BlockManagerMaster の参照 BlockManagerMasterRef を取得し、独自の BlockManager も起動することを示しています。

BlockManager は初期化されると、BlockManagerMasterEndpoint に自身を登録します。 BlockManagerMasterEndpoint は registerBlockManager メッセージを送信します。 BlockManagerMasterEndpoint はメッセージを受信し、後で使用するために BlockManagerSlaveEndpoint の参照を独自の blockManagerInfo データ構造に保存します。

分散プロトコル

次の表は、マスターとスレーブが受信するさまざまな種類のメッセージと、メッセージを受信した後に実行される処理を示しています。

  • BlockManagerMasterEndpoint が受信したメッセージ

  • BlockManagerSlaveEndpoint が受信したメッセージ

上記のプロトコルに基づいて、相互作用プロセス全体を明確に推測できると考えています。一般的なプロセスは次のようになります。スレーブの BlockManager は独自の接続でブロックを保存し、この BlockId をマスターの BlockManager に報告します。キャッシュ、シャッフル、またはブロードキャストの後、他のノードが前のステップのブロックを必要とする場合、マスターに移動してデータの場所を取得し、対応するノードに移動して取得します。

ストレージ層

RDD レベルでは、RDD はさまざまなパーティションで構成されており、実行する変換とアクションはパーティションに対して実行されることがわかります。ストレージ モジュール内では、RDD は異なるブロックで構成されていると見なされ、RDD はブロック単位でアクセスされます。本質的には、パーティションとブロックは同等ですが、異なる視点から見られます。 Spark ストレージ モジュール内のデータにアクセスするための最小単位はブロックであり、すべての操作はブロック内で実行されます。

BlockManager オブジェクトが作成されると、ブロックにアクセスするための MemoryStore オブジェクトと DiskStore オブジェクトが作成されます。メモリに十分なメモリがある場合は、MemoryStore がストレージとして使用されます。そうでない場合は、ディスクに書き出され、DiskStore を通じて保存されます。

  • DiskStore には DiskBlockManager があり、これは主に論理ブロックとディスク上のブロック間のマッピングを作成および維持するために使用されます。論理ブロックは、BlockId を通じてディスク上のファイルにマップされます。 DiskStore で diskManager.getFile メソッドが呼び出されます。サブフォルダーが存在しない場合は作成されます。フォルダ名は (spark-local-yyyyMMddHHmmss-xxxx、xxxx はランダムな数字) です。すべてのブロックは作成されたフォルダーに保存されます。
  • ブロック ID ハッシュに基づいてファイル パスを計算し、対応するファイルにブロックを格納する必要がある DiskStore と比較して、MemoryStore はブロックを非常に簡単に管理します。MemoryStore はすべてのブロックを管理するために内部でハッシュ マップを維持し、ブロック ID をキーとしてハッシュ マップにブロックを格納します。 MemoryStore からブロックを取得するのは非常に簡単です。ハッシュ マップからブロック ID に対応する値を取得するだけです。

BlockManagerのPUTおよびGETインターフェース

BlockManager は Put インターフェースと Get インターフェースを提供します。これら 2 つの API は基礎となる詳細を保護します。基礎となるレイヤーがどのように実装されているかを見てみましょう。

  • GET 操作がローカルに存在する場合は、直接返されます。ローカルからブロックを取得するには、まずそれが useMemory であるかどうかを判断し、メモリから直接取得します。 useDisk の場合はディスクから取得して返します。そして、useMemory に基づいて、次回の取得のためにメモリにキャッシュするかどうかが決定されます。ローカルに存在しない場合は、他のノードから取得されます。もちろん、メタデータはドライブに保存されます。ブロックのノード位置は、上記の GETlocation プロトコルに従って取得する必要があり、その後、他のノードから取得されます。
  • PUT 操作の前に、マルチスレッドの問題を回避するためにロックが追加されます。保存時には、ストレージ レベルに応じて対応する memoryStore または diskStore が呼び出され、その後、特定のストレージ上でストレージ インターフェイスが呼び出されます。レプリケーションの要件がある場合、データは他のマシンにバックアップされます。

blockManagerとblockTransferServiceの関係

Spark はこれまで 2 つのネットワーク フレームワークを使用してきました。当初、Akka は RPC 呼び出しに使用され、Netty は大容量ファイルの転送に使用されていました。その後、すべてNettyが使用されるようになりました。ここでの大きなファイル転送は、実際には Netty によって行われます。 blockManager を起動すると、blockTransferService サービスが開始されます。このサービスは大きなファイルを転送するために使用されます。対応する特定のクラスは NettyBlockTransferService です。このインスタンスには、BlocakManager への参照もあります。 NettyBlockRpcServer の Netty ハンドラーが起動され、サービスを提供するために BlocakManager への参照も保持されます。 BlocakManager は BlockId に従ってブロックを取得し、それを ManagedBuffer オブジェクトとしてラップします。

リモート エンドからブロックを取得する必要がある場合は、大きなバイト配列を転送するために blockTransferService が必要です。

まず、ドライバーからブロックの実際のストレージ場所を取得し、次に blockTransferService の fetchBlocks メソッドを呼び出して、他の実際のストレージ ノードからデータを取得する必要があります。クライアントはクライアント リソース プールから取得されます。 1対1フェッチの場合は、OneForOneBlockFetcher が使用されます。このフェッチャーはチャンク単位で個別にフェッチします。各チャンクはブロックのデータに対応します。設定に応じて、再試行の最大回数に達するまで再試行が行われ、対応する BlockId を含む OpenBlocks メッセージが送信されます。他のノード サーバーは、BlockId に従って blockManager からデータを取得し、それを送信に使用します。 Nettyのストリーミング伝送方式を採用しており、コールバック関数も存在します。

バックアップ中にブロックが同期的にアップロードされると、他のノード サーバーは、uploadBlock メッセージに含まれる BlockId に基づいて、ローカル BlockManager にコピーを冗長的に保存します。

ChunkFetch にも Stream に似た概念があります。 ChunkFetch のオブジェクトは「メモリ内の Iterator[ManagedBuffer]」、つまり Buffer のグループであり、各 Buffer は chunkIndex に対応し、Iterator[ManagedBuffer] 全体は StreamID によって識別されます。クライアントからの各 ChunkFetch リクエストは、(streamId、chunkIndex) で構成される一意の StreamChunkId です。サーバーは StreamChunkId に基づいてそれをバッファーとして取得し、クライアントに返します。 Stream か ChunkFetch かに関係なく、StreamID とリソース間のマッピング セットは、サーバーのメモリ、つまり StreamManager クラスで管理する必要があります。StreamManager クラスは、ChunkFetch と Stream の 2 つの操作にそれぞれ応答するための getChunk と openStream という 2 つのインターフェイスを提供し、バッファー セットを登録するためのサーバーの ChunkFetch 用の registerStream インターフェイスを提供します。たとえば、BlockManager 内の BlockID のセットに対応する Iterator[ManagedBuffer] を StreamManager に登録して、リモート ブロック フェッチ操作をサポートできます。

ExternalShuffleService (このノード上のすべてのシャッフル マップ出力を他のコンピューティング ノードに提供する別のシャッフル サービス プロセス) の場合、リモート Executor 用の OpenBlocks RPC インターフェイスが提供されます。つまり、要求された appid、executorid、blockid (appid+executor はローカル ディレクトリのセットに対応し、blockid は展開されます) に従って、ローカル ディスクからメモリに FileSegmentManagedBuffer のセットをロードし、ロードされた streamId をクライアントに返して、後続の ChunkFetch 操作をサポートします。

パーティションとブロックの関係

RDD 操作はパーティションに基づいていることは誰もが知っています。各タスクは、パーティション上のステージ内の計算クロージャを表します。タスクは複数のエグゼキュータで実行されるようにスケジュールされます。では、どこでブロックになるのでしょうか?変換プロセスを確認するために、Spark 2.11 のソース コードを標準として採用してみましょう。

RDD がエグゼキュータにディスパッチされると、getOrCompute メソッドが呼び出されます。

  1. SparkEnv.get.blockManager.getOrElseUpdate(blockId、storageLevel、elementClassTag、() => {
  2. readCachedBlock = false  
  3. computeOrReadCheckpoint(パーティション、コンテキスト)
  4. })

BlockManager 内に Block が存在する場合は、BlockManager から取得されます。存在しない場合は、ブロックが計算され、次回簡単に使用できるように BlockManager に保存されて保持されます。

もちろん、取得する場合は、まずローカルの BlockManager から取得します。ローカルで利用できない場合は、リモートから取得されます。まず、メタデータ ブロックの場所がドライバーから取得され、次に実際のノードから取得されます。

そうでない場合は、計算されてから、ストレージ レベルに応じて、コンピューティング ノードのローカル BlockManager のメモリまたはディスクに保存されます。

このように、RDD の変換とアクションはブロック データと結びついています。抽象的には操作はパーティション レベルで実行されますが、パーティションは最終的にブロックにマップされます。したがって、実際には、私たちのすべての操作はブロックの処理とアクセスです。

SparkにおけるblockManagerの役割

BlockManager は非常に重要な Spark コンポーネントです。 BlockManager がいかに重要であるかを示すために、いくつかの例を見てみましょう。

  • スパークシャッフルプロセスは常にBlockManagerをデータ転送ステーションとして使用します。
  • Sparkブロードキャストが複数のエグゼキューターにタスクをスケジュールする場合、broadCastによって使用される基礎となるデータストレージ層
  • Spark ストリーミングの ReceiverInputDStream によって受信されたデータも、最初に BlockManager に配置され、次に計算の次のステップのために BlockRdd にカプセル化されます。
  • RDD をキャッシュする場合、cacheManager はデータを blockmanager にも配置し、計算チェーンの依存関係を切断します。後続のタスクを実行すると、最初から計算する必要なく、cacheManager から cacherdd を直接取得できます。

Spark キャッシュと Spark ブロードキャスト タスク

Spark Cache と Spark Broadcast がタスクをスケジュールするときに BlockManager を使用する方法を示す 2 つの例を示します。

スパークキャッシュ

rdd を計算するときは、まず RDD ID とパーティション インデックスに従ってブロック ID (rdd_xx_xx) を構築し、BlockManager から対応するブロックを取り出します。ブロックが存在する場合、この RDD は以前に計算され、BlockManager に保存されているため、再計算せずに取り出すことができます。ブロックが存在しない場合は、計算を行い、doPutIterator関数を介してノード上のBlockManagerにブロックを保存し、ブロック情報をドライバーに報告し、次回同じRDDが使用される場合、対応するブロックを分散ストレージから直接取得できます。

ソースコードを見てみましょう

  1. 最終的な定義イテレータ(分割: パーティション、コンテキスト: TaskContext): Iterator[T] = {
  2. ストレージレベルがストレージレベル.NONE ではない場合
  3. getOrCompute(分割、コンテキスト)
  4. }それ以外{
  5. computeOrReadCheckpoint(分割、コンテキスト)
  6. }
  7. }

ストレージ レベルが NONE でない場合、getOrCompute が呼び出されます。私たちはすでにこれを見てきました。実際には、SparkEnv.get.blockManager.getOrElseUpdate メソッドを呼び出します。 BlockManager にブロックが存在する場合は、BlockManager から取得されます。そうでない場合、ブロックは計算され、次回の使用のために BlockManager に保存され、保持されます。

BlockManager はデータを保存した後、次のコードを呼び出してブロック情報をドライバーに報告します。

  1. プライベートdef tryToReportBlockStatus(
  2. ブロックID: ブロックID、
  3. ステータス: ブロックステータス、
  4. droppedMemorySize: Long = 0L): ブール値 = {
  5. val ストレージレベル = status.storageLevel
  6. val inMemSize = Math.最大(status.memSize、droppedMemorySize)
  7. val onDiskSize = status.diskSize
  8. master.updateBlockInfo(ブロックマネージャID、ブロックID、ストレージレベル、メモリサイズ、ディスクサイズ)
  9. }

実際には、マスターは masterEndpoint の参照に UpdateBlockInfo メッセージを送信し、マスターはこの blockId に対応する場所をドライバーに配置します。

同様に、ブロックが計算されている場合は、ドライバーから位置情報が取得されます。

  1. プライベートdef getLocations(blockId: BlockId): Seq[BlockManagerId] = {
  2. val locs = Random.shuffle(master.getLocations(blockId))
  3. val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
  4. 優先Locs ++ その他のLocs
  5. }

スパークブロードキャストタスク

複数のタスクにタスクをスケジュールするプロセスはコードが多すぎるため、ここでは投稿しません。プロセスについてのみお話しします。

  • DAGScheduler が submitMissingTasks メソッドを使用してタスクを送信すると、タスクが Broadcast タイプとしてラップされます。Broadcast タイプは、TorrentBroadcastFactory を使用して TorrentBroadcast タイプを作成し、p2p プロトコルを使用することで、マスターへの負荷を軽減します。これはwriteBlocksを呼び出して、blockManager.putSingleを介してtaskBinaryをBlockManagerキャッシュに格納します。
  • ShuffleMapTask または ResultTask を呼び出すと、runTask メソッドが呼び出されます。このメソッドは実際には Broadcast の value メソッドを呼び出し、最終的には BlockManager の getLocalBytes または getRemoteBytes メソッドを呼び出します。

SparkストリーミングにおけるblockManagerの応用

  • ReceiverTracker が起動すると、ジョブが実行され、各エグゼキュータで ReceiverSupervisorImpl が起動され、次に特定のデータ レシーバーが起動されます。 SocketInputDStream の場合は、SocketReceiver を起動します。
  • データを受信した後、Receiver はまずそれを BlockGenerator にキャッシュします。一定のサイズに達すると、BlockManagerBasedBlockHandler の storeBlock メソッドを呼び出して、BlockManager に永続化します。次に、データ情報を ReceiverTracker に報告し、最終的に ReceivedBlockTracker の timeToAllocatedBlocks にまとめます。
  • ReceiverInputDStream が計算を行うと、receivedBlockTracker は最終的に BlockManager の保存場所に対応する時間に基づいて BlockManager 内のメタデータを取得し、計算用のデータを取得します。

blockManager のテスト

簡単なテストをしてみましょう。 2 つのコードの違いは、一方はキャッシュし、もう一方はキャッシュしないことです。

  1. val ファイル = sc.textFile( "/fusionlog/midsourcenew/2017-03-13-18-15_2.gz" )  
  2. ファイル数()  
  3. ファイル数()

ログから、最初のコード セグメントでは、両方のジョブが HDFS からファイルを 2 回読み取っていることがわかります。

  1. val ファイル = sc.textFile( "/fusionlog/midsourcenew/2017-03-13-18-15_2.gz" ).cache()
  2. ファイル数()
  3. ファイル数()

以下のログがあります

  1. MemoryStore: ブロックrdd_1_0は次のように保存されます 価値観 メモリ(推定サイズ1354.9 MB、空き4.9 GB)
  2. BlockManager: ブロック rdd_1_0 がローカルに見つかりました

ファイルを初めて読み取った後、そのファイルが blockManager にキャッシュされていることがわかりました。次のジョブが実行されると、ローカル BlockManager は hdfs ファイルを読み取らずにブロックが取得されたことを直接検出しました。

Spark UI では、キャッシュ ブロックも見つかりました。これらはすべてメモリにキャッシュされています。

<<:  IoTデバイスは爆発的に増加し、クラウドコンピューティングは「フォグコンピューティング」へと移行している

>>:  分散アーキテクチャにおける「負荷分散」について 1 つの記事で学ぶ

推薦する

エンタープライズウェブサイトの構築と最適化に関する実務経験

インターネットの発展により、オンライン情報はあらゆる方法で私たちの生活に浸透し、私たちの生活に利便性...

床塗料のキーワードランキングを向上させる6つの戦略

現在でも、バックリンクは、Baidu や Google などの検索エンジンがウェブサイトの人気度を測...

企業サイトはキーワード密度を合理的に改善することで、最高の最適化効果を達成できます。

かつてはキーワード密度が非常に重要で、ウェブサイトのキーワードランキングに影響を与えるほどだったこと...

#11.11# ZJiNet: 香港アリババ専用回線(独立)サーバー、55%オフ、最低412元、さまざまなハイエンド構成、10MアリババCN2帯域幅

ZJiNet は今年のダブルイレブンの独立サーバープロモーション活動に新しいコンテンツを追加しました...

2018年の国内エンタープライズSaaS市場の6つのハイライト、関係者は見逃せない

7月19日、International Data Corporation(IDC)は「2017年下半...

クラウド ネイティブでの観測可能なデータ収集の実践については、この記事をお読みください。

この記事は、GOPS 2022 上海での Yu Tao 教授の講演に基づいてまとめられています。さら...

Leica Cloud: クラウド サーバーが 20% オフ、最低 38 元、香港 CN2 GIA、米国 CN2 GIA、韓国 CN2、日本 CN2、帯域幅 20M から

国内のサーバープロバイダーであるLeica Cloud(lcayun.com)は、付加価値通信ライセ...

takewyn - ウクライナ VPS+専用サーバー、1Gbps 帯域幅、無料 Windows

ウクライナのホスティング会社である Takewyn は、ウクライナの仮想ホスティング、ウクライナの ...

ecvps 完全マネージド VPS (DA パネル付き) 30 ドル/年、2.58 ドル/月

ecvpsは中国人が開設したVPSです。数年にわたって利用されており、非常に安定しています。コミュニ...

PyTorch はどのようにしてデータ並列トレーニングを高速化するのでしょうか?分散型チートが明らかに

[[333298]]現在、チップのパフォーマンスの向上は限られているため、分散トレーニングは超大規模...

#専用# usdedicated: $77/E3-1271v3/16g/256gSSD/10G 防御/ロサンゼルス

アメリカのホスティング会社 usdedicated は、ロサンゼルスのデータセンターでサーバープロモ...

お問い合わせ: (米国/ドイツ)、299 ユーロ、2 * EPYC 7282/256g メモリ/10Gbps 帯域幅、無制限トラフィック、DDOS 保護内蔵

20年以上運営しているドイツの老舗企業Contaboは、ドイツと米国セントルイスにすでにデータセンタ...

MySQL分散アーキテクチャの拡張と縮小の予備設計

MySQL 分散アーキテクチャのスケールアップとスケールダウンは非常に興味深いトピックです。厳密に言...

より小さなコンテナを構築する方法

コンテナの操作は、多くのユーザーや開発者にとって日常的なタスクです。コンテナ開発者は、コンテナイメー...

ナショナルデーのブランドマーケティングポスターを包括的にレビューし、これらのブランドがどのようにレバレッジマーケティングを活用しているかを確認します。

月収10万元の起業の夢を実現するミニプログラム起業支援プランあっという間に建国記念日の祝日が終わり、...