Python分散プロセスで遭遇する可能性のある落とし穴

Python分散プロセスで遭遇する可能性のある落とし穴

  [[266539]]

ちょっとした騒ぎ

Python 3 を使用していますか、それとも Windows システムでプログラミングしていますか?最も重要なのは、プロセスとスレッドについてよくわかっていないということです。それではおめでとうございます。Python 分散プロセスには、掘り下げる穴が待っています。 。 。 (ハハハハ、ちょっと怖がらせてください) 冗談です。ただし、匿名関数がシーケンスでサポートされていないことを知っていれば、この落とし穴にさよならすることができます。さて、これ以上前置きせずに、早速本題に入りましょう。

分散プロセス

ご存知のとおり、プロセスはスレッドよりも安定しており、プロセスは複数のマシンに分散できますが、スレッドは最大で同じマシンの複数の CPU にしか分散できません。 Python の multiprocessing モジュールは複数のプロセスをサポートするだけでなく、そのマネージャー サブモジュールは複数のプロセスを複数のマシンに分散することもサポートします。サービス プロセスはスケジューラとして機能し、ネットワーク通信を利用してタスクを複数の他のプロセスに分散します。マネージャー モジュールは適切にカプセル化されているため、ネットワーク通信の詳細を理解しなくても、分散マルチプロセス プログラムを簡単に作成できます。

コードログ

例えば

同じマシン上でキューを介して通信するマルチプロセス プログラムが既に実行されており、タスクを処理するプロセスが重いため、タスクを送信するプロセスとタスクを処理するプロセスを 2 台のマシンに分散したい場合、分散プロセスを使用してこれをどのように実現すればよいでしょうか。元のキューは引き続き使用できることは既にご存じのとおりであり、マネージャー モジュールを介してネットワーク経由でキューを公開することで、他のマシン上のプロセスがキューにアクセスできるようになります。よし、やってみよう!

task_master.pyを書く

まずはサービスプロセスを見てみましょう。サービス プロセスは、キューを開始し、キューをネットワークに登録し、キューにタスクを書き込む役割を担います。

  1. #! /ユーザー/bin/pytthon
  2. # -*- コーディング:utf-8 -*-
  3. # @時間: 2018/3/3 16:46
  4. # @著者: lichexo
  5. # @ファイル: task_master.py
  6. ランダム、時間、キューをインポートする
  7. multiprocessing.managersからBaseManager をインポートします
  8. # タスクキューを送信:
  9. タスクキュー = キュー.Queue()
  10. # 結果を受信するキュー:
  11. result_queue = キュー.Queue()
  12. # BaseManager から継承された QueueManager:
  13. クラス QueueManager(BaseManager):
  14. 合格
  15. # 両方のキューをネットワークに登録し、呼び出し可能なパラメータをキュー オブジェクトに関連付けます。
  16. QueueManager.register( 'get_task_queue' 、呼び出し可能=lambda: task_queue)
  17. QueueManager.register( 'get_result_queue' 、呼び出し可能=lambda: result_queue)
  18. # ポート5000をバインドし、検証コード「abc」を設定します:
  19. マネージャー = QueueManager(アドレス=( '' , 5000), authkey=b 'abc' )
  20. # キューを開始します:
  21. マネージャー.開始()
  22. # ネットワーク経由でアクセスされたキュー オブジェクトを取得します。
  23. タスク = manager.get_task_queue()
  24. 結果 = manager.get_result_queue()
  25. # いくつかのタスクを入力します:
  26. iが範囲(10)内にある場合:
  27. n = ランダム.randint(0, 10000)
  28. print( 'タスク%dを実行...' % n)
  29. タスクを実行する
  30. # 結果キューから結果を読み取ります:
  31. print( '結果を取得してみてください...' )
  32. iが範囲(10)内にある場合:
  33. r = 結果.get(タイムアウト=10)
  34. print( '結果: %s' % r)
  35. # 閉鎖:
  36. マネージャー.シャットダウン()
  37. print( 'マスター終了。' )

単一のマシン上でマルチプロセス プログラムを作成する場合、作成されたキューを直接使用できることに注意してください。ただし、分散マルチプロセス環境では、Queue へのタスクの追加は元の task_queue で直接行うことはできません。これは、QueueManager のカプセル化をバイパスするため、manager.get_task_queue() を通じて取得された Queue インターフェイスを通じて追加する必要があるためです。次に、別のマシンでタスクプロセスを開始します(このマシンでも開始できます)

