分散遅延メッセージングの実装方法を尋ねられたら、この記事を教えてあげてください。

分散遅延メッセージングの実装方法を尋ねられたら、この記事を教えてあげてください。

背景

RocketMQ のオープン ソース バージョンでは、メッセージ キューの遅延レベルは 18 レベルのみ提供されます。この機能はオープンソース版では特に役に立たないように思えますが、Alibaba Cloud の RocketMQ は 40 日以内であれば第 2 レベルの遅延キューをサポートします。一部の機能は、料金を支払わなければ利用できないことが判明しました。もちろん、オープンソースのメッセージ キューに切り替えることもできます。オープンソース コミュニティでは、RabbitMQ、Kafka など、多くのメッセージ キュー遅延メッセージがサポートされていません。遅延機能は、いくつかの特別な方法でのみ完了できます。なぜ多くの人がこの機能を実装していないのでしょうか?技術が複雑すぎるからでしょうか?次に、遅延メッセージを実装する方法を分析しましょう。

[[285062]]

ローカル遅延

分散メッセージ キューの遅延メッセージを実装する前に、通常、独自のアプリケーションで遅延機能をどのように実装するかについて考えてみましょう。 Java では、次のようにして遅延関数を完了できます。

  • ScheduledThreadPoolExecutor: ScheduledThreadPoolExecutor は ThreadPoolExecutor を継承します。タスクを送信するときは、まずタスクを DelayedWorkQueue の優先キューに送信し、有効期限に従って並べ替えます。この優先キューがヒープ構造です。各タスク送信ソートの複雑さは O(logN) です。次に、タスクを実行するときは、山の一番上から、つまり遅延時間が最も短いタスクから実行します。 ScheduledThreadPoolExecutor の利点の 1 つは、ThreadPoolExecutor を継承しているため、遅延タスクを実行するときにマルチスレッドの並列実行をサポートできることです。
  • タイマー: タイマーも優先キュー構造を使用して作成されますが、スレッド プールを継承しません。比較的独立性が高く、マルチスレッドをサポートしていません。使用できるスレッドは 1 つだけです。

分散メッセージキューの遅延

ローカル遅延を実装するのは比較的簡単で、Java で既成のものを直接使用できます。では、分散メッセージ キューを実装する上での難しさは何でしょうか?

多くの学生は、まず分散メッセージ キューの遅延タスクの実装について考えるでしょう。ローカル セットを直接使用したり、ScheduledThreadPoolExecutor、Timer を使用したりできますか?もちろん、メッセージの量が少ない場合はこれが可能です。しかし、当社の分散メッセージ キューはエンタープライズ レベルのミドルウェアであることが多く、データ量が非常に大きいため、純粋なメモリ ソリューションは絶対に実現可能ではありません。そこで、この問題を解決するための以下の解決策があります。

データベース

データベースは一般的に私たちが簡単に思いつく方法です。通常は次のようなテーブルを作成できます。

  1. 作成する テーブル`delay_message` (
  2. `id` bigint (20) 符号なしNOT   NULL AUTO_INCREMENT、
  3. `excute_time` bigint (16)デフォルト  NULL COMMENT '実行時間、ミリ秒レベル'
  4. `body` varchar (4096) COLLATE utf8mb4_unicode_ciデフォルト  NULL COMMENT 'メッセージ本文'
  5. 主要な キー(`id`)、
  6. キー`time_index` (`excute_time`)
  7. ) エンジン = InnoDBデフォルト文字セット = utf8mb4照合= utf8mb4_unicode_ci;

このテーブルでは、execute_time を使用して実際の実行時間を表し、そのインデックスを作成します。次に、メッセージ サービスで、データベースから実行可能メッセージを定期的にスキャンして実行を開始するスケジュールされたタスクを開始します。具体的なプロセスは以下のとおりです。

データベースを使用する方法は比較的原始的な方法です。遅延メッセージの概念が導入される前は、この方法は通常、注文が期限切れになるまでの分数を決定するなどの機能を実行するために使用されていました。この方法は通常、当社の単一の事業に限定されます。これをエンタープライズレベルのミドルウェアに拡張したい場合、うまくいきません。 BTree の特性上、MySQL でセカンダリ インデックスを維持するためのオーバーヘッドが増加し、書き込み速度が遅くなるため、このソリューションは通常は考慮されません。

ロックスDB/レベルDB

以前、RocketMQ のオープン ソース バージョンでは 18 レベルの遅延メッセージのみが実装されていることを紹介しました。ただし、多くの企業は、いつでもサポートできる RocketMQ に基づく独自の遅延メッセージ セットを構築しています。 Meituan は RocketMQ をカプセル化し、LevelDB を使用して遅延メッセージをカプセル化しました。 Didi のオープンソース DDMQ では、RocketMQ の遅延メッセージ部分をカプセル化するために RocksDB が使用されました。

