分散遅延メッセージングの実装方法を尋ねられたら、この記事を教えてあげてください。

分散遅延メッセージングの実装方法を尋ねられたら、この記事を教えてあげてください。

背景

RocketMQ のオープン ソース バージョンでは、メッセージ キューの遅延レベルは 18 レベルのみ提供されます。この機能はオープンソース版では特に役に立たないように思えますが、Alibaba Cloud の RocketMQ は 40 日以内であれば第 2 レベルの遅延キューをサポートします。一部の機能は、料金を支払わなければ利用できないことが判明しました。もちろん、オープンソースのメッセージ キューに切り替えることもできます。オープンソース コミュニティでは、RabbitMQ、Kafka など、多くのメッセージ キュー遅延メッセージがサポートされていません。遅延機能は、いくつかの特別な方法でのみ完了できます。なぜ多くの人がこの機能を実装していないのでしょうか?技術が複雑すぎるからでしょうか?次に、遅延メッセージを実装する方法を分析しましょう。

[[285062]]

ローカル遅延

分散メッセージ キューの遅延メッセージを実装する前に、通常、独自のアプリケーションで遅延機能をどのように実装するかについて考えてみましょう。 Java では、次のようにして遅延関数を完了できます。

  • ScheduledThreadPoolExecutor: ScheduledThreadPoolExecutor は ThreadPoolExecutor を継承します。タスクを送信するときは、まずタスクを DelayedWorkQueue の優先キューに送信し、有効期限に従って並べ替えます。この優先キューがヒープ構造です。各タスク送信ソートの複雑さは O(logN) です。次に、タスクを実行するときは、山の一番上から、つまり遅延時間が最も短いタスクから実行します。 ScheduledThreadPoolExecutor の利点の 1 つは、ThreadPoolExecutor を継承しているため、遅延タスクを実行するときにマルチスレッドの並列実行をサポートできることです。
  • タイマー: タイマーも優先キュー構造を使用して作成されますが、スレッド プールを継承しません。比較的独立性が高く、マルチスレッドをサポートしていません。使用できるスレッドは 1 つだけです。

分散メッセージキューの遅延

ローカル遅延を実装するのは比較的簡単で、Java で既成のものを直接使用できます。では、分散メッセージ キューを実装する上での難しさは何でしょうか?

多くの学生は、まず分散メッセージ キューの遅延タスクの実装について考えるでしょう。ローカル セットを直接使用したり、ScheduledThreadPoolExecutor、Timer を使用したりできますか?もちろん、メッセージの量が少ない場合はこれが可能です。しかし、当社の分散メッセージ キューはエンタープライズ レベルのミドルウェアであることが多く、データ量が非常に大きいため、純粋なメモリ ソリューションは絶対に実現可能ではありません。そこで、この問題を解決するための以下の解決策があります。

データベース

データベースは一般的に私たちが簡単に思いつく方法です。通常は次のようなテーブルを作成できます。

  1. 作成する テーブル`delay_message` (
  2. `id` bigint (20) 符号なしNOT   NULL AUTO_INCREMENT、
  3. `excute_time` bigint (16)デフォルト  NULL COMMENT '実行時間、ミリ秒レベル'
  4. `body` varchar (4096) COLLATE utf8mb4_unicode_ciデフォルト  NULL COMMENT 'メッセージ本文'
  5. 主要な キー(`id`)、
  6. キー`time_index` (`excute_time`)
  7. ) エンジン = InnoDBデフォルト文字セット = utf8mb4照合= utf8mb4_unicode_ci;

このテーブルでは、execute_time を使用して実際の実行時間を表し、そのインデックスを作成します。次に、メッセージ サービスで、データベースから実行可能メッセージを定期的にスキャンして実行を開始するスケジュールされたタスクを開始します。具体的なプロセスは以下のとおりです。

データベースを使用する方法は比較的原始的な方法です。遅延メッセージの概念が導入される前は、この方法は通常、注文が期限切れになるまでの分数を決定するなどの機能を実行するために使用されていました。この方法は通常、当社の単一の事業に限定されます。これをエンタープライズレベルのミドルウェアに拡張したい場合、うまくいきません。 BTree の特性上、MySQL でセカンダリ インデックスを維持するためのオーバーヘッドが増加し、書き込み速度が遅くなるため、このソリューションは通常は考慮されません。

ロックスDB/レベルDB

以前、RocketMQ のオープン ソース バージョンでは 18 レベルの遅延メッセージのみが実装されていることを紹介しました。ただし、多くの企業は、いつでもサポートできる RocketMQ に基づく独自の遅延メッセージ セットを構築しています。 Meituan は RocketMQ をカプセル化し、LevelDB を使用して遅延メッセージをカプセル化しました。 Didi のオープンソース DDMQ では、RocketMQ の遅延メッセージ部分をカプセル化するために RocksDB が使用されました。

次の図に示すように、その原理は基本的に Mysql と似ています。

  • ステップ 1: DDMQ がメッセージを送信すると、複数のメッセージ キュー、kafka、rocketMQ などがあるため、メッセージを配信するためのプロキシ レイヤーが存在します。遅延メッセージの場合、メッセージは RockesDB ストレージに送信されます。
  • ステップ 2: スケジュールされたタスクローテーションスキャンを通じてデータを RocketMQ クラスターに転送します。
  • ステップ 3: 消費者が購入します。