task_worker.pyを書く

  1. #! /ユーザー/bin/pytthon
  2. # -*- コーディング:utf-8 -*-
  3. # @時間: 2018/3/3 16:46
  4. # @著者: lichexo
  5. # @ファイル: task_worker.py
  6. インポート時間、システム、キュー
  7. multiprocessing.managersからBaseManager をインポートします
  8. # 同様の QueueManager を作成します。
  9. クラス QueueManager(BaseManager):
  10. 合格
  11. # この QueueManager はネットワークからキューを取得するだけなので、登録時には名前のみが提供されます。
  12. QueueManager.register( 'get_task_queue' )を登録します。
  13. QueueManager.register( 'get_result_queue' )を登録します。
  14. # task_master.py を実行しているマシンであるサーバーに接続します。
  15. サーバーアドレス = '127.0.0.1'  
  16. print( 'サーバー%sに接続...' % server_addr)
  17. # ポートと検証コードが task_master.py で設定されているものとまったく同じであることを確認します。
  18. m = QueueManager(アドレス=(server_addr, 5000), authkey=b 'abc' )
  19. # ネットワークから接続:
  20. m.connect () 関数
  21. # キューオブジェクトを取得します:
  22. タスク = m.get_task_queue()
  23. 結果 = m.get_result_queue()
  24. # タスク キューからタスクを取得し、結果を結果キューに書き込みます。
  25. iが範囲(10)内にある場合:
  26. 試す:
  27. n = タスク.get(タイムアウト=1)
  28. print( 'タスク %d * %d を実行...' % (n, n))
  29. r = '%d * %d = %d' % (n, n, n*n)
  30. 時間.sleep(1)
  31. 結果.put(r)
  32. Queue.Emptyを除く:
  33. print( 'タスクキューは空です。' )
  34. # 処理終了:
  35. print( 'ワーカー終了。' )

タスク プロセスはネットワーク経由でサービス プロセスに接続する必要があるため、サービス プロセスの IP を指定する必要があります。

運用結果

これで、職場で分散プロセスを試すことができます。まず、task_master.py サービス プロセスを開始します。

  1. トレースバック(最新の呼び出しが最後):
  2. ファイル"F:/Python/untitled/xianchengjincheng/master.py" 行 25、 <module>
  3. マネージャー.開始()
  4. ファイル「F:Pythonpystalllibmultiprocessingmanagers.py」 、行 513 開始
  5. 自己._プロセス.開始()
  6. ファイル「F:Pythonpystalllibmultiprocessingprocess.py」 、行 105 開始
  7. self._popen = self._popen(self)
  8. ファイル"F:Pythonpystalllibmultiprocessingcontext.py" 、行 322、 _Popen
  9. Popen(process_obj)を返す
  10. ファイル"F:Pythonpystalllibmultiprocessingpopen_spawn_win32.py" 、65 行目、 __init__
  11. 削減.ダンプ(process_obj、to_child)
  12. ファイル "F:Pythonpystalllibmultiprocessing
  13. ダンプ内のeduction.py、60行目
  14. ForkingPickler(ファイル、プロトコル).dump(obj)
  15. _pickle.PicklingError: <関数<lambda> at 0x00000202D1921E18> をピクルできません: __main__属性検索 <lambda>失敗しました

task_master.py プロセスはタスクを送信した後、結果キューからの結果を待機し始めます。次に、task_worker.py プロセスを開始します。

  1. 接続する サーバー 127.0.0.1...
  2. トレースバック(最新の呼び出しが最後):
  3. ファイル"F:/Python/untitled/xianchengjincheng/work.py" 行 24、 <module>
  4. m.connect () 関数
  5. ファイル「F:Pythonpystalllibmultiprocessingmanagers.py」 、行489  接続する 
  6. conn = クライアント(self._address, authkey=self._authkey)
  7. ファイル「F:Pythonpystalllibmultiprocessingconnection.py」 、行 487、クライアント
  8. c = SocketClient(アドレス)
  9. ファイル"F:Pythonpystalllibmultiprocessingconnection.py"614、 SocketClient
  10. s。接続(アドレス)
  11. ConnectionRefusedError: [WinError 10061] ターゲット マシンが積極的に拒否したため、接続できませんでした。