次の図に示すように、その原理は基本的に Mysql と似ています。

  • ステップ 1: DDMQ がメッセージを送信すると、複数のメッセージ キュー、kafka、rocketMQ などがあるため、メッセージを配信するためのプロキシ レイヤーが存在します。遅延メッセージの場合、メッセージは RockesDB ストレージに送信されます。
  • ステップ 2: スケジュールされたタスクローテーションスキャンを通じてデータを RocketMQ クラスターに転送します。
  • ステップ 3: 消費者が購入します。

どちらもデータベースであるのに、なぜ RocksDB が MySQL よりも適しているのでしょうか? RocksDB は LSM ツリーを特徴としており、その使用シナリオは大規模な書き込みに適しており、メッセージ キューのシナリオとより一致しています。そのため、Didi と Meituan は、遅延メッセージのカプセル化のためのストレージ メディアとしてこれを選択しました。

3.2 タイムホイール + ディスクストレージ

タイム ホイールについて説明する前に、ローカル遅延を実装するために使用した ScheduledThreadPoolExecutor と Timer に戻りましょう。これらはすべて、優先キューを使用して完了します。優先キューは本質的にヒープ構造です。ヒープ構造を挿入する時間の計算量は O(LogN) です。将来メモリが無限になる可能性がある場合は、遅延されたメッセージを格納するために優先キューを使用します。ただし、メッセージの数が増えるにつれて、メッセージを挿入する効率は低下します。では、メッセージ数が増えてもメッセージ挿入の効率が低下するのを防ぐにはどうすればよいでしょうか?答えはタイムホイールです。

タイムホイールとは何ですか?実際、これを単純に多次元配列と見なすことができます。多くのフレームワークでは、タイマーの代わりにタイムホイールを使用して、いくつかの時間制限付きタスクを実行します。例えば、以前触れたローカルキャッシュ Caffeine に関する記事では、Caffeine は 2 層のタイムホイール、つまり 2 次元配列であると説明されています。 1 次元データは、秒、分、時間、日などのより大きな時間次元を表し、2 次元データは、1 秒内の特定の間隔などのより小さな時間次元を表します。 TimeWhile[i][j]を見つけると、そのデータ構造は実際にはノードを記録するリンクリストであることがわかります。 Caffeine では、タイムホイールを使用して、特定の時間に期限が切れるデータを記録し、処理します。

タイムホイールは配列構造なので、挿入の複雑さはO(1)です。効率の問題を解決した後でも、メモリは依然として無限ではありません。タイムホイールはどのように使うのでしょうか?答えはもちろんディスクです。 QunarのオープンソースQMQでは、タイムホイール+ディスクストレージが実装されています。説明の便宜上、RocketMQ の構造に変換して説明します。実装図は次のとおりです。

  • ステップ 1: プロデューサーは遅延されたメッセージを CommitLog に配信します。このとき、トピックを置き換えるというトリックを使用して、次の効果を実現します。
  • ステップ 2: バックグラウンドで、トピック関連のメッセージを定期的に取得して遅延させる Reput タスクが実行されます。
  • ステップ 3: メッセージが現在のタイム ホイールの範囲内にあるかどうかを判断します。そうでない場合は、手順 4 に進みます。そうである場合は、メッセージをタイム ホイールに直接配信します。
  • ステップ 4: 現在のメッセージが属する scheduleLog を見つけて、そこに書き込みます。 Qunar のデフォルトの区分は 1 期間あたり 1 時間ですが、業務に応じて調整できます。
  • ステップ 5: タイム ホイールは、次の期間のスケジュール ログを定期的にメモリにプリロードします。
  • ステップ 6: 到着したメッセージはトピックに復元され、CommitLog に再度配信されます。配達が成功した場合、dispatchLog がここに記録されます。記録する理由は、タイムホイールがメモリ内にあり、実行がどこに到達したかわからないためです。実行が最後の 1 秒でハングした場合、このタイムホイールより前のすべてのデータを再ロードする必要があります。これは、配信されたメッセージをフィルタリングするために使用されます。

個人的には、Time Wheel + Disk Storage は、前述の RocksDB よりも標準化されていると思います。他のミドルウェアに依存せずに完結できるため、当然可用性も高くなります。もちろん、Alibaba Cloud の RocketMQ の実装方法の観点からは、どちらのソリューションも可能です。

3.3 レディス

コミュニティ内には、遅延メッセージングに Redis を使用している企業も多数あります。 Redis には、順序付きセットである Zest と呼ばれるデータ構造があります。優先キューと同様の機能を実装できます。また、ヒープ構造なので、挿入アルゴリズムの複雑さは依然として O(logN) ですが、Redis は十分に高速なので、この部分は無視できます。 (これについては比較ベンチマークテストはなく、単なる推測です)。学生の中には、Redis は純粋なメモリ キーと値ではないのかと尋ねる人もいるかもしれません。メモリ制限も考慮する必要があります。なぜそれを選ぶのですか?

