[[422916]] Python 中国語コミュニティ (ID: python-china) 1. レイとは何かオフライン コンピューティング用の Hadoop (マップ リデュース)、ストリーミング コンピューティング用の spark、strom、Flink などの分散コンピューティング フレームワークについては、誰もがよく知っている必要があります。比較的言えば、これらのコンピューティング フレームワークはすべて他のビッグ データ コンポーネントに依存しており、インストールと展開が比較的複雑です。 Pythonでは、以前から共有されているCeleryが分散コンピューティングを提供できます。今日は、もう 1 つのオープン ソース分散コンピューティング フレームワークである Ray についてお話ししたいと思います。 Ray は、カリフォルニア大学バークレー校 RISELab によって新たに立ち上げられた高性能分散実行フレームワークです。 Spark よりもコンピューティング パフォーマンスが優れており、導入と変換も簡単です。また、機械学習やディープラーニングの分散トレーニングもサポートしており、主流のディープラーニングフレームワーク(pytorch、tensorflow、kerasなど)もサポートしています。 - https://github.com/ray-project/ray
2. レイアーキテクチャRayのアーキテクチャは、最初に公開された論文「Ray: 新興AIアプリケーションのための分散フレームワーク」に記載されています。 上の図からわかるように、Ray には主に次のものが含まれます。 - ノード: 主にヘッドとワーカーからなるノード。ヘッドはマスターと考えることができ、ワーカーはタスクを実行するユニットです。
- 各ノードには独自のローカルスケジューラがある
- オブジェクトストア: ノード間の通信を可能にするメモリ内オブジェクトストア
- スケジューラ: スケジューラは 2 つあります。各ノードにはローカル スケジューラがあります。タスクを送信すると、ローカル スケジューラは、タスクを他のワーカーに配布して実行するためにグローバル スケジューラに送信する必要があるかどうかを判断します。
- GCS: グローバル状態制御は、Ray 内のさまざまなオブジェクトの状態情報を記録し、メタデータとして考えることができ、Ray のフォールト トレランスの保証となります。
Ray は、分散トレーニングを含むあらゆる分散コンピューティング タスクに適しています。著者は最近、これを多数の時系列予測モデルのトレーニングとオンライン予測に使用しました。 Ray は現在、ハイパーパラメータ調整の Ray tune、勾配降下法の Ray SGD、推論サービス RaySERVE、分散データ Dataset、分散強化学習 RLlib をサポートしています。他にも次のようなサードパーティ ライブラリがあります。 3. 使いやすい3.1 インストールと展開 - pip インストール --upgrade pip
- # レイをインストール
- pip インストールray ==1.6.0
- # ImportError: 'attr.validators' から名前 'deep_mapping' をインポートできません
- # pip インストールattr == 19.1.0
3.2 単一マシン使用 - 簡単な例: Ray は @ray.remote デコレータを使用して、関数を分散呼び出し可能なタスクに変換します。関数名.remote メソッドを通じてタスクを送信し、ray.get メソッドを通じてタスクの戻り値を取得します。シングルクリックのケースは、マルチスレッドの非同期実行方法に似ています。
- インポート時間
- レイをインポート
- ray.init( num_cpus = 4 ) # このシステムに 4 つの CPU があることを指定します。
- レイ・リモート
- def do_some_work(x):
- time.sleep(1) # これを実行する必要がある作業に置き換えます。
- xを返す
- 開始=時間.time()
- 結果= ray .get([do_some_work.remote(x) の x が範囲(4)])
- print(" duration =", time.time() - 開始)
- print("結果= ", 結果)
- #期間= 1 .0107324123382568
- #結果= [0, 1, 2, 3]
リモートによって返されるオブジェクトの ID は、ObjectRef(7f10737098927148ffffffff0100000001000000) のようになります。実際の値を取得するには、ray.get を使用する必要があります。 ray.getはブロッキング呼び出しであり、使用できないことに注意してください[ray.get(do_some_work.remote(x)) for x in range(4)] - 小さなタスクの使用に注意してください。 Ray 分散コンピューティングでは、スケジュール設定、プロセス間通信、タスク ステータスの更新などのスケジュール設定時に余分な時間が必要になるため、小さすぎるタスクは避ける必要があることに注意してください。小さなタスクを組み合わせることができる
- レイ・リモート
- tiny_work(x)を定義します。
- time.sleep(0.0001) # これを実行する必要がある作業に置き換えます。
- xを返す
- 開始=時間.time()
- result_ids = [tiny_work.remote(x) の場合、x は範囲(100000)]
- 結果= ray .get(result_ids)
- print(" duration =", time.time() - 開始)
- ray.put ray.put() はオブジェクトをオブジェクト ストレージに格納し、分散マシンで呼び出すことができるオブジェクト ID を返します。操作は非同期です。 ray.get() を通じて取得できます。
- num =レイ.put(10)
- ray.get(数値)
- ray.wait タスクが複数の結果を返す場合、ray.get() はすべての結果が完了するまで待機してから、後続の操作を実行します。複数の結果の実行にかかる時間が異なる場合、最も時間のかかるタスクに欠点があります。
このとき、ray.wait() メソッドを使用できます。 ray.wait() は完了したタスクと未完了のタスクの結果を返します。完了した結果は後続の操作を続行できます。 - ランダムにインポート
- レイ・リモート
- def do_some_work(x):
- time.sleep(random.uniform(0, 4)) # これを実行する必要がある作業に置き換えます。
- 戻る
- def process_incremental(合計、結果):
- time.sleep(1) # これを何らかの処理コードに置き換えます。
- 合計 + 結果を返す
- 開始=時間.time()
- result_ids = [do_some_work.remote(x) の範囲(4)]
- 合計= 0
- len(result_ids):の間
- done_id、 result_ids = ray .wait(result_ids) です。
- 合計= process_incremental (合計、ray.get(done_id[0]))
- print(" duration = ", time.time() - start, " \ nresult = ", sum) 出力
- #期間= 5 .270821809768677
- #結果= 6
2.3 クラスタの展開 Ray のアーキテクチャはマスター スレーブ モデルに従います。ヘッド ノードはマスターと見なすことができ、他のノードはワーカーです。クラスターをデプロイするときは、まず ray start --head を使用してヘッドノードを起動し、他のマシンが順番にワーカーを起動する必要があります。関係を決定するには、ヘッド ノードのアドレス (ray start --address 10.8.xx.3:6379) を指定する必要があることに注意してください。 サービスをシャットダウンするには、各マシンでray.stopを実行する必要があります。 - # ヘッドノードを起動します。
- #ray start --head --num-cpus = < NUM_CPUS > --num - gpus = <NUM_GPUS>
- レイスタート --head --node-ip-address 10.8.xx.3 --port = 6379
- # 非ヘッドノードを起動します。
- #ray start --address = <アドレス> --num - cpus = < CPUの数> --num - gpus = <NUM_GPUS>
- ray start --address 10.8.xx.3:6379 --node-ip-address 10.8.xx.3 --num-cpus 10 --temp-dir ={一時パス}
- タスクの送信 どのワーカー マシンでもタスクを送信できます。まず、init を介してヘッドノードに接続し、リモート接続することができます。
- レイをインポート
- レイ.init(10.8.xx.3:6379)
3. さまざまなタスクの例- タスクの依存関係 タスク間には依存関係があります。 Ray は Spark と同様に、DAG グラフを生成することで依存関係を決定し、並列実行できるタスクを決定します。下の図に示すように、ゼロは並列に実行できます。
- numpyをnpとしてインポートする
- # 2 つのリモート関数を定義します。これらの関数を呼び出すとタスクが作成される
- # リモートで実行されます。
- レイ・リモート
- def multiply(x, y):
- np.dot(x, y) を返す
- レイ・リモート
- ゼロを定義します(サイズ):
- np.zeros(サイズ)を返す
- # 2 つのタスクを並行して開始します。これらはすぐに先物を返すので、
- # タスクはバックグラウンドで実行されます。
- x_id =ゼロ.remote((100, 100))
- y_id =ゼロ.remote((100, 100))
- # 3 番目のタスクを開始します。これは最初の2つまでは予定されていません
- # 個のタスクが完了しました。
- z_id = .remote(x_id, y_id)を掛け算する
- # 結果を取得します。 3 番目のタスクが完了するまでブロックされます。
- z =レイ.get(z_id)
- 印刷(z)
- 上記のタスクはすべてステートレスです (依存関係を除く)。つまり、タスク間に関係はありません。 Ray は、Actor としてステートフル タスクもサポートします。通常、Python クラスに @ray.remote を追加すると、ray は各クラスの内部状態のさまざまな状態を追跡します。
- レイ・リモート
- クラス Counter(オブジェクト):
- __init__(self)を定義します。
- 自分.n = 0
- def増分(自己):
- 自己.n += 1
- def read(self):
- 自己.nを返す
- カウンター= [Counter.remote() の i が範囲(4)]
- # 連続実行により各カウンターのカウントが増加する可能性があります
- [カウンター内の c の c.increment.remote()]
- futures = [c.read.remote() はカウンター内の c に対して]
- 印刷(ray.get(futures))
- # [1, 1, 1, 1]
- # [11、11、11、11]
- Map-Reduce タスクは、実際には他の分散タスクと同じです。主に各種集計操作。 Map-Reduceの一般的な操作は次のとおりです。
- - 単語数カウントの例については、https://github.com/ray-project/ray/blob/master/doc/examples/streaming/streaming.py を参照してください。
以下に簡単な例を示します。 - レイ・リモート
- 定義マップ(obj, f):
- f(obj) を返す
- レイ・リモート
- def sum_results(*要素):
- np.sum(要素)を返す
- 項目=リスト(範囲(100))
- map_func =ラムダi: i*2
- remote_elements = [map.remote(i, map_func) アイテム内の i の場合]
- #シンプルリデュース
- remote_final_sum = sum_results .remote(*remote_elements)
- 結果= ray .get(remote_final_sum)
- #ツリーを減らす
- 中間結果= [sum_results.remote(
- *remote_elements[i * 20: (i + 1) * 20]) iが範囲内(5)]
- remote_final_sum = sum_results .remote(*intermediate_results)
- 結果= ray .get(remote_final_sum)
- モデルのトレーニングについては、pytorch の公式サイトで「Best Practices: Ray with PyTorch」が提供されています。これは主にトレーニング/テスト データをダウンロードし、複数のモデルをトレーニングするものです (あまり実用的ではないようです)。複数のモデルをトレーニングすることで、パラメータの融合を実行できます。
https://docs.ray.io/en/latest/using-ray-with-pytorch.html を参照してください。 4. 結論この記事では、効率的な Python 分散コンピューティング フレームワーク Ray を紹介します。お役に立てば幸いです。要約すると: - Ray は、カリフォルニア大学バークレー校 RISELab によって新たに立ち上げられた高性能分散実行フレームワークです。 Spark も Berkeley によって生産されています。
- Rayアーキテクチャの重要なポイント: 2つのスケジューラ、ヘッドノードとワーカーノード、コンピューティングのフォールトトレランスを確保するためのGCSグローバル状態制御
- Rayアプリケーションはシンプルです: @ray.remoteがタスクを分散タスクに変換し、x.remoteがタスクを送信し、get/waitが結果を取得します
- クラスターは存在しません: レイ開始
- Rayは、依存型DAG、ステートフルアクター、ディープラーニングサポートなど、複数のタスクをサポートします。
- 継続的に拡充されるライブラリ: RaySERVE、RaySGD、RayTune、Ray data、rllib
|