ほら、結果はすべて間違っています。何が悪かったのか分析してみましょう。 。 。

エラー分析

task_master.py のエラー メッセージには、lambda エラーと表示されていることがわかります。これは、シリアル化が匿名関数をサポートしていないため、コードを変更し、QueueManager を使用してキューを再カプセル化してネットワークに配置する必要があるためです。

  1. # 両方のキューをネットワークに登録します。呼び出し可能なパラメータは、Queue オブジェクトに関連付けられています。
  2. QueueManager.register( 'get_task_queue' 、呼び出し可能=return_task_queue)
  3. QueueManager.register( 'get_result_queue' 、呼び出し可能=return_result_queue)

このうち、task_queue と result_queue は、それぞれタスクと結果を格納する 2 つのキューです。プロセス間通信やオブジェクトの交換に使用されます。

分散環境であるため、キューに入れられたデータは、読み取る前にワーカー マシンが処理するのを待つ必要があります。このように、キューは QueueManager でカプセル化され、ネットワークに配置される必要があります。これは上記の 2 行のコードによって実現されます。 return_task_queue のネットワーク呼び出しインターフェースを get_task_queue と名付け、return_result_queue の名前を get_result_queue とすることで、どのキューに対して操作が行われているかを区別しやすくしています。 task.put(n) は task_queue にデータを書き込むもので、タスクを割り当てることと同じです。 result.get() は、ワーカー マシンによって処理された後に返される結果を待機します。

Windows では IP アドレスを入力する必要がありますが、Linux などの他のオペレーティング システムでは入力する必要がないことに注意してください。

  1. # WindowsはIPアドレスを書き込む必要があります
  2. マネージャー = QueueManager(アドレス=( '127.0.0.1' , 5000), authkey=b 'abc' )

修正されたコード

task_master.py を次のように変更します。

  1. #! /ユーザー/bin/pytthon
  2. # -*- コーディング:utf-8 -*-
  3. # @時間: 2018/3/3 16:46
  4. # @著者: lichexo
  5. # @ファイル: task_master.py
  6. #タスクマスター.py
  7. ランダム、時間、キューをインポートする
  8. multiprocessing.managersからBaseManager をインポートします
  9. マルチプロセッシングからfreeze_supportをインポート
  10. task_queue = queue.Queue() # タスクキューを送信します:
  11. result_queue = queue.Queue() # 結果を受信するためのキュー:
  12. class QueueManager(BaseManager): # QueueManager は BaseManager から継承されます:
  13. 合格
  14. # Windowsで実行
  15. return_task_queue() を定義します:
  16. グローバルタスク キュー
  17. return task_queue # 送信タスクキューに戻る
  18. return_result_queue() を定義します:
  19. グローバル結果キュー
  20. return result_queue # 受信結果キューを返します
  21. デフテスト():
  22. # 両方のキューをネットワークに登録します。呼び出し可能なパラメータは、Queue オブジェクトに関連付けられています。プロセス間通信やオブジェクト交換に使用されます。
  23. #QueueManager.register( 'get_task_queue' 、呼び出し可能=lambda: task_queue)
  24. #QueueManager.register( 'get_result_queue' 、呼び出し可能=lambda: result_queue)
  25. QueueManager.register( 'get_task_queue' 、呼び出し可能=return_task_queue)
  26. QueueManager.register( 'get_result_queue' 、呼び出し可能=return_result_queue)
  27. # ポート5000をバインドし、検証コード「abc」を設定します:
  28. #manager = QueueManager(アドレス=( '' , 5000), authkey=b 'abc' )
  29. # WindowsはIPアドレスを書き込む必要があります
  30. マネージャー = QueueManager(アドレス=( '127.0.0.1' , 5000), authkey=b 'abc' )
  31. manager.start() # キューを開始:
  32. # ネットワーク経由でアクセスされたキュー オブジェクトを取得します。
  33. タスク = manager.get_task_queue()
  34. 結果 = manager.get_result_queue()
  35. for i in range(10): # いくつかのタスクを入れます:
  36. n = ランダム.randint(0, 10000)
  37. print( 'タスク%dを実行...' % n)
  38. タスクを実行する
  39. # 結果キューから結果を読み取ります:
  40. print( '結果を取得してみてください...' )
  41. iが範囲(10)内にある場合:
  42. # 例外キャプチャがここに追加されます
  43. 試す:
  44. r = 結果.get(タイムアウト=5)
  45. print( '結果: %s' % r)
  46. キューを除く。空:
  47. print( '結果キューは空です。' )
  48. # 閉鎖:
  49. マネージャー.シャットダウン()
  50. print( 'マスター終了。' )
  51. __name__ == '__main__'の場合:
  52. フリーズサポート()
  53. print( '開始!' )
  54. テスト()