実際、このシナリオでは、Redis は水平方向にスケーリングするのが非常に簡単です。 1 つの Redis メモリでは不十分な場合は、ニーズに合わせて 2 つ以上を使用できます。 Redis 遅延メッセージの原理図 (元の画像: https://www.cnblogs.com/lylife/p/7881950.html) は次のとおりです。

  • 遅延メッセージ プール: Redis ハッシュ構造。キーはメッセージ ID、値は特定のメッセージです。もちろん、代わりにディスクやデータベースを使用することもできます。ここに、すべてのメッセージの内容が主に保存されます。
  • 遅延キュー: ZSET データ構造、値はメッセージ ID、スコアは実行時間です。ここで、遅延キューを水平方向に拡張して、サポートできるデータの量を増やすことができます。
  • ワーカー スレッド プール: 複数のワーカーがあり、複数のマシンにデプロイしてクラスターを形成できます。クラスター内のすべてのワーカーは、ZK を通じて調整され、遅延キューが割り当てられます。

遅延キュー内のメッセージの有効期限が切れたことをどのように知ることができますか?方法は2つあります:

  • 各ワーカーは、ZSET の最小実行時間を定期的にスキャンし、その時間に達した場合はそれを取得します。この方法は、メッセージが少ない場合に特にリソースを無駄にします。メッセージ数が多い場合、タイミングの悪いラウンドロビントレーニングにより遅延時間が不正確になります。
  • 最初の方法には多くの問題があるため、ここでは Timer からいくつかのアイデアを借用します。 wait-notify により、より良い遅延効果が得られ、リソースが無駄になりません。最初は、ZSET 内の最小の時間を取得し、その後 (実行時間 - 現在の時間) を待機します。こうすることで、リソースを無駄にする必要がなくなります。到着時間になると自動的に応答します。新しいメッセージが現在の ZSET に入り、待機しているメッセージよりも小さい場合は、起動するように直接通知し、小さいメッセージを再度取得して、再度待機する、という動作を繰り返します。

要約する

この記事では、分散遅延メッセージを実装する 3 つの方法を紹介します。独自の遅延メッセージを実装する際に、この情報が何らかのアイデアを提供してくれることを願っています。一般的には、最初の 2 つの方法の方が適していると考えられます。結局のところ、RocketMQ などの大規模なメッセージ キュー ミドルウェアには、シーケンシャル メッセージ、トランザクション メッセージなどの他の統合機能がいくつかあります。遅延メッセージは、独立したコンポーネントとして存在するよりも、分散メッセージ キュー内の機能になる傾向があります。もちろん、一つ一つ紹介していない詳細もいくつかあります。具体的な内容については、QMQ および DDMQ のソースコードを参照してください。

<<:  JVM メモリ管理 - GC アルゴリズムの詳細な説明

>>:  5Gがクラウドネットワークの統合を加速

推薦する

知湖の進歩への道

最近、知乎は上場以来初の四半期報告書を発表した。公開データによると、最新の月間アクティブユーザー数は...

Exabytes VPS の簡単なレビュー、米国データセンター

私は exabytes.com で 1G メモリの VPS を入手しました。exabytes.com...

40日間の防疫努力、曹東マーケティングミニプログラムが九木王のオンライン突破を支援

ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス疫病流行下で、九木王はフ...

Raysync: プロフェッショナルな大容量ファイル転送ソリューション

転送速度が遅い、接続が切れる、プライバシーが漏れる、操作が不便... 個人でも企業でも、ファイル(特...

クラウド プラットフォームを「より適切に管理」するにはどうすればよいでしょうか?ファーウェイのクラウド集中運用・保守が企業のイノベーションを加速

デジタル化の波は世界の経済情勢を一変させており、デジタル経済は世界の持続可能な成長の新たな原動力にな...

GoogleとBaiduのもう一つのアイデンティティ:ドメイン名登録業者

この記事を始める前に、まず明確にしておきたいのは、Google は長い間世界トップクラスのドメイン名...

企業はクラウドの回帰を必要としているのでしょうか?

クラウドの復帰とは、特定のワークロードとアプリケーションをパブリック クラウドからオンプレミスのデー...

映画ウェブサイトのプロモーションと優れたユーザーエクスペリエンスを提供するための提案

時代の発展とともに、あらゆる分野における競争はますます激しくなってきています。今日では、企業は製品で...

SEOとUEOにもっと有利になる企業ウェブサイトのホームページデザイン方法

SEO の意味はよく分かっているかもしれませんが、UEO とは何でしょうか? UEO は、ユーザー ...

莫言のノーベル文学賞から、さまざまな交通手段が絶えず出現していることがわかります

A5で「莫言ノーベル文学賞受賞後の国内トップ5の電子商取引企業のイベントマーケティングへの反応」とい...

cheapvpsllc - バレンタインデーに VPS が 50% オフ / 年額 7 ドルから

cheapvpsllc のボスが事前に割引コード valentineday50 を送ってくれました。...

急いで!ソフトウェアの更新だけでなく、httpプロトコルも更新する必要があります

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています携帯電話の...

5Gとエッジコンピューティングが企業の新たな常態への対応にどのように役立つか

コロナウイルス危機への対応として、世界中の組織は、世界が正常に戻るか、少なくとも次の正常に戻るまで待...

mycustomhosting-20 USD/年/512M メモリ/KVM/20G ハードディスク/500G トラフィック

MyCustom Hosting のプロモーション版 VPS をお勧めします。米国とカナダにデータ ...