最もシンプルな分散タスクスケジューリングフレームワークを実装する

最もシンプルな分散タスクスケジューリングフレームワークを実装する

以前、同社は既存の単一ノード スケジューリングを分散タスク スケジューリングに変換したいと考え、市場で主流のオープン ソース分散タスク スケジューリング フレームワークを研究しました。使うのが面倒に感じました!特に以前は多くのスケジュールタスクが 1 つのクラスに記述されていたため、変換するのはさらに面倒でした。私は怠け者なので、他の人が書いたツールに多くの変更を加えなければならないと感じると、いつも不安を感じます。

[[282425]]

そこで、自分でフレームワークを書きたいと思いました。結局のところ、一般的な企業のタスクスケジューリングでは、大量のタスクを同時にスケジュールし、大きな並行性を持たせることは不可能なので、分散タスクスケジューリングは、すべての分散システムの中で最もシンプルだと思います。分散システムに変換する主な目的は、タスクを複数のノードに分散し、より多くのタスクを同時に処理できるようにすることです。

ある日、会社の受付で荷物を受け取っていたとき、同僚数人(私を含む)が、その荷物が自分たちのものかどうかを最初から最後まで確認しているという現象を目にしました。それが自分たちのものであれば、彼らはそれを奪い、そうでなければ無視するでしょう。すると、私はインスピレーションを受けました。このシナリオを分散配送システムに当てはめると、宅配会社または宅配業者が各荷物を私たちの名前と電話番号に従って仕分けしており、私たちは必要なものだけを取り出せばよいと想定できます。

しかし、別の観点から見ると、私たち一人一人が最初から最後まですべての速達便を見て、一定の合意されたルールに従って、自分の速達便であれば持ち帰り、自分のものでなければ無視して次の速達便に移るということも理解できます。速達をタスクとして考えてみると、たくさんの速達を受け取りにいくと、大勢の人が自分の速達をスムーズに受け取ることができます。では、多数のノードが独自にタスクを取得し、それぞれのタスクを適切に処理できるのでしょうか?

従来の分散タスク スケジューリングには、単一障害点を回避するためにマルチノード クラスターを展開するスケジューリング センターがあり、さらに、スケジューリング センターによって分散されたタスクの実行を担当する多数のエグゼキュータがあります。上記から得たインスピレーションに基づいて、私のアイデアは、集中型のディスパッチ センターを放棄し、各実行ノードが直接パブリックな場所に移動し、合意されたルールに従ってタスクを取得して実行できるようにすることです。設計図は以下のとおりです

タスク DB ライブラリに単一点の問題があるのではないかと疑問に思う人もいるかもしれません。逆に質問したいのですが、他の分散タスク スケジューリング フレームワークにはこの問題はないのでしょうか?単一ポイントのデータベースについては、ビジネス ライブラリのような高可用性ソリューションを個別に検討できますが、これはこの記事の焦点では​​ありません。当然のことながら、私たちの焦点は、単一のタスクが複数のノードによって同時に実行されないように実行ノードの高可用性を確保する方法、実行の途中で単一のノードが突然接続を失った場合にこのタスクをどうするか、その他の複雑な問題にあります。

その後、私たちは、修正されていないコードを使用して、これらの問題を一つずつ解決しました (修正されていないとは、主に、構造を最適化していない実行中のアカウントのコード スタイルを意味します。私を含め、多くの人は、他の人のソース コードを読むときに、迷路にいるかのようにいつも目が回るような感覚を覚えます。非常に骨の折れる作業のようです。おそらく、私はまだそのレベルに達していません)。

集中スケジューリングが省略されているので、タスクスケジューリングと呼ばれている以上、スケジューリングのプロセスが存在する必要があることは明らかです。そうでなければ、複数のノードがタスクを競合する場合に、どのようにして競合を回避できるでしょうか?ここでの私の解決策は、まずタスクのいくつかの状態(実行待ち、実行中、例外あり、完了)を明確にすることです。

各ノードは、すぐに実行される保留中のタスクをチェックするスレッドを開始し、これらのタスクを反復処理して、楽観的ロックを使用してタスクのバージョン番号 (バージョン番号 + 1) とステータス (実行中になる) を更新します。更新が成功した場合、更新はノード自身の遅延キューに配置され、実行を待機します。

