分散タスクキューCeleryの実践

分散タスクキューCeleryの実践

[[432209]]

最近の仕事でセロリに出会いました。これは、Github で 18,000 個のスターを獲得したオープンソースの分散タスク キューです。主に、アプリケーションで非同期タスクとスケジュールされたタスクを実装するために使用できます。 Python で書かれていますが、プロトコルはどの言語でも実装できます。すでに gocelery、nodecelery、celery-php が存在します。

著者は、Celery と仕事での使用についての理解をまとめるためにこの記事を書きました。この記事の主な内容は次のとおりです。

  • タスクキューとは何ですか?
  • Celery の機能
  • 職場でのセロリ。

タスクキューとは何ですか?

バックエンドの学生はすべて「メッセージキュー」に精通している必要があります。一般的なものには、RabbitMQ、RocketMQ、Kafka などがあります。 「タスク キュー」という言葉については、Celery に触れるまで聞いたことがありませんでした。タスク キューとは何ですか? また、タスク キューとメッセージ キューの関係は何ですか?これらの質問を念頭に置いて、Celery アーキテクチャを見てみましょう。

セロリ

Celery アーキテクチャでは、複数のサーバーが非同期タスク (Async Task) を開始し、そのタスクを Broker キューに送信し、その中で Celery Beat プロセスがスケジュールされたタスクを開始する役割を担っていることがわかります。タスクがブローカーに到着すると、対応する Celery ワーカーに配布され、処理されます。タスクが処理されると、その結果がバックエンドに保存されます。

上記のプロセスでは、Celery はブローカーとバックエンドを実装せず、メッセージ キュー サービスを提供するブローカーとして RabbitMQ を使用し、結果ストレージ サービスを提供するバックエンドとして Redis などの既存のオープン ソース実装を使用します。 Celery は、メッセージ キュー アーキテクチャにおける Producer と Consumer の実装を抽象化したようなもので、メッセージ キュー内の基本単位「メッセージ」をタスク キュー内の「タスク」に抽象化し、非同期タスクやスケジュール タスクの開始、結果の保存などの操作をカプセル化することで、開発者が AMQP、RabbitMQ などの実装の詳細を無視できるようにし、開発の利便性を高めます。

要約すると、タスク キューとしての Celery は、メッセージ キューに基づいてさらにカプセル化されたものであり、その実装はメッセージ キューに依存します。

次に、簡単なアプリケーションを使用して、Celery の機能を理解しましょう。

セロリの働き

アプリケーション開発では、応答速度を確保するために、プロセスに影響を与えない時間のかかる操作は非同期で処理されるのが一般的です。たとえば、ユーザー登録プロセス中は、通常、ユーザーに通知するために電子メールが非同期的に送信されます。 Celery がこの非同期操作をどのように実装するかを見てみましょう。

task.py では、メールを送信するための send_mail メソッドが宣言され、Celery が提供する @app.task デコレータが追加されます。このデコレータを使用すると、send_mail 関数を celery.app.task:Task インスタンス オブジェクトに変換できます。 Task インスタンスは、次の 2 つのコア機能を提供します。

  • キューにメッセージを送信します。
  • メッセージを受信した後にワーカーが実行する必要がある特定の関数を宣言します。
  1. セロリ輸入セロリ
  2.  
  3. アプリ = Celery( 'タスク' 、ブローカー = 'amqp://guest@localhost//' )
  4.  
  5. @app.タスク
  6. def send_mail(電子メール):
  7. print( "メールを送信する" , email)
  8. インポート時間 
  9. 時間.sleep(5)
  10. 戻る  "成功"  

タスクが定義されました。非同期タスクを開始するには、Task の delay メソッドを呼び出して、キューにメッセージを送信します。たとえば、ユーザー登録が完了すると、電子メールを送信する非同期タスクが開始されます。

  1. #ユーザー.py
  2. タスクからsend_mailをインポート
  3.  
  4. def register():
  5. print( "1. データベースにレコードを挿入する" )
  6. print( "2. Celery 経由で非同期にメールを送信する" )
  7. send_mail.delay( "[email protected]" )
  8. print( "3. 登録が成功したことをユーザーに通知します" )
  9.  
  10. __name__ == '__main__'の場合:
  11. 登録する()

上記のプログラムを実行すると、メッセージは RabbitMQ キューに送信され、そのメッセージ形式は次のようになります。

RabbitMQのタスク

Celery によってカプセル化されたメッセージには、タスク識別子と実行パラメータが含まれていることがわかります。

次に、RabbitMQ からのメッセージを消費するワーカーを起動します。

  1. セロリ -A タスクワーカー--loglevel=info  

Worker が起動すると、次の情報が印刷されます。

ワーカー開始

まずワーカーの構成情報が来ます。次にワーカーによって実行されるタスクのリストが来ます。そして RabbitMQ からメッセージを正常に取得し、対応するタスクを実行します。

上記の例を通じて、タスク キュー フレームワークとしての Celery の働きをさらに理解することができます。 「分散タスクキュー」の「分散」とは、複数のプロデューサーとコンシューマーが存在する可能性があることを意味します。つまり、複数のプロセスがブローカーにタスクを送信し、複数のワーカーがブローカーからタスクを取得して実行します。

上記は単なる簡単な例です。仕事で Celery を使用した実際の経験をいくつか見てみましょう。

セロリの活用

ビジネスシナリオに応じてキューを分割する

私が取り組んでいるプロジェクトでは、Celery を使用して、注文の配置、軌道の解析、アップストリームのプッシュなどの非同期タスクとスケジュールされたタスクを処理します。各タスクのビジネス シナリオに応じて、対応するキューを指定できます。次に例を示します。

  1. DEFAULT_CELERY_ROUTES = {
  2. 'celery_task.pending_create' : { 'キュー' : '作成' },
  3. 'celery_task.multi_create' : { 'キュー' : '作成' },
  4. 'celery_task.pull_tracking' : { 'キュー' : 'プル' },
  5. 'celery_task.pull_branch' : { 'キュー' : 'pull' },
  6. 'celery_task.push_tracking' : { 'キュー' : 'プッシュ' },
  7. 'celery_task.push_weight' : { 'キュー' : 'プッシュ' },
  8. }
  9.  
  10. CELERY_ROUTES = {
  11. デフォルト
  12. }

ビジネス シナリオに応じて、DEFAULT_CELERY_ROUTES 構成の 6 つのタスクに対応するキューを指定します。キューは合計で、作成、プル、プッシュの 3 つあります。ルーティング ルールを有効にするには、CELERY_ROUTES に追加します。この設計の目的は、異なるシナリオが互いに影響を及ぼさないようにすることです。たとえば、解析タスクのブロックは注文配置タスクに影響を与えてはなりません。

キューのさらなる分割

ビジネスシナリオに基づいて大まかな区分を行った後、特定のシナリオではさらに詳細な区分が必要になる場合があります。たとえば、アップストリームにプッシュする場合、1 つのアップストリームのブロックが他のアップストリームへのプッシュに影響するのを避けるために、異なるアップストリームが互いに影響を及ぼさないようにする必要があります。したがって、アップストリームごとに異なるキューを使用する必要があります。例:

  1. CLIENT_CELERY_ROUTES = {
  2. # {0} はクライアントのプレースホルダーで、ClientRouter でフォーマットされます
  3. 'celery_task.push_tracking_retry' : { 'キュー' : 'push_tracking_retry_{0}' },
  4. 'celery_task.push_weight_retry' : { 'キュー' : 'push_weight_retry_{0}' },
  5. }
  6.  
  7. クラスClientRouter(オブジェクト):
  8.  
  9. def route_for_task(self, task, args=None, kwargs=None):
  10. タスク  CLIENT_CELERY_ROUTES:
  11. なしを返す
  12. client_id = kwargs( 'client_id' )
  13. # client_idに基づいてキュー名を取得します
  14. queue_name = CLIENT_CELERY_ROUTES[タスク][ 'キュー' ].format(client_id)
  15. { 'キュー' :キュー名}を返します
  16.  
  17. CELERY_ROUTES = {
  18. 「クライアントルーター」  
  19. デフォルト_CELERY_ROUTES、
  20. }

CLIENT_CELERY_ROUTES では、クライアントに応じて分離する必要があるタスクと対応するキュー名の形式が指定されます。キュー名には、クライアントごとに異なるキュー名を取得するためのプレースホルダーが含まれています。

次に、ルーター ClientRouter が実装され、タスクに対応するキュー名を指定するために使用される router_for_task メソッドが定義されます。タスクが CLIENT_CELERY_ROUTES にある場合、キュー名は kwargs の client_id を使用してフォーマットされ、メッセージが最終的に送信されるキューの名前が取得され、入力パラメータ client_id に基づいて使用する特定のキューが決定され、異なるクライアントが異なるキューを使用することがないように分離されるというロジックであることがわかります。

クライアント ディメンションに基づいてキューを分割することに加えて、分離を実現するために他のディメンションに基づいてキューをさらに分割する必要がある場合は、この方法を参照してルーティング ルールを設計することもできます。

動的キュー

動的キューについてお話しましょう。それらは本質的には予備キューです。その目的は、オンライン環境内の特定のキューにメッセージが蓄積される圧力を軽減し、迅速なサポートを提供することです。構成を通じて、動的キューがサポートする必要があるキューを定義します。たとえば、プッシュ キューに大きな負荷がかかっている場合は、次のように json を構成して、push_tracking タスクと push_weight タスクを準備された動的キューにルーティングできます。

  1. celery_dynamic_router の設定
  2.  
  3. {
  4. 「celery_task.push_tracking」 : {
  5. "ダイナミックキュー" : [1,2],
  6. "ダイナミックパーセンテージ" : 0.7,
  7. },
  8. "celery_task.push_weight" : {
  9. 「ダイナミックキュー」 : [3,4],
  10. "ダイナミックパーセンテージ" : 0.7,
  11. }
  12. }

上記の構成では、celery_task.push_tracking タスクの 70% が動的キュー 1 と 2 にルーティングされ、celery_task.push_weight タスクの 70% が動的キュー 3 と 4 にルーティングされます。

動的キュー ルーター DynamicRouter は、おおよそ次のように実装されます。

  1. クラスDynamicRouter(オブジェクト):
  2.  
  3. def route_for_task(self, task, args=None, kwargs=None):
  4. # 設定を取得する
  5. task_config = get_conf_dict( 'celery_dynamic_router' ).get(task, None)
  6. # タスクが構成にない場合は、直接戻ります
  7. task_configでない場合は:
  8. なしを返す
  9. # タスクに対応する動的キュー構成を取得します
  10. dynamic_queue = task_config.get( 'dynamic_queue' , [])
  11. dynamic_percentage = task_config.get( 'dynamic_percentage' , 0.0)
  12. #一定の割合のタスクを動的キューにルーティングする
  13. random.random() <= dynamic_percentage の場合:
  14. # 使用する動的キューを決定する
  15. キュー名 = router_load_balance(動的キュー、タスク名)
  16. log.data( 'get_router| タスク名:%s、キュー:%s' 、タスク名、キュー名)
  17. { 'キュー' :キュー名}を返します
  18. それ以外
  19. なしを返す

動的に構成されたスケジュールされたタスク

前述のように、Celery は非同期タスクを実装できるだけでなく、Celery Beat を通じてスケジュールされたタスクも実装できます。まずは例を見てみましょう:

  1. celery.schedulesからcrontabをインポートする
  2.  
  3. app.conf.beat_schedule = {
  4. # 30秒ごとにメールを送信する
  5. '30秒ごとにメールを送信' :{
  6. 'タスク' : 'asks.send_mail' ,
  7. 'スケジュール' : 30.0,
  8. 'args' : [ '[email protected]' ]
  9. },
  10. }

上記の設定が完了したら、Celery Beat コマンドを実行します。

セロリビート

つまり、設定に応じて、send_email タスクは 30 秒ごとに実行されます。

上記の例では、コード内でスケジュールされたタスクを構成します。私の作業では、djcelery が提供するデータベース スケジューリング モデルを使用しましたが、これは Django が提供する ORM 機能と組み合わせることで動的に設定でき、より便利です。やり方は次のとおりです。まず、Celery 設定に以下を追加します。

  1. CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'   

DatabaseScheduler の使用を設定し、スケジュールされたタスクの構成テーブルを生成します。

  1. python manage.py 移行

次のテーブルがデータベースに追加されたことがわかります。

  1. |セロリ
  2. |セロリタスクセットメタ |
  3. | djcelery_crontabschedule |
  4. | djcelery_intervalschedule |
  5. |翻訳:
  6. |翻訳者
  7. |タスクの状態
  8. |翻訳:

上記の操作を完了したら、Celery Beat コマンドを実行するだけで、データベースから構成が読み取られ、スケジュールされたタスクが開始されます。これの利点は、タスク サイクルやパラメータを調整するなど、データベース内のレコードを変更することで、スケジュールされたタスクを動的に構成できることです。

上記は、私が仕事でCeleryを使った経験から学んだことです。非同期タスクまたはスケジュールされたタスクを実装する必要がある場合は、Celery の使用を検討してください。

私はテクノロジーと人生を愛するソウギョのCao Nieziです。また次回お会いしましょう!

参照する

メッセージ キューとタスク キューの違い (https://newbedev.com/message-queue-vs-task-queue-difference)

高性能非同期フレームワーク Celery のガイド (https://juejin.cn/post/6844903689103081480)

分散タスク キュー Celery - 詳細なタスク (https://www.cnblogs.com/jmilkfan-fanguiju/p/10589779.html)

<<:  Jiuzhou Cloudは、最も有望なエッジコンピューティング企業として「Golden Edge Award」を受賞しました。

>>:  パブリッククラウドのセキュリティについてお話ししましょう

推薦する

地域情報サイトを簡単に運営する方法

Leshan Onlineの運営を始める前に、私は比較的詳細な調査を行いました。人口数百万人の楽山市...

ウェブマスターネットワークレポート:アリババとUCWebの関係、テンセントQQグループにXSS攻撃の脆弱性が発見される

1. アリババがUCWebを完全買収したとの噂北京時間6月11日朝のニュース、米国のテクノロジーサイ...

テンセントとネットイースは秘密戦争中。ネットイースの元ゲーム幹部、魏建宏が警察に連行される

文/王静5月30日、広州オプティマスプライムネットワークテクノロジー株式会社(以下、「オプティマスプ...

tranzmedia-7 USD/OpenStack クラウド/フランスの高防御 VPS/無制限トラフィック

tranzmediaはインドのIT企業で、2005年に事業を開始したと言われていますが、具体的な情報...

企業がクラウドテクノロジーの新時代を受け入れるべき理由

企業はクラウド コンピューティングの利点だけを考慮するべきではありません。クラウド コンピューティン...

テンセントと手を組んで「行動と否定」、九洲通は情報開示規定に違反したと非難される

昨日(4月29日)、ジョインタウンは2013年度の年次報告書を発表し、テンセントとの数か月にわたる提...

中小企業向けインターネットマーケティングソリューション

電子商取引の発展に伴い、中国のビジネス分野は徐々に「有形」から「無形」へと移行しています。インターネ...

企業の中核的な競争力はどこから来るのか?成功するマーケティングは自分自身を知ることから始まります

ほとんどすべての企業のウェブサイト管理者やSEO担当者は、オリジナルコンテンツの作成や外部リンクの宣...

ソフト記事の書き方 王通記事分析

SEO 担当者の皆さん、実は私たち SEO 担当者にとって、記事を書いたりウェブサイトを更新したりす...

損失が拡大し続ける中、クラウド サービスと CDN の売却の波が差し迫っているのでしょうか?

[[249686]]自然の季節に合わせて、TMT 業界全体が急速に寒い冬に向かっています。プライマリ...

キーワードに焦点を当ててウェブサイトのコンテンツを作成することは無駄ではない

検索エンジンの大幅な拡大に伴い、キーワードの役割はますます重要になってきました。人間の社会生活のペー...

病院のBaidu入札体験の共有

まず、プロモーションを行うには、Baiduプロモーションの基礎知識を理解する必要があります。 1. ...

PieLayer - 年間 15 USD/512 MB RAM/100 GB ハード ドライブ/800 GB データ フロー/G ポート

PieLayerは創業4年の歴史を持ち、現在は米国ロサンゼルス、ニューヨーク、カンザス、ドイツフラン...

ウェブサイトの内部リンクを最適化する際には3つの重要なポイントに注意する

これまで、ウェブサイトの最適化は、通常、プロモーションや外部リンクの公開と呼ばれるオフサイト最適化に...

微博は20.24ドルで取引を終え、新規株式公開から19.06%上昇した。

4月15日、海外メディアの報道によると、新浪微博の株価は取引初日に19%急騰し、終値は20.24ドル...