[[335642]] 序文日々の開発では、ロックが必要な状況に必然的に遭遇します。たとえば、製品在庫を減算するには、まずデータベースから在庫を取得し、在庫決定を実行してから在庫を減算する必要があります。この一連の操作は明らかにアトミック性に準拠していません。コード ブロックがロックされていない場合、同時実行による過剰販売の問題が発生する可能性が高くなります。システムがモノリシック アーキテクチャである場合は、ローカル ロックを使用することで問題を解決できます。分散アーキテクチャの場合は、分散ロックが必要です。 プランSETNXコマンドとEXPIREコマンドの使用 - SETNXキー値
- EXPIRE キー秒数
- DELキー
- (setnx("item_1_lock", 1)) の場合 {
- 有効期限が切れます("item_1_lock", 30);
- 試す {
- ... ロジック
- } キャッチ {
- ...
- ついに
- del("item_1_lock");
- }
- }
この方法は問題を解決するように見えますが、SETNX および EXPIRE 操作は非アトミックであるため、一定のリスクがあります。 SETNX が成功した後にエラーが発生した場合、ロックにタイムアウト期間がないため、EXPIRE は実行されず、デッドロックが発生します。 この場合、Lua スクリプトを使用して操作のアトミック性を維持し、SETNX 操作と EXPIRE 操作の両方が成功または失敗することを確認できます。 - if (redis.call('setnx', KEYS[1], ARGV[1]) < 1 )
- 0 を返します。
- 終わり;
- redis.call('expire', KEYS[1], tonumber(ARGV[2]));
- 1 を返します。
この方法により、競合するロックの原子性の問題を暫定的に解決しました。他の機能はまだ実装されていませんが、デッドロックは発生しないはずです🤪🤪🤪。 Redis 2.6.12以降ではSETコマンドを柔軟に使用できます - キー値の設定 NX EX 30
- DELキー
- if (set("item_1_lock", 1, "NX", "EX", 30)) {
- 試す {
- ... ロジック
- } キャッチ {
- ...
- ついに
- del("item_1_lock");
- }
- }
改良された方法は、Lua スクリプトを使用せずに SETNX および EXPIRE の原子性問題を解決します。では、よく考えてみましょう。 A がロックを取得し、コード ブロックに正常に入力してロジックを実行したが、さまざまな理由によりタイムアウトになり、ロックが自動的に解放された場合。その後、B はロックを正常に取得し、コード ブロックに入ってロジックを実行します。ただし、A がロジック実行を完了した後にロックを解除すると、B が取得したばかりのロックも解除されます。それは自分の鍵を使って他人のドアを開けるようなもので、許されることではありません。 この問題を解決するには、SET 時にロック フラグを設定し、DEL 時に現在のロックが自分のロックであるかどうかを確認します。 - 文字列値= UUID .randomUUID().toString().replaceAll("-", "");
- if (set("item_1_lock", 値, "NX", "EX", 30)) {
- 試す {
- ... ロジック
- } キャッチ {
- ...
- ついに
- ... lua スクリプトはアトミック性を保証します
- }
- }
- redis.call('get', KEYS[1]) == ARGV[1]の場合
- その後、redis.call('del', KEYS[1]) を返します。
- それ以外の場合は0を返す
- 終わり
この時点で、競合するロックの原子性の問題と、誤ってロックを削除する問題がようやく解決されました。ただし、ロックは通常、再入、循環待機、タイムアウト時の自動更新などの機能もサポートする必要があります。次に、これらの問題を解決するために非常に便利なパッケージの使用方法を学びます。 Redissonを使い始めるRedission のロックは、再入可能およびタイムアウトの自動更新機能を実装しており、これらはすべてカプセル化されています。上記のいくつかの機能ポイントを簡単に実装するには、必要に応じて API を呼び出すだけです。詳細な機能については、Redissonのドキュメントを参照してください。 プロジェクトにRedissonをインストールする - <依存関係>
- <グループID> org.redisson</グループID>
- <artifactId>再配布</artifactId>
- <バージョン> 3.13.2</バージョン>
- </依存関係>
- 実装 'org.redisson:redisson:3.13.2'
Maven または Gradle を使用してビルドします。最新バージョンは3.13.2です。必要なバージョンは、Redisson でも見つかります。 単純な試み - RedissonClient redissonClient = Redisson .create();
- RLockロック= redissonClient .getLock("lock");
- ブール値res = lock .lock();
- もし(res){
- 試す {
- ... ロジック
- ついに
- ロックを解除します。
- }
- }
Redisson は基礎となるロジックをすべてカプセル化します。具体的な実装については気にする必要はありません。ほんの数行のコードで完璧なロックを使用できます。次に、ソースコードを簡単にいじってみましょう🤔🤔🤔。 ロック - private void lock(longleasingTime, TimeUnit unit, boolean interruptibly) は InterruptedException をスローします {
- 長いthreadId =スレッド.currentThread().getId();
- Long ttl = tryAcquire (leaseTime、unit、threadId);
- ttl == nullの場合{
- 戻る;
- }
- RFuture < RedissonLockEntry > 将来=サブスクライブ(スレッド ID);
- if (割り込み可能) {
- コマンドExecutor.syncSubscriptionInterrupted(将来);
- } それ以外 {
- コマンドExecutor.syncSubscription(将来);
- }
- 試す {
- (真)の間{
- ttl = tryAcquire (リース時間、ユニット、スレッド ID);
- ttl == nullの場合{
- 壊す;
- }
- ttl > = 0 の場合
- 試す {
- future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } キャッチ (InterruptedException e) {
- if (割り込み可能) {
- eを投げる;
- }
- future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- }
- } それ以外 {
- if (割り込み可能) {
- 将来。getNow()。getLatch()。取得();
- } それ以外 {
- future.getNow().getLatch().acquireUninterruptibly();
- }
- }
- }
- ついに
- 購読を解除します(将来、スレッドID);
- }
- }
ロックを取得 - プライベート< T > RFuture < Long > tryAcquireAsync(long リースタイム、TimeUnit ユニット、long スレッド ID) {
- リースタイムが -1 の場合
- tryLockInnerAsync(leaseTime、ユニット、スレッドId、RedisCommands.EVAL_LONG) を返します。
- }
- RFuture <長い> ttlRemainingFuture = tryLockInnerAsync (commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
- ttlRemainingFuture.onComplete((ttlRemaining, e) - > {
- e != null の場合
- 戻る;
- }
- 残り時間== null の場合
- スケジュール有効期限更新(スレッドID)
- }
- });
- ttlRemainingFuture を返します。
- }
- < T > RFuture < T > tryLockInnerAsync(long リースタイム、TimeUnit ユニット、long スレッド ID、RedisStrictCommand < T >コマンド) {
- 内部ロックリース時間= unit .toMillis(リース時間);
- evalWriteAsync(getName(), LongCodec.INSTANCE, コマンド, を返します。
- 「(redis.call('exists', KEYS[1]) == 0) の場合」 +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "nil を返す; " +
- 「終了;」+
- 「(redis.call('hexists', KEYS[1], ARGV[2]) == 1) の場合」 +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "nil を返す; " +
- 「終了;」+
- "redis.call('pttl', KEYS[1]); を返します",
- Collections.singletonList(getName())、internalLockLeaseTime、getLockName(threadId));
- }
ロックの削除 - パブリック RFuture < Void > unlockAsync(long threadId) {
- RPromise <無効> 結果=新しいRedissonPromise < Void > ();
- RFuture <ブール値> 将来= unlockInnerAsync (スレッドID);
- future.onComplete((opStatus, e) - > {
- 有効期限の更新をキャンセルします(スレッドID);
- e != null の場合
- 結果.tryFailure(e);
- 戻る;
- }
- opStatusが null の場合
- IllegalMonitorStateException原因= new IllegalMonitorStateException("ロックを解除しようとしましたが、ノード ID によって現在のスレッドによってロックされていません: "
- + id + " スレッドID: " + threadId);
- 結果.tryFailure(原因);
- 戻る;
- }
- 結果.trySuccess(null);
- });
- 結果を返します。
- }
- 保護された RFuture <ブール値> unlockInnerAsync(long threadId) {
- evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, を返します。
- 「(redis.call('hexists', KEYS[1], ARGV[3]) == 0) の場合」 +
- "nil を返します。" +
- 「終了;」+
- "ローカルカウンタ= redis .call('hincrby', KEYS[1], ARGV[3], -1); " +
- 「もし(カウンタ> 0)ならば」+
- "redis.call('pexpire', KEYS[1], ARGV[2]); " +
- "0を返す; " +
- 「その他」+
- "redis.call('del', KEYS[1]); " +
- "redis.call('publish', KEYS[2], ARGV[1]); " +
- "1を返す; " +
- 「終了;」+
- "nil を返す;",
- Arrays.asList(getName(), getChannelName())、LockPubSub.UNLOCK_MESSAGE、internalLockLeaseTime、getLockName(threadId));
- }
要約する同時実行性の問題を解決するために Redis を分散ロックとして使用するには、まだいくつかの困難があり、注意が必要な点が多数あります。システムの規模を正しく評価し、特定の技術を使用するためだけにシステムを使用するべきではありません。同時実行の問題を完全に解決するには、データベース レベルで作業する必要があります。 |