どちらもデータベースであるのに、なぜ RocksDB が MySQL よりも適しているのでしょうか? RocksDB は LSM ツリーを特徴としており、その使用シナリオは大規模な書き込みに適しており、メッセージ キューのシナリオとより一致しています。そのため、Didi と Meituan は、遅延メッセージのカプセル化のためのストレージ メディアとしてこれを選択しました。

3.2 タイムホイール + ディスクストレージ

タイム ホイールについて説明する前に、ローカル遅延を実装するために使用した ScheduledThreadPoolExecutor と Timer に戻りましょう。これらはすべて、優先キューを使用して完了します。優先キューは本質的にヒープ構造です。ヒープ構造を挿入する時間の計算量は O(LogN) です。将来メモリが無限になる可能性がある場合は、遅延されたメッセージを格納するために優先キューを使用します。ただし、メッセージの数が増えるにつれて、メッセージを挿入する効率は低下します。では、メッセージ数が増えてもメッセージ挿入の効率が低下するのを防ぐにはどうすればよいでしょうか?答えはタイムホイールです。

タイムホイールとは何ですか?実際、これを単純に多次元配列と見なすことができます。多くのフレームワークでは、タイマーの代わりにタイムホイールを使用して、いくつかの時間制限付きタスクを実行します。例えば、以前触れたローカルキャッシュ Caffeine に関する記事では、Caffeine は 2 層のタイムホイール、つまり 2 次元配列であると説明されています。 1 次元データは、秒、分、時間、日などのより大きな時間次元を表し、2 次元データは、1 秒内の特定の間隔などのより小さな時間次元を表します。 TimeWhile[i][j]を見つけると、そのデータ構造は実際にはノードを記録するリンクリストであることがわかります。 Caffeine では、タイムホイールを使用して、特定の時間に期限が切れるデータを記録し、処理します。

タイムホイールは配列構造なので、挿入の複雑さはO(1)です。効率の問題を解決した後でも、メモリは依然として無限ではありません。タイムホイールはどのように使うのでしょうか?答えはもちろんディスクです。 QunarのオープンソースQMQでは、タイムホイール+ディスクストレージが実装されています。説明の便宜上、RocketMQ の構造に変換して説明します。実装図は次のとおりです。

  • ステップ 1: プロデューサーは遅延されたメッセージを CommitLog に配信します。このとき、トピックを置き換えるというトリックを使用して、次の効果を実現します。
  • ステップ 2: バックグラウンドで、トピック関連のメッセージを定期的に取得して遅延させる Reput タスクが実行されます。
  • ステップ 3: メッセージが現在のタイム ホイールの範囲内にあるかどうかを判断します。そうでない場合は、手順 4 に進みます。そうである場合は、メッセージをタイム ホイールに直接配信します。
  • ステップ 4: 現在のメッセージが属する scheduleLog を見つけて、そこに書き込みます。 Qunar のデフォルトの区分は 1 期間あたり 1 時間ですが、業務に応じて調整できます。
  • ステップ 5: タイム ホイールは、次の期間のスケジュール ログを定期的にメモリにプリロードします。
  • ステップ 6: 到着したメッセージはトピックに復元され、CommitLog に再度配信されます。配達が成功した場合、dispatchLog がここに記録されます。記録する理由は、タイムホイールがメモリ内にあり、実行がどこに到達したかわからないためです。実行が最後の 1 秒でハングした場合、このタイムホイールより前のすべてのデータを再ロードする必要があります。これは、配信されたメッセージをフィルタリングするために使用されます。

個人的には、Time Wheel + Disk Storage は、前述の RocksDB よりも標準化されていると思います。他のミドルウェアに依存せずに完結できるため、当然可用性も高くなります。もちろん、Alibaba Cloud の RocketMQ の実装方法の観点からは、どちらのソリューションも可能です。

3.3 レディス

コミュニティ内には、遅延メッセージングに Redis を使用している企業も多数あります。 Redis には、順序付きセットである Zest と呼ばれるデータ構造があります。優先キューと同様の機能を実装できます。また、ヒープ構造なので、挿入アルゴリズムの複雑さは依然として O(logN) ですが、Redis は十分に高速なので、この部分は無視できます。 (これについては比較ベンチマークテストはなく、単なる推測です)。学生の中には、Redis は純粋なメモリ キーと値ではないのかと尋ねる人もいるかもしれません。メモリ制限も考慮する必要があります。なぜそれを選ぶのですか?

実際、このシナリオでは、Redis は水平方向にスケーリングするのが非常に簡単です。 1 つの Redis メモリでは不十分な場合は、ニーズに合わせて 2 つ以上を使用できます。 Redis 遅延メッセージの原理図 (元の画像: https://www.cnblogs.com/lylife/p/7881950.html) は次のとおりです。

  • 遅延メッセージ プール: Redis ハッシュ構造。キーはメッセージ ID、値は特定のメッセージです。もちろん、代わりにディスクやデータベースを使用することもできます。ここに、すべてのメッセージの内容が主に保存されます。
  • 遅延キュー: ZSET データ構造、値はメッセージ ID、スコアは実行時間です。ここで、遅延キューを水平方向に拡張して、サポートできるデータの量を増やすことができます。
  • ワーカー スレッド プール: 複数のワーカーがあり、複数のマシンにデプロイしてクラスターを形成できます。クラスター内のすべてのワーカーは、ZK を通じて調整され、遅延キューが割り当てられます。

遅延キュー内のメッセージの有効期限が切れたことをどのように知ることができますか?方法は2つあります:

  • 各ワーカーは、ZSET の最小実行時間を定期的にスキャンし、その時間に達した場合はそれを取得します。この方法は、メッセージが少ない場合に特にリソースを無駄にします。メッセージ数が多い場合、タイミングの悪いラウンドロビントレーニングにより遅延時間が不正確になります。
  • 最初の方法には多くの問題があるため、ここでは Timer からいくつかのアイデアを借用します。 wait-notify により、より良い遅延効果が得られ、リソースが無駄になりません。最初は、ZSET 内の最小の時間を取得し、その後 (実行時間 - 現在の時間) を待機します。こうすることで、リソースを無駄にする必要がなくなります。到着時間になると自動的に応答します。新しいメッセージが現在の ZSET に入り、待機しているメッセージよりも小さい場合は、起動するように直接通知し、小さいメッセージを再度取得して、再度待機する、という動作を繰り返します。

要約する

この記事では、分散遅延メッセージを実装する 3 つの方法を紹介します。独自の遅延メッセージを実装する際に、この情報が何らかのアイデアを提供してくれることを願っています。一般的には、最初の 2 つの方法の方が適していると考えられます。結局のところ、RocketMQ などの大規模なメッセージ キュー ミドルウェアには、シーケンシャル メッセージ、トランザクション メッセージなどの他の統合機能がいくつかあります。遅延メッセージは、独立したコンポーネントとして存在するよりも、分散メッセージ キュー内の機能になる傾向があります。もちろん、一つ一つ紹介していない詳細もいくつかあります。具体的な内容については、QMQ および DDMQ のソースコードを参照してください。

<<:  JVM メモリ管理 - GC アルゴリズムの詳細な説明

>>:  5Gがクラウドネットワークの統合を加速

推薦する

クラウドコンピューティングについて1つの記事で学ぶ: 世界は1台のコンピュータ

[[259522]]クラウド コンピューティングは、2008 年の世界的金融危機以来、世界で最もホッ...

実用情報 | 360 Gamesが「2016年中国モバイルゲーム業界動向レポート」を発表 模倣による反撃の時代は終わりに近づいている

モバイルゲーム市場は今や大手企業の「遊び場」となっており、中小のチームが反撃の機会を得るのはますます...

アンクルオペレーションズ: モバイル業界のすべての CP とチャネル パートナーの皆様に、楽しい中秋節をお過ごしください!

 この台風の間、私体存在する南部の美しい海辺の都市、厦門は、オペレーション・アンクルの願い:モバイル...

推奨: CorgiTech-750m/vmware/30g ハードディスク/1T トラフィック/月額 7 ドル

CorgiTech は、Host Cat が推奨する VPS 業者です。使用してみれば、その価値がわ...

Java 仮想マシンはどのようにしてスレッド同期を実行しますか?

同期の原理を紹介したいのですが、どこから始めればよいかわかりません。インターネットで外国人が書いた記...

ケース分析: nofollow タグを使用する際のバランスの取り方

nofollow タグが SEO 担当者の視野に入ってから 1 年以上経ちました。過去 1 年ほどの...

SEM をメインに、SEO を補助的に使う?

現在、SEO は死んだ、あるいは継続する理由がないというのが一般的な見解です。本当にそうなのでしょう...

WeChat StoresがTaobaoに挑戦:電子商取引へのアクセスには基準はないが、2万元のデポジットが必要

概要:WeChatパブリックプラットフォームは昨日、「WeChatストア」を正式に開始しました。We...

大規模ウェブサイトのランキングを向上させる方法を教えます

検索エンジン最適化に関して、誰もが最も気にするのは、おそらくウェブサイトのランキングです。最適化はラ...

さまざまなエッジ クラスタ管理ソリューションの比較と選択

[[429682]]この記事は、Double_Dong&Huazi が執筆した WeChat...

エッジコンピューティングによってもたらされる新たなセキュリティリスクを排除する方法

現在、エッジコンピューティングの適用範囲はますます広がっています。エッジ コンピューティング アプリ...

#blackfriday# hostgator - 80% オフ、無制限のウェブサイト構築、仮想ホスティング/VPS/サーバー

アメリカの有名ホスティングブランド、Hostgatorのブラックフライデープロモーションが始まりまし...

アジアのクラウドプロバイダーが暗号通貨マイニングマルウェアの標的に

アジアのクラウドコンピューティングサービスプロバイダーは現在、暗号通貨のマイニングに使用されるコンピ...