task_worker.py を次のように変更します。

  1. #! /ユーザー/bin/pytthon
  2. # -*- コーディング:utf-8 -*-
  3. # @時間: 2018/3/3 16:46
  4. # @著者: lichexo
  5. # @ファイル: task_worker.py
  6. #タスクワーカー.py
  7. インポート時間、システム、キュー
  8. multiprocessing.managersからBaseManager をインポートします
  9. # 同様の QueueManager を作成します。
  10. クラス QueueManager(BaseManager):
  11. 合格
  12. # この QueueManager はネットワークからキューを取得するだけなので、登録時には名前のみが提供されます。
  13. QueueManager.register( 'get_task_queue' )を登録します。
  14. QueueManager.register( 'get_result_queue' )を登録します。
  15. # task_master.py を実行しているマシンであるサーバーに接続します。
  16. サーバーアドレス = '127.0.0.1'  
  17. print( 'サーバー%sに接続...' % server_addr)
  18. # ポートと検証コードが task_master.py で設定されているものとまったく同じであることを確認します。
  19. m = QueueManager(アドレス=(server_addr, 5000), authkey=b 'abc' )
  20. # ネットワークから接続:
  21. m.connect () 関数
  22. # キューオブジェクトを取得します:
  23. タスク = m.get_task_queue()
  24. 結果 = m.get_result_queue()
  25. # タスク キューからタスクを取得し、結果を結果キューに書き込みます。
  26. iが範囲(10)内にある場合:
  27. 試す:
  28. n = タスク.get(タイムアウト=1)
  29. print( 'タスク %d * %d を実行...' % (n, n))
  30. r = '%d * %d = %d' % (n, n, n*n)
  31. 時間.sleep(1)
  32. 結果.put(r)
  33. キューを除く。空:
  34. print( 'タスクキューは空です。' )
  35. # 処理終了:
  36. print( 'ワーカー終了。' )

まずtask_master.pyを実行し、次にtask_worker.pyを実行します。

(1)task_master.pyの実行結果は以下の通りである。

  1. 始める!
  2. タスク 7872 を配置します...
  3. タスク 6931 を配置します...
  4. タスク 1395 を配置します...
  5. タスク 8477 を配置します...
  6. タスク 8300 を配置します...
  7. タスク 1597 を配置します...
  8. タスク 8738 を配置します...
  9. タスク 8627 を配置します...
  10. タスク 1884 を配置します...
  11. タスク 2561 を配置します...
  12. 結果を取得してみてください...
  13. 結果: 7872 * 7872 = 61968384
  14. 結果: 6931 * 6931 = 48038761
  15. 結果: 1395 * 1395 = 1946025
  16. 結果: 8477 * 8477 = 71859529
  17. 結果: 8300 * 8300 = 68890000
  18. 結果: 1597 * 1597 = 2550409
  19. 結果: 8738 * 8738 = 76352644
  20. 結果: 8627 * 8627 = 74425129
  21. 結果: 1884 * 1884 = 3549456
  22. 結果: 2561 * 2561 = 6558721
  23. マスター出口。

(2)task_worker.pyの実行結果は以下の通りである。

  1. 接続する サーバー 127.0.0.1...
  2. タスク 8640 * 8640 を実行します...
  3. タスク 7418 * 7418 を実行します...
  4. タスク 9303 * 9303 を実行します...
  5. タスク 568 * 568 を実行します...
  6. タスク 1633 * 1633 を実行します...
  7. タスク 3583 * 3583 を実行します...
  8. タスク 3293 * 3293 を実行します...
  9. タスク 8975 * 8975 を実行します...
  10. タスク 8189 * 8189 を実行します...
  11. タスク 731 * 731 を実行します...
  12. 作業員の退出。

知識補足

このシンプルなマスター/ワーカー モデルの用途は何でしょうか?実際、これはシンプルですが、真に分散されたコンピューティングです。コードを少し変更して複数のワーカーを起動することで、タスクを数台または数十台のマシンに分散できます。たとえば、n*n を計算するコードを電子メールの送信に置き換えると、電子メール キューの非同期送信を実現できます。

Queue オブジェクトはどこに保存されますか? task_worker.py には Queue を作成するコードがないため、Queue オブジェクトは task_master.py プロセスに保存されることに注意してください。

キューにネットワーク経由でアクセスできるのは、QueueManager を介しているためです。 QueueManager は複数のキューを管理するため、各キューのネットワーク呼び出しインターフェースには get_task_queue などの名前を付ける必要があります。 task_worker に登録された QueueManager の名前は、task_manager の名前と同じである必要があります。上記の例と比較すると、Queue オブジェクトがネットワーク経由で別のプロセスから渡されることがわかります。唯一の違いは、ここでの送信とネットワーク通信は QueueManager によって完了することです。

authkey の用途は何ですか?これは、他のマシンによって悪意を持って干渉されることなく、2 台のマシンが正常に通信できることを保証するためです。 task_worker.py の authkey が task_master.py の authkey と一致しない場合、接続は確実に失敗します。

<<:  ビッグデータのクラウド移行における5つの重要なポイント

>>:  警察におけるクラウドファーストの変革

推薦する

短期間で新しいウェブサイトの Baidu インデックスと重みを取得する方法

みなさんこんにちは。私は湖南省出身のキネスです。今回は、編集者が新しいウェブサイトを最適化し、短期間...

百度、ウェブサイト開発者向けに端末適応サービスを開始

携帯電話ユーザーにより良い閲覧体験を提供するために、ウェブサイト開発者は通常、さまざまな端末デバイス...

アリババクラウドが「ゼロカーボンクラウド」プランを発表

5月28日、2021 Alibaba Cloud Summitにおいて、Alibaba Cloud ...

WeChatミニプログラムを宣伝するには?定着率を向上させるにはどうすればいいでしょうか?

ミニプログラムは、10年前のタオバオストアのように人気が出てきています。企業がこれを実行しなければ、...

Ceph による分散システム障害検出

ノード障害検出は分散システムでは避けられない問題です。クラスターはノードの生存を感知し、適切な調整を...

クラウドのダウンタイムイベントの影響を軽減するにはどうすればよいでしょうか?

クラウド コンピューティングは本質的に信頼性が低いわけではありませんが、あらゆる形態の IT と同様...

クラウド アプリケーション コンテナの左向き監視アプローチ

弾力性のあるコンテナ化された環境では、非効率的なコードは非常にコストがかかります。左監視アプローチと...

Canalys:米国のクラウドインフラ支出は第1四半期に29%増加

Canalysの最新データによると、米国市場におけるクラウドインフラサービス支出は2021年第1四半...

フレンドリーリンクを交換する際にウェブマスターが見落としがちな問題は何ですか?

フレンドリー リンクの交換は、ほとんどの Web サイト最適化担当者が行う必要がある作業です。Web...

greencloudvps の香港 VPS の簡単なレビュー

greencloudvps (Green Cloud または Green Cloud vps と呼ん...

1枚の写真で数十億のAmazon Web Services IoT接続のパワーを解き放つ

「東樹西軒」は人気を博し、そのプロジェクトはあまりにも大規模で、一般の人々には何の関係もないようです...

国産CPUをベースにしたクラウドプラットフォーム上でコンテナ管理プラットフォームを構築するには? (パート1)

「ZTE事件」が拡大し続ける中、中国国民は自主管理可能な国産技術に大きな注目を寄せている。私の部署の...

alpharacks-VPSは年間わずか3.99ドルで、Banwagongの伝説を引き継ぐ

私たちの愛する BandwagonHost VPS が安価な IPV4 を利用できなくなって以来、年...

ウェブサイトの最適化は、経験だけでなく、より良い結果を得るために定説を取り除くことも重要です。

ウェブサイトの運用や最適化の初心者であれば、経験があれば回り道が少なくなるため、ぜひ指導を受けたいと...

陳天橋氏は5年ぶりに演説し、なぜ変革と撤退を望んだのかを説明した。

Huxiu Note: 陳天橋がついに口を開いた。最近のインタビューで、彼は4年間の「海外視察の旅」...