今日は、Redis のウォッチドッグ メカニズムについて見ていきます。結局のところ、分散ロックを実装するために Redis を使用する人は依然として多くいます。 Redis が分散ロックをどのように実装しているかを確認し、次に Redis のウォッチドッグ メカニズムを分析します。このメカニズムがないと、分散ロックに Redis を使用する多くの友人がデッドロックを引き起こすことがよくあります。 Redisは分散ロックを実装するRedisは分散ロックを実装しており、最も重要なのは以下の条件である。 ロックを取得- 相互排他: 1つのスレッドだけがロックを取得できることを保証する
- 非ブロッキング: 一度試して、成功した場合は true を返し、失敗した場合は false を返します。
ロックを解除- 手動リリース
- タイムアウト解除: ロック取得時にタイムアウトを追加する
上記のコード: @Resource private RedisTemplate redisTemplate; public static final String UNLOCK_LUA; /** * 释放锁脚本,原子操作*/ static { StringBuilder sb = new StringBuilder(); sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] "); sb.append("then "); sb.append(" return redis.call(\"del\",KEYS[1]) "); sb.append("else "); sb.append(" return 0 "); sb.append("end "); UNLOCK_LUA = sb.toString(); } /** * 获取分布式锁,原子操作* @param lockKey * @param requestId 唯一ID, 可以使用UUID.randomUUID().toString(); * @param expire * @param timeUnit * @return */ public boolean tryLock(String lockKey, String requestId, long expire, TimeUnit timeUnit) { try{ RedisCallback<Boolean> callback = (connection) -> { return connection.set(lockKey.getBytes(Charset.forName("UTF-8")), requestId.getBytes(Charset.forName("UTF-8")), Expiration.seconds(timeUnit.toSeconds(expire)), RedisStringCommands.SetOption.SET_IF_ABSENT); }; return (Boolean)redisTemplate.execute(callback); } catch (Exception e) { log.error("redis lock error.", e); } return false; } /** * 释放锁* @param lockKey * @param requestId 唯一ID * @return */ public boolean releaseLock(String lockKey, String requestId) { RedisCallback<Boolean> callback = (connection) -> { return connection.eval(UNLOCK_LUA.getBytes(), ReturnType.BOOLEAN ,1, lockKey.getBytes(Charset.forName("UTF-8")), requestId.getBytes(Charset.forName("UTF-8"))); }; return (Boolean)redisTemplate.execute(callback); } /** * 获取Redis锁的value值* @param lockKey * @return */ public String get(String lockKey) { try { RedisCallback<String> callback = (connection) -> { return new String(connection.get(lockKey.getBytes()), Charset.forName("UTF-8")); }; return (String)redisTemplate.execute(callback); } catch (Exception e) { log.error("get redis occurred an exception", e); } return null; } この実装方法は、Redis を直接使用して分散ロックを独自に実装するのと同じですが、それを実装するためのフレームワーク、つまり Redission があります。ウォッチドッグ メカニズムは、Redission が提供する自動拡張メカニズムであり、Redission が提供する分散ロックを自動的に更新できるようにします。 なぜウォッチドッグ機構が必要なのでしょうか?分散ロックは期限切れにならないように設定することはできません。これは、分散環境でロックを取得した後にノードがクラッシュすることで発生するデッドロックを回避するためです。したがって、分散ロックには有効期限を設定する必要があります。ただし、これによりスレッドはロックを取得しますが、ロックの有効期限が到来してもプログラムの実行が終了していないため、ロックがタイムアウトして解放されます。すると、他のスレッドがロックを取得し、問題が発生する可能性があります。 したがって、ウォッチドッグ メカニズムの自動更新によってこの問題はうまく解決されます。 Redisson はすでにこの分散ロックを実装しています。必要なのはそれを呼ぶことだけです。それでは、Redisson のソース コードを見て、ウォッチドッグ メカニズムがどのように実装されているかを見てみましょう。 ロックを試みるRedissonLock クラスの下で: public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { return tryLock(waitTime, -1, unit); } - waitTime: ロックを取得するための最大待機時間 (渡されない場合のデフォルトは -1)
- リースタイム: ロックが自動的に解除される時間 (渡されない場合はデフォルト -1)
- 単位: 時間の単位(待機時間や自動ロック解除時間の単位)
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); } 上記のコードセグメントの主な内容はウォッチドッグメカニズムに関するもので、実際にはtryAcquireと考えるべきです。 最終着陸はtryAcquireAsyncです //如果获取锁失败,返回的结果是这个key的剩余有效期RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //上面获取锁回调成功之后,执行这代码块的内容ttlRemainingFuture.onComplete((ttlRemaining, e) -> { //不存在异常if (e == null) { //剩余有效期为null if (ttlRemaining == null) { //这个函数是解决最长等待有效期的问题this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture;
tryLockInnerAsync を呼び出します。ロックの取得に失敗した場合、返される結果はキーの残りの有効期間になります。ロックの取得が成功した場合は null が返されます。 ロックが正常に取得された後、検出時に例外がなく、ロックが正常に取得された場合 (ttlRemaining == null)。 次に、this.scheduleExpirationRenewal(threadId); を実行します。ウォッチドッグメカニズムを起動します。 ウォッチドッグ メカニズムによって提供されるデフォルトのタイムアウトは、30 * 1000 ミリ秒、つまり 30 秒です。 スレッドがロックを取得し、プログラムを実行してロックを解放するまでの時間がロックの自動解放時間 (つまり、ウォッチドッグ メカニズムによって提供される 30 秒のタイムアウト時間) よりも長い場合、Redission は Redis 内のターゲット ロックのタイムアウト時間を自動的に延長します。 Redis でウォッチドッグ メカニズムを開始する場合、ロックを取得するときに、leaseTime (自動ロック解放時間) を定義する必要はありません。 ただし、Redisson は、分散ロックに関する当社独自の定義とは異なります。ロックの自動解放時間を定義すると、lock メソッドまたは tryLock メソッドに関係なく、ウォッチドッグ メカニズムを有効にすることはできません。 分散ロックのウォッチドッグメカニズムを理解しましたか? |