Windows で分散キュー Celery のハートビート ポーリングを設定する手順を説明します。

Windows で分散キュー Celery のハートビート ポーリングを設定する手順を説明します。

[[385390]]

1 はじめに

みなさんこんにちは。私はボス・ウーです。 Celery の公式説明によれば、Celery は分散共有中間キューやスケジュールされたタスクなどに適用できる非常に優れた分散キューです。

2 バージョンの違い

Celery には多くのバージョンがあり、それらの違いはかなり大きくなります。たとえば、最新の Celery 6.0 は Celery 4.0 よりもはるかに安定性が低くなります。したがって、異なるバージョンを使用すると、システムから提供されるフィードバックが期待どおりにならない可能性があります。

3 サービス

Windows でハングしている Celery サービスが不安定になる場合があります (現時点では Unix ではこの状況は確認されていません)。たとえば、スケジュールされたタスクを実行すると、一定時間が経過すると Celery が一時停止状態になり、指定された時間にタスクを実行できなくなります。

これらのタスクは実行キューに追加されるだけで(Redis に蓄積され)、蓄積されたタスクは Celery サービスを手動で再起動した後にのみ解放して実行できます。

この場合、まず、スケジュールされたタスクは指定された時点で正常に実行されません。第二に、これらのタスクが他の時間に実行されると、データが時間内に更新されず、時間ノードが混乱する可能性があります。ビジネスニーズが満たされないだけでなく、逆効果にもなります。

4 ハートビートを設定する

Windows の Celery のこの問題を解決するには、Celery タスク キューのハートビート時間を設定し、1 分ごとまたは 5 分ごとに Redis データベースにデータを送信するなどして、キューが常にアクティブになるようにします。この方法では、コンピューターがシャットダウンされず、ネットワークが開いたままになっている限り (リモート Redis の場合)、Celery タスク キュー サービスは中断状態になりません。

5. 例

私は物事を説明するときにいつも例を使うのが好きです。以前、あるプラットフォームのマーチャントバックエンドからデータを収集していたとき、そのウェブサイトを使用する際にそのウェブサイトのクッキーを自動的に取得するために、

私は Pyppeteer を使用して自動ログイン スクリプトを作成しました。これは通常どおり Celery キューに残っており、すぐにサービスを開始します。

