[[424485]] 1. 分散ロックとは何ですか?- 分散ロックは、次のように読みます。最初に「分散」、次に「ロック」です。
分散: ここでの分散とは、CAP 理論、分散ストレージ、分散トランザクション、分散ロックなど、多くのテクノロジーと理論を含む分散システムを指します。 分散システムとは、ネットワークを介して通信し、共通のタスクを完了するために作業を調整するコンピュータ ノードのグループで構成されるシステムです。 分散システムの出現により、単一のコンピューターでは実行できないコンピューティングおよびストレージ タスクを、安価で一般的なマシンを使用して実行できるようになりました。目標は、より多くのマシンを使用して、より多くのデータを処理することです。 - ロック:そう、そう思うんだ。 Java が学習する最初のロックはおそらく同期ロックです。
セケリーナの一日を説明するJavaエントリーレベルの面接の質問 ロックの使用シナリオの観点から、次の 3 種類のロックを見てみましょう。 スレッド ロック: synchronized はメソッドまたはコード ブロックで使用されます。私たちはそれを「スレッドロック」と呼んでいます。スレッド ロックの実装は、実際にはスレッド間でメモリを共有することによって実現されます。簡単に言えば、アイドルやロックなどの状態を持つメモリ内の整数です。たとえば、 synchronized は、オブジェクト ヘッダーの Mark Word 内のロック ステータス フラグです。ほとんどの Lock 実装クラスには、ステータス フラグとして volatile int state と呼ばれる共有変数があります。 プロセス ロック: 同じオペレーティング システム内の複数のプロセスが共有リソースにアクセスするように制御する場合、プロセスは独立しているため、各プロセスは他のプロセスのリソースにアクセスできず、同期などのスレッド ロックを通じてプロセス ロックを実装することはできません。たとえば、同じ Linux サーバーに複数の Java プロジェクトをデプロイすると、それらのプロジェクトが同時にサーバー上の同じデータにアクセスしたり操作したりする可能性があるため、プロセス ロックが必要になります。一般的に、プロセスの排他制御を実現するには、「ファイル ロック」を使用します。 分散ロック: ユーザー数の増加に伴い、多くのサーバーを追加しました。もともと、スケジュールされた時間に顧客にメールを送信するタスクがありました。制御されていない場合は、各マシンが指定された時間にタスクを 1 回実行し、顧客は N 通の電子メールを受信します。これには、分散ロックによる相互排他が必要です。 文章による説明: 分散ロックは、分散システム間または異なるシステム間の共有リソースへのアクセスを制御するロック実装です。リソースが異なるシステム間または同じシステムの異なるホスト間で共有される場合、一貫性を確保するために相互の干渉を防ぐために相互排他制御が必要になることがよくあります。 分散ロックとは何かがわかったので、次のステップは適切なテクノロジーを選択することです。 2. 分散ロックのやり方分散ロックを実装するには、通常、すべてのクラスター マシンで操作できる外部システムを選択し、各マシンがこの外部システムにアクセスしてロックを申請します。 この外部システムが機能するには、通常、次の要件を満たす必要があります。 - 相互排他: 一度にロックを保持できるのは 1 つのクライアントだけです。
- デッドロックの防止: ロックを保持している間にクライアントがクラッシュし、ロックを積極的に解除しない場合でも、他のクライアントがロックをロックすることができます。したがって、ロックには通常有効期限があります。
- 排他性: ベルを結んだ人が、それを解くことができる必要があります。ドアのロックとロック解除には同じクライアントを使用する必要があります。ロックには鍵が 1 つしか設定できません。クライアント自身のロックは他人によってロック解除することはできませんし、もちろん他人のロックを開けることもできません。
- フォールト トレランス: 外部システムは「脆弱」になりすぎてはいけません。クライアントが外部システムをロックおよびロック解除する前に、外部システムの正常な動作が保証される必要があります。
類似点はこんな感じだと思います: 多くのベンダーが倉庫を借りたいと考えていますが、同時に貸し出せるのは 1 つのベンダーのみであり、キーも 1 つしかありません。また、一定の「リース期間」を設け、その期間満了後に倉庫を回収しなければなりません。もちろん、最も重要なことは、倉庫のドアが破壊されないことです。破壊されなければ、ロックできません。これは分散ロックではないですか? 私は本当にテクノロジーと人生を愛するプログラマーだと感じています〜〜 実際、ロックは基本的に重複した操作(データの一貫性)を防ぐために使用されます。クエリなどのべき等操作では、この作業は必要ありません。 結論に直接: 分散ロックを実装する方法は一般的に 3 つあります。1. データベース オプティミスティック ロック。 2. Redis ベースの分散ロック。 3. ZooKeeper ベースの分散ロック。 しかし、より良いパフォーマンスを追求するために、通常は Redis または Zookeeper を使用することを選択します。 データベースのオブジェクト ロックのパフォーマンスが低いのはなぜかと質問したい学生もいるはずです。 主キーの重複防止やバージョン番号の制御を含むデータベースの楽観的ロックを使用します。しかし、どちらの方法にも長所と短所があります。 主キー競合戦略を使用して重複を防ぐと、同時実行性が非常に高い場合、特にアプリケーション データ テーブルと主キー競合テーブルが同じデータベース内にある場合に、データベースのパフォーマンスに影響が出ます。もう 1 つの方法は、MySQL データベースで主キーの競合防止を使用することです。これにより、同時実行性が高い場合にテーブル ロックが発生する可能性があります。より良い方法は、重複を防ぐためにプログラム内で主キーを生成することです。 バージョン番号戦略の使用 この戦略は、MySQL の MVCC メカニズムに由来しています。この戦略を使用することには何ら問題はありません。唯一の問題は、データ テーブルに大きな影響を与えることです。各テーブルにバージョン番号フィールドを設計し、その都度判定を行う判定SQLを記述する必要があります。 パート3: コーディング 3. Redis ベースの分散ロック実際、Redis の公式サイトでは、すでに実装が公開されています: https://redis.io/topics/distlock では、さまざまな書籍やブログでさまざまな手段を使って Redis で分散ロックを実装しており、より標準化され、より安全な Redlock を使用して実装することが推奨されていると述べています。順を追って見ていきましょう 全員がRedis 2.6.12以降のバージョンを使用していると想定しているため、setnx、expireなどについては説明しません。ロックするコマンドを直接設定します。 - セット キー値[有効期限 EX 秒|PX ミリ秒] [NX|XX]
例えば: - リソース名をランダム値に設定NX PX 30000
SETコマンドの動作は一連のパラメータによって変更できる。 - EX second : キーの有効期限を second 秒に設定します。 SET キー値 EX 秒は、SETEX キー秒値と同じ効果があります。
- PX ミリ秒: キーの有効期限をミリ秒に設定します。 SET キー値 PX ミリ秒は、PSETEX キー ミリ秒値と同じ効果があります。
- NX: キーが存在しない場合にのみ設定します。 SET キー値 NX は、SETNX キー値と同じ効果があります。
- XX: キーがすでに存在する場合にのみ設定します。
この命令は、キー resource_name が存在しない場合は、そのようなキーを作成し、値を my_random_value に設定し、有効期限を 30000 ミリ秒に設定することを意味します。 これは 2 つのことを行いますが、Redis はシングルスレッドであるため、この命令は中断されず、アトミック操作になります。 Redis が分散ロックを実装するための主な手順は次のとおりです。 - ロック タグとしてキーを指定し、Redis に保存し、値として一意の識別子を指定します。
- 値はキーが存在しない場合にのみ設定でき、これにより、相互排他機能が満たされ、同時に 1 つのクライアント プロセスのみがロックを取得することが保証されます。
- システム異常によりキーが削除されるのを防ぐために有効期限を設定し、デッドロック防止機能を満たします。
- 業務処理が完了したら、キーをクリアしてロックを解除する必要があります。鍵をクリアするときは、その価値を検証する必要があり、ベルを結んだ人が解く必要があります。
ランダムな値を設定するということは、キーの値がロック解除時に保存した乱数と同じかどうかを判断することを意味します。同じであれば、それはあなたのロックなので、直接削除してロックを解除できます。 もちろん、これら 2 つの操作はアトミックである必要があるため、Redis は Lua スクリプトを提供します (Redis サーバーは、処理中に他の要求によって Lua スクリプトが中断されないように、単一のスレッドで Lua スクリプトをアトミックに実行します)。 - redis.call( "get" ,KEYS[1]) == ARGV[1]の場合
- redis.call( "del" ,KEYS[1])を返す
- それ以外
- 0を返す
- 終わり
質問:まず、次の 2 つの質問について考えてみましょう。 ロックを取得する場合、適切な有効期限はどれくらいですか? 適切な時間を見積もるのは簡単ではありません。たとえば、リソースを操作するのに最も時間がかかる時間は 10 秒ですが、期限が 5 秒で切れるように設定すると、ロックが早期に期限切れになるリスクがあります。まずこの質問に注目し、Javaer がコード内で Redis ロックをどのように使用するかを見てみましょう。 フォールトトレランスを確保するにはどうすればよいでしょうか? Redis がクラッシュしたらどうすればいいですか?マスタースレーブやクラスタを使うように言われるかもしれませんが、そのような極端な状況もあるでしょう。マスターノードをロックするとクラッシュします。この時点ではスレーブノードに時間的に同期されていません。マスター/スレーブ切り替え後も、ロックは失われます。 この2つの質問について考えてみましょう Redisson実装コードredisson は、Redis の公式分散ロック コンポーネントです。 GitHub アドレス: https://github.com/redisson/redisson Redisson は、Redis をベースに実装された Java インメモリ データ グリッドです。一連の分散 Java 共通オブジェクトを提供するだけでなく、再入可能ロック、フェア ロック、インターロック (MultiLock)、レッド ロック (RedLock)、読み取り/書き込みロック (ReadWriteLock) などを実装し、多くの分散サービスも提供します。 Redisson は、Redis を使用するための最もシンプルで便利な方法を提供します。 Redisson の目的は、Redis に対するユーザーの関心の分離を促進し、ユーザーがビジネス ロジックの処理に集中できるようにすることです。 Redisson は現在非常に強力であり、github wiki も非常に詳細です。分散ロックの概要については、「分散ロックとシンクロナイザー」を参照してください。 Redisson は、シングルポイント モード、マスター スレーブ モード、センチネル モード、クラスター モードをサポートしていますが、構成は異なります。シングルポイントモードでの使い方を見てみましょう。コードは非常にシンプルで、パッケージ化されています。そのまま使用してください。詳細なデモは github: starfish-learn-redisson に掲載してあるので、ここでは手順を追って説明しません。 - RLock ロック = redisson.getLock( "myLock" );
RLock はさまざまなロック方法を提供します。このインターフェースメソッドを解釈してみましょう。 注: コードのバージョンは 3.16.2 です。 JDKのLockインターフェースとReddsionの非同期ロックインターフェースRLockAsyncを継承していることがわかります(今のところこれについては学習しません) RLock - パブリックインターフェースRLockはLock、RLockAsyncを拡張します{
-
- /**
- * ロック名を取得する
- */
- 文字列 getName();
-
- /**
- ※これは端末ロック操作と呼ばれ、ロックが中断される可能性があることを意味します。 AとBが同時にこのメソッドを呼び出し、Aがロックを取得し、Bがロックを取得しなかった場合、スレッドBは
- * Thread.currentThread().interrupt();メソッドは実際にスレッドを中断する
- */
- void lockInterruptibly(longleasingTime, TimeUnit unit) は InterruptedException をスローします。
-
- /**
- * これは最も一般的に使用されるはずです。ロックを取得してみてください
- * waitTimeout ロックの取得を試行する際の最大待機時間。この値を超えるとロック取得に失敗したとみなされます。
- * リースタイム: ロック保持時間。ロックがこの時間を超えると自動的に無効になります(ロックの有効期間内に業務を処理できるように、業務処理時間よりも大きい値を設定する必要があります)
- */
- boolean tryLock(long waitTime, longleasingTime, TimeUnit unit) は InterruptedException をスローします。
-
- /**
- * ロックの有効期間はleaseTimeに設定されており、期限が切れると自動的に無効になります
- * リースタイムが-1に設定されている場合、アクティブな有効期限がないことを意味します
- */
- void lock(long リース時間、TimeUnit 単位);
-
- /**
- *ロックの状態に関係なくロックを解除します
- */
- ブール型の強制ロック解除();
-
- /**
- * 別のスレッドによってロックされていないか確認する
- */
- ブール値 isLocked();
-
- /**
- * 現在のスレッドがロックを保持しているかどうかを確認する
- */
- ブール値 isHeldByCurrentThread();
-
- /**
- * これは明らかです。指定されたスレッドがロックを保持しているかどうかを確認します
- */
- ブール型 isHeldByThread(long threadId);
-
- /**
- * 現在のスレッドがロックを保持している回数を返します
- */
- 保持カウントを取得します。
-
- /**
- * ロックの残り時間を返します
- * @戻る 時間 ミリ秒単位
- * ロックが存在しない場合は -2 。
- * ロックは存在するが、関連付けられている有効期限がない場合は -1 になります。
- */
- 長いremainTimeToLive();
-
- }
デモ とても簡単です。 Redisson はパッケージ化されており、非常に使いやすいです。マスタースレーブ、センチネル、またはクラスターを使用する場合、唯一の違いは構成です。 原理 ソースコードを読むためのヒント: ソースコードを自分のリポジトリにフォークし、ローカルでプルして読み、注釈を付けてから自分のリポジトリに送信するのが最適です。これは、後で表示するときにも便利です。面倒なことが嫌なら、私のJstarfish/redissonを直接見ることもできます RLockのクラス関係を初めて見る ソース コードをたどると、RedissonLock は RLock の直接実装であり、ロックおよびロック解除操作のコア クラスでもあることがわかります。 ロック 主なロック方法は次の 2 つです。違いは非常に単純で、1 つには待機時間があり、もう 1 つには待機時間がありません。そのため、複雑な方を選択して見ていきます (ソース コードには他のほとんどの方法が含まれています)。 - boolean tryLock(long waitTime, longleasingTime, TimeUnit unit) は InterruptedException をスローします。
- void lock(long リース時間、TimeUnit 単位);
RedissonLock.tryLock - @オーバーライド
- パブリックブール型 tryLock(long waitTime, longleasingTime, TimeUnit unit) は InterruptedException をスローします {
- // ロックを待つ最大時間を取得します
- 長い時間= unit.toMillis(waitTime);
- 長い現在の= System.currentTimeMillis();
- //現在のスレッド ID を取得します (ロックを再入力できるかどうかを判断するキー)
- 長いスレッド ID = Thread.currentThread().getId();
- // [コアポイント 1] ロックの取得を試みます。戻り値がnullの場合、ロックが取得されたことを意味します。返される ttl は、キーの残りの存続時間です。
- Long ttl = tryAcquire(waitTime、leaseTime、unit、threadId);
- ttl == null の場合
- 戻る 真実;
- }
- // 許容可能な待機時間 = ロック取得の最大許容待機時間 - 上記の操作プロセスを完了するまでの時間
- 時間-= System.currentTimeMillis() -現在の値;
- もし (時間<= 0 ) {
- // 待つことができないので、失敗を直接返す
- 取得に失敗しました(待機時間、ユニット、スレッドID);
- 戻る 間違い;
- }
-
- 現在の= System.currentTimeMillis();
- /**
- * [コアポイント2]
- * ロック解除メッセージ redisson_lock__channel:{$ KEY } をサブスクライブし、await メソッドを通じてロックが解除されるのをブロックして待機します。これにより、無効なロックの適用とリソースの浪費の問題が解決されます。
- * 情報量に基づいて、ロックが他のリソースによって占有されている場合、現在のスレッドは Redis チャネルを介してロック解放イベントをサブスクライブします。ロックが解除されると、待機中のスレッドに完了を通知するメッセージを送信します。
- * this.awaitがfalseを返す場合、待機時間がロック取得の最大待機時間を超えたことを意味し、サブスクリプションをキャンセルし、ロック取得失敗を返します。
- * this.await がtrueを返すと、ループに入り、ロックを取得しようとします。
- */
- RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
- //await メソッドは CountDownLatch を使用してブロッキングを実装し、subscribe の非同期実行の結果を取得します (Netty の Future を使用)
- if (!subscribeFuture.await(時間、TimeUnit.MILLISECONDS)) {
- subscribeFuture.cancel( false )の場合
- subscribeFuture.onComplete((res, e) -> {
- e == nullの場合{
- 購読を解除します(subscribeFuture、スレッドID);
- }
- });
- }
- 取得に失敗しました(待機時間、ユニット、スレッドID);
- 戻る 間違い;
- }
-
- // ttl が空ではない場合、そのようなキーがすでに存在し、ブロックして待機することしかできないことを示します。
- 試す {
- 時間-= System.currentTimeMillis() -現在の値;
- もし (時間<= 0 ) {
- 取得に失敗しました(待機時間、ユニット、スレッドID);
- 戻る 間違い;
- }
-
- // 無限ループを作成し、ロックの取得を試行し続けます
- (真)の間{
- 長いcurrentTime = System.currentTimeMillis();
- ttl = tryAcquire(waitTime、leaseTime、unit、threadId);
- ttl == null の場合
- 戻る 真実;
- }
-
- 時間-= System.currentTimeMillis() - 現在の時間;
- もし (時間<= 0 ) {
- 取得に失敗しました(待機時間、ユニット、スレッドID);
- 戻る 間違い;
- }
-
- 現在の時刻 = System.currentTimeMillis();
-
- /**
- * [ポイント3] ロックTTLに応じてブロッキング待機時間を調整します。
- * 1. ラッチは実際にはセマフォです。 tryAcquire メソッドを呼び出すと、while ループ内での頻繁なロック要求を回避するために、現在のスレッドが一定期間ブロックされます。
- * 他のスレッドが占有しているロックを解放すると、ロック解除メッセージをブロードキャストし、リスナーはロック解除メッセージを受信し、セマフォを解放します。これにより、最終的にここでブロックされているスレッドが起動します。
- * 2. セマフォの解放メソッドは、サブスクリプションロック解除メッセージのリスナーメッセージ処理メソッド org.redisson.pubsub.LockPubSub#onMessage によって呼び出されます。
- */
- //セマフォ メソッドを呼び出して、ロック待機時間とリース時間のうち短い方の間、スレッドをブロックします。
- ttl >= 0 && ttl <時間 の場合{
- subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- }それ以外{
- subscribeFuture.getNow().getLatch().tryAcquire(時間、TimeUnit.MILLISECONDS);
- }
-
- 時間-= System.currentTimeMillis() - 現在の時間;
- もし (時間<= 0 ) {
- 取得に失敗しました(待機時間、ユニット、スレッドID);
- 戻る 間違い;
- }
- }
- ついに
- // ロックを取得するか、割り込み例外をスローし、redisson_lock__channel:{$ KEY } を登録解除し、ロック解除イベントに注意を払わなくなります。
- 購読を解除します(subscribeFuture、スレッドID);
- }
- }
次に、コメントに記載されている 3 つの主要なポイントを見てみましょう。 コアポイント 1 - ロックを試行: RedissonLock.tryAcquireAsync - プライベート <T> RFuture<Long> tryAcquireAsync(long waitTime, longleasingTime, TimeUnit unit, long threadId) {
- RFuture<Long> ttlRemainingFuture;
- // リースタイム != -1 は期限が切れていないことを意味します
- リースタイムが -1 の場合
- // 本質は、ロックされたLuaスクリプトを非同期に実行することです
- ttlRemainingFuture = tryLockInnerAsync(waitTime、leaseTime、unit、threadId、RedisCommands.EVAL_LONG);
- }それ以外{
- // それ以外の場合は期限が切れており、パラメータは新しい時間に変更されます(更新後)
- ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
- TimeUnit.MILLISECONDS、threadId、RedisCommands.EVAL_LONG);
- }
- ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
- e != null の場合
- 戻る;
- }
-
- // ロックを取得しました
- 残り時間 == null の場合
- リースタイムが -1 の場合
- 内部ロックリース時間 = unit.toMillis(リース時間);
- }それ以外{
- // 更新
- スケジュール有効期限更新(スレッドID)
- }
- }
- });
- ttlRemainingFutureを返します。
- }
ロック Lua スクリプトの非同期実行: RedissonLock.tryLockInnerAsync - <T> RFuture<T> tryLockInnerAsync(long waitTime、longleasingTime、TimeUnit unit、long threadId、RedisStrictCommand<T> command) {
- evalWriteAsync(getRawName(), LongCodec.INSTANCE, コマンド,を返します。
- // 1. キャッシュにキーが存在しない場合は、hincrby コマンド (hincrbyキーUUID+threadId 1) を実行し、再エントリ回数を 1 に設定します。
- // 次に、pexpire コマンドを使用してロックの有効期限 (つまり、ロックのリース時間) を設定します。
- // ロックが正常に取得されたことを示すヌル値 nil を返します
- 「(redis.call('exists', KEYS[1]) == 0) の場合」 +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "nil を返す; " +
- 「終了;」 +
- //キーがすでに存在し、値が一致する場合、ロックは現在のスレッドによって保持されていることを意味します。次にhincrbyコマンドを実行し、再入回数を1増やし、有効期限を設定します。
- 「(redis.call('hexists', KEYS[1], ARGV[2]) == 1) の場合」 +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "nil を返す; " +
- 「終了;」 +
- //キーがすでに存在するが、値が一致しない場合は、ロックがすでに別のスレッドによって保持されていることを意味します。 pttl コマンドを使用して、ロックの残りの存続時間を取得して返します。この時点で、ロックの取得は失敗します。
- 「redis.call('pttl', KEYS[1]);を返します。」 、
- Collections.singletonList(getRawName())、unit.toMillis(leaseTime)、getLockName(threadId));
- }
- KEYS[1]はCollections.singletonList(getName())であり、分散ロックのキーを表す。
- ARGV[1]はinternalLockLeaseTimeで、ロックのリース時間(ロックを保持する実効時間)であり、デフォルト値は30秒です。
- ARGV[2]はgetLockName(threadId)であり、ロックを取得する際に設定される一意の値、つまりUUID+threadIdである。
ウォッチドッグの更新: RedissonBaseLock.scheduleExpirationRenewal - // スレッドIDに基づいてスケジュールと更新のタイミングを決める
- 保護されたvoidスケジュール有効期限更新(長いスレッドID) {
- //スレッドの再エントリ回数を記録するための新しいExpirationEntryを作成する
- ExpirationEntry エントリ = 新しい ExpirationEntry();
- 有効期限エントリ oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
- 古いエントリがnullの場合
- // 現在のスレッドは現在ロックに再入中です
- 古いエントリにスレッドIDを追加します。
- }それ以外{
- // 現在のスレッドが初めてロックされる
- エントリ.addThreadId(スレッドId);
- // 初めてExpirationEntryを作成するときは、更新メソッドをトリガーして更新タスクハンドルを記録する必要があります
- 有効期限を更新します。
- }
- }
-
- // 更新を処理する
- プライベートvoid renewExpiration() {
- // entryName に基づいて ExpirationEntry インスタンスを取得します。空の場合は、通常ロック解除時にトリガーされる cancelExpirationRenewal() メソッドが削除されたことを意味します。
- 有効期限エントリ ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
- ee == nullの場合{
- 戻る;
- }
-
- // 新しいスケジュールされたタスクを作成します。これはウォッチドッグの実装です。io.netty.util.Timeout は、タイムホイールと組み合わせて Netty によって使用されるスケジュールされたタスクインスタンスです。
- タイムアウトタスク = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- @オーバーライド
- パブリックvoid run(Timeout timeout) 例外をスローします {
- // これは外部のロジックを繰り返すためです。
- 有効期限エントリ ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
- ent == nullの場合{
- 戻る;
- }
- // ExpirationEntry の最初のスレッド ID を取得します。空の場合は、保持されているスレッドの再エントリ カウントをクリアするために cancelExpirationRenewal() メソッドが呼び出されたことを意味します。これは通常、ロックが解放された場合に発生します。
- ロングスレッドId = ent.getFirstThreadId();
- スレッドIDがnullの場合
- 戻る;
- }
- // 更新コマンドをRedisに非同期で送信する
- RFuture<ブール値> future = renewExpirationAsync(threadId);
- future.onComplete((res, e) -> {
- // 例外をスローし、更新に失敗し、ログのみを印刷してタスクを直接終了します
- e != null の場合
- log.error( "ロック " + getRawName() + " の有効期限を更新できません" , e);
- EXPIRATION_RENEWAL_MAP.remove(getEntryName());
- 戻る;
- }
- //更新が成功したことを証明するためにtrueを返し、次に更新メソッドを再帰的に呼び出します (それ自体を再スケジュールします)。更新が失敗した場合は、対応するロックがもう存在しないことを意味するため、再帰せずに直接戻ります。
- もし(res){
- // スケジュールを再設定する
- 有効期限を更新します。
- }それ以外{
- 有効期限更新をキャンセルします( null );
- }
- });
- }// ここでの実行頻度は、leaseTime の 3 分の 1 をミリ秒に変換したものです。更新ロジックはleaseTimeの初期値が-1の場合にのみ開始されるため、ここでの実行頻度はlockWatchdogTimeoutの3分の1になります。
- }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
- // ExpirationEntryインスタンスはスケジュールされたタスクインスタンスを保持します
- タスクのタイムアウトを設定します。
- }
コアポイント2 - サブスクリプションロック解除メッセージ: RedissonLock.subscribe - 保護された最終的な LockPubSub pubSub;
-
- パブリックRedissonLock(CommandAsyncExecutor commandExecutor, 文字列名) {
- super(コマンドエグゼキュータ、名前);
- this.commandExecutor = コマンドExecutor;
- this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
- //コンストラクタでpubSubを初期化します。これらの get メソッドに従うと、すべてがコンストラクター内で初期化されていることがわかります。あるだろう
- // プライベート最終AsyncSemaphore[] locks = new AsyncSemaphore[50];このコードはセマフォのセットを初期化します
- this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
- }
-
- 保護された RFuture<RedissonLockEntry> subscribe(long threadId) {
- pubSub.subscribe(getEntryName(), getChannelName());を返します。
- }
-
- // LockPubSub に entryName -> RedissonLockEntry のハッシュ マップを登録します。 RedissonLockEntry インスタンスには、RPromise<RedissonLockEntry> の結果、セマフォ スタイルのロック、およびサブスクリプション メソッドの再入カウンターが格納されます。
- パブリックRFuture<E> subscribe(String entryName, String channelName) {
- AsyncSemaphore セマフォ = service.getSemaphore(新しい ChannelName(channelName));
- RPromise<E> newPromise = 新しい RedissonPromise<>();
- セマフォ.取得(() -> {
- (!newPromise.setUncancellable())の場合{
- セマフォを解放します。
- 戻る;
- }
-
- E エントリ = entrys.get(entryName);
- エントリがnullの場合
- エントリを取得します。
- セマフォを解放します。
- entry.getPromise().onComplete(新しい TransferListener<E>(newPromise));
- 戻る;
- }
-
- E 値 = createEntry(newPromise);
- 値を取得します。
-
- E 古い値 = エントリ.putIfAbsent(エントリ名、値);
- (古い値 != null ) の場合 {
- 古い値を取得します。
- セマフォを解放します。
- oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
- 戻る;
- }
-
- RedisPubSubListener<Object> リスナー = createListener(channelName, value);
- service.subscribe(LongCodec.INSTANCE、チャネル名、セマフォ、リスナー);
- });
-
- newPromiseを返します。
- }
核心ポイント3は比較的単純なので、ここでは説明しません。 ロック解除 RedissonLock.ロック解除() - @オーバーライド
- パブリックボイドロック解除() {
- 試す {
- // 操作のロックを解除するための現在の呼び出しのスレッド ID を取得します
- 取得(unlockAsync(Thread.currentThread().getId()));
- } キャッチ (RedisException e) {
- // IllegalMonitorStateException は通常、スレッド A がロックされ、スレッド B がロック解除され、内部判定のスレッド ステータスが不一致な場合にスローされます。
- if (e.getCause() インスタンスの IllegalMonitorStateException) {
- (IllegalMonitorStateException) e.getCause() をスローします。
- }それ以外{
- eを投げる;
- }
- }
- }
RedissonBaseLock.unlockAsync - @オーバーライド
- パブリックRFuture<Void> unlockAsync(long threadId) {
- // 結果を構築する RedissonPromise
- RPromise<Void> 結果 = new RedissonPromise<>();
- // 返された RFuture の結果がtrueの場合、ロック解除が成功したことを意味します。 NULLが返された場合、スレッドIDが異常であり、ロックとロック解除のクライアントスレッドが同じスレッドではないことを意味します。
- RFuture<ブール値> future = unlockInnerAsync(threadId);
-
- future.onComplete((opStatus, e) -> {
- //ウォッチドッグの更新タスクをキャンセルする
- 有効期限の更新をキャンセルします(スレッドID);
-
- e != null の場合
- 結果.tryFailure(e);
- 戻る;
- }
-
- opStatus == nullの場合
- IllegalMonitorStateException 原因 = new IllegalMonitorStateException( "ロックのロックを解除しようとしましたが、ノード ID によって現在のスレッドによってロックされていません: "
- + id + " スレッドID: " + threadId);
- 結果.tryFailure(原因);
- 戻る;
- }
-
- 結果.trySuccess( null );
- });
-
- 結果を返します。
- }
RedissonLock.unlockInnerAsync - // 実際の内部ロック解除方法、ロック解除 Lua スクリプトを実行する
- 保護された RFuture<Boolean> unlockInnerAsync(long threadId) {
- evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,を返します。
- // 分散ロックが存在するが、値が一致しない場合は、ロックが他のスレッドによって占有されており、ロックを解放する権利がないことを意味し、直接 null 値を返します (ベルを結んだ人がそれを解く必要があります)
- 「(redis.call('hexists', KEYS[1], ARGV[3]) == 0) の場合」 +
- "nil を返します。" +
- 「終了;」 +
- //値が一致する場合、現在のスレッドは分散ロックを保持しているため、再入回数は1回減少します。
- "ローカルカウンタ = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
- //再入カウントから 1 を引いた値が 0 より大きい場合、分散ロックが再入されたことを意味し、有効期限の更新のみ可能で、削除はできません。
- 「もし(カウンタ>0)ならば」 +
- "redis.call('pexpire', KEYS[1], ARGV[2]); " +
- "0を返す; " +
- 「その他」 +
- //再入回数から1を引いた値が0の場合、このKEYを削除し、ロック解除メッセージを公開して1を返すことができます。
- "redis.call('del', KEYS[1]); " +
- "redis.call('publish', KEYS[2], ARGV[1]); " +
- "1を返す; " +
- 「終了;」 +
- "nil を返します。" 、
- //これらの5つのパラメータは、それぞれKEYS[1]、KEYS[2]、ARGV[1]、ARGV[2]、ARGV[3]に対応します。
- Arrays.asList(getRawName(), getChannelName())、LockPubSub.UNLOCK_MESSAGE、internalLockLeaseTime、getLockName(threadId));
- }
私はコードのほんの一部をリストしただけなので、まだ自分でやらなければならないことがたくさんあります ソース コードから、Redisson が最初に提起された問題「有効期限をどのくらいに設定すればよいか」を解決するのに役立つことがわかります。 Redisson はウォッチドッグを提供します。ロックが取得されるたびに、短いタイムアウトのみが設定されます。同時に、タイムアウトが期限切れになるたびに、ロック タイムアウトを更新するスレッドが開始されます。ロックを解放しながらスレッドを終了します。 しかし、ノードがハングしてロックが失われる問題は解決されません。次へ〜 4. レッドロック上記で紹介した分散ロックには、極端なケースではまだ欠陥があります。 クライアントが長時間ブロックされ、ロックが失敗する クライアント 1 はロックを取得しますが、ネットワークの問題または GC により長時間ブロックされ、ビジネス プログラムが実行される前にロックが期限切れになります。このとき、クライアント 2 も正常にロックを取得できるため、スレッド セーフティの問題が発生する可能性があります。 Redis サーバーのクロックドリフト Redis サーバーのマシン時間が進むと、キーは早期にタイムアウトになります。たとえば、クライアント 1 がロックを取得した後、キーはまだ期限切れになっていませんが、Redis サーバーの時刻がクライアントの時刻より 2 分早いため、キーの期限が早く切れてしまいます。このとき、クライアント 1 がロックを解除していない場合、複数のクライアントが同時に同じロックを保持する可能性があります。 単一ポイントインスタンスのセキュリティ問題 Redis がスタンドアロン モードの場合、クラッシュすると、すべてのクライアントがロックを取得できなくなります。マスター/スレーブ モードになっているが、Redis のマスター/スレーブ同期は非同期であるとします。 Redis マスターがクラッシュした場合、スレーブはこの時点でロックに同期していないため、マシン B は再度ロックを適用するときに再度ロックを適用することになり、これも問題になります。 これらの問題を解決するために、Redis の作者は RedLock アルゴリズムを提案し、Redission にも RedLock を実装しました。 Redlock アルゴリズムは、Redis の公式 Web サイトで次のように紹介されています。 アルゴリズムの分散バージョンでは、完全に独立した N 個の Redis マスター ノードがあり、レプリケーションやその他の暗黙的な分散調整メカニズムは使用しないと想定しています。以前、単一の Redis インスタンスでロックを安全に取得および解放する方法について説明しました。このメソッドを使用して、(N) インスタンスごとにロックが取得および解放されることを保証します。この例では、N = 5 に設定しています。これは適切な設定であるため、これらのインスタンスを 5 台のマシンまたは仮想マシンで実行して、同時に障害が発生しないようにする必要があります。ロックを取得するには、クライアントは次の操作を実行する必要があります。 現在の Unix 時間をミリ秒単位で取得します。 同じキーと一意の値 (UUID など) を使用して、5 つのインスタンスから順番にロックを取得してみます。 Redis からロックを要求する場合、クライアントは Reids インスタンスからロックを取得しようとする最大待機時間を設定する必要があります (この時間を超えると、次のインスタンスがすぐに要求されます)。このタイムアウトはロックの有効期限よりも短くする必要があります。たとえば、ロックの自動有効期限が 10 秒の場合、タイムアウトは 5 ~ 50 ミリ秒にする必要があります。これにより、サーバー側の Redis がすでにクラッシュしているにもかかわらず、クライアントが応答結果を待機している状況を回避できます。サーバーが指定された時間内に応答しない場合、クライアントはできるだけ早く別の Redis インスタンスからロックを取得しようとする必要があります。 クライアントは、現在の時刻からロックが開始された時刻 (手順 1 で記録された時刻) を引いて、ロックを取得するのに費やされた時間を取得します。ロックが Redis ノードの過半数 (N/2+1、ここでは 3 つのノード) から取得され、使用された合計時間がロックの有効期限よりも短い場合にのみ、ロックが正常に取得されたと見なされます。 ロックが取得された場合、キーの実際の有効時間 = 有効時間 (ロック取得時に設定されたキーの自動タイムアウト時間) - ロック取得に費やされた合計時間 (各 Redis インスタンスのクエリに費やされた合計時間の合計) (手順 3 で計算された結果)。 何らかの理由でロックの取得に失敗した場合 (つまり、少なくとも「N/2+1」個の Redis インスタンスでロックが取得されないか、「ロックを取得する合計時間」が「有効時間」を超えている場合)、クライアントはすべての Redis インスタンスのロックを解除する必要があります (一部の Redis インスタンスがまったくロックに成功しない場合でも)。これにより、一部のノードがロックを取得できなくなりますが、クライアントは応答を受信せず、一定期間ロックを再取得できなくなります。 総括する: クライアントが複数の Redis インスタンスのロック申請を行う場合、大多数のノードが正常にロックされていることを確認する必要があります。 フォールトトレランスの問題を解決します。いくつかのインスタンスは異常ですが、残りは正常にロックできます。 ほとんどのノードをロックするのにかかる合計時間は、ロックに設定された有効期限よりも短くなります。 マルチインスタンス操作では、ネットワーク遅延、パケット損失、タイムアウトなどの問題が発生する可能性があります。したがって、ほとんどのノードが正常にロックされたとしても、ロックの累積時間がロック有効期限を超えると、一部のノードのロックが期限切れになっている可能性があり、これは意味がありません。 ロックを解除するには、すべてのノードにロック解除要求を送信する必要があります。 一部のノードが正常にロックされたが、例外のためにほとんどのノードのロックに失敗した場合は、すべてのノードを解放し、すべてのノードの一貫性を維持する必要があります。 RedLockに関しては、分散化業界の大物であるAntirez氏とMartin氏も議論を交わした。ご興味がございましたら、ぜひご覧になってみてください。 - 設定 config1 = 新しい Config();
- config1.useSingleServer().setAddress( "127.0.0.1:6379" );
- RedissonClient redissonClient1 = Redisson.create (config1);
-
- 設定 config2 = 新しい Config();
- config2.useSingleServer().setAddress( "127.0.0.1:5378" );
- RedissonClient redissonClient2 = Redisson.create (config2);
-
- 設定 config3 = 新しい Config();
- config3.useSingleServer().setAddress( "127.0.0.1:5379" );
- RedissonClient redissonClient3 = Redisson.create (config3);
-
- /**
- * 複数のRLockオブジェクトを取得する
- */
- RLock lock1 = redissonClient1.getLock(lockKey);
- RLock lock2 = redissonClient2.getLock(lockKey);
- RLock lock3 = redissonClient3.getLock(lockKey);
-
- /**
- * 複数のRLockオブジェクトに基づいてRedissonRedLockを構築する(ここに核心的な違いがある)
- */
- ロック1、ロック2、ロック3を設定します。
-
- 試す {
- /**
- * 4. ロックを取得してみる
- *ロックを取得しようとするための最大待ち時間をwheittimeします。この値を超えた場合、ロックの取得が失敗したと見なされます。
- * LEASETIME:ロック保持時間。ロックがこの時間を超えた場合、自動的に無効になります(ロックの有効期間内にビジネスを処理できるように、値をビジネス処理時間よりも大きく設定する必要があります)
- */
- Boolean Res = RedLock.TryLock(100、10、TimeUnit.Seconds);
- もし(res){
- //ロックを正常に取得し、ここでビジネスを処理します
- }
- } キャッチ (例外 e) {
- 新しいruntimeexception( "aquire lock fail" );
- }ついに{
- //何があっても、最終的にロックを解除する必要があります
- redlock.unlock();
- }
最もコアの変更は、複数のRlockを構築する必要があることです。レッドロックアルゴリズムは複数の独立したRedis環境で構築されているため(それらを区別するために、redissionノードと呼ばれる)、複数のRlockを構築し、複数のRlockに基づいてredissonredlockを構築する必要があることです。リリースノードは、単一、マスター/サルブ、センチネル、またはクラスターのいずれかです。これは、過去のように、1つのクラスター、1つのセンチネルクラスター、または1つのマスタースレーブアーキテクチャだけを構築できなくなることを意味しますが、RedissOnredLockのためにいくつかの追加の独立したredissionノードを構築する必要があります。 RedissonMultilock.TryLock - @オーバーライド
- パブリックブール型 tryLock(long waitTime, longleasingTime, TimeUnit unit) は InterruptedException をスローします {
- // 試す {
- // return tryLockasync(waittime、leasetime、unit).get();
- //} catch(executionexception e){
- //新しいIllegalStateException(e);
- // }
- long newLeasetime = -1;
- リースタイムが -1 の場合
- if(waittime == -1){
- newLeaseTime = unit.Tomillis(LEASETIME);
- }それ以外{
- newLeaseTime = unit.Tomillis(waittime)*2;
- }
- }
-
- long time = system.currenttimemillis();
- Long Remaintime = -1;
- if(waittime!= -1){
- Remaintime = Unit.Tomillis(waittime);
- }
- long lockwaittime = calclockwaittime(Remaintime);
-
- //ロックに失敗する可能性のあるノードの数を制限する(n-(n/2+1))
- int failedlockslimit = failedlockslimit();
- List <Rlock> acchiredLocks = new ArrayList <>( locks。size ());
- //すべてのノードをトラバースし、evalコマンドを介してluaロックを実行します
- for (listiterator <rlock> iterator = locks.listiterator(); iterator.hasnext();){
- rlock lock = iterator.next ();
- ブールロックアッキー。
- 試す {
- //ノードをロックしてみてください
- if(waittime == -1 && leasetime == -1){
- lockquired = lock.trylock();
- }それ以外{
- long awaittime =数学。 min (lockwaittime、remaintime);
- lockquired = lock.trylock(awaittime、newLeasetime、timeUnit.milliseconds);
- }
- } catch(redisResponsetimeOutexception e){
- //このタイプの例外がスローされている場合、ロックが成功するのを防ぐために応答が失敗するため、すべてのノードをロック解除する必要があります
- lockinner(arrays.aslist(lock));
- lockquired = false ;
- } キャッチ (例外 e) {
- lockquired = false ;
- }
-
- if(lockAcquired){
- quickiedlocks.add (lock);
- }それ以外{
- /*
- *ロックを適用できなかったノードがロックに失敗しないノードの数の制限に達したかどうかを計算します(n-(n/2+1))
- *到達した場合、最終的なロックアプリケーションが失敗したと考えられており、後続のノードから継続する必要はありません
- * RedLockアルゴリズムでは、最終的なロックアプリケーションが成功すると見なされる前に、少なくともN/2+1ノードを正常にロックする必要があるため
- */
- if( locks。size () - acchiedlocks。size ()== failedlockslimit()) {
- 壊す;
- }
-
- if(failedlockslimit == 0){
- Unlockinner(achociredlocks);
- if(waittime == -1){
- 戻る 間違い;
- }
- failedlockslimit = failedlockslimit();
- acturedlocks.clear();
- // iteratorをリセットします
- while(iterator.hasprevious()){
- iterator.previous();
- }
- }それ以外{
- FailedLockSlimit-
- }
- }
- //各ノードからロックを取得するために現在消費されている合計時間を計算します。最大待機時間に等しい場合、最終的なロックアプリケーションが失敗し、 falseが返されると見なされます
- if(Regrantime!= -1){
- Remaintime - = System.CurrentTimemillis() - Time ;
- time = system.currenttimemillis();
- if(Regrantime <= 0){
- Unlockinner(achociredlocks);
- 戻る 間違い;
- }
- }
- }
-
- リースタイムが -1 の場合
- quickiedlocks.stream()
- .map(l->(redissonlock)l)
- .map(l-> l.expireasync(unit.tomillis(leasetime)、timeunit.milliseconds)))
- 。
- }
-
- 戻る 真実;
- }
参照とありがとう 「Redis- Redisを使用した分散ロック」 「Redisson-分散ロックと同期」 Redisについてゆっくりと話して、分散ロックとRedissonソースコード分析を実装する Redissonでの分散ロックの実装を理解する |