Curatorのソースコードから群集効果まで

Curatorのソースコードから群集効果まで

1. はじめに

Curator は、Zookeeper を操作し、分散ロックや選挙などの高度な機能をカプセル化する、Java で記述されたクライアント ツールです。

今日は主に分散ロックの主な原理を分析します。分散ロックの紹介やその他の実装については、興味のある学生は以下の記事をお読みください。

私は数万語を費やして、Redis で分散ロックを実装する困難な道を、単一マシンからマスタースレーブ、複数インスタンスまで歩き回りましたが、多くの問題が発生することがわかりました_Yangyang のブログ - CSDN ブログ

Redisson の再入とロック更新のソース コード分析_Yang Yang のブログ - CSDN ブログ

Curator を使用して分散ロックを取得する場合、Curator は指定されたパスの下に順序付けられた一時ノードを作成します。ノードが最小の場合、ロックが正常に取得されたことを意味します。

次に、準備として、一時ノードが作成されるかどうかを確認します。

2. 準備

まず、Zookeeper クラスターを構築する必要がありますが、もちろん単一のマシンを使用することもできます。

この記事では、「インタビューアー:Zookeeper 選挙の図を描いてもらえますか?」というタイトルで、docker-compose を使って素早く zk クラスターを構築する方法が紹介されています。

pom に依存関係を導入します。

  1. <依存関係>
  2. <グループ ID>org.apache.curator</グループ ID>
  3. <artifactId>キュレーターレシピ</artifactId>
  4. <バージョン>2.12.0</バージョン>
  5. </依存関係>

Curator クライアントの構成項目:

  1. /**
  2. * @著者 qcy
  3. *@作成2022/01/01 22:59:34
  4. */
  5. @構成
  6. パブリッククラス CuratorFrameworkConfig {
  7.  
  8. //zk ノードアドレス
  9. プライベート静的最終文字列 CONNECT_STRING = "localhost:2181,localhost:2182,localhost:2183" ;
  10. //接続タイムアウト(単位:ミリ秒)
  11. プライベート静的最終int CONNECTION_TIME_OUT_MS = 10 * 1000;
  12. //セッションタイムアウト(単位:ミリ秒)
  13. プライベート静的最終int SESSION_TIME_OUT_MS = 30 * 1000;
  14. //再試行の初期待機時間(単位:ミリ秒)
  15. プライベート静的最終int BASE_SLEEP_TIME_MS = 2 * 1000;
  16. //再試行の最大回数
  17. プライベート静的最終int MAX_RETRIES = 3;
  18.  
  19. @ビーン
  20. パブリックCuratorFramework getCuratorFramework() {
  21. CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
  22. .connectString(接続文字列)
  23. .connectionTimeoutMs(接続タイムアウトMS)
  24. .sessionTimeoutMs(セッションタイムアウトMS)
  25. .retryPolicy(新しいExponentialBackoffRetry(BASE_SLEEP_TIME_MS、MAX_RETRIES))
  26. 。建てる();
  27. curatorFramework を開始します。
  28. curatorFrameworkを返します
  29. }
  30.      
  31. }

SESSION_TIME_OUT_MS パラメータは、クライアントがロックを取得した後に突然クラッシュした場合でも、zk がその時間内に現在のクライアントによって作成された一時的な順序付きノードを削除できるようにします。

テストコードは次のとおりです。

  1. // 一時的なノードパス、qcy はブロガーの名前の略称です
  2. プライベート静的最終文字列 LOCK_PATH = "/lockqcy" ;
  3.  
  4. @リソース
  5. キュレーターフレームワーク キュレーターフレームワーク;
  6.  
  7. パブリックvoid testCurator() 例外をスローします {
  8. InterProcessMutex interProcessMutex = 新しい InterProcessMutex(curatorFramework, LOCK_PATH);
  9. インタープロセスミューテックスを取得します。
  10.  
  11. 試す {
  12. //シミュレーション業務時間
  13. スレッドをスリープ状態にします(30 * 1000);
  14. } キャッチ (例外 e) {
  15. e.printStackTrace();
  16. ついに
  17. プロセスミューテックスを解放します。
  18. }
  19. }

