[[431030]] 新しく引き継いだプロジェクトでは、アカウントの不均衡により問題が発生する場合があります。前の技術担当の上司は、退職前に「調べたが原因がわからなかった、忙しくて解決できなかったのでフレームワークが原因かもしれない」と説明していましたが… プロジェクトが完了したので、このような問題を解決する必要があります。すべての会計処理ロジックを整理した後、ようやく原因がわかりました。データベース内のホット アカウントに対する同時操作が原因でした。今回は、分散システムにおける Redis ベースの分散ロックについて説明します。ちなみに、問題の原因と解決策も分析してみましょう。 原因分析システムの同時実行性は高くなく、ホットアカウントもありますが、それほど深刻ではありません。問題の根本は、同時実行性を人為的に生み出すシステム アーキテクチャ設計にあります。シナリオは次のとおりです。販売者がバッチでデータをインポートすると、システムが前処理を実行し、口座残高を増減します。 この時点で、別のスケジュールされたタスクもアカウントをスキャンして更新します。さらに、同じアカウントに対する操作がさまざまなシステムに分散され、ホットアカウントが出現します。 この問題の解決策として、アーキテクチャの観点から、会計システムを抽出して 1 つのシステムに集中させることが考えられます。すべてのデータベース トランザクションと実行順序は、会計システムによって調整および処理されます。技術的な観点から見ると、ホット スポット アカウントはロック メカニズムによってロックされる可能性があります。 この記事では、分散ロックに基づいてホットスポット アカウントを実装する方法について詳しく説明します。 ロック分析Java のマルチスレッド環境では、通常、使用できるロックの種類がいくつかあります。 - 一般的に使用される JVM メモリ モデル レベルのロックは、synchronized、Lock などです。
- 楽観的ロック、悲観的ロックなどのデータベース ロック。
- 分散ロック。
JVM メモリ レベルのロックは、複数のスレッドがグローバル変数にアクセスしたり変更したりするなど、単一のサービス内のスレッドのセキュリティを確保できます。しかし、システムがクラスターに展開されている場合、JVM レベルでのローカル ロックは無力になります。 悲観的ロックと楽観的ロック上記のケースのように、ホット アカウントは分散システム内の共有リソースであり、通常はデータベース ロックまたは分散ロックを使用して解決します。 データベース ロックは、楽観的ロックと悲観的ロックに分けられます。 悲観的ロックは、データベース (MySQL の InnoDB) によって提供される排他ロックに基づいて実装されます。トランザクションを実行すると、MySQL は select ... for update ステートメントを通じてクエリ結果セット内の各データ行に排他ロックを追加し、他のスレッドはレコードの更新および削除操作をブロックします。これにより、共有リソースの順次実行(変更)が可能になります。 楽観的ロックは悲観的ロックに相対的です。楽観的ロックでは、通常の状況ではデータが競合を引き起こさないと想定されるため、データが更新のために送信されると、データが競合しているかどうかを正式にチェックします。競合が発生した場合は、例外メッセージがユーザーに返され、ユーザーは対処方法を決定できます。楽観的ロックは、書き込みよりも読み取りが多いシナリオに適しており、プログラムのスループットを向上させることができます。楽観的ロックを実装する場合、通常は記録ステータスまたはバージョンの追加に基づいて実装されます。 悲観的なロック失敗シナリオプロジェクトでは悲観的ロックが使用されましたが、失敗しました。これは、悲観的ロックを使用する場合によくある誤解でもあります。以下で分析してみましょう。 悲観的ロックを使用する通常のプロセス: - 更新のために select ... を通じてレコードをロックします。
- 新しい残高を計算し、金額を変更して保存します。
- 実行が完了し、ロックが解放されます。
プロセスにおけるよくある間違い: - 口座残高を確認し、新しい残高を計算します。
- 更新のために select ... を通じてレコードをロックします。
- 金額を変更して保存します。
- 実行が完了し、ロックが解放されます。
間違ったプロセスでは、たとえば、サービス A と B によって照会された残高が両方とも 100 で、A が 50 を差し引き、B が 40 を差し引き、その後 A がレコードをロックし、データベースを 50 に更新します。 A がロックを解除した後、B はレコードをロックし、データベースを 60 に更新します。明らかに、後者は前者の更新を上書きします。解決策は、ロックの範囲を拡大し、新しい残高を計算する前にロックを進めることです。 通常、悲観的ロックはデータベースに大きな負担をかけます。実際には、シナリオに応じて楽観的ロックまたは分散ロックを使用して実装されるのが一般的です。 それでは、本題に入り、Redis に基づく分散ロックの実装について説明しましょう。 Redis 分散ロックの実践演習ここでは、Spring Boot、Redis、Lua スクリプトを例に、分散ロックの実装を説明します。プロセスを簡素化するために、この例の Redis は分散ロックとデータベースの両方の機能を想定しています。 シーン構築クラスター環境で、同じアカウントの金額を操作するための基本的な手順は次のとおりです。 - データベースからユーザーの金額を読み取ります。
- プログラム修正量;
- 次に、最新の金額をデータベースに保存します。
- 以下は、初期のロック解除と非同期処理から最終的な分散ロックを段階的に推論したものです。
基本的な統合とクラス構築ロック解除処理のための基本的なビジネス環境を準備します。 まず、Spring Boot プロジェクトに関連する依存関係を導入します。 - <依存関係>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </依存関係>
- <依存関係>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </依存関係>
アカウントはエンティティ クラス UserAccount に対応します。 - パブリッククラスUserAccount {
-
- //ユーザーID
- プライベート文字列ユーザーID;
- //口座残高
- プライベートint金額;
-
- //口座金額を追加
- パブリックvoid addAmount( int金額) {
- this.amount = this.amount + amount;
- }
- // コンストラクタとゲッター/セッターを省略
- }
スレッド実装クラス AccountOperationThread を作成します。 - パブリッククラス AccountOperationThread は Runnable を実装します {
-
- プライベート最終静的Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);
-
- プライベート静的最終Long RELEASE_SUCCESS = 1L;
-
- プライベート文字列ユーザーID;
-
- プライベート Redis テンプレート <Object, Object> redis テンプレート;
-
- パブリックアカウント操作スレッド(文字列userId、RedisTemplate<Object、Object> redisTemplate) {
- this.userId = ユーザーID;
- redisテンプレートをコピーします。
- }
-
- @オーバーライド
- パブリックボイド実行(){
- ロックなし();
- }
-
- /**
- * ロック解除
- */
- プライベートボイドnoLock() {
- 試す {
- ランダム random = new Random();
- //ビジネス処理のスレッドをシミュレートする
- TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
- } キャッチ (InterruptedException e) {
- e.printStackTrace();
- }
- //シミュレーションデータベースからユーザーアカウントを取得する
- ユーザーアカウント userAccount = (ユーザーアカウント) redisTemplate.opsForValue().get(userId);
- // 金額 +1
- ユーザーアカウント.addAmount(1);
- logger.info(Thread.currentThread().getName() + " : ユーザーID : " + userId + " 金額 : " + userAccount.getAmount());
- //データベースへの保存をシミュレートする
- redisTemplate.opsForValue() 。 (userId、userAccount)を設定します。
- }
- }
RedisTemplate のインスタンス化は Spring Boot に渡されます。 - @構成
- パブリッククラスRedisConfig {
-
- @ビーン
- パブリックRedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
- Redis テンプレートを新規作成します。
- redisTemplate.setConnectionFactory(redisConnectionFactory);
- Jackson2JsonRedisSerializer<オブジェクト> jackson2JsonRedisSerializer =
- 新しい Jackson2JsonRedisSerializer<>(Object.class);
- オブジェクトマッパー objectMapper = 新しいオブジェクトマッパー();
- objectMapper.setVisibility(PropertyAccessor. ALL 、 JsonAutoDetect.Visibility. ANY );
- オブジェクトマッパーのデフォルトタイピングを有効にします(ObjectMapper.DefaultTyping.NON_FINAL);
- jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
- // 値とキーのシリアル化ルールを設定する
- redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
- redisTemplate.setKeySerializer(新しい StringRedisSerializer());
- redisTemplate.afterPropertiesSet();
- redisTemplateを返します。
- }
- }
最後に、マルチスレッド操作をトリガーする TestController を準備します。 - @レストコントローラ
- パブリッククラスTestController{
-
- プライベート最終静的Logger ロガー = LoggerFactory.getLogger(TestController.class);
-
- プライベート静的ExecutorService executorService = Executors.newFixedThreadPool(10);
-
- オートワイヤード
- プライベート Redis テンプレート <Object, Object> redis テンプレート;
-
- @GetMapping( "/テスト" )
- パブリック文字列test()はInterruptedExceptionをスローします{
- // ユーザー user_001 を Redis に初期化します。アカウント残高は 0 です
- redisTemplate.opsForValue() 。 ( "user_001" 、新しい UserAccount( "user_001" 、 0)を設定します);
- // 同期テスト用に 10 個のスレッドを開始し、各スレッドはアカウントに 1 元を追加します
- ( int i = 0; i < 10; i++) {
- logger.info( "スレッド i を作成=" + i);
- executorService.execute (新しい AccountOperationThread( " user_001" 、redisTemplate));
- }
-
- // メインスレッドは1秒間スリープし、スレッドの実行が完了するまで待機します
- TimeUnit.MILLISECONDS.sleep(1000);
- // Redis で user_001 アカウントをクエリする
- ユーザーアカウント userAccount = (UserAccount) redisTemplate.opsForValue().get( "user_001" );
- logger.info( "ユーザーID: " + userAccount.getUserId() + "金額: " + userAccount.getAmount());
- 戻る "成功" ;
- }
- }
上記のプログラムを実行します。通常、スレッドは 10 個あり、各スレッドが 1 を加算するので、結果は 10 になるはずです。しかし、複数回実行すると、結果が大きく異なり、基本的に 10 より小さくなることがわかります。 - [pool-1-thread-5] csredis.thread.AccountOperationThread: pool-1-thread-5:ユーザーID: user_001 量: 1
- [pool-1-thread-4] csredis.thread.AccountOperationThread: pool-1-thread-4:ユーザーID: user_001 量: 1
- [pool-1-thread-3] csredis.thread.AccountOperationThread: pool-1-thread-3:ユーザーID: user_001 量: 1
- [pool-1-thread-1] csredis.thread.AccountOperationThread: pool-1-thread-1:ユーザーID: user_001 量: 1
- [pool-1-thread-1] csredis.thread.AccountOperationThread: pool-1-thread-1:ユーザーID: user_001 量: 2
- [pool-1-thread-2] csredis.thread.AccountOperationThread: pool-1-thread-2:ユーザーID: user_001 量: 2
- [pool-1-thread-5] csredis.thread.AccountOperationThread: pool-1-thread-5:ユーザーID: user_001 量: 2
- [pool-1-thread-4] csredis.thread.AccountOperationThread: pool-1-thread-4:ユーザーID: user_001 量: 3
- [pool-1-thread-1] csredis.thread.AccountOperationThread: pool-1-thread-1:ユーザーID: user_001 量: 4
- [pool-1-thread-3] csredis.thread.AccountOperationThread: pool-1-thread-3:ユーザーID: user_001 量: 5
- [nio-8080- exec -1] csredis.controller.TestController:ユーザーID: user_001 量: 5
上記のログを例にとると、最初の 4 つのスレッドはすべて値を 1 に変更しました。つまり、次の 3 つのスレッドが以前の変更を上書きし、最終結果が 10 ではなく 5 になります。これは明らかに問題があります。 Redis 同期ロックの実装上記の状況では、同じ JVM 内でスレッド ロックを行うことで解決できます。ただし、分散環境では JVM レベルのロックは実装できないため、ここでは Redis 同期ロックを使用できます。 基本的な考え方: 最初のスレッドが入ると、Redis にレコードが入力されます。後続のスレッドがリクエストを送信すると、Redis にレコードが存在するかどうかが判断されます。存在する場合は、ロック状態にあり、待機または戻ることを意味します。存在しない場合は、後続の業務処理に進みます。 - /**
- * 1. リソースを取得するときに、リソースがロックされているかどうかを判断します。
- * 2. ロックされていない場合はプリエンプションが成功し、ロックが追加されます。そうでない場合は、ロックが解除されるまで待機します。
- * 3. 業務が完了したらロックを解除し、他のスレッドに渡します。
- * <p>
- * このソリューションでは、スレッドの取得とロックのプロセスがアトミック操作ではないため、同期の問題は解決されません。スレッド A がロックを取得する可能性がありますが、スレッド B もロックを取得する前にロックを取得します。
- */
- プライベートvoid redisLock() {
- ランダム random = new Random();
- 試す {
- TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
- } キャッチ (InterruptedException e) {
- e.printStackTrace();
- }
- (真)の間{
- オブジェクト lock = redisTemplate.opsForValue().get(userId + ":syn" );
- ロック == nullの場合
- // ロックを取得 -> ロック -> ループから抜け出す
- logger.info(Thread.currentThread().getName() + ":ロックを取得" );
- redisTemplate.opsForValue() 。 (userId + ":syn" 、 "lock"を設定します);
- 壊す;
- }
- 試す {
- // ロックの取得を再試行するために 500 ミリ秒待機します
- TimeUnit.MILLISECONDS.sleep(500);
- } キャッチ (InterruptedException e) {
- e.printStackTrace();
- }
- }
- 試す {
- //シミュレーションデータベースからユーザーアカウントを取得する
- ユーザーアカウント userAccount = (ユーザーアカウント) redisTemplate.opsForValue().get(userId);
- ユーザーアカウントがnullの場合
- //金額を設定する
- ユーザーアカウント.addAmount(1);
- logger.info(Thread.currentThread().getName() + " : ユーザーID : " + userId + " 金額 : " + userAccount.getAmount());
- //データベースへの保存をシミュレートする
- redisTemplate.opsForValue() 。 (userId、userAccount)を設定します。
- }
- ついに
- //ロックを解除する
- redisTemplate.delete (userId + ":syn" );
- logger.info(Thread.currentThread().getName() + ":ロックを解除" );
- }
- }
while コード ブロックでは、まず対応するユーザー ID が Redis に存在するかどうかを判断します。そうでない場合は設定してロックしてください。そうであれば、ループを終了して待機を続けます。 上記のコードはロック機能を実装しているように見えますが、プログラムを実行すると、ロックされていないかのように並行性の問題があることがわかります。その理由は、取得とロックの操作がアトミックではないからです。たとえば、2 つのスレッドがロックが両方とも null であることを検出すると、両方のスレッドがロックしますが、同時実行の問題は依然として存在します。 Redis アトミック同期ロック上記の問題に対処するために、ロックの取得とロックのプロセスをアトミックに処理することができます。 spring-boot-data-redis によって提供されるアトミック API に基づいて、次のことを実現できます。 - // このメソッドはredis命令を使用します: SETNXキー値
- // 1.キーが存在しない場合は、正常に設定されていれば値が返され、setIfAbsent はtrueを返します。
- // 2.キーが存在する場合、設定は失敗してnullを返し、setIfAbsent はfalseを返します。
- // 3.アトミック操作;
- ブール値setIfAbsent(K var1, V var2);
上記のメソッドのアトミック操作は、Redis setnx コマンドのカプセル化です。次の例では、Redis で setnx を使用します。 - redis> SETNX mykey "こんにちは"
- (整数) 1
- redis> SETNX mykey "ワールド"
- (整数) 0
- redis> GET mykey
- "こんにちは"
mykey が初めて設定されるときに、存在しない場合は、設定が成功したことを示す 1 を返します。 2 回目に mykey が設定され、すでに存在する場合は、設定が失敗したことを示す 0 が返されます。 mykey に対応する値を再度照会すると、それが最初に設定された値のままであることがわかります。つまり、redis の setnx は、一意のキーが 1 つのサービスによってのみ正常に設定されることを保証します。 上記の API と基本原理を理解した後、次のスレッドの実装コードを見てみましょう。 - /**
- * 1. アトミック操作ロック
- * 2. 競合するスレッドはループで再試行してロックを取得します
- * 3. 業務終了後はロックを解除する
- */
- プライベートvoidアトミック性RedisLock() {
- //Spring Data Redis でサポートされているアトミック操作
- (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn" , "lock" )) {
- 試す {
- // ロックの取得を再試行するために 100 ミリ秒待機します
- TimeUnit.MILLISECONDS.sleep(100);
- } キャッチ (InterruptedException e) {
- e.printStackTrace();
- }
- }
- logger.info(Thread.currentThread().getName() + ":ロックを取得" );
- 試す {
- //シミュレーションデータベースからユーザーアカウントを取得する
- ユーザーアカウント userAccount = (ユーザーアカウント) redisTemplate.opsForValue().get(userId);
- ユーザーアカウントがnullの場合
- //金額を設定する
- ユーザーアカウントに金額を追加します(1);
- logger.info(Thread.currentThread().getName() + " : ユーザーID : " + userId + " 金額 : " + userAccount.getAmount());
- //データベースへの保存をシミュレートする
- redisTemplate.opsForValue() 。 (userId、userAccount)を設定します。
- }
- ついに
- //ロックを解除する
- redisTemplate.delete (userId + ":syn" );
- logger.info(Thread.currentThread().getName() + ":ロックを解除" );
- }
- }
コードを再度実行すると、結果が正しいことがわかります。これは、分散スレッドを正常にロックできることを意味します。 Redis 分散ロック デッドロック上記コードの実行結果は問題ありませんが、アプリケーションが異常終了し、最終的にロックを解除するメソッドを実行する時間がない場合、他のスレッドはロックを取得できなくなります。 このとき、setIfAbsent のオーバーロード メソッドを使用できます。 - ブール型setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
この方法に基づいて、ロックの有効期限を設定できます。こうすることで、ロックを取得したスレッドがクラッシュした場合でも、Redis 内のデータの有効期限が切れた後は他のスレッドが正常にロックを取得できるようになります。 サンプルコードは次のとおりです。 - プライベートvoidアトミック性およびExRedisLock() {
- 試す {
- //Spring Data Redis でサポートされているアトミック操作。有効期限を 5 秒に設定します。
- while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn" 、
- System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
- // ロックの取得を再試行するために 100 ミリ秒待機します
- logger.info(Thread.currentThread().getName() + ": ループ内でロックを取得しようとしています" );
- TimeUnit.MILLISECONDS.sleep(1000);
- }
- logger.info(Thread.currentThread().getName() + ":ロックを取得--------" );
- // ここでアプリケーションがクラッシュし、プロセスが終了し、最終的に実行できなくなります。
- スレッド.currentThread().interrupt();
- // ビジネス ロジック...
- } キャッチ (InterruptedException e) {
- e.printStackTrace();
- ついに
- //ロックを解除する
- (!Thread.currentThread().isInterrupted())の場合{
- redisTemplate.delete (userId + ":syn" );
- logger.info(Thread.currentThread().getName() + ":ロックを解除" );
- }
- }
- }
ビジネスタイムアウトとデーモンスレッド上記の Redis タイムアウトを追加すると問題は解決するように見えますが、新たな問題が発生します。 たとえば、通常の状況では、スレッド A は 5 秒以内にビジネス処理を完了できますが、場合によっては 5 秒以上かかることがあります。タイムアウトを 5 秒に設定すると、スレッド A はロックを取得しますが、ビジネス ロジックの処理には 6 秒かかります。この時点では、スレッド A はまだ通常のビジネス ロジックを実行しており、スレッド B がロックを取得しています。スレッドAが処理を終了すると、スレッドBのロックを解除することができます。 上記のシナリオには 2 つの問題があります。 - まず、スレッド A とスレッド B が同時に実行される可能性があり、その結果、同時実行性の問題が発生します。
- 次に、スレッド A がスレッド B のロックを解除し、一連の悪循環を引き起こす可能性があります。
もちろん、Redis で値を設定することで、ロックがスレッド A に属しているか、スレッド B に属しているかを判別できます。しかし、注意深く分析すると、この問題の本質は、スレッド A がビジネス ロジックを実行するのにかかる時間がロック タイムアウト期間を超えていることにあることがわかります。 その場合、解決策は 2 つあります。 - まず、ロックが解除される前にビジネス コードを実行できるように、タイムアウトを十分に長く設定します。
- 次に、ロックにデーモン スレッドを追加して、期限が切れそうでまだ解放されていないロックの時間を延長します。
最初の方法では、ほとんどの場合、銀行全体のビジネス ロジックの時間消費とタイムアウト期間の設定が必要になります。 2 番目の方法は、次のデーモン スレッド メソッドを使用して、ロック タイムアウトを動的に増やすことです。 - パブリッククラスDaemonThreadはRunnableを実装します{
- プライベート最終静的Logger logger = LoggerFactory.getLogger(DaemonThread.class);
-
- // デーモン化が必要かどうか。メインスレッドが閉じられると、デーモンスレッドは終了します。
- プライベート揮発性ブールデーモン = true ;
- // ガーディアンロック
- プライベート文字列 lockKey;
-
- プライベート Redis テンプレート <Object, Object> redis テンプレート;
-
- パブリックDaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {
- ロックキーを初期化します。
- redisテンプレートをコピーします。
- }
-
- @オーバーライド
- パブリックボイド実行(){
- 試す {
- while (デーモン) {
- 長い時間= redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
- // 残りの有効期間が1秒未満の場合は、寿命を延長します
- もし (時間< 1000 ) {
- logger.info( "デーモンプロセス: " + Thread.currentThread().getName() + "ロック時間を 5000 ミリ秒延長" );
- redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
- }
- TimeUnit.MILLISECONDS.sleep(300);
- }
- logger.info( "デーモン: " + Thread.currentThread().getName() + "閉じられました" );
- } キャッチ (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- // メインスレッドはendをアクティブに呼び出します
- パブリックボイドストップ(){
- デーモン = false ;
- }
- }
上記のスレッドは、300 ミリ秒ごとに Redis のロック タイムアウトを取得します。 1秒未満の場合は5秒延長されます。メイン スレッドがシャットダウンを呼び出すと、デーモン スレッドもシャットダウンされます。 メインスレッド内の関連コード実装: - プライベートvoid deamonRedisLock() {
- //ガーディアンスレッド
- デーモンスレッド デーモンスレッド = null ;
- //Spring Data Redis でサポートされているアトミック操作。有効期限を 5 秒に設定します。
- 文字列 uuid = UUID.randomUUID().toString();
- 文字列値 = Thread.currentThread().getId() + ":" + uuid;
- 試す {
- while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn" , value, 5000, TimeUnit.MILLISECONDS)) {
- // ロックの取得を再試行するために 100 ミリ秒待機します
- logger.info(Thread.currentThread().getName() + ": ループ内でロックを取得しようとしています" );
- TimeUnit.MILLISECONDS.sleep(1000);
- }
- logger.info(Thread.currentThread().getName() + ":ロックを取得----" );
- // デーモンスレッドを開始する
- daemonThread = 新しい DaemonThread(userId + ":syn" , redisTemplate);
- スレッド thread = new Thread(daemonThread);
- スレッドを開始します。
- // ビジネス ロジックは 10 秒間実行されます...
- TimeUnit.MILLISECONDS.sleep(10000);
- } キャッチ (InterruptedException e) {
- e.printStackTrace();
- ついに
- //ロックを解除します。ここでもアトミック操作が必要です。これについては、今後 Redis + Lua で説明します。
- 文字列結果 = (文字列) redisTemplate.opsForValue().get(userId + ":syn" );
- if (値.equals(結果)) {
- redisTemplate.delete (userId + ":syn" );
- logger.info(Thread.currentThread().getName() + ":ロックを解除-----" );
- }
- //デーモンスレッドを閉じる
- デーモンスレッドがnullの場合
- デーモンスレッドを停止します。
- }
- }
- }
ロックを取得した後、デーモン スレッドが開始され、最終的に閉じられます。 Luaスクリプトに基づく実装上記のロジックでは、spring-boot-data-redis が提供するアトミック操作に基づいて、ロックの判断と実行のアトミック性を保証します。 Spring Boot 以外のプロジェクトでは、Lua スクリプトに基づいて実装できます。 まず、ロックとロック解除用の Lua スクリプトと対応する DefaultRedisScript オブジェクトを定義し、RedisConfig 構成クラスに次のインスタンス化コードを追加します。 - @構成
- パブリッククラスRedisConfig {
-
- //スクリプトをロックする
- プライベート静的最終文字列LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " +
- " その後 redis.call('expire',KEYS[1],ARGV[2]) " +
- " 1 を返す " +
- " そうでなければ 0 を返して終了 " ;
- プライベート静的最終文字列UNLOCK_SCRIPT = "redis.call('get', KEYS[1]) == ARGV[1]の場合、redis.callを返す" +
- 「('del', KEYS[1]) そうでなければ0を返す 終了」 ;
-
- // ... 一部のコードは省略
-
- @ビーン
- パブリックDefaultRedisScript<ブール値> lockRedisScript() {
- DefaultRedisScript<ブール値> defaultRedisScript = 新しい DefaultRedisScript<>();
- デフォルトのRedisScript.setResultType(Boolean.class);
- デフォルトのRedisScript.setScriptText(LOCK_SCRIPT);
- defaultRedisScriptを返します。
- }
-
- @ビーン
- パブリックDefaultRedisScript<Long>RedisScriptのロック解除() {
- DefaultRedisScript<Long> defaultRedisScript = 新しい DefaultRedisScript<>();
- デフォルトのRedisScript.setResultType(Long.class);
- デフォルトのRedisScript.setScriptText(UNLOCK_SCRIPT);
- defaultRedisScriptを返します。
- }
- }
次に、AccountOperationThread クラスに新しい構築メソッドを作成し、上記の 2 つのオブジェクトをクラスに渡します (デモのこの部分は省略されています)。その後、RedisTemplate に基づいて呼び出すことができます。変更されたコードは次のとおりです。 - プライベートvoid deamonRedisLockWithLua() {
- //ガーディアンスレッド
- デーモンスレッド デーモンスレッド = null ;
- //Spring Data Redis でサポートされているアトミック操作。有効期限を 5 秒に設定します。
- 文字列 uuid = UUID.randomUUID().toString();
- 文字列値 = Thread.currentThread().getId() + ":" + uuid;
- 試す {
- while (! redisTemplate.execute (lockRedisScript, Collections.singletonList(userId + ":syn" ), value, 5)) {
- // ロックの取得を再試行するために 1000 ミリ秒待機します
- logger.info(Thread.currentThread().getName() + ": ループ内でロックを取得しようとしています" );
- TimeUnit.MILLISECONDS.sleep(1000);
- }
- logger.info(Thread.currentThread().getName() + ":ロックを取得----" );
- // デーモンスレッドを開始する
- daemonThread = 新しい DaemonThread(userId + ":syn" , redisTemplate);
- スレッド thread = new Thread(daemonThread);
- スレッドを開始します。
- // ビジネス ロジックは 10 秒間実行されます...
- TimeUnit.MILLISECONDS.sleep(10000);
- } キャッチ (InterruptedException e) {
- logger.error( "例外" , e);
- ついに
- //Luaスクリプトを使用する: 最初にロックが設定されているかどうかを確認し、削除を実行します
- //キーが存在し、現在の値が期待値と等しい場合は、キーを削除します。キーが存在し、現在の値が期待値と異なる場合は 0 を返します。
- 長い結果 = redisTemplate。実行(unlockRedisScript、Collections.singletonList(userId + ":syn" )、値);
- logger.info( "redis のロック解除:{}" 、 RELEASE_SUCCESS.equals(result));
- (RELEASE_SUCCESS.equals(結果)) の場合 {
- デーモンスレッドがnullの場合
- //デーモンスレッドを閉じる
- デーモンスレッドを停止します。
- logger.info(Thread.currentThread().getName() + ":ロックを解除---" );
- }
- }
- }
- }
while ループでのロックと finally でのロックの解放は、どちらも Lua スクリプトに基づいて実装されています。 Redisロックのその他の要素上記の例に加えて、Redis 分散ロックを使用する場合は、次の状況と解決策も考慮できます。 Redisのロックは再入可能ではない スレッドがロックを保持したまま再度ロックを要求する場合、ロックがスレッドによる複数のロックをサポートしていれば、ロックは再入可能になります。再入不可能なロックが再度ロックされると、ロックがすでに保持されているため、ロックは失敗します。 Redis はロックの再入力をカウントし、ロック時に 1 を加算し、ロック解除時に 1 を減算し、カウントが 0 に戻るとロックを解除します。 再入可能ロックは効率的ですが、コードの複雑さが増すため、ここでは例を挙げません。 ロックが解除されるのを待っています 一部のビジネス シナリオでは、システムがロックされていることが判明すると、直接戻ります。しかし、シナリオによっては、クライアントはロックを取得する前にロックが解放されるのを待つ必要があります。上記の例は後者に属します。ロックの解放を待つための解決策も 2 つあります。 クライアント ポーリング: ロックが取得されない場合は、しばらく待ってから成功するまで再試行します。上記の例はこのように実装されています。この方法の欠点も明らかです。同時実行量が多い場合、サーバーのリソース消費量が増加し、サーバーの効率に影響します。 Redis のサブスクリプションとパブリッシュ機能を使用します。ロックの取得に失敗した場合はロック解除メッセージをサブスクライブし、ロックの取得と解除に成功した場合は解除メッセージを送信します。 クラスタ内のアクティブ/スタンバイの切り替えとスプリットブレイン マスターとスレーブの同期を含む Redis クラスターの展開モードでは、マスター ノードがハングアップすると、スレーブ ノードがマスター ノードに昇格されます。クライアント A がマスター ノードを正常にロックしたが、コマンドがスレーブ ノードにまだ同期されていない場合、マスター ノードはクラッシュし、スレーブ ノードがマスター ノードに昇格されます。新しいマスター ノードにはロックされたデータは含まれません。この場合、クライアント B はセッションを正常にロックし、同時実行シナリオが発生する可能性があります。 クラスターでスプリットブレインが発生すると、Redis マスターノードはスレーブノードおよびセンチネルクラスターとは異なるネットワークパーティションに存在します。センチネル クラスターはマスターの存在を感知できないため、スレーブ ノードをマスター ノードに昇格します。この時点では、2 つの異なるマスター ノードが存在します。これにより、同時実行の問題も発生します。 Redis Cluster の展開方法も同様です。 まとめ本番環境で問題が発生し、原因をトラブルシューティングし、解決策を見つけ、最終的にRedisの分散に基づいて詳細な調査を行うことが、学習プロセスです。 同時に、インタビューを受けたり、分散共有リソースの解決方法を尋ねられたりするときはいつでも、「Redis に基づいて分散ロックを実装する」と口走りますが、この記事の学習を通じて、Redis の分散ロックは万能ではなく、使用の過程でタイムアウト、デッドロック、ロックの誤解、クラスター リーダーの選出/ブレイン スプリットなどの問題にも注意する必要があることがわかります。 Redis は高性能であることで知られていますが、分散ロックを実装するプロセスにはまだいくつかの問題があります。したがって、Redis ベースの分散ロックは同時実行の問題を大幅に軽減できますが、同時実行を完全に防止するには、データベース レベルから開始する必要があります。 ソースコードアドレス: https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock 参考記事: https://jinzhihong.github.io/2019/08/12/%E6%B7%B1%E5%85%A5%E6%B5%85%E5%87%BA-Redis-%E5% 88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%8E%9F%E7%90%86%E4%B8%8E%E5%AE%9E%E7%8E%B0-%E4%B8%80/ https://xiaomi-info.github.io/2019/12/17/redis-distributed-lock/ |