[[385390]] 1 はじめに みなさんこんにちは。私はボス・ウーです。 Celery の公式説明によれば、Celery は分散共有中間キューやスケジュールされたタスクなどに適用できる非常に優れた分散キューです。 2 バージョンの違い Celery には多くのバージョンがあり、それらの違いはかなり大きくなります。たとえば、最新の Celery 6.0 は Celery 4.0 よりもはるかに安定性が低くなります。したがって、異なるバージョンを使用すると、システムから提供されるフィードバックが期待どおりにならない可能性があります。 3 サービス Windows でハングしている Celery サービスが不安定になる場合があります (現時点では Unix ではこの状況は確認されていません)。たとえば、スケジュールされたタスクを実行すると、一定時間が経過すると Celery が一時停止状態になり、指定された時間にタスクを実行できなくなります。 これらのタスクは実行キューに追加されるだけで(Redis に蓄積され)、蓄積されたタスクは Celery サービスを手動で再起動した後にのみ解放して実行できます。 この場合、まず、スケジュールされたタスクは指定された時点で正常に実行されません。第二に、これらのタスクが他の時間に実行されると、データが時間内に更新されず、時間ノードが混乱する可能性があります。ビジネスニーズが満たされないだけでなく、逆効果にもなります。 4 ハートビートを設定する Windows の Celery のこの問題を解決するには、Celery タスク キューのハートビート時間を設定し、1 分ごとまたは 5 分ごとに Redis データベースにデータを送信するなどして、キューが常にアクティブになるようにします。この方法では、コンピューターがシャットダウンされず、ネットワークが開いたままになっている限り (リモート Redis の場合)、Celery タスク キュー サービスは中断状態になりません。 5. 例 私は物事を説明するときにいつも例を使うのが好きです。以前、あるプラットフォームのマーチャントバックエンドからデータを収集していたとき、そのウェブサイトを使用する際にそのウェブサイトのクッキーを自動的に取得するために、 私は Pyppeteer を使用して自動ログイン スクリプトを作成しました。これは通常どおり Celery キューに残っており、すぐにサービスを開始します。 スクリプトは次のようになります(実際の疑似コードに非常に近い、選択の余地なし、命が大切) - # -*- コーディング: utf-8 -*-
- db.redisCurdからRedisQueueをインポート
- 非同期インポート
- ランダムにインポート
- tkinterをインポートする
- pyppeteer.launcherからlaunchをインポート
- platLogin.configからUSERNAME、 PASSWORD 、LOGIN_URL をインポートします。
-
- クラスLogin():
- def __init__(self, ショップID):
- self.shopId = ショップID
- self.RedisQueue = RedisQueue( "クッキー" )
-
- スクリーンサイズを定義します。
- tk = tkinter.Tk()
- 幅 = tk.winfo_screenwidth()
- 高さ = tk.winfo_screenheight()
- tk.終了()
- 戻り値: { 'width' :幅、 'height' :高さ}
-
- 非同期定義ログイン(自分自身、ユーザー名、パスワード、URL):
- ブラウザ = 起動を待つ(
- {
- 'headless' : False 、
- 'dumpio' :本当
- },
- args=[ '--no-sandbox' , '--disable-infobars' , '--user-data-dir=./userData' ],
- )
- page = await browser.newPage() # 新しいブラウザページを開始する
-
- 試す:
- page.setViewport(viewport=self.screen_size()) を待機します。
- await page.setJavaScriptEnabled(enabled= True ) # jsを有効にする
- ページを待つ.setUserAgent(
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML、Gecko に類似) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299'
- )
- 自己ページ評価を待機します(ページ)
- await page.goto (URL)を待ちます
- asyncio.sleep(2) を待つ
- # ユーザー名とパスワードを入力してください
- page.evaluate(f 'document.querySelector("#userName").value=""' )を待機します。
- await page.type( '#userName' , username, { 'delay' : self.input_time_random() - 50}) # delayは入力を制限する時間です
- page.evaluate( 'document.querySelector("#passWord").value=""' )を待ちます。
- page.type( '#passWord' 、パスワード、 { 'delay' : self.input_time_random()})を待機します。
- ページを待つ.waitFor(6000)
-
- loginImgVcode = page.waitForSelector( '#checkCode' )を待機します
- loginImgVcode.screenshot({ 'path' : './loginImg.png' })を待ちます。
- ページを待つ.waitFor(6000)
-
- res = use_cjy( "./loginImg.png" )
- pic_str = res.get( "pic_str" ) の場合、 res.get( "err_str" ) == "OK" それ以外 「1234」
-
- ページを待つ.waitFor(6000)
- page.type( '#checkWord' , pic_str, { 'delay' : self.input_time_random() - 50})を待機します。
- ページを待つ.waitFor(6000)
-
- page.click( '#subMit' )を待ちます
- ページを待つ.waitFor(6000)
- asyncio.sleep(2) を待つ
- self.get_cookie(ページ) を待機します。
- ページを待つ.waitFor(3000)
- self.page_close(ブラウザ)を待つ
- 戻り値: { 'code' : 200, 'msg' : 'ログイン成功' }
- を除外する:
- 戻り値: { 'code' : -1, 'msg' : 'エラー' }
-
- ついに:
- ページを待つ.waitFor(3000)
- self.page_close(ブラウザ)を待つ
-
- # ログイン後にクッキーを取得する
- 非同期def get_cookie(self, page):
- cookies_list = page.cookies() を待つ
- クッキー = ''
- cookies_list内のcookieの場合:
- str_cookie = '{0}={1}; '
- str_cookie = str_cookie.format(cookie.get( '名前' ), cookie.get( '値' ))
- クッキー += str_cookie
- # クッキーをクッキープールに入れる
- self.RedisQueue.put_hash(self.shopId、クッキー)
- クッキーを返す
-
- 非同期def page_evaluate(self, page):
- page.evaluate( '' '() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }' '' )を待ちます。
- page.evaluate( '' '() =>{ window.navigator.chrome = { ランタイム: {}, }; }' '' )を待機します。
- ページを待つ.evaluate(
- '' '() =>{ Object.defineProperty(navigator, ' languages ', { get: () => [' en-US ', ' en '] }); }' '' )
- ページを待つ.evaluate(
- '' '() =>{ Object.defineProperty(navigator, ' plugins ', { get: () => [1, 2, 3, 4, 5,6], }); }' '' )
- ページを待つ.waitFor(3000)
-
- 非同期定義page_close(self, browser):
- _pageについては、 browser.pages() を待機します:
- _page.close ()を待つ
- ブラウザを待機します。近い()
-
- 定義: input_time_random(self):
- random.randint(100, 151)を返します。
-
- def run(self, ユーザー名=ユーザー名,パスワード=パスワード, url=ログインURL):
- ループ = asyncio.get_event_loop()
- i_future = asyncio.ensure_future(self.login(ユーザー名、パスワード、URL))
- loop.run_until_complete(i_future)
- i_future.result()を返す
-
-
- __name__ == '__main__'の場合:
- Z = ログイン(ショップID = "001" )
- Z.run()
Celeryタスクファイルは次のようになります - # -*- コーディング: utf-8 -*-
- __future__からabsolute_import をインポートする
- インポートOS
- インポートシステム
- インポート時間
- db.redisCurdからRedisQueueをインポート
- send_msg.weinxinからSend_msg をインポート
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(ベースディレクトリ)
- logger.loggerからlog_vをインポートします
- セロリのインポートタスク
- from platLogin.login import Login # ログインクラス
- セロリ輸入セロリ
-
- ランダムキュー = RedisQueue( "クッキー" )
-
- celery_app = Celery( 'タスク' )
- celery_app.config_from_object( 'celeryConfig' )
-
- S = Send_msg()
-
- dl_dict = {
- 'デモ' : {
- 'クッキー' : '' ,
- 'loginClass' : 'ログイン' 、
- }
- }
-
- # todo これらは3つの実行状態です
- クラスtask_status(タスク):
- on_success を定義します(self、retval、task_id、args、kwargs):
- log_v.info( 'タスク情報 -> id:{}, arg:{}, 成功しました.....完了しました'.format (task_id, args))
-
- on_failure を定義します(self、exc、task_id、args、kwargs、einfo):
- log_v.error( 'タスクID:{}、引数:{}、失敗しました! エラー: {}' .format(task_id, args, exc))
-
- on_retry を定義します(self、exc、task_id、args、kwargs、einfo):
- log_v.warning( 'タスクID:{}、引数:{}、再試行!情報: {}' .format(task_id, args, exc))
-
-
- # todoポーリング オブジェクトとしてハッシュキーを選択するだけです。 Celery は Windows 10 システムでは安定しない可能性があり、接続が切断されることがあります。
- @celery_app.task(ベース=タスクステータス)
- def get_cookie_status(platName= "demo" ):
- 試す:
- # log_v.debug(f '[+] Poll {platName} タイマーが開始されました.....完了' )
- ランダムキュー.get_hash(platName).decode()
- log_v.debug(f '[+] {platName} のポーリングが正常に完了しました..... 完了' )
- 戻る 「ERP 投票成功」
- を除外する:
- 戻る 「ERP ポーリングに失敗しました」
-
-
- @celery_app.task(ベース=タスクステータス)
- def set_plat_cookie(platName= "demo" 、shopId=None):
- log_v.debug(f "[+] {platName} がログイン中です" )
- コア = eval(dl_dict[platName][ 'loginClass' ])(shopId=shopId)
- 結果 = core.run()
- 結果を返す
Celery 構成ファイルは次のようになります。 - __future__からabsolute_import をインポートする
- 日時をインポート
- kombuインポートExchange、キューから
- celery.schedulesからcrontabをインポートする
- urllibからインポート解析
-
- BROKER_URL = f'redis://root:{parse.quote("不規則なパスワード")}@host:6379/15'
-
- # タスクをインポートします(tasks.py など)
- CELERY_IMPORTS = ( 'monitor.tasks' ,)
-
- #列化されたタスクペイロードのデフォルトのシリアル化方法
- CELERY_TASK_SERIALIZER = 'json'
-
- # 結果のシリアル化方法
- CELERY_RESULT_SERIALIZER = 'json'
- CELERY_ACCEPT_CONTENT = [ 'json' ]
-
- CELERY_TIMEZONE = 'Asia/Shanghai' # タイムゾーンを指定します。指定しない場合は、デフォルトは「UTC」です。
- # CELERY_TIMEZONE = 'UTC'
-
- セレリービートスケジュール = {
- '60秒ごとに追加' :{
- 'タスク' : 'tasks.get_cookie_status' 、
- 'schedule' : datetime.timedelta(minutes=1), # 1分ごとに実行
- 'args' : () # タスク関数のパラメータ
- },
- }
サービスを開始する - セロリ -A タスク ビート -l INFO
- セロリ -A タスクワーカー -l INFO -c 2
2 つのスレッドでコンシューマー キュー サービスを開始し、スケジュールされたタスクを有効にします。現在のプラットフォームの Cookie が利用できないことが判明した場合は、Celery にシグナルを送信します (つまり、前の set_plat_cookie メソッドを呼び出します)。コンシューマーがこのタスクを取得すると、自動化されたスクリプトが実行され、Cookie が取得されて Redis に保存されます。使用する際には、Redis から取得し、プラットフォームのデータを正常に要求することができます。 アイドル時間中、Celery の get_cookie_status メソッドは 1 分ごとに Redis にデータを要求します。これは私たちが設定した 1 分間のハートビートです。 この方法では、Celery がバックグラウンドで起動されているかどうかに関係なく、誤った停止やスタック状態は発生せず、すべてが正常になります。 6 結論 Windows の Celery のこの問題を解決するために、この記事では、Celery タスク キューのハートビート時間を設定し、1 分ごとに 1 回または 5 分ごとに 1 回、Redis データベースにデータを送信するなどして、キューが常にアクティブであることを確認します。この方法では、コンピューターがシャットダウンされておらず、ネットワークが妨げられていない限り (リモート Redis の場合)、Celery タスク キュー サービスは中断または停止しているようには見えません。 |