分散タスクキューCeleryの実践

分散タスクキューCeleryの実践

[[432209]]

最近の仕事でセロリに出会いました。これは、Github で 18,000 個のスターを獲得したオープンソースの分散タスク キューです。主に、アプリケーションで非同期タスクとスケジュールされたタスクを実装するために使用できます。 Python で書かれていますが、プロトコルはどの言語でも実装できます。すでに gocelery、nodecelery、celery-php が存在します。

著者は、Celery と仕事での使用についての理解をまとめるためにこの記事を書きました。この記事の主な内容は次のとおりです。

  • タスクキューとは何ですか?
  • Celery の機能
  • 職場でのセロリ。

タスクキューとは何ですか?

バックエンドの学生はすべて「メッセージキュー」に精通している必要があります。一般的なものには、RabbitMQ、RocketMQ、Kafka などがあります。 「タスク キュー」という言葉については、Celery に触れるまで聞いたことがありませんでした。タスク キューとは何ですか? また、タスク キューとメッセージ キューの関係は何ですか?これらの質問を念頭に置いて、Celery アーキテクチャを見てみましょう。

セロリ

Celery アーキテクチャでは、複数のサーバーが非同期タスク (Async Task) を開始し、そのタスクを Broker キューに送信し、その中で Celery Beat プロセスがスケジュールされたタスクを開始する役割を担っていることがわかります。タスクがブローカーに到着すると、対応する Celery ワーカーに配布され、処理されます。タスクが処理されると、その結果がバックエンドに保存されます。

上記のプロセスでは、Celery はブローカーとバックエンドを実装せず、メッセージ キュー サービスを提供するブローカーとして RabbitMQ を使用し、結果ストレージ サービスを提供するバックエンドとして Redis などの既存のオープン ソース実装を使用します。 Celery は、メッセージ キュー アーキテクチャにおける Producer と Consumer の実装を抽象化したようなもので、メッセージ キュー内の基本単位「メッセージ」をタスク キュー内の「タスク」に抽象化し、非同期タスクやスケジュール タスクの開始、結果の保存などの操作をカプセル化することで、開発者が AMQP、RabbitMQ などの実装の詳細を無視できるようにし、開発の利便性を高めます。

要約すると、タスク キューとしての Celery は、メッセージ キューに基づいてさらにカプセル化されたものであり、その実装はメッセージ キューに依存します。

次に、簡単なアプリケーションを使用して、Celery の機能を理解しましょう。

セロリの働き

アプリケーション開発では、応答速度を確保するために、プロセスに影響を与えない時間のかかる操作は非同期で処理されるのが一般的です。たとえば、ユーザー登録プロセス中は、通常、ユーザーに通知するために電子メールが非同期的に送信されます。 Celery がこの非同期操作をどのように実装するかを見てみましょう。

task.py では、メールを送信するための send_mail メソッドが宣言され、Celery が提供する @app.task デコレータが追加されます。このデコレータを使用すると、send_mail 関数を celery.app.task:Task インスタンス オブジェクトに変換できます。 Task インスタンスは、次の 2 つのコア機能を提供します。

  • キューにメッセージを送信します。
  • メッセージを受信した後にワーカーが実行する必要がある特定の関数を宣言します。
  1. セロリ輸入セロリ
  2.  
  3. アプリ = Celery( 'タスク' 、ブローカー = 'amqp://guest@localhost//' )
  4.  
  5. @app.タスク
  6. def send_mail(電子メール):
  7. print( "メールを送信する" , email)
  8. インポート時間 
  9. 時間.sleep(5)
  10. 戻る  "成功"  

タスクが定義されました。非同期タスクを開始するには、Task の delay メソッドを呼び出して、キューにメッセージを送信します。たとえば、ユーザー登録が完了すると、電子メールを送信する非同期タスクが開始されます。

  1. #ユーザー.py
  2. タスクからsend_mailをインポート
  3.  
  4. def register():
  5. print( "1. データベースにレコードを挿入する" )
  6. print( "2. Celery 経由で非同期にメールを送信する" )
  7. send_mail.delay( "[email protected]" )
  8. print( "3. 登録が成功したことをユーザーに通知します" )
  9.  
  10. __name__ == '__main__'の場合:
  11. 登録する()

上記のプログラムを実行すると、メッセージは RabbitMQ キューに送信され、そのメッセージ形式は次のようになります。

RabbitMQのタスク

Celery によってカプセル化されたメッセージには、タスク識別子と実行パラメータが含まれていることがわかります。

次に、RabbitMQ からのメッセージを消費するワーカーを起動します。

  1. セロリ -A タスクワーカー--loglevel=info  

Worker が起動すると、次の情報が印刷されます。

ワーカー開始

まずワーカーの構成情報が来ます。次にワーカーによって実行されるタスクのリストが来ます。そして RabbitMQ からメッセージを正常に取得し、対応するタスクを実行します。

上記の例を通じて、タスク キュー フレームワークとしての Celery の働きをさらに理解することができます。 「分散タスクキュー」の「分散」とは、複数のプロデューサーとコンシューマーが存在する可能性があることを意味します。つまり、複数のプロセスがブローカーにタスクを送信し、複数のワーカーがブローカーからタスクを取得して実行します。

上記は単なる簡単な例です。仕事で Celery を使用した実際の経験をいくつか見てみましょう。

セロリの活用

ビジネスシナリオに応じてキューを分割する

私が取り組んでいるプロジェクトでは、Celery を使用して、注文の配置、軌道の解析、アップストリームのプッシュなどの非同期タスクとスケジュールされたタスクを処理します。各タスクのビジネス シナリオに応じて、対応するキューを指定できます。次に例を示します。

  1. DEFAULT_CELERY_ROUTES = {
  2. 'celery_task.pending_create' : { 'キュー' : '作成' },
  3. 'celery_task.multi_create' : { 'キュー' : '作成' },
  4. 'celery_task.pull_tracking' : { 'キュー' : 'プル' },
  5. 'celery_task.pull_branch' : { 'キュー' : 'pull' },
  6. 'celery_task.push_tracking' : { 'キュー' : 'プッシュ' },
  7. 'celery_task.push_weight' : { 'キュー' : 'プッシュ' },
  8. }
  9.  
  10. CELERY_ROUTES = {
  11. デフォルト
  12. }

ビジネス シナリオに応じて、DEFAULT_CELERY_ROUTES 構成の 6 つのタスクに対応するキューを指定します。キューは合計で、作成、プル、プッシュの 3 つあります。ルーティング ルールを有効にするには、CELERY_ROUTES に追加します。この設計の目的は、異なるシナリオが互いに影響を及ぼさないようにすることです。たとえば、解析タスクのブロックは注文配置タスクに影響を与えてはなりません。

キューのさらなる分割

ビジネスシナリオに基づいて大まかな区分を行った後、特定のシナリオではさらに詳細な区分が必要になる場合があります。たとえば、アップストリームにプッシュする場合、1 つのアップストリームのブロックが他のアップストリームへのプッシュに影響するのを避けるために、異なるアップストリームが互いに影響を及ぼさないようにする必要があります。したがって、アップストリームごとに異なるキューを使用する必要があります。例:

  1. CLIENT_CELERY_ROUTES = {
  2. # {0} はクライアントのプレースホルダーで、ClientRouter でフォーマットされます
  3. 'celery_task.push_tracking_retry' : { 'キュー' : 'push_tracking_retry_{0}' },
  4. 'celery_task.push_weight_retry' : { 'キュー' : 'push_weight_retry_{0}' },
  5. }
  6.  
  7. クラスClientRouter(オブジェクト):
  8.  
  9. def route_for_task(self, task, args=None, kwargs=None):
  10. タスク  CLIENT_CELERY_ROUTES:
  11. なしを返す
  12. client_id = kwargs( 'client_id' )
  13. # client_idに基づいてキュー名を取得します
  14. queue_name = CLIENT_CELERY_ROUTES[タスク][ 'キュー' ].format(client_id)
  15. { 'キュー' :キュー名}を返します
  16.  
  17. CELERY_ROUTES = {
  18. 「クライアントルーター」  
  19. デフォルト_CELERY_ROUTES、
  20. }

CLIENT_CELERY_ROUTES では、クライアントに応じて分離する必要があるタスクと対応するキュー名の形式が指定されます。キュー名には、クライアントごとに異なるキュー名を取得するためのプレースホルダーが含まれています。

次に、ルーター ClientRouter が実装され、タスクに対応するキュー名を指定するために使用される router_for_task メソッドが定義されます。タスクが CLIENT_CELERY_ROUTES にある場合、キュー名は kwargs の client_id を使用してフォーマットされ、メッセージが最終的に送信されるキューの名前が取得され、入力パラメータ client_id に基づいて使用する特定のキューが決定され、異なるクライアントが異なるキューを使用することがないように分離されるというロジックであることがわかります。

クライアント ディメンションに基づいてキューを分割することに加えて、分離を実現するために他のディメンションに基づいてキューをさらに分割する必要がある場合は、この方法を参照してルーティング ルールを設計することもできます。

動的キュー

動的キューについてお話しましょう。それらは本質的には予備キューです。その目的は、オンライン環境内の特定のキューにメッセージが蓄積される圧力を軽減し、迅速なサポートを提供することです。構成を通じて、動的キューがサポートする必要があるキューを定義します。たとえば、プッシュ キューに大きな負荷がかかっている場合は、次のように json を構成して、push_tracking タスクと push_weight タスクを準備された動的キューにルーティングできます。

  1. celery_dynamic_router の設定
  2.  
  3. {
  4. 「celery_task.push_tracking」 : {
  5. "ダイナミックキュー" : [1,2],
  6. "ダイナミックパーセンテージ" : 0.7,
  7. },
  8. "celery_task.push_weight" : {
  9. 「ダイナミックキュー」 : [3,4],
  10. "ダイナミックパーセンテージ" : 0.7,
  11. }
  12. }

上記の構成では、celery_task.push_tracking タスクの 70% が動的キュー 1 と 2 にルーティングされ、celery_task.push_weight タスクの 70% が動的キュー 3 と 4 にルーティングされます。

動的キュー ルーター DynamicRouter は、おおよそ次のように実装されます。

  1. クラスDynamicRouter(オブジェクト):
  2.  
  3. def route_for_task(self, task, args=None, kwargs=None):
  4. # 設定を取得する
  5. task_config = get_conf_dict( 'celery_dynamic_router' ).get(task, None)
  6. # タスクが構成にない場合は、直接戻ります
  7. task_configでない場合は:
  8. なしを返す
  9. # タスクに対応する動的キュー構成を取得します
  10. dynamic_queue = task_config.get( 'dynamic_queue' , [])
  11. dynamic_percentage = task_config.get( 'dynamic_percentage' , 0.0)
  12. #一定の割合のタスクを動的キューにルーティングする
  13. random.random() <= dynamic_percentage の場合:
  14. # 使用する動的キューを決定する
  15. キュー名 = router_load_balance(動的キュー、タスク名)
  16. log.data( 'get_router| タスク名:%s、キュー:%s' 、タスク名、キュー名)
  17. { 'キュー' :キュー名}を返します
  18. それ以外
  19. なしを返す

動的に構成されたスケジュールされたタスク

前述のように、Celery は非同期タスクを実装できるだけでなく、Celery Beat を通じてスケジュールされたタスクも実装できます。まずは例を見てみましょう:

  1. celery.schedulesからcrontabをインポートする
  2.  
  3. app.conf.beat_schedule = {
  4. # 30秒ごとにメールを送信する
  5. '30秒ごとにメールを送信' :{
  6. 'タスク' : 'asks.send_mail' ,
  7. 'スケジュール' : 30.0,
  8. 'args' : [ '[email protected]' ]
  9. },
  10. }

上記の設定が完了したら、Celery Beat コマンドを実行します。

セロリビート

つまり、設定に応じて、send_email タスクは 30 秒ごとに実行されます。

上記の例では、コード内でスケジュールされたタスクを構成します。私の作業では、djcelery が提供するデータベース スケジューリング モデルを使用しましたが、これは Django が提供する ORM 機能と組み合わせることで動的に設定でき、より便利です。やり方は次のとおりです。まず、Celery 設定に以下を追加します。

  1. CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'   

DatabaseScheduler の使用を設定し、スケジュールされたタスクの構成テーブルを生成します。

  1. python manage.py 移行

次のテーブルがデータベースに追加されたことがわかります。

  1. |セロリ
  2. |セロリタスクセットメタ |
  3. | djcelery_crontabschedule |
  4. | djcelery_intervalschedule |
  5. |翻訳:
  6. |翻訳者
  7. |タスクの状態
  8. |翻訳:

上記の操作を完了したら、Celery Beat コマンドを実行するだけで、データベースから構成が読み取られ、スケジュールされたタスクが開始されます。これの利点は、タスク サイクルやパラメータを調整するなど、データベース内のレコードを変更することで、スケジュールされたタスクを動的に構成できることです。

上記は、私が仕事でCeleryを使った経験から学んだことです。非同期タスクまたはスケジュールされたタスクを実装する必要がある場合は、Celery の使用を検討してください。

私はテクノロジーと人生を愛するソウギョのCao Nieziです。また次回お会いしましょう!

参照する

メッセージ キューとタスク キューの違い (https://newbedev.com/message-queue-vs-task-queue-difference)

高性能非同期フレームワーク Celery のガイド (https://juejin.cn/post/6844903689103081480)

分散タスク キュー Celery - 詳細なタスク (https://www.cnblogs.com/jmilkfan-fanguiju/p/10589779.html)

<<:  Jiuzhou Cloudは、最も有望なエッジコンピューティング企業として「Golden Edge Award」を受賞しました。

>>:  パブリッククラウドのセキュリティについてお話ししましょう

推薦する

#Alipay# ionswitch-$15/年/KVM/512M メモリ/5gSSD/500g トラフィック/シアトル

年に設立された ionswitch は、独自の BGP ネットワークを運用し、独自の AS 番号を持...

ブランド向けの新しいマーケティング手法 10 選!

新しいブランドが新たな成長機会をもたらしています。最近、私は多くの業界サミットに出席しました。ブラン...

サーバーの安定性がランキングに与える影響

サーバーの安定性はウェブサイトのランキングに直接影響します。これはSEOの重要な部分であり、SEOに...

インタラクティブなWeChatマーケティングは瞬時に実行可能

現在、電子商取引とモバイルインターネットはホットな産業です。私たちの生活や仕事がインターネットでます...

検索エンジンのランキングに影響を与える要因は何ですか?

検索エンジンにおけるウェブサイトのランキングは、多くの要因によって決まります。すべてをまとめることは...

屋台グループに潜入した後、私は一連の実用的なガイドラインをまとめました

ある日、家族計画局が出産の誘発を担当し、都市管理部隊が屋台の開発を始めると誰が想像したでしょうか。本...

dedipath: $198/2*e5-2620v2/512G メモリ/複数のディスク スロット/1Gbps 帯域幅/無制限トラフィック、ロサンゼルスおよびニューヨークのデータ センター

Dedipath の米国サーバーでは特別プロモーションを実施しており、西海岸のロサンゼルスと東海岸の...

2020 年、クラウド コンピューティングは上位 4 社による戦いになるのでしょうか、それとも勝者総取りになるのでしょうか。

最近、Googleの親会社であるAlphabetが初めてGoogleのクラウドコンピューティング事業...

2018 年のクラウド コンピューティング市場を振り返ると、変化は需要よりも大きいのでしょうか?

今年、徐々に成熟し、導入に向けて順調に進んでいるように見えるクラウドコンピューティングも、実は内部的...

クラウドコンピューティング戦争:ハードウェアベースの企業は古すぎるために失敗している

これは歴史に残る戦争だ。それは参加者の生存に関わるだけでなく、人類の技術発展の将来の方向性にも大きな...

brainhost 1年間無料ホスティング

Brainhost は信頼できるホスティング会社で、現在 1 年間の無料ホスティングを提供しています...

SEOキーワード選択戦略とテクニックの完全ガイド

選択は SEO 作業全体を効果的に実行できるかどうかに関係しており、キーワードの選択が適切でない場合...

Amazon Auroraについて

Amazon Aurora は、ハイエンドの商用データベースのパフォーマンスと可用性とオープンソース...

コンテナ対仮想マシン: 未来のテクノロジーをめぐる戦い

導入:コンテナと仮想マシンの違いを理解することは、多くの人にとって、特に IT 分野に深く関わってい...

世界トップのPaaSプロバイダー

トップ PaaS プロバイダーはクラウド コンピューティングで重要な役割を果たしており、クラウドの導...