スクリプトは次のようになります(実際の疑似コードに非常に近い、選択の余地なし、命が大切)

  1. # -*- コーディング: utf-8 -*-
  2. db.redisCurdからRedisQueueをインポート
  3. 非同期インポート
  4. ランダムにインポート
  5. tkinterをインポートする
  6. pyppeteer.launcherからlaunchをインポート
  7. platLogin.configからUSERNAME、 PASSWORD 、LOGIN_URL をインポートします。
  8.  
  9. クラスLogin():
  10. def __init__(self, ショップID):
  11. self.shopId = ショップID
  12. self.RedisQueue = RedisQueue( "クッキー" )
  13.  
  14. スクリーンサイズを定義します。
  15. tk = tkinter.Tk()
  16. 幅 = tk.winfo_screenwidth()
  17. 高さ = tk.winfo_screenheight()
  18. tk.終了()
  19. 戻り値: { 'width' :幅、 'height' :高さ}
  20.  
  21. 非同期定義ログイン(自分自身、ユーザー名、パスワード、URL):
  22. ブラウザ = 起動を待つ(
  23. {
  24. 'headless' : False
  25. 'dumpio' :本当 
  26. },
  27. args=[ '--no-sandbox' , '--disable-infobars' , '--user-data-dir=./userData' ],
  28. page = await browser.newPage() # 新しいブラウザページを開始する
  29.  
  30. 試す:
  31. page.setViewport(viewport=self.screen_size()) を待機します。
  32. await page.setJavaScriptEnabled(enabled= True ) # jsを有効にする
  33. ページを待つ.setUserAgent(
  34. 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML、Gecko に類似) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299'  
  35. 自己ページ評価を待機します(ページ)
  36. await page.goto (URL)を待ちます
  37. asyncio.sleep(2) を待つ
  38. # ユーザー名とパスワードを入力してください
  39. page.evaluate(f 'document.querySelector("#userName").value=""' )を待機します。
  40. await page.type( '#userName' , username, { 'delay' : self.input_time_random() - 50}) # delayは入力を制限する時間です
  41. page.evaluate( 'document.querySelector("#passWord").value=""' )を待ちます。
  42. page.type( '#passWord' パスワード、 { 'delay' : self.input_time_random()})を待機します。
  43. ページを待つ.waitFor(6000)
  44.  
  45. loginImgVcode = page.waitForSelector( '#checkCode' )を待機します
  46. loginImgVcode.screenshot({ 'path' : './loginImg.png' })を待ちます。
  47. ページを待つ.waitFor(6000)
  48.  
  49. res = use_cjy( "./loginImg.png" )
  50. pic_str = res.get( "pic_str" ) の場合、 res.get( "err_str" ) == "OK"  それ以外  「1234」  
  51.  
  52. ページを待つ.waitFor(6000)
  53. page.type( '#checkWord' , pic_str, { 'delay' : self.input_time_random() - 50})を待機します。
  54. ページを待つ.waitFor(6000)
  55.  
  56. page.click( '#subMit' )を待ちます
  57. ページを待つ.waitFor(6000)
  58. asyncio.sleep(2) を待つ
  59. self.get_cookie(ページ) を待機します。
  60. ページを待つ.waitFor(3000)
  61. self.page_close(ブラウザ)を待つ
  62. 戻り値: { 'code' : 200, 'msg' : 'ログイン成功' }
  63. を除外する
  64. 戻り値: { 'code' : -1, 'msg' : 'エラー' }
  65.  
  66. ついに:
  67. ページを待つ.waitFor(3000)
  68. self.page_close(ブラウザ)を待つ
  69.  
  70. # ログイン後にクッキーを取得する
  71. 非同期def get_cookie(self, page):
  72. cookies_list = page.cookies() を待つ
  73. クッキー = ''  
  74. cookies_list内のcookieの場合:
  75. str_cookie = '{0}={1}; '  
  76. str_cookie = str_cookie.format(cookie.get( '名前' ), cookie.get( '値' ))
  77. クッキー += str_cookie
  78. # クッキーをクッキープールに入れる
  79. self.RedisQueue.put_hash(self.shopId、クッキー)
  80. クッキーを返す
  81.  
  82. 非同期def page_evaluate(self, page):
  83. page.evaluate( '' '() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }' '' )を待ちます。
  84. page.evaluate( '' '() =>{ window.navigator.chrome = { ランタイム: {}, }; }' '' )を待機します。
  85. ページを待つ.evaluate(
  86. '' '() =>{ Object.defineProperty(navigator, ' languages ​​', { get: () => [' en-US ', ' en '] }); }' '' )
  87. ページを待つ.evaluate(
  88. '' '() =>{ Object.defineProperty(navigator, ' plugins ', { get: () => [1, 2, 3, 4, 5,6], }); }' '' )
  89. ページを待つ.waitFor(3000)
  90.  
  91. 非同期定義page_close(self, browser):
  92. _pageについては、 browser.pages() を待機します:
  93. _page.close ()を待つ
  94. ブラウザを待機します。近い()
  95.  
  96. 定義: input_time_random(self):
  97. random.randint(100, 151)を返します
  98.  
  99. def run(self, ユーザー名=ユーザー名,パスワード=パスワード, url=ログインURL):
  100. ループ = asyncio.get_event_loop()
  101. i_future = asyncio.ensure_future(self.login(ユーザー名、パスワード、URL))
  102. loop.run_until_complete(i_future)
  103. i_future.result()を返す
  104.  
  105.  
  106. __name__ == '__main__'の場合:
  107. Z = ログイン(ショップID = "001" )
  108. Z.run()

Celeryタスクファイルは次のようになります

  1. # -*- コーディング: utf-8 -*-
  2. __future__からabsolute_import をインポートする
  3. インポートOS
  4. インポートシステム
  5. インポート時間 
  6. db.redisCurdからRedisQueueをインポート
  7. send_msg.weinxinからSend_msg をインポート
  8. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  9. sys.path.append(ベースディレクトリ)
  10. logger.loggerからlog_vをインポートします
  11. セロリインポートタスク
  12. from platLogin.login import Login # ログインクラス
  13. セロリ輸入セロリ
  14.  
  15. ランダムキュー = RedisQueue( "クッキー" )
  16.  
  17. celery_app = Celery( 'タスク' )
  18. celery_app.config_from_object( 'celeryConfig' )
  19.  
  20. S = Send_msg()
  21.  
  22. dl_dict = {
  23. 'デモ' : {
  24. 'クッキー' : '' ,
  25. 'loginClass' : 'ログイン'
  26. }
  27. }
  28.  
  29. # todo これらは3つの実行状態です
  30. クラスtask_status(タスク):
  31. on_success を定義します(self、retval、task_id、args、kwargs):
  32. log_v.info( 'タスク情報 -> id:{}, arg:{}, 成功しました.....完了しました'.format (task_id, args))
  33.  
  34. on_failure を定義します(self、exc、task_id、args、kwargs、einfo):
  35. log_v.error( 'タスクID:{}、引数:{}、失敗しました! エラー: {}' .format(task_id, args, exc))
  36.  
  37. on_retry を定義します(self、exc、task_id、args、kwargs、einfo):
  38. log_v.warning( 'タスクID:{}、引数:{}、再試行!情報: {}' .format(task_id, args, exc))
  39.  
  40.  
  41. # todoポーリング オブジェクトとしてハッシュキーを選択するだけです。 Celery は Windows 10 システムでは安定しない可能性があり、接続が切断されることがあります。
  42. @celery_app.task(ベース=タスクステータス)
  43. def get_cookie_status(platName= "demo" ):
  44. 試す:
  45. # log_v.debug(f '[+] Poll {platName} タイマーが開始されました.....完了' )
  46. ランダムキュー.get_hash(platName).decode()
  47. log_v.debug(f '[+] {platName} のポーリングが正常に完了しました..... 完了' )
  48. 戻る  「ERP 投票成功」  
  49. を除外する
  50. 戻る  「ERP ポーリングに失敗しました」  
  51.  
  52.  
  53. @celery_app.task(ベース=タスクステータス)
  54. def set_plat_cookie(platName= "demo" 、shopId=None):
  55. log_v.debug(f "[+] {platName} がログイン中です" )
  56. コア = eval(dl_dict[platName][ 'loginClass' ])(shopId=shopId)
  57. 結果 = core.run()
  58. 結果を返す

Celery 構成ファイルは次のようになります。

  1. __future__からabsolute_import をインポートする
  2. 日時をインポート
  3. kombuインポートExchange、キューから
  4. celery.schedulesからcrontabをインポートする
  5. urllibからインポート解析
  6.  
  7. BROKER_URL = f'redis://root:{parse.quote("不規則なパスワード")}@host:6379/15'  
  8.  
  9. # タスクをインポートします(tasks.py など)
  10. CELERY_IMPORTS = ( 'monitor.tasks' ,)
  11.  
  12. #列化されたタスクペイロードのデフォルトのシリアル化方法
  13. CELERY_TASK_SERIALIZER = 'json'  
  14.  
  15. # 結果のシリアル化方法
  16. CELERY_RESULT_SERIALIZER = 'json'  
  17. CELERY_ACCEPT_CONTENT = [ 'json' ]
  18.  
  19. CELERY_TIMEZONE = 'Asia/Shanghai' # タイムゾーンを指定します。指定しない場合は、デフォルトは「UTC」です。  
  20. # CELERY_TIMEZONE = 'UTC'  
  21.  
  22. セレリービートスケジュール = {
  23. '60秒ごとに追加' :{
  24. 'タスク' : 'tasks.get_cookie_status'
  25. 'schedule' : datetime.timedelta(minutes=1), # 1分ごとに実行
  26. 'args' : () # タスク関数のパラメータ
  27. },
  28. }

サービスを開始する

  1. セロリ -A タスク ビート -l INFO
  2. セロリ -A タスクワーカー -l INFO -c 2

2 つのスレッドでコンシューマー キュー サービスを開始し、スケジュールされたタスクを有効にします。現在のプラットフォームの Cookie が利用できないことが判明した場合は、Celery にシグナルを送信します (つまり、前の set_plat_cookie メソッドを呼び出します)。コンシューマーがこのタスクを取得すると、自動化されたスクリプトが実行され、Cookie が取得されて Redis に保存されます。使用する際には、Redis から取得し、プラットフォームのデータを正常に要求することができます。

アイドル時間中、Celery の get_cookie_status メソッドは 1 分ごとに Redis にデータを要求します。これは私たちが設定した 1 分間のハートビートです。

この方法では、Celery がバックグラウンドで起動されているかどうかに関係なく、誤った停止やスタック状態は発生せず、すべてが正常になります。

6 結論

Windows の Celery のこの問題を解決するために、この記事では、Celery タスク キューのハートビート時間を設定し、1 分ごとに 1 回または 5 分ごとに 1 回、Redis データベースにデータを送信するなどして、キューが常にアクティブであることを確認します。この方法では、コンピューターがシャットダウンされておらず、ネットワークが妨げられていない限り (リモート Redis の場合)、Celery タスク キュー サービスは中断または停止しているようには見えません。

<<:  ファーウェイがGood Vision Cloud Serviceを正式に開始、包括的なマシンビジョンの時代を先導

>>:  ファーウェイとパートナーが共同でOSS/BSS統合ラボを構築し、クラウドネットワーク産業の発展を加速

推薦する

百度が動けば世界が変わる

国内のインターネット大手の一つである百度の動きは常に何百万もの人々の関心を集めている。百度の広告収入...

#格安 VPS# 料金: $4.49/4G メモリ/2 コア (AMD EPYC 7282)/100g SSD/16T トラフィック、日本/シンガポール/米国/英国/ドイツ/オーストラリア

2003年に設立されたドイツの老舗データセンターであるContaboは、本日、VPS事業にローエンド...

Kafka の原則: 図解された Kafka アーキテクチャの原則

[[392023]]これは[Code Brother]によるKafkaシリーズの2番目の記事です。 ...

大規模ウェブサイトの 301 リダイレクトの実施方法に関する個人的な経験分析

最近、SEOディレクターグループで、友人のウェブサイトが次のような問題に遭遇したのを見ました。ウェブ...

チケット販売サイトEventbriteの売上高が10億ドルを突破

世界で最も人気のあるオンラインチケット販売サイトであるEventbriteは本日、10億ドル相当のチ...

ウェブサイトの最適化において高品質の外部リンクを取得するためのチャネルについての簡単な説明

ウェブサイトの最適化のプロセスにおいて、一部のウェブマスターにとって頭の痛いことが 1 つあります。...

Appleの声明:iCloudデータ暗号化ベンダーはアクセスできない

「AppleのiCloud中国ユーザーのデータが中国電信のクラウドストレージに移された」というニュー...

インデックスが適切に行われたウェブサイトが突然インデックスされなくなった理由の分析

実際、SEO 作業では、いくつかの問題に遭遇することがよくあります。これらの問題が単純であるかどうか...

ジャック・マーのWeChatに対する恐怖と莱王への推進の背後にあるもの

LaiWang は以前から Taobao と Tmall で強力に宣伝されており、現在は Taoba...

Chengwaiquanはマルチメディアチャネルを統合してターゲット広告とマーケティングを支援します

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っていますモバイルソ...

Hostus 香港 VPS/256M メモリ/10g ハードディスク/500g トラフィック/ソフトレイヤー/1000M ポート

Hostus は創業から 20 年になりますが、おそらく経営者は今日これほど人気が​​出るとは思って...

ツールの観点からプログラマーを見る: 優秀なプログラマーはIE6を使うべきではない

新しいソフトウェアの初期のユーザーの多くはプログラマーです。その理由は何でしょうか? プログラマーは...

Pythonを使用して分散トランザクションTCCを簡単に完了する、乳母レベルのチュートリアル

分散トランザクションとは何ですか?銀行間送金業務は、典型的な分散型トランザクションのシナリオです。 ...

ビッグデータ、人工知能、クラウドコンピューティングの統合と応用

概要: データ処理の段階的な発展を分析することにより、ビッグデータと人工知能技術の発展動向を分析しま...

Baiduを簡単に信頼せず、ウェブサイトの説明を簡単に変更しないでください

【ガイド】昨晩と今朝早くのBaiduの大きなアップデートから、私は教訓を得ました:Baiduを簡単に...