[[266539]] ちょっとした騒ぎ Python 3 を使用していますか、それとも Windows システムでプログラミングしていますか?最も重要なのは、プロセスとスレッドについてよくわかっていないということです。それではおめでとうございます。Python 分散プロセスには、掘り下げる穴が待っています。 。 。 (ハハハハ、ちょっと怖がらせてください) 冗談です。ただし、匿名関数がシーケンスでサポートされていないことを知っていれば、この落とし穴にさよならすることができます。さて、これ以上前置きせずに、早速本題に入りましょう。 分散プロセス ご存知のとおり、プロセスはスレッドよりも安定しており、プロセスは複数のマシンに分散できますが、スレッドは最大で同じマシンの複数の CPU にしか分散できません。 Python の multiprocessing モジュールは複数のプロセスをサポートするだけでなく、そのマネージャー サブモジュールは複数のプロセスを複数のマシンに分散することもサポートします。サービス プロセスはスケジューラとして機能し、ネットワーク通信を利用してタスクを複数の他のプロセスに分散します。マネージャー モジュールは適切にカプセル化されているため、ネットワーク通信の詳細を理解しなくても、分散マルチプロセス プログラムを簡単に作成できます。 コードログ 例えば 同じマシン上でキューを介して通信するマルチプロセス プログラムが既に実行されており、タスクを処理するプロセスが重いため、タスクを送信するプロセスとタスクを処理するプロセスを 2 台のマシンに分散したい場合、分散プロセスを使用してこれをどのように実現すればよいでしょうか。元のキューは引き続き使用できることは既にご存じのとおりであり、マネージャー モジュールを介してネットワーク経由でキューを公開することで、他のマシン上のプロセスがキューにアクセスできるようになります。よし、やってみよう! task_master.pyを書く まずはサービスプロセスを見てみましょう。サービス プロセスは、キューを開始し、キューをネットワークに登録し、キューにタスクを書き込む役割を担います。 - #! /ユーザー/bin/pytthon
- # -*- コーディング:utf-8 -*-
- # @時間: 2018/3/3 16:46
- # @著者: lichexo
- # @ファイル: task_master.py
- ランダム、時間、キューをインポートする
- multiprocessing.managersからBaseManager をインポートします
- # タスクキューを送信:
- タスクキュー = キュー.Queue()
- # 結果を受信するキュー:
- result_queue = キュー.Queue()
- # BaseManager から継承された QueueManager:
- クラス QueueManager(BaseManager):
- 合格
- # 両方のキューをネットワークに登録し、呼び出し可能なパラメータをキュー オブジェクトに関連付けます。
- QueueManager.register( 'get_task_queue' 、呼び出し可能=lambda: task_queue)
- QueueManager.register( 'get_result_queue' 、呼び出し可能=lambda: result_queue)
- # ポート5000をバインドし、検証コード「abc」を設定します:
- マネージャー = QueueManager(アドレス=( '' , 5000), authkey=b 'abc' )
- # キューを開始します:
- マネージャー.開始()
- # ネットワーク経由でアクセスされたキュー オブジェクトを取得します。
- タスク = manager.get_task_queue()
- 結果 = manager.get_result_queue()
- # いくつかのタスクを入力します:
- iが範囲(10)内にある場合:
- n = ランダム.randint(0, 10000)
- print( 'タスク%dを実行...' % n)
- タスクを実行する
- # 結果キューから結果を読み取ります:
- print( '結果を取得してみてください...' )
- iが範囲(10)内にある場合:
- r = 結果.get(タイムアウト=10)
- print( '結果: %s' % r)
- # 閉鎖:
- マネージャー.シャットダウン()
- print( 'マスター終了。' )
単一のマシン上でマルチプロセス プログラムを作成する場合、作成されたキューを直接使用できることに注意してください。ただし、分散マルチプロセス環境では、Queue へのタスクの追加は元の task_queue で直接行うことはできません。これは、QueueManager のカプセル化をバイパスするため、manager.get_task_queue() を通じて取得された Queue インターフェイスを通じて追加する必要があるためです。次に、別のマシンでタスクプロセスを開始します(このマシンでも開始できます) task_worker.pyを書く - #! /ユーザー/bin/pytthon
- # -*- コーディング:utf-8 -*-
- # @時間: 2018/3/3 16:46
- # @著者: lichexo
- # @ファイル: task_worker.py
- インポート時間、システム、キュー
- multiprocessing.managersからBaseManager をインポートします
- # 同様の QueueManager を作成します。
- クラス QueueManager(BaseManager):
- 合格
- # この QueueManager はネットワークからキューを取得するだけなので、登録時には名前のみが提供されます。
- QueueManager.register( 'get_task_queue' )を登録します。
- QueueManager.register( 'get_result_queue' )を登録します。
- # task_master.py を実行しているマシンであるサーバーに接続します。
- サーバーアドレス = '127.0.0.1'
- print( 'サーバー%sに接続...' % server_addr)
- # ポートと検証コードが task_master.py で設定されているものとまったく同じであることを確認します。
- m = QueueManager(アドレス=(server_addr, 5000), authkey=b 'abc' )
- # ネットワークから接続:
- m.connect () 関数
- # キューオブジェクトを取得します:
- タスク = m.get_task_queue()
- 結果 = m.get_result_queue()
- # タスク キューからタスクを取得し、結果を結果キューに書き込みます。
- iが範囲(10)内にある場合:
- 試す:
- n = タスク.get(タイムアウト=1)
- print( 'タスク %d * %d を実行...' % (n, n))
- r = '%d * %d = %d' % (n, n, n*n)
- 時間.sleep(1)
- 結果.put(r)
- Queue.Emptyを除く:
- print( 'タスクキューは空です。' )
- # 処理終了:
- print( 'ワーカー終了。' )
タスク プロセスはネットワーク経由でサービス プロセスに接続する必要があるため、サービス プロセスの IP を指定する必要があります。 運用結果 これで、職場で分散プロセスを試すことができます。まず、task_master.py サービス プロセスを開始します。 - トレースバック(最新の呼び出しが最後):
- ファイル"F:/Python/untitled/xianchengjincheng/master.py" 、行 25、 <module>
- マネージャー.開始()
- ファイル「F:Pythonpystalllibmultiprocessingmanagers.py」 、行 513 、開始
- 自己._プロセス.開始()
- ファイル「F:Pythonpystalllibmultiprocessingprocess.py」 、行 105 、開始
- self._popen = self._popen(self)
- ファイル"F:Pythonpystalllibmultiprocessingcontext.py" 、行 322、 _Popen内
- Popen(process_obj)を返す
- ファイル"F:Pythonpystalllibmultiprocessingpopen_spawn_win32.py" 、65 行目、 __init__内
- 削減.ダンプ(process_obj、to_child)
- ファイル "F:Pythonpystalllibmultiprocessing
- ダンプ内のeduction.py、60行目
- ForkingPickler(ファイル、プロトコル).dump(obj)
- _pickle.PicklingError: <関数<lambda> at 0x00000202D1921E18> をピクルできません: __main__の属性検索 <lambda>に失敗しました
task_master.py プロセスはタスクを送信した後、結果キューからの結果を待機し始めます。次に、task_worker.py プロセスを開始します。 - 接続する サーバー 127.0.0.1へ...
- トレースバック(最新の呼び出しが最後):
- ファイル"F:/Python/untitled/xianchengjincheng/work.py" 、行 24、 <module>
- m.connect () 関数
- ファイル「F:Pythonpystalllibmultiprocessingmanagers.py」 、行489 、 接続する
- conn = クライアント(self._address, authkey=self._authkey)
- ファイル「F:Pythonpystalllibmultiprocessingconnection.py」 、行 487、クライアント内
- c = SocketClient(アドレス)
- ファイル"F:Pythonpystalllibmultiprocessingconnection.py" 、行614、 SocketClient
- s。接続(アドレス)
- ConnectionRefusedError: [WinError 10061] ターゲット マシンが積極的に拒否したため、接続できませんでした。
ほら、結果はすべて間違っています。何が悪かったのか分析してみましょう。 。 。 エラー分析 task_master.py のエラー メッセージには、lambda エラーと表示されていることがわかります。これは、シリアル化が匿名関数をサポートしていないため、コードを変更し、QueueManager を使用してキューを再カプセル化してネットワークに配置する必要があるためです。 - # 両方のキューをネットワークに登録します。呼び出し可能なパラメータは、Queue オブジェクトに関連付けられています。
- QueueManager.register( 'get_task_queue' 、呼び出し可能=return_task_queue)
- QueueManager.register( 'get_result_queue' 、呼び出し可能=return_result_queue)
このうち、task_queue と result_queue は、それぞれタスクと結果を格納する 2 つのキューです。プロセス間通信やオブジェクトの交換に使用されます。 分散環境であるため、キューに入れられたデータは、読み取る前にワーカー マシンが処理するのを待つ必要があります。このように、キューは QueueManager でカプセル化され、ネットワークに配置される必要があります。これは上記の 2 行のコードによって実現されます。 return_task_queue のネットワーク呼び出しインターフェースを get_task_queue と名付け、return_result_queue の名前を get_result_queue とすることで、どのキューに対して操作が行われているかを区別しやすくしています。 task.put(n) は task_queue にデータを書き込むもので、タスクを割り当てることと同じです。 result.get() は、ワーカー マシンによって処理された後に返される結果を待機します。 Windows では IP アドレスを入力する必要がありますが、Linux などの他のオペレーティング システムでは入力する必要がないことに注意してください。 - # WindowsはIPアドレスを書き込む必要があります
- マネージャー = QueueManager(アドレス=( '127.0.0.1' , 5000), authkey=b 'abc' )
修正されたコード task_master.py を次のように変更します。 - #! /ユーザー/bin/pytthon
- # -*- コーディング:utf-8 -*-
- # @時間: 2018/3/3 16:46
- # @著者: lichexo
- # @ファイル: task_master.py
- #タスクマスター.py
- ランダム、時間、キューをインポートする
- multiprocessing.managersからBaseManager をインポートします
- マルチプロセッシングからfreeze_supportをインポート
- task_queue = queue.Queue() # タスクキューを送信します:
- result_queue = queue.Queue() # 結果を受信するためのキュー:
- class QueueManager(BaseManager): # QueueManager は BaseManager から継承されます:
- 合格
- # Windowsで実行
- return_task_queue() を定義します:
- グローバルタスク キュー
- return task_queue # 送信タスクキューに戻る
- return_result_queue() を定義します:
- グローバル結果キュー
- return result_queue # 受信結果キューを返します
- デフテスト():
- # 両方のキューをネットワークに登録します。呼び出し可能なパラメータは、Queue オブジェクトに関連付けられています。プロセス間通信やオブジェクト交換に使用されます。
- #QueueManager.register( 'get_task_queue' 、呼び出し可能=lambda: task_queue)
- #QueueManager.register( 'get_result_queue' 、呼び出し可能=lambda: result_queue)
- QueueManager.register( 'get_task_queue' 、呼び出し可能=return_task_queue)
- QueueManager.register( 'get_result_queue' 、呼び出し可能=return_result_queue)
- # ポート5000をバインドし、検証コード「abc」を設定します:
- #manager = QueueManager(アドレス=( '' , 5000), authkey=b 'abc' )
- # WindowsはIPアドレスを書き込む必要があります
- マネージャー = QueueManager(アドレス=( '127.0.0.1' , 5000), authkey=b 'abc' )
- manager.start() # キューを開始:
- # ネットワーク経由でアクセスされたキュー オブジェクトを取得します。
- タスク = manager.get_task_queue()
- 結果 = manager.get_result_queue()
- for i in range(10): # いくつかのタスクを入れます:
- n = ランダム.randint(0, 10000)
- print( 'タスク%dを実行...' % n)
- タスクを実行する
- # 結果キューから結果を読み取ります:
- print( '結果を取得してみてください...' )
- iが範囲(10)内にある場合:
- # 例外キャプチャがここに追加されます
- 試す:
- r = 結果.get(タイムアウト=5)
- print( '結果: %s' % r)
- キューを除く。空:
- print( '結果キューは空です。' )
- # 閉鎖:
- マネージャー.シャットダウン()
- print( 'マスター終了。' )
- __name__ == '__main__'の場合:
- フリーズサポート()
- print( '開始!' )
- テスト()
task_worker.py を次のように変更します。 - #! /ユーザー/bin/pytthon
- # -*- コーディング:utf-8 -*-
- # @時間: 2018/3/3 16:46
- # @著者: lichexo
- # @ファイル: task_worker.py
- #タスクワーカー.py
- インポート時間、システム、キュー
- multiprocessing.managersからBaseManager をインポートします
- # 同様の QueueManager を作成します。
- クラス QueueManager(BaseManager):
- 合格
- # この QueueManager はネットワークからキューを取得するだけなので、登録時には名前のみが提供されます。
- QueueManager.register( 'get_task_queue' )を登録します。
- QueueManager.register( 'get_result_queue' )を登録します。
- # task_master.py を実行しているマシンであるサーバーに接続します。
- サーバーアドレス = '127.0.0.1'
- print( 'サーバー%sに接続...' % server_addr)
- # ポートと検証コードが task_master.py で設定されているものとまったく同じであることを確認します。
- m = QueueManager(アドレス=(server_addr, 5000), authkey=b 'abc' )
- # ネットワークから接続:
- m.connect () 関数
- # キューオブジェクトを取得します:
- タスク = m.get_task_queue()
- 結果 = m.get_result_queue()
- # タスク キューからタスクを取得し、結果を結果キューに書き込みます。
- iが範囲(10)内にある場合:
- 試す:
- n = タスク.get(タイムアウト=1)
- print( 'タスク %d * %d を実行...' % (n, n))
- r = '%d * %d = %d' % (n, n, n*n)
- 時間.sleep(1)
- 結果.put(r)
- キューを除く。空:
- print( 'タスクキューは空です。' )
- # 処理終了:
- print( 'ワーカー終了。' )
まずtask_master.pyを実行し、次にtask_worker.pyを実行します。 (1)task_master.pyの実行結果は以下の通りである。 - 始める!
- タスク 7872 を配置します...
- タスク 6931 を配置します...
- タスク 1395 を配置します...
- タスク 8477 を配置します...
- タスク 8300 を配置します...
- タスク 1597 を配置します...
- タスク 8738 を配置します...
- タスク 8627 を配置します...
- タスク 1884 を配置します...
- タスク 2561 を配置します...
- 結果を取得してみてください...
- 結果: 7872 * 7872 = 61968384
- 結果: 6931 * 6931 = 48038761
- 結果: 1395 * 1395 = 1946025
- 結果: 8477 * 8477 = 71859529
- 結果: 8300 * 8300 = 68890000
- 結果: 1597 * 1597 = 2550409
- 結果: 8738 * 8738 = 76352644
- 結果: 8627 * 8627 = 74425129
- 結果: 1884 * 1884 = 3549456
- 結果: 2561 * 2561 = 6558721
- マスター出口。
(2)task_worker.pyの実行結果は以下の通りである。 - 接続する サーバー 127.0.0.1へ...
- タスク 8640 * 8640 を実行します...
- タスク 7418 * 7418 を実行します...
- タスク 9303 * 9303 を実行します...
- タスク 568 * 568 を実行します...
- タスク 1633 * 1633 を実行します...
- タスク 3583 * 3583 を実行します...
- タスク 3293 * 3293 を実行します...
- タスク 8975 * 8975 を実行します...
- タスク 8189 * 8189 を実行します...
- タスク 731 * 731 を実行します...
- 作業員の退出。
知識補足 このシンプルなマスター/ワーカー モデルの用途は何でしょうか?実際、これはシンプルですが、真に分散されたコンピューティングです。コードを少し変更して複数のワーカーを起動することで、タスクを数台または数十台のマシンに分散できます。たとえば、n*n を計算するコードを電子メールの送信に置き換えると、電子メール キューの非同期送信を実現できます。 Queue オブジェクトはどこに保存されますか? task_worker.py には Queue を作成するコードがないため、Queue オブジェクトは task_master.py プロセスに保存されることに注意してください。 キューにネットワーク経由でアクセスできるのは、QueueManager を介しているためです。 QueueManager は複数のキューを管理するため、各キューのネットワーク呼び出しインターフェースには get_task_queue などの名前を付ける必要があります。 task_worker に登録された QueueManager の名前は、task_manager の名前と同じである必要があります。上記の例と比較すると、Queue オブジェクトがネットワーク経由で別のプロセスから渡されることがわかります。唯一の違いは、ここでの送信とネットワーク通信は QueueManager によって完了することです。 authkey の用途は何ですか?これは、他のマシンによって悪意を持って干渉されることなく、2 台のマシンが正常に通信できることを保証するためです。 task_worker.py の authkey が task_master.py の authkey と一致しない場合、接続は確実に失敗します。 |