インターフェースを使用してこのメ​​ソッドを呼び出すときは、Thread.sleep にブレークポイントを設定し、zk コンテナーに入って作成されたノードを観察します。

docker exec -it zk container name/bin/bash を使用してコンテナに対話モードで入り、./bin/zkCli.sh を使用して zk サーバーに接続します。

次にls pathを使用してノードを表示します

これら 3 つのノードは永続ノードです。 get path を使用して、ノードのデータ構造情報を表示できます。

ノードの ephemeralOwner 値が 0 の場合、つまりノードの一時所有者のセッション ID が 0 の場合、そのノードは永続ノードであることを意味します。

ブレークポイント Thread.sleep に到達したとき、lockqcy の下に一時ノードが作成されたことがわかりました。

この時点で準備は完了です。次に、interProcessMutex.acquireとreleaseのプロセスを分析してみましょう。

3. ソースコード分析

Curatorは、次のような複数のタイプのロックをサポートしています。

  • InterProcessMutex、再入可能ロック、排他ロック
  • InterProcessReadWriteLock、読み取り書き込みロック
  • InterProcessSemaphoreMutex、非再入可能排他ロック

今日は主に InterProcessMutex のロックとロック解除のプロセスを分析します。まずロックのプロセスを見てみましょう

ロック

  1. パブリックvoid acquire() は例外をスローします {
  2. 内部ロックが-1の場合、 nullの場合
  3. throw new IOException( "ロックを取得中に接続が失われました: " + basePath);
  4. }
  5. }

これはブロッキング ロックの取得です。ロックを取得できない場合は、ブロックが継続されます。したがって、internalLock メソッドの場合、タイムアウトは -1 に設定され、時間単位は null に設定されます。

  1. private boolean internalLock(long time , TimeUnit unit) は例外をスローします {
  2. スレッド currentThread = Thread.currentThread();
  3. //スレッドのLockData情報がマップから取得できるかどうかによって、スレッドがすでにロックを保持しているかどうかを判断します。
  4. ロックデータ lockData = threadData.get(currentThread);
  5. ロックデータ != null の場合{
  6. //再入可能、ロック成功に直接戻る
  7. ロックデータ.ロックカウント.増加および取得();
  8. 戻る 真実;
  9. }
  10. //ロック
  11. 文字列 lockPath = internals.attemptLock( time , unit, getLockNodeBytes());
  12. lockPath != null の場合{
  13. //ロックに成功しました。マップに保存します
  14. LockData newLockData = 新しい LockData(currentThread、lockPath);
  15. threadData.put(現在のスレッド、新しいロックデータ);
  16. 戻る 真実;
  17. }
  18.  
  19. 戻る 間違い;
  20. }

このうち、threadData はマップであり、キーはスレッド オブジェクト、値はスレッドにバインドされたロック データです。

LockDataは、ロックスレッドowningThread、再入カウントlockCount、およびロックパスlockPathを格納します。例:

  1. /lockqcy/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-ロック-0000000005
  1. プライベート最終 ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
  2.  
  3. プライベート静的クラスLockData {
  4. 最終スレッド所有スレッド;
  5. 最終的な文字列 lockPath;
  6. 最終的なAtomicInteger lockCount = 新しいAtomicInteger(1);
  7.  
  8. プライベートLockData(スレッド所有スレッド、文字列lockPath) {
  9. this.owningThread = 所有しているスレッド;
  10. ロックパスをロックします。
  11. }
  12. }