各ノードのスレッドはデータベースにアクセスして実行するタスクを確認するため、実行中のタスクは次回他のノードによって照会されないことは明らかです。このノードのステータスが更新される前に見つかった実行対象のタスクについては、楽観的ロックの試行後に更新が失敗し、タスクはスキップされます。これにより、複数のノードによって同時にタスクが繰り返し実行されるのを回避できます。キーコードは次のとおりです。

  1. パッケージ com.rdpaas.task.scheduler;
  2.  
  3. com.rdpaas.task.common.* をインポートします。
  4. com.rdpaas.task.config.EasyJobConfig をインポートします。
  5. com.rdpaas.task.repository.NodeRepository をインポートします。
  6. com.rdpaas.task.repository.TaskRepository をインポートします。
  7. com.rdpaas.task.strategy.Strategy をインポートします。
  8.  
  9. org.slf4j.Logger をインポートします。
  10. org.slf4j.LoggerFactory をインポートします。
  11. org.springframework.beans.factory.annotation.Autowired をインポートします。
  12. org.springframework.stereotype.Component をインポートします。
  13.  
  14. javax.annotation.PostConstruct をインポートします。
  15.  
  16. java.util.Dateをインポートします
  17. java.util.List をインポートします。
  18. インポート java.util.concurrent.*;
  19.  
  20. /**
  21. * タスクスケジューラ
  22. * @著者 ロンディ
  23. * @日付2019-03-13 21:15
  24. */
  25. @成分
  26. パブリッククラスTaskExecutor {
  27.  
  28. プライベート静的最終 Logger ロガー = LoggerFactory.getLogger(TaskExecutor.class);
  29.  
  30. オートワイヤード
  31. プライベート TaskRepository タスクリポジトリ;
  32.  
  33. オートワイヤード
  34. プライベート NodeRepository nodeRepository;
  35.  
  36. オートワイヤード
  37. プライベートEasyJobConfig設定;/**
  38. * タスクの有効期限遅延キューを作成する
  39. */
  40. プライベートDelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>();
  41.  
  42. /**
  43. * 最大で2つのスレッドしか実行されないことが明確にわかるので、システムに組み込まれたツールを使用するだけで済みます。
  44. */
  45. プライベート ExecutorService bossPool = Executors.newFixedThreadPool(2);
  46.  
  47. /**
  48. * ワーカースレッドプールを宣言する
  49. */
  50. プライベート ThreadPoolExecutor ワーカープール;
  51.  
  52.  
  53. @投稿コンストラクト
  54. パブリックvoid init() {
  55. /**
  56. * カスタム スレッド プール、初期スレッド数 corePoolSize、スレッド プール待機キュー サイズ queueSize、初期スレッドにタスクがあり、待機キューがいっぱいの場合
  57. * スレッド数は最大スレッド数 maxSize まで自動的に拡張され、新しく拡張されたスレッドは 60 秒間アイドル状態になった後に自動的にリサイクルされます。カスタムスレッドプールは、Executorsのスレッドツールによるものです。
  58. * それぞれに欠点があり、本番環境での使用には適していません
  59. */
  60. ワーカープール = 新しい ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, 新しい ArrayBlockingQueue<>(config.getQueueSize()));
  61. /**
  62. * 保留中のタスク読み込みスレッドを実行する
  63. */
  64. bossPool.execute (新しいLoader ());
  65. /**
  66. * タスクスケジューリングスレッドを実行する
  67. */
  68. bossPool.execute (新しいBoss ());
  69.  
  70. }
  71.  
  72. クラス Loader は Runnable を実装します {
  73.  
  74. @オーバーライド
  75. パブリックボイド実行(){
  76. のために(;;) {
  77. 試す {
  78. /**
  79. * 指定した時間(秒単位)に開始されるメインタスクのリストを検索します
  80. */
  81. リスト<Task> タスク = taskRepository.listPeddingTasks(config.getFetchDuration());
  82. タスクがnullの場合、tasks.isEmpty() は次のように記述します。
  83. 続く;
  84. }
  85. (タスク task:tasks)の場合{
  86.  
  87. task.setStatus(TaskStatus.DOING);
  88. タスクのNodeIdを設定します。
  89. /**
  90. * 楽観的ロックを使用してステータスを更新します。更新が成功した場合、他のノードは正常に更新されません。実行するタスクのリストを照会する場合
  91. * この期間中に一部のノードがこのタスクを更新したため、バージョンは確認時のバージョンと異なる必要があります。ここで更新してください
  92. * 必ず0を返します
  93. */
  94. int n = taskRepository.updateWithVersion(タスク);
  95. 日付nextStartTime = task.getNextStartTime();
  96. if(n == 0 || nextStartTime == null ) {
  97. 続く;
  98. }
  99. /**
  100. * 遅延オブジェクトにカプセル化し、遅延キューに入れる
  101. */
  102. task = taskRepository.get(task.getId());
  103. DelayItem<タスク> delayItem = new DelayItem<タスク>(nextStartTime.getTime() - new Date ().getTime(), task);
  104. タスクキュー.offer(遅延項目);
  105.  
  106. }
  107. スレッドをスリープ状態にします。
  108. } 例外 e をキャッチします。
  109. logger.error( "タスク リストの取得に失敗しました。原因:{}" , e);
  110. }
  111. }
  112. }
  113.  
  114. }
  115.  
  116. クラスBossはRunnableを実装します{
  117. @オーバーライド
  118. パブリックボイド実行(){
  119. のために(;;) {
  120. 試す {
  121. /**
  122. * 時間切れになると、タスクオブジェクトは遅延キューから取り出され、実行のためにワーカースレッドプールに渡されます。
  123. */
  124. DelayItem<タスク> item = taskQueue.take();
  125. if(item != null && item.getItem() != null ) {
  126. タスク task = item.getItem();
  127. ワーカープールを実行します(新しいワーカー (タスク))。
  128. }
  129.  
  130. } キャッチ (例外 e) {
  131. logger.error( "フェッチタスクが失敗しました。原因:{}" , e);
  132. }
  133. }
  134. }
  135.  
  136. }
  137.  
  138. WorkerクラスはRunnableを実装します{
  139.  
  140. プライベートタスクタスク;
  141.  
  142. パブリックワーカー(タスクタスク) {
  143. this.task = タスク;
  144. }
  145.  
  146. @オーバーライド
  147. パブリックボイド実行(){
  148. logger.info( "タスクの実行を開始します:{}" ,task.getId());
  149. タスク詳細の詳細 = null ;
  150. 試す {
  151. //タスクを開始する
  152. 詳細 = taskRepository.start(タスク);
  153. if(detail == null )戻り値;
  154. //タスクを実行する
  155. タスクの呼び出し
  156. //タスクを完了する
  157. 終了(タスク、詳細);
  158. logger.info( "タスクの実行が完了しました:{}" ,task.getId());
  159. } キャッチ (例外 e) {
  160. logger.error( "タスクを実行:{} エラー、原因:{}" ,task.getId(), e);
  161. 試す {
  162. taskRepository.fail(タスク,詳細,e.getCause().getMessage());
  163. } 例外 e1 をキャッチします。
  164. logger.error( "タスク失敗:{} エラー、原因:{}" ,task.getId(), e);
  165. }
  166. }
  167. }
  168.  
  169. }
  170.  
  171. /**
  172. * サブタスクを完了します。親タスクが失敗した場合、サブタスクは実行されません。
  173. * @param タスク
  174. * @param 詳細
  175. * @例外をスローします
  176. */
  177. private void Finish(Task task,TaskDetail detail) 例外をスローします {
  178.  
  179. // サブクラスタスクがあるかどうか確認する
  180. リスト<タスク> childTasks = taskRepository.getChilds(task.getId());
  181. 子タスクがnullの場合、childTasks.isEmpty() は次のように記述します。
  182. //子タスクがない場合に親タスクを完了する
  183. taskRepository.finish(タスク,詳細);
  184. 戻る;
  185. }それ以外{
  186. (タスク子タスク : 子タスク) {
  187. //タスクを開始する
  188. タスクの詳細の子の詳細 = null ;
  189. 試す {
  190. //サブタスクのステータスを実行中に変更する
  191. childTask.setStatus(TaskStatus.DOING);
  192. 子タスクのノード ID を設定します。
  193. // サブタスクを開始
  194. childDetail = taskRepository.startChild(childTask,detail);
  195. // ステータスを更新するには楽観的ロックを使用します。そうしないと、回復スレッドで同時実行の問題が発生する可能性があります。
  196. int n = taskRepository.updateWithVersion(childTask);
  197. (n > 0) の場合 {
  198. //上記の更新後にバージョンが同期されなくなるのを避けるために、データベースから再度取得します
  199. childTask = taskRepository.get(childTask.getId());
  200. //サブタスクを実行
  201. 子タスクの呼び出し
  202. //サブタスクを完了する
  203. 終了(子タスク、子の詳細);
  204. }
  205. } キャッチ (例外 e) {
  206. logger.error( "子タスクの実行エラー、原因:{}" , e);
  207. 試す {
  208. taskRepository.fail(childTask, childDetail, e.getCause().getMessage());
  209. } キャッチ (例外 e1) {
  210. logger.error( "子タスクエラーが失敗しました。原因:{}" , e);
  211. }
  212. }
  213. }
  214. /**
  215. * サブタスクがある場合は、親タスクを完了する前にサブタスクを完了してください
  216. */
  217. taskRepository.finish(タスク,詳細);
  218.  
  219. }
  220.  
  221. }
  222.  
  223. }

前述のように、タスクは一度に 1 つのノードによってのみスケジュールされ、実行されることが保証されます。このとき、複数のノードを展開すると、一団がフロントデスクに速達便を受け取りに行き、すべての速達便をスムーズに持ち帰ることができるのと同じように、タスク ライブラリ内のすべてのタスクがスムーズに実行されるはずです。結局のところ、すべての速達便はあなたのものか他人のものかのどちらかであり、あなたの速達便が他人のものにはなりません。

ただし、ここでの宅配便の発送と集荷は少し異なります。宅配便を受け取る人は誰でも、どれが自分のものなのかを見分ける方法を知っています。ここでのスケジュールにはそのような概念はまったくありません。タスクのステータスを更新するために楽観的ロックを使用できる幸運なノードに完全に依存します。一般的に言えば、違いは合意されたルールが必要であるということです。宅配業者の名前と携帯電話番号を見れば、その宅配業者が自分の業者かどうかがわかります。また、タスクをそれ自体で実行する必要があるかどうかを判断するルールを作成し、どのタスクをどのノードで実行する必要があるかを明確にして、不要なロック競合を回避することもできます。

ここでは、負荷分散戦略を参照できます。現時点では、以下のルールを実装したいと考えています。

  • id_hash: タスクの自己増分 ID に従って、残りのノード数を取得します。剰余値は、現在のノードのリアルタイムシリアル番号と一致します。一致した場合は、それを持ち帰って処刑することができます。それ以外の場合は、このタスクを意識的に無視してください。
  • least_count: 最も少ない数のタスクを実行するノードが優先されます
  • 重み: ノードの重みに応じてタスクを実行します
  • デフォルト: デフォルトで先着順、その他のルールはありません

タスクの負荷分散戦略とも言える上記のルールによれば、デフォルトのルールを除いて、残りのルールはノード実行数、ノードシーケンス番号、ノード重みなどのグローバルノード情報を知る必要があることがわかります。そのため、ノードにハートビートを追加し、ハートビートサイクルごとにその情報をデータベースに報告する必要があります。ハートビートのコアコードは次のとおりです。

  1. /**
  2. * ノードハートビート遅延キューを作成する
  3. */
  4. プライベートDelayQueue<DelayItem<Node>> heartBeatQueue = new DelayQueue<>();
  5. /**
  6. * 最大2つのスレッドしか実行されないことが明確にわかるので、システムの組み込みツールを直接使用できます。
  7. */
  8. プライベート ExecutorService bossPool = Executors.newFixedThreadPool(2);
  9.    
  10.  
  11. @投稿コンストラクト
  12. パブリックvoid init() {
  13. /**
  14. * リカバリスレッドスイッチがオンで、ハートビートスイッチもオンの場合
  15. */
  16. config.isRecoverEnable() と config.isHeartBeatEnable() の両方が true の場合、
  17. /**
  18. * ノードをハートビートキューに遅延0で初期化して登録する
  19. */
  20. heartBeatQueue.offer(新しい DelayItem<>(0、新しい Node(config.getNodeId())));
  21. /**
  22. * ハートビートスレッドを実行する
  23. */
  24. bossPool.execute (新しいHeartBeat ());
  25. /**
  26. * 例外回復スレッドを実行する
  27. */
  28. bossPool.execute (新しい回復());
  29. }
  30. }
  31.  
  32. HeartBeatクラスはRunnableを実装します{
  33. @オーバーライド
  34. パブリックボイド実行(){
  35. のために(;;) {
  36. 試す {
  37. /**
  38. * 時間が経過すると、ノード オブジェクトを遅延キューから取り出し、時間とシーケンス番号を更新できます。
  39. * 最後に、ハートビート周期と同じタイムアウト期間を持つ新しいノードオブジェクトを作成し、それを遅延キューに入れて循環ハートビートを形成します。
  40. */
  41. DelayItem<Node> アイテム = heartBeatQueue.take();
  42. if(item != null && item.getItem() != null ) {
  43. ノード node = item.getItem();
  44. handHeartBeat(ノード);
  45. }
  46. heartBeatQueue.offer(新しい DelayItem<>(config.getHeartBeatSeconds() * 1000、新しい Node(config.getNodeId())));
  47. } キャッチ (例外 e) {
  48. logger.error( "タスクのハートビート エラー、原因:{} " 、e);
  49. }
  50. }
  51. }
  52. }
  53.  
  54. /**
  55. * プロセスノードのハートビート
  56. * @param ノード
  57. */
  58. プライベートvoid handHeartBeat(Nodeノード) {
  59. if(ノード == null ) {
  60. 戻る;
  61. }
  62. /**
  63. * まずこのノードがデータベースに存在するかどうかを確認します
  64. * 存在しない場合: まず次のシーケンス番号を見つけ、それをノードオブジェクトに設定し、最後に挿入します
  65. * 存在する場合: nodeIdに従って現在のノードのシーケンス番号と時間を直接更新します
  66. */
  67. ノード currNode = nodeRepository.getByNodeId(node.getNodeId());
  68. (currNode == null )の場合{
  69. ノードの行番号を設定します。
  70. ノードリポジトリ。挿入(ノード);
  71. }それ以外{
  72. nodeRepository.updateHeartBeat(node.getNodeId());
  73. }
  74.  
  75. }

データベースにノード情報が保存されたら、さまざまな高度なタスク取得戦略を実装できます。コードは次のとおりです。

  1. /**
  2. * 抽象戦略インターフェース
  3. * @著者 ロンディ
  4. * @日付2019-03-16 12:36
  5. */
  6. パブリックインターフェース戦略{
  7.  
  8. /**
  9. * デフォルトポリシー
  10. */
  11. 文字列DEFAULT = "default" ;
  12.  
  13. /**
  14. * タスクIDハッシュの残りを自分のノード番号と一致させます
  15. */
  16. 文字列 ID_HASH = "id_hash" ;
  17.  
  18. /**
  19. * 最小実行回数
  20. */
  21. 文字列 LEAST_COUNT = "least_count" ;
  22.  
  23. /**
  24. * ノードの重みによって
  25. */
  26. 文字列 WEIGHT = "重量" ;
  27.  
  28.  
  29. 公共 静的戦略選択(文字列キー) {
  30. スイッチ(キー) {
  31. ケースID_HASH:
  32. 新しいIdHashStrategy()を返します
  33. LEAST_COUNTの場合:
  34. 新しい LeastCountStrategy()を返します
  35. ケース重量:
  36. 新しいWeightStrategy()を返します
  37. デフォルト
  38. 新しい DefaultStrategy()を返します
  39. }
  40. }
  41.  
  42. パブリックブール値 accept(List<Node> ノード、Task タスク、Long myNodeId);
  43.  
  44. }

  1. /**
  2. * タスク ID ハッシュ方式に従って有効なノード数の残りを取得し、その残りに 1 を加えて各ノードのシーケンス番号と一致させます。
  3. * この方法は、タスクIDが自己増加するため、実際にはポーリングと同等です。
  4. * @著者 ロンディ
  5. * @日付2019-03-16
  6. */
  7. パブリッククラスIdHashStrategyはStrategyを実装します{
  8.  
  9. /**
  10. * ここでのノードコレクションは、外部スケジューラによって決定されるため空であってはならず、ノードIDの昇順でソートされます。
  11. */
  12. @オーバーライド
  13. パブリックブール値 accept(List<Node> ノード、タスクタスク、Long myNodeId) {
  14. 整数 サイズ= nodes.size () ;
  15. 長いタスクID = task.getId();
  16. /**
  17. * 自分のノードを見つける
  18. */
  19. ノード myNode = nodes.stream().filter(node ​​-> node.getNodeId() == myNodeId).findFirst().get();
  20. myNode == nullを返しますか? false : (taskId %サイズ) + 1 == myNode.getRownum();
  21. }
  22.  
  23. }

  1. /**
  2. * 最小数のタスクを処理する戦略、つまり、タスクが来るたびに、最小数のタスクを処理したかどうかを確認し、そうであれば、このタスクを消費することができます。
  3. * @著者 ロンディ
  4. * @日付2019-03-16 21:56
  5. */
  6. パブリッククラスLeastCountStrategyはStrategyを実装します{
  7.  
  8. @オーバーライド
  9. パブリックブール値 accept(List<Node> ノード、タスクタスク、Long myNodeId) {
  10.  
  11. /**
  12. * 回数が最も少ないノードを取得します。ここでは、カウントの昇順で並べ替えてから最初の要素を取得するのと比較することができます。
  13. * その後、 trueを返します 
  14. */
  15. オプション<Node> min = nodes.stream()。 min ((o1, o2) -> o1.getCounts().compareTo(o2.getCounts()));
  16.  
  17. 戻る 最小.isPresent()? min .get().getNodeId() == myNodeId : false ;
  18. }
  19.  
  20. }

  1. /**
  2. * 加重配分戦略によると、計画は次のようになります。
  3. * ノード番号 1、2、3、4
  4. * ノードの重み 2,3,3,2
  5. * 次に余り 0,1 を取ります | 2,3,4 | 5,6,7 | 8,9
  6. * シーケンス番号1は、2未満の重みを法として合計を消費することができます。
  7. * シリアル番号2は、2以上2+3未満の重みを法として合計を消費できます。
  8. * シリアル番号3は、2+3以上2+3+3未満の重みを法として合計を消費できます。
  9. * シリアル番号3は、重みを法とした合計を消費することができ、これは2+3+3以上2+3+3+2未満である。
  10. * 概要: このノードが消費できる範囲は、前のノードの重みの合計以上で、自身の重みの合計より小さい重みの合計を法とする重みの合計です。
  11. * もっと良いアルゴリズムのアイデアを持っている偉大な神はいるのだろうか
  12. * @著者 ロンディ
  13. * @日付2019-03-16 23:16
  14. */
  15. パブリッククラス WeightStrategy は Strategy を実装します {
  16.  
  17. @オーバーライド
  18. パブリックブール値 accept(List<Node> ノード、タスクタスク、Long myNodeId) {
  19. ノード myNode = nodes.stream().filter(node ​​-> node.getNodeId() == myNodeId).findFirst().get();
  20. myNode がnull場合
  21. 戻る 間違い;
  22. }
  23. /**
  24. * このノードの前のノードの重みと値を計算する
  25. */
  26. int preWeightSum = nodes.stream().filter(node ​​-> node.getRownum() < myNode.getRownum()).collect(Collectors.summingInt(Node::getWeight));
  27. /**
  28. * すべての重みの合計を計算する
  29. */
  30. 重み合計を nodes.stream() で収集します (Collectors.summingInt(Node::getWeight))。
  31. /**
  32. * 加重合計の残りを計算する
  33. */
  34. int残り = ( int )(task.getId() % weightSum);
  35. 残り >= preWeightSum && 残り < preWeightSum + myNode.getWeight()を返します
  36. }
  37.  
  38. }

次にスケジューリングクラスを変換します

  1. /**
  2. * タスクの戦略を取得する
  3. */
  4. プライベート戦略戦略;
  5.  
  6.  
  7. @投稿コンストラクト
  8. パブリックvoid init() {
  9. /**
  10. * 構成に基づいてタスク戦略を取得するノードを選択します
  11. */
  12. 戦略 = Strategy.choose(config.getNodeStrategy());
  13. /**
  14. * カスタム スレッド プール、初期スレッド数 corePoolSize、スレッド プール待機キュー サイズ queueSize、初期スレッドにタスクがあり、待機キューがいっぱいの場合
  15. * スレッド数は最大スレッド数 maxSize まで自動的に拡張され、新しく拡張されたスレッドは 60 秒間アイドル状態になった後に自動的にリサイクルされます。カスタムスレッドプールは、Executorsのスレッドツールによるものです。
  16. * それぞれに欠点があり、本番環境での使用には適していません
  17. */
  18. ワーカープール = 新しい ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, 新しい ArrayBlockingQueue<>(config.getQueueSize()));
  19. /**
  20. * 保留中のタスク読み込みスレッドを実行する
  21. */
  22. bossPool.execute (新しいLoader ());
  23. /**
  24. * タスクスケジューリングスレッドを実行する
  25. */
  26. bossPool.execute (新しいBoss ());
  27.  
  28. }
  29.  
  30. クラス Loader は Runnable を実装します {
  31.  
  32. @オーバーライド
  33. パブリックボイド実行(){
  34. のために(;;) {
  35. 試す {
  36. /**
  37. * まず利用可能なノードのリストを取得します
  38. */
  39. リスト<Node> ノード = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2);
  40. ノードがnullの場合、ノードは空です。
  41. 続く;
  42. }
  43. /**
  44. * 指定した時間(秒単位)に開始されるメインタスクのリストを検索します
  45. */
  46. リスト<Task> タスク = taskRepository.listPeddingTasks(config.getFetchDuration());
  47. タスクがnullの場合、tasks.isEmpty() を実行します。
  48. 続く;
  49. }
  50. (タスク task:tasks)の場合{
  51.  
  52. ブール値の accept = strategy.accept(ノード、タスク、config.getNodeId());
  53. /**
  54. * 取るべきではないものを取ってはいけない
  55. */
  56. 受け入れる場合
  57. 続く;
  58. }
  59. task.setStatus(TaskStatus.DOING);
  60. タスクのNodeIdを設定します。
  61. /**
  62. * 楽観的ロックを使用してステータスを更新します。更新が成功した場合、他のノードは正常に更新されません。実行するタスクのリストを照会する場合
  63. * この期間中に一部のノードがこのタスクを更新したため、バージョンは確認時のバージョンと異なる必要があります。ここで更新してください
  64. * 必ず0を返します
  65. */
  66. int n = taskRepository.updateWithVersion(タスク);
  67. 日付nextStartTime = task.getNextStartTime();
  68. if(n == 0 || nextStartTime == null ) {
  69. 続く;
  70. }
  71. /**
  72. * 遅延オブジェクトにカプセル化し、遅延キューに入れる
  73. */
  74. task = taskRepository.get(task.getId());
  75. DelayItem<タスク> delayItem = new DelayItem<タスク>(nextStartTime.getTime() - new Date ().getTime(), task);
  76. タスクキュー.offer(遅延項目);
  77.  
  78. }
  79. スレッドをスリープ状態にします。
  80. } 例外 e をキャッチします。
  81. logger.error( "タスク リストの取得に失敗しました。原因:{}" , e);
  82. }
  83. }
  84. }
  85.  
  86. }

前述のように、さまざまな負荷分散戦略を使用して各ノードが取得するタスクのバランスをとることができ、同じタスクに対するノード間の競合も大幅に軽減されます。

しかし、まだ問題が残っています。ノードがタスクを取得して実行中に更新し、実行が途中まで進んでいるが完了しておらず、例外も発生していない場合、さまざまな理由でこのノードが突然ハングすると、このタスクが再度実行される機会はなくなります。これは、ウンチもせずにトイレを占拠するという伝説の行為です。

この問題は、最終的に一貫性のあるシステムの一般的な方法、つまり異常回復スレッドを使用することで解決できます。このシナリオでは、指定されたハートビート タイムアウト期間 (たとえば、デフォルトの 3 つのハートビート サイクル) 内にハートビート時間を更新していないノードに属する未完了のタスクをチェックし、これらのタスクのステータスを実行保留中に復元し、次の実行時間を現在の時間に変更するだけで済みます。コアコードは次のとおりです。

  1. クラスRecoverはRunnableを実装します{
  2. @オーバーライド
  3. パブリックボイド実行(){
  4. のために(;;) {
  5. 試す {
  6. /**
  7. * 復元する必要があるタスクを見つけます。ここで、復元する必要があるタスクは、完了しておらず、実行ノードが 3 つ以上あるタスクとして定義されます。
  8. * ハートビート周期はハートビート時間を更新しません。これらのタスクはノードの実行により時間内に完了しなかったため、
  9. * タスクを再度スケジュールするには、ステータスを実行保留に戻し、次の実行時間を現在の時間に変更するだけです。
  10. */
  11. リスト<タスク> タスク = taskRepository.listRecoverTasks(config.getHeartBeatSeconds() * 3);
  12. タスクがnullの場合、tasks.isEmpty() を実行します。
  13. 戻る;
  14. }
  15. /**
  16. * まず利用可能なノードのリストを取得します
  17. */
  18. リスト<Node> ノード = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2);
  19. ノードがnullの場合、ノードは空です。
  20. 戻る;
  21. }
  22. 長い maxNodeId = nodes.get( nodes.size () - 1).getNodeId();
  23. for (タスク タスク: タスク) {
  24. /**
  25. * 各ノードにはリカバリ スレッドがあります。不要な競合を避けるために、利用可能なノードからタスクが属するノードに最も近いノードを検索します。
  26. */
  27. 長い currNodeId = chooseNodeId(ノード、maxNodeId、task.getNodeId());
  28. 長い myNodeId = config.getNodeId();
  29. /**
  30. * 現在のノードを処理する必要がない場合は、直接スキップします
  31. */
  32. currNodeId != myNodeId の場合 {
  33. 続く;
  34. }
  35. /**
  36. * タスクのステータスを保留に、ノードを現在のノードに直接変更します
  37. */
  38. task.setStatus(TaskStatus.PENDING);
  39. タスクに次の開始時刻を設定します(新しい日付());
  40. タスクのNodeIdを設定します。
  41. taskRepository.updateWithVersion(タスク);
  42. }
  43. スレッドスリープ(config.getRecoverSeconds() * 1000);
  44. } キャッチ (例外 e) {
  45. logger.error( "次のタスクの取得に失敗しました。原因:{}" , e);
  46. }
  47. }
  48. }
  49.  
  50. }
  51. /**
  52. * 次のノードを選択
  53. * @param ノード
  54. * @param 最大ノードID
  55. * @param ノードID
  56. * @戻る 
  57. */
  58. プライベート long chooseNodeId(List<Node> ノード、long maxNodeId、long nodeId) {
  59. ノードIDがmaxNodeIdより大きい場合
  60. nodes.get(0).getNodeId()を返します
  61. }
  62. 戻り値: nodes.stream().filter(node ​​-> node.getNodeId() > nodeId).findFirst().get().getNodeId();
  63. }

前述のように、同じタスクに対する各ノードの異常回復スレッド間の不要な競合を回避するために、各異常タスクは、そのタスクが属するノード ID の次の正常ノードによってのみ回復できます。これにより、タスクが完了する前にノードがクラッシュした場合でも、一定期間後に自動的に復元できるようになります。

一般的に、上記は最適化を考慮せずに、優れたタスク スケジューリング フレームワークとして機能するはずです。もしそれが全てだと思ったら、ごめんなさいとしか言​​えません。まだありますよ、ハハハ!先ほど、特に Spring アノテーションを使用してスケジュールを記述することに慣れている人にとって、他のタスク スケジューリングを使用することの不便さが嫌いだと述べました。 1 つのクラスに @Scheduled アノテーション付きの n 個のスケジューリング メソッドを記述する可能性があり、これにより変換がさらに面倒になります。分散タスク スケジューリングに直接統合するために、次の方法を実現したいと考えています。

  1. /**
  2. * テストスケジュール機能
  3. * @著者 ロンディ
  4. * @日付2019-03-17 16:54
  5. */
  6. @成分
  7. パブリッククラス SchedulerTest {
  8.  
  9. @Scheduled(cron = "0/10 * * * * ?" )
  10. パブリックvoid test1()はInterruptedExceptionをスローします{
  11. SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
  12. スレッド.sleep(2000);
  13. System.out.println ( "現在の時刻1:" +sdf.format(new Date ())) ;
  14. }
  15.  
  16. @Scheduled(cron = "0/20 * * * * ?" 、親 = "test1" )
  17. パブリックvoid test2()はInterruptedExceptionをスローします{
  18. SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
  19. スレッド.sleep(2000);
  20. System.out.println ( "現在の時刻2:" +sdf.format(new Date ())) ;
  21. }
  22.  
  23. @Scheduled(cron = "0/10 * * * * ?" 、親 = "test2" )
  24. パブリックvoid test3()はInterruptedExceptionをスローします{
  25. SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
  26. スレッド.sleep(2000);
  27. System.out.println ( "現在の時刻 3:" +sdf.format(new Date ())) ;
  28. }
  29.  
  30. @Scheduled(cron = "0/10 * * * * ?" 、親 = "test3" )
  31. パブリックvoid test4() は InterruptedException をスローします {
  32. SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
  33. スレッド.sleep(2000);
  34. System.out.println ( "現在の時刻4:" + sdf.format(new Date ()));
  35. }
  36.  
  37. }

上記の目標を達成するには、Spring の起動後にカスタム アノテーション (spring と同じ名前) もロードする必要があります。コードは次のとおりです

  1. /**
  2. * スプリングコンテナが起動したら、カスタムアノテーションをロードします
  3. * @著者 ロンディ
  4. * @日付2019-03-15 21:07
  5. */
  6. @成分
  7. パブリッククラスContextRefreshedListenerはApplicationListener<ContextRefreshedEvent>を実装します。
  8.  
  9. オートワイヤード
  10. プライベート タスク エグゼキュータ タスク エグゼキュータ;
  11.  
  12. /**
  13. * タスクが挿入された後にメソッド名/タスク名とデータベースID間のマッピングを保存し、サブタスクの追加を処理するために使用されます。
  14. */
  15. プライベート Map<String,Long> taskIdMap = 新しい HashMap<>();
  16.  
  17. @オーバーライド
  18. パブリックvoid onApplicationEvent(ContextRefreshedEventイベント) {
  19. /**
  20. * ルート コンテナーが Spring コンテナーであるかどうかを判別して、2 回の呼び出しを防止します (MVC の読み込みでも 1 回トリガーされます)
  21. */
  22. イベントのgetApplicationContext().getParent()がnull場合
  23. /**
  24. * スケジュールスイッチがオンになっているかどうかを確認する
  25. * 有効にした場合: スケジュール注釈を読み込み、スケジュール管理にスケジュールを追加します
  26. */
  27. アプリケーションコンテキストコンテキスト = event.getApplicationContext();
  28. Map<String,Object> beans = context.getBeansWithAnnotation(org.springframework.scheduling.annotation.EnableScheduling.class);
  29. (豆がnullの場合){
  30. 戻る;
  31. }
  32. /**
  33. * メソッド名とスケジュールアノテーションによって変更されたメソッド間のマッピングを保存するために使用されます
  34. */
  35. Map<String,Method> methodMap = new HashMap<>();
  36. /**
  37. * Service、ControllerなどはすべてComponentを含んでいるため、直接的または間接的にComponentで注釈が付けられたすべてのクラスを検索します。
  38. * スプリングコンテナによって管理されるクラスは、コンポーネントによって直接的または間接的に変更される必要があります。
  39. */
  40. マップ <String,Object> allBeans = context.getBeansWithAnnotation(org.springframework.stereotype.Component.class);
  41. <Map.Entry<String,Object>>を設定します。entries = allBeans.entrySet();
  42. /**
  43. * Beanと内部のメソッドを走査して、Scheduledアノテーションによって変更されたメソッドを見つけ、タスクをタスクスケジューラに配置します。
  44. */
  45. (Map.Entryエントリ:エントリ){
  46. オブジェクト obj = entry.getValue();
  47. クラス clazz = obj.getClass();
  48. メソッド[] methods = clazz.getMethods();
  49. for (メソッド m: メソッド) {
  50. if(m.isAnnotationPresent(Scheduled.class)) {
  51. メソッドMap.put(clazz.getName() + Delimiters.DOT + m.getName(),m);
  52. }
  53. }
  54. }
  55. /**
  56. * スケジュールされた注釈を処理する
  57. */
  58. handleSheduledAnn(メソッドマップ);
  59. /**
  60. * taskIdMapはSpringの起動後に一度だけ使用されるため、ここで直接クリアできます。
  61. */
  62. タスクIDマップをクリアします。
  63. }
  64. }
  65.  
  66. /**
  67. * メソッドマップ内のすべてのメソッドをループする
  68. * @param メソッドマップ
  69. */
  70. プライベート void handleSheduledAnn(Map<String,Method> methodMap) {
  71. メソッドマップがnullの場合|| メソッドマップが空の場合 () {
  72. 戻る;
  73. }
  74. <Map.Entry<String,Method>>を設定します。entries = methodMap.entrySet();
  75. /**
  76. * Beanと内部のメソッドを走査して、Scheduledアノテーションによって変更されたメソッドを見つけ、タスクをタスクスケジューラに配置します。
  77. */
  78. ( Map.Entry<String,Method> エントリ:エントリ){
  79. メソッド m = entry.getValue();
  80. 試す {
  81. メソッドマップのmを処理します。
  82. } キャッチ (例外 e) {
  83. e.printStackTrace();
  84. 続く;
  85. }
  86. }
  87. }
  88.  
  89. /**
  90. * 親タスクと子タスクを再帰的に追加する
  91. * @param メソッドマップ
  92. * @param m
  93. * @例外をスローします
  94. */
  95. private void handleSheduledAnn(Map<String,Method> methodMap,Method m) は例外をスローします {
  96. クラス<?> clazz = m.getDeclaringClass();
  97. 文字列= m.getName();
  98. スケジュールされた sAnn = m.getAnnotation(Scheduled.class);
  99. 文字列 cron = sAnn.cron();
  100. 文字列親 = sAnn.parent();
  101. /**
  102. * 親が空の場合、このメソッドによって表されるタスクはルートタスクであり、タスクスケジューラに追加され、グローバルマップに保存されることを意味します。
  103. * 親が空でない場合はサブタスクであることを意味し、サブタスクは親タスクのIDを知る必要があります。
  104. * まず、親によって表されるフルネームまたはメソッド名を使用します(親タスクメソッドと子タスクメソッドは、同じクラス内のメソッド名で直接使用できます。
  105. * それ以外の場合は、クラスのフルネームを指定する必要があります) taskIdMapから親タスクIDを取得します
  106. * 親タスクIDが見つからない場合は、まず親メソッドのフルネームに従ってmethodMapで親タスクのメソッドオブジェクトを見つけ、このメソッドを再帰的に呼び出します。
  107. * 親タスクIDが見つかった場合は、サブタスクを追加します
  108. */
  109. if(StringUtils.isEmpty(親)) {
  110. if(!taskIdMap.containsKey(clazz.getName() + Delimiters.DOT +名前)) {
  111. Long taskId = taskExecutor.addTask( name 、 cron 、 new Invocation(clazz、 name 、 new Class[]{}、 new Object[]{}));
  112. taskIdMap.put(clazz.getName() + Delimiters.DOT +名前、taskId);
  113. }
  114. }それ以外{
  115. 文字列 parentMethodName = parent.lastIndexOf(Delimiters.DOT) == -1 ? clazz.getName() + Delimiters.DOT + 親 : 親;
  116. 長い親タスク ID = taskIdMap.get(親メソッド名);
  117. 親タスクIDnullの場合
  118. メソッド parentMethod = methodMap.get(parentMethodName);
  119. handleSheduledAnn(メソッドマップ、親メソッド);
  120. /**
  121. * 再帰的に戻るときは必ず親タスクIDを更新してください
  122. */
  123. 親タスク ID = taskIdMap.get(親メソッド名);
  124. }
  125. if(parentTaskId != null && !taskIdMap.containsKey(clazz.getName() + Delimiters.DOT + name )) {
  126. Long taskId = taskExecutor.addChildTask(parentTaskId、 name 、cron、新しいInvocation(clazz、 name 、新しいClass[]{}、新しいObject[]{}));
  127. taskIdMap.put(clazz.getName() + Delimiters.DOT +名前、taskId);
  128. }
  129.  
  130. }
  131.  
  132.  
  133. }
  134. }

上記のコードは、Spring の初期化が完了した後に独自のカスタム タスク スケジューリングをロードするアノテーションを完了し、Spring のスケジューリング スイッチ @EnableScheduling によっても制御され、Spring または Springboot へのシームレスな統合を実現し、私のような怠け者の要件を満たします。

実は、このフレームワークを書くのに余暇を 5 日ほど費やしました。隠れた落とし穴がいくつかあると思いますが、明らかな落とし穴は自分で解決しました。オープンソース化の目的は、議論を刺激し、大多数のプログラマーに新しいアイデアを提供することです。皆様のお役に立てれば幸いです。同時に、皆さんがバグを見つけて一緒に改善してくれることを願っています。無視してください、マスター。

<<:  産業インターネット参入の背景:Xunzhong Sharesは5G時代に新たな道をどのように切り開くのか?

>>:  SalesEasy CRMを使用して、大量の販売リードに迅速かつ効率的に対応します。

推薦する

最適化、プロモーション、入札の違いは何ですか?

昨日の朝、コンサルタントとして働いている姉がQQで私を追加してきて、「最適化、プロモーション、入札の...

タイム・ウェルス・ネットワークの周瑜氏:タオバオ・ウィトキーには勝ち目がない

最近、IT茶室のネットユーザーが、タオバオが第三次産業電子商取引プラットフォームの拡大に力を入れ始め...

ウェブサイトのランキング低下を時間内に改善する方法

はじめに: ウェブサイトのプロモーションと運用の過程で、不適切な運用が原因で、検索エンジンでのウェブ...

百度の25回目のアルゴリズムアップグレードが医療業界に与える影響とその意味

6月28日の事件以来、百度はユーザーエクスペリエンスとネットワーク環境の向上、そしてユーザーへのより...

SEOを学ぶ初心者は、基本的なコード知識と基本的なSEOの考え方を組み合わせる必要があります。

ご存知のとおり、電子商取引の活発な発展に伴い、多くの企業がインターネット上の競争に参戦しています。S...

Kubernetes デプロイメントを管理するための 15 個のツール

[51CTO.com クイック翻訳] Kubernetes は、コンテナ化されたアプリケーションを大...

hostodo: KVM NVMe シリーズ VPS は年間 16.99 ドルから、有料パネル DirectAdmin は生涯無料

hostodo から送られてきたプロモーションメールによると、KVM NVMe シリーズの VPS ...

Dockerの軽量仮想化、イメージ、コンテナの詳細な説明

仮想化技術とは何ですか?サーバーの場合、リソースはほとんどの場合アイドル状態であり、十分に活用されて...

雲突騰超融合が数千万ドル相当の新たな受注を獲得

ソフトウェア定義は、IT インフラストラクチャの変革の前兆となっています。ソフトウェア定義ネットワー...

ユニークなウェブサイトを作成する方法

インターネットの誕生以来、さまざまなウェブサイトが雨後の筍のように出現してきましたが、ウェブサイトの...

SEO 担当者が知っておくべき 6 つの「スパイダー トラップ」

SEO に携わる人なら誰でも、Web ページがユーザーによって検索されるためには適切なキーワードを選...

APPランキング操作調査:大手企業は毎月50万~60万元を費やして分配

過去2年間、Xu Huaizhe氏とLiu Xiong氏は、外部から見ると非常に謎めいたビジネス、つ...

BAT の春節紅包のレビュー: To C から To B まで!

春節の連休が過ぎ、2019年の春節紅包戦も終わりを迎えました。今年のBAT紅包戦略を振り返ると、新た...

従来の企業は、インターネット マーケティングのために信頼できる代理店をどのように選択するのでしょうか?

月給5,000~50,000のこれらのプロジェクトはあなたの将来ですインターネットマーケティングに注...

Linux の仮想メモリと物理メモリを理解する

[[286810]]仮想メモリ:理解の第一段階1. 各プロセスにはそれぞれ独立した 4G のメモリ空...