internals.attemptLockメソッドを入力する

  1. 文字列 attemptLock(long time , TimeUnit unit, byte[] lockNodeBytes) は例外をスローします {
  2. //開始時間
  3. 最終的な長いstartMillis = System.currentTimeMillis();
  4. //タイムアウト期間をミリ秒に変換します
  5. 最終的な Long millisToWait = (unit != null ) ?ユニット.toMillis(時間) : null ;
  6. //ノードデータ、ここはnull  
  7. 最終的なbyte[] localLockNodeBytes = (revocable.get() != null ) ?新しいバイト[0]: lockNodeBytes;
  8. //再試行回数
  9. 試行回数 = 0;
  10. //パスをロック
  11. 文字列 ourPath = null ;
  12. //ロックを取得するかどうか
  13. ブール値hasTheLock = false ;
  14. //完了しましたか?
  15. ブール値 isDone = false ;
  16.  
  17. while (!isDone) {
  18. 完了 = true ;
  19.  
  20. 試す {
  21. // 一時的に順序付けられたノードを作成し、ノードのパスを返します
  22. // 内部的にクライアントを呼び出します。作成().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
  23. ourPath = driver.createsTheLock(クライアント、パス、localLockNodeBytes);
  24. //返されたノードパスに基づいて、ロックが取得されているかどうかを判断します
  25. ロックがある = internalLockLoop(startMillis、millisToWait、ourPath);
  26. } キャッチ (KeeperException.NoNodeException e) {
  27. //セッションが期限切れになると、ドライバーは一時的に順序付けられたノードを見つけられなくなり、NoNodeException をスローすることがあります。
  28. //ここで再試行
  29. (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++、System.currentTimeMillis() - startMillis、RetryLoop.getDefaultRetrySleeper())) {
  30. isDone = false ;
  31. }それ以外{
  32. eを投げる;
  33. }
  34. }
  35. }
  36. //ロックを取得し、呼び出し元がマップに記録するためのノード パスを返します。
  37. ロックがある場合
  38. ourPathを返します
  39. }
  40.  
  41. 戻る ヌル;
  42. }

次に、先ほど作成した一時的な順序付きノードが internalLockLoop で使用され、ロックが取得されたかどうかが判断されます。

  1. private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) は例外をスローします {
  2. //ロックを取得するかどうか
  3. ブール値 haveTheLock = false ;
  4. ブール型doDelete = false ;
  5. 試す {
  6. (revocable.get() != null ) の場合 {
  7. //今回はここには入りません
  8. client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
  9. }
  10. //ロックの取得を試行し続ける
  11. ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) の間 {
  12. //basePath(ここではlockqcy)の下のすべての一時的に順序付けられたノードを返し、サフィックスに従って昇順に並べます
  13. リスト<String> children = getSortedChildren();
  14. //現在のスレッドによって作成された一時的な順序付きノードの名前を取得します。ここでは /_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005 です。
  15. 文字列sequenceNodeName = ourPath.substring ( basePath.length () + 1);
  16. //ソート後に現在のノードが 1 位であるかどうかを判断します。そもそもロックが取得されていることを意味する
  17. 述語結果 predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
  18. if (述語結果.getsTheLock()) {
  19. //ロックを取得したらループを終了する
  20. ロックを有効にする = true ;
  21. }それ以外{
  22. //これはロックが取得されなかったことを意味します
  23. //現在のノードより小さいインデックスを持つ前のノードを取得します
  24. 文字列 previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
  25.  
  26. 同期された(これ){
  27. 試す {
  28. //前のノードが存在しない場合は、NoNodeExceptionを直接スローし、catchで処理せず、次のラウンドでロックの取得を続行します
  29. // 前のノードが存在する場合は、そのノードの解放イベントをリッスンするリスナーを設定します
  30. client.getData().usingWatcher(watcher).forPath(previousSequencePath);
  31. (ミリストゥウェイト!= null )の場合{
  32. millisToWait -= (System.currentTimeMillis() - startMillis);
  33. startMillis = System.currentTimeMillis();
  34. //タイムアウトしたか判断する
  35. (ミリ秒待機時間 <= 0)の場合{
  36. //ロックタイムアウトを取得し、作成した一時的な順序付きノードを削除します
  37. 削除する = true ;
  38. 壊す;
  39. }
  40. //タイムアウトがない場合は、millisToWait で待機します
  41. 待機(ミリ秒単位の待機)。
  42. }それ以外{
  43. // 無期限にブロックして待機し、前のノードが削除された場合にのみウェイクアップ操作がトリガーされます。
  44. 待って();
  45. }
  46. } キャッチ (KeeperException.NoNodeException e) {
  47. //前のノードが存在しない場合は、NoNodeExceptionを直接スローし、catchで処理せず、次のラウンドでロックの取得を続行します
  48. }
  49. }
  50. }
  51. }
  52. } キャッチ (例外 e) {
  53. ThreadUtils.checkInterrupted(e);
  54. 削除する = true ;
  55. eを投げる;
  56. ついに
  57. 削除する場合
  58. // 先ほど作成した一時的な順序付きノードを削除します
  59. パスを削除します。
  60. }
  61. }
  62. haveTheLock を返します
  63. }

ロックが取得されたかどうかを判断するためのコアロジックは、getsTheLockにあります。

  1. パブリックPredicateResultsはLockを取得します(CuratorFrameworkクライアント、List<String> children、StringsequenceNodeName、 int maxLeases)例外をスローします{
  2. // すべての子ノードがソートされた後の現在のノードのインデックス位置を取得します
  3. int ourIndex = children.indexOf(sequenceNodeName);
  4. // 現在のノードが子ノードにあるかどうかを判定する
  5. シーケンスノード名、インデックスを検証します。
  6. //InterProcessMutexの構築メソッドはmaxLeasesを1に初期化します
  7. //getsTheLock をtrue にするには、ourIndex は 0 である必要があります。つまり、ロックが取得されたことを示すには、現在のノードが basePath の下の最小のノードである必要があります。
  8. ブール値 getsTheLock = ourIndex < maxLeases;
  9. //ロックを取得できない場合は、前のノードの名前が返され、そのノードのモニターが設定されます
  10. 文字列 pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
  11.  
  12. 新しい PredicateResults(pathToWatch、getsTheLock)を返します
  13. }
  14.  
  15. 静的voidのvalidateOurIndex(StringのsequenceNodeName、 intのourIndex)はKeeperExceptionをスローします{
  16. (ourIndex < 0)の場合{
  17. //接続が失われると一時ノードが削除される可能性があるため、これは安全対策です
  18. 新しい KeeperException.NoNodeException( "シーケンシャル パスが見つかりません: " + sequenceNodeName);をスローします。
  19. }
  20. }

では、internalLockLoop で待機しているスレッドはいつ起動できるのでしょうか?

internalLockLoopメソッドでは、

  1. client.getData().usingWatcher(watcher).forPath(前のシーケンスパス);

前のノードにリスナーが設定されます。ノードが削除されると、ウォッチャー内のコールバックがトリガーされます。

  1. プライベートファイナルウォッチャーウォッチャー = 新しいウォッチャー() {
  2. //コールバックメソッド
  3. @オーバーライド
  4. パブリックvoid プロセス(WatchedEvent イベント) {
  5. 通知からWatcher();
  6. }
  7. };
  8.  
  9. プライベート同期voidnotifyFromWatcher() {
  10. //LockInternalsインスタンスを待機しているすべてのスレッドを起動します
  11. すべて通知します();
  12. }

この時点で、ロック プロセスは基本的に分析されており、その概要は次のとおりです。

まず、一時的な順序付きノードを作成します

ノードが basePath 下の最小のノードである場合、ロックが取得されてマップに格納され、次回直接再入力されることを意味します。

ノードが最小のノードでない場合は、前のノードのリスナーを設定して待機します。前のノードが削除されると、通知スレッドに通知されます。

ロック解除

ロック解除のロジックは比較的単純で、リリースメソッドに直接進むだけです

  1. パブリックvoid release() は例外をスローします {
  2. スレッド currentThread = Thread.currentThread();
  3. ロックデータ lockData = threadData.get(currentThread);
  4. ロックデータ== nullの場合
  5. throw new IllegalMonitorStateException( "ロックを所有していません: " + basePath);
  6. }
  7.  
  8. int newLockCount = lockData.lockCount.decrementAndGet();
  9. //再突入回数を直接減らす
  10. (新しいロックカウント > 0) の場合 {
  11. 戻る;
  12. }
  13. (新しいロックカウント < 0) の場合 {
  14. throw new IllegalMonitorStateException( "ロックのロック数が負になりました: " + basePath);
  15. }
  16.  
  17. //ここでは再入回数が0であることを意味します
  18. 試す {
  19. //ロックを解除する
  20. 内部的にロックを解除します(lockData.lockPath);
  21. ついに
  22. //マップから削除
  23. スレッドデータを削除します。(現在のスレッド)
  24. }
  25. }
  26.  
  27. void releaseLock(String lockPath) 例外をスローします {
  28. 取り消し可能に設定します( null ) ;
  29. //内部的に保証を使用して、バックグラウンドでノードの削除を試行し続けます
  30. パスを削除します(パスをロックします);
  31. }

再入回数が 0 より大きい場合は、再入回数を減らします。 0 に減少すると、zk が呼び出されてノードが削除されます。これは、Redisson 再入可能ロックの解放と一致します。

4. 群集効果

ここでは、分散ロックシナリオで群れ効果を実装するためにZookeeperを使用する方法について説明します。

群集効果とは何か

まず第一に、羊の群れは非常に散在した組織であり、目的もなく、管理も不十分であるため、一般的には飼い主が群れを管理するのを助ける牧羊犬が必要になります。

ある時点で、羊の一匹が前方にもっとおいしい草があることに気づいて動き始めると、周囲の状況に関係なく、残りの羊も突進するようになります。

したがって、群集効果とは、一人の人の合理的な行動が他の人を盲目的に追随させ、非合理的な群集行動を引き起こす状況を指します。

Zookeeper の herd 効果とは、znode が変更された後、回避できたはずの大量の監視通知がトリガーされ、クラスター リソースが無駄になるという事実を指します。

ロックを取得できない場合は進化を待つ

しばらく寝る

スレッドがロックの取得に失敗した場合、しばらくスリープしてから再度ロックの取得を試行することができます。

しかし、この方法は非常に非効率的です。

スリープ時間が短いと、ポーリングが頻繁に実行され、リソースが浪費されます。

スリープ時間が長いと、ロックが解除されたのに取得できないという困った状況になります。

したがって、ここでの最適化のポイントは、アクティブ ポーリングを非同期通知にどのように変更するかということです。

ロックされたノードを監視する

すべてのクライアントがロックを取得したい場合、同じ名前のノードのみを作成します。

znode が存在する場合、これらのクライアントはそこにリスナーを設定します。 znode が削除されると、ロックを待機しているすべてのクライアントに通知され、これらのクライアントは再度ロックを取得しようとします。

ここでは非同期通知のために監視メカニズムが使用されていますが、クライアントの数が特に多い場合はパフォーマンスが低下します。

znode が削除されると、その時点で多数のクライアントに通知を送信する必要があります。この期間中、zk に送信された他の通常のリクエストは遅延またはブロックされる可能性があります。

これにより、単一のポイントの変更 (znode の削除) が包括的な影響 (多数のクライアントへの通知) を引き起こす、集団効果が発生します。

したがって、ここでの最適化のポイントは、znode の監視の数をいかに減らすかであり、最良のケースは 1 つだけです。

前の順序のノードを見る

最初に basePath が指定されている場合、ロックを取得しようとするクライアントは、このパスの下に一時的な順序付きノードを直接作成できます。

作成されたノードが最小ノードである場合、ロックが取得されたことを意味します。最小のノードでない場合は、前のノードに対してのみリスナーを設定し、前のノードの削除動作のみをリッスンします。

この方法では、前のノードが削除されると、すべてのクライアントではなく、次のノードによって表されるクライアントにのみ通知が送信されるため、集団効果が回避されます。

群集効果を回避しながら、現在のロックは公平なロックになります。つまり、スレッドが過度に不足するのを避けるために、ロックは適用された順序で取得されます。

5. 追記

この記事では、ソースコードの観点から Curator を使用して分散ロックを取得するプロセスについて説明し、次にロックを待つ進化プロセスの観点から、分散ロック シナリオで群集効果を回避するための Zookeeper のソリューションを分析します。

これは Zookeeper シリーズの 2 番目の記事です。監視原理と ZAB プロトコルの分析に関する記事も今後公開される予定です。

<<:  Cloud Custodian を使用してクラウド ガバナンスをコードとして実装する方法

>>:  クラウドレジリエンスへのアプローチ - システムおよびカオステスト

推薦する

ブラインドボックスビジネスを台無しにしたのは誰ですか?

人々は未知のものに対して好奇心を抱くことが多いため、多くの企業が消費者の好奇心を捉えてブラインドボッ...

企業のホームページ構築が低価格のホームページ構築会社を選ばない理由

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っていますこれまで、...

なぜJikeとPanguは発展の機会を失ったのでしょうか?

少し前に Pangu と Jikesou が合併するという噂を耳にしたことがあるでしょうか。まだ結論...

ウェブサイトは情報システムセキュリティレベル保護期限修正通知ソリューションを受け取りました

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています2018年...

エッジコンピューティングがデジタルビジネスを推進する 5 つの方法

すべての業界で新しい常識が生まれています。IT インフラストラクチャがデジタルに対応していなければ、...

ウェブサイトにインデックスが多ければ多いほど本当に良いのでしょうか?

掲載数が多いほど、ウェブサイトの品質のより良い尺度となるのでしょうか? 以前は、私たち個々のウェブマ...

おすすめ: budgetvm-E3/E5 シリーズ サーバー 25% オフ プロモーション

budgetvm は、2007 年以降、主にサーバーレンタル事業、続いて格安 VPS 事業を展開し、...

ウェブマスターは数年にわたってウェブサイト構築の本当の意味を教えてくれました

私は現在、メインサイトを5つ持っていて、各サイトの基本IPは1日あたり約1000IPです。これを見て...

ウェブマスター: サイトの起動が遅い問題を解決する方法

ウェブサイトを構築する上で最も厄介なことは何でしょうか? そう、ウェブサイトが開けないことです。ウェ...

競合他社よりもウェブサイトを上位にランク付けする方法

これまで、競合サイトを凌駕する方法に関する記事をインターネットでたくさん目にしたことがあるかもしれま...

1API - .red ドメインを 3.45 ドルで登録

1api は .red ドメイン名の新年プロモーションを実施しており、初年度の登録料は 3.45 ド...

Kubernetes 1.17ではボリュームスナップショットとプラグイン管理の簡素化が実現

[51CTO.com クイック翻訳] Kubernetes は、コンテナのデプロイ、スケーリング、管...

クラウド ストレージ アーキテクチャ フレームワーク設計は、アプリケーション ベースのサービス モデルをどのように実現するのでしょうか?

導入クラウド ネイティブの開発によりソフトウェア定義が推進され、その結果、コンピューティング、ネット...

この記事を読めば、分散ロックのさまざまな実装を理解できるようになります。

序文今日は分散ロックについてお話します。インターネット上には関連コンテンツが多数存在しますが、それら...