新しく引き継いだプロジェクトでは、アカウントの不均衡により問題が発生する場合があります。前の技術担当の上司は、退職前に「調べたが原因がわからなかった、忙しくて解決できなかったのでフレームワークが原因かもしれない」と説明していましたが… プロジェクトが完了したので、このような問題を解決する必要があります。すべての会計処理ロジックを整理した後、ようやく原因がわかりました。データベース内のホット アカウントに対する同時操作が原因でした。今回は、分散システムにおける Redis ベースの分散ロックについて説明します。ちなみに、問題の原因と解決策も分析してみましょう。 原因分析システムの同時実行性は高くなく、ホットアカウントもありますが、それほど深刻ではありません。問題の根本は、同時実行性を人為的に生み出すシステム アーキテクチャ設計にあります。シナリオは次のとおりです。販売者がバッチでデータをインポートすると、システムが前処理を実行し、口座残高を増減します。 この時点で、別のスケジュールされたタスクもアカウントをスキャンして更新します。さらに、同じアカウントに対する操作がさまざまなシステムに分散され、ホットアカウントが出現します。 この問題の解決策として、アーキテクチャの観点から、会計システムを抽出して 1 つのシステムに集中させることが考えられます。すべてのデータベース トランザクションと実行順序は、会計システムによって調整および処理されます。技術的な観点から見ると、ホット スポット アカウントはロック メカニズムによってロックされる可能性があります。 この記事では、分散ロックに基づいてホットスポット アカウントを実装する方法について詳しく説明します。 ロック分析Java のマルチスレッド環境では、通常、使用できるロックの種類がいくつかあります。 - 一般的に使用される JVM メモリ モデル レベルのロックは、synchronized、Lock などです。
- 楽観的ロック、悲観的ロックなどのデータベース ロック。
JVM メモリ レベルのロックは、複数のスレッドがグローバル変数にアクセスしたり変更したりするなど、単一のサービス内のスレッドのセキュリティを確保できます。しかし、システムがクラスターに展開されている場合、JVM レベルでのローカル ロックは無力になります。 悲観的ロックと楽観的ロック上記のケースのように、ホット アカウントは分散システム内の共有リソースであり、通常はデータベース ロックまたは分散ロックを使用して解決します。 データベース ロックは、楽観的ロックと悲観的ロックに分けられます。 悲観的ロックは、データベース (MySQL の InnoDB) によって提供される排他ロックに基づいて実装されます。トランザクションを実行すると、MySQL は select ... for update ステートメントを通じてクエリ結果セット内の各データ行に排他ロックを追加し、他のスレッドはレコードの更新および削除操作をブロックします。これにより、共有リソースの順次実行(変更)が実現されます。 楽観的ロックは悲観的ロックに相対的です。楽観的ロックでは、通常の状況ではデータが競合を引き起こさないと想定されるため、データが更新のために送信されると、データが競合しているかどうかを正式にチェックします。競合が発生した場合は、例外メッセージがユーザーに返され、ユーザーは対処方法を決定できます。楽観的ロックは、書き込みよりも読み取りが多いシナリオに適しており、プログラムのスループットを向上させることができます。楽観的ロックを実装する場合、通常は記録ステータスまたはバージョンの追加に基づいて実装されます。 悲観的なロック失敗シナリオプロジェクトでは悲観的ロックが使用されましたが、失敗しました。これは、悲観的ロックを使用する場合によくある誤解でもあります。以下で分析してみましょう。 悲観的ロックを使用する通常のプロセス: - 更新のために select ... を通じてレコードをロックします。
プロセスにおけるよくある間違い: - 更新のために select ... を通じてレコードをロックします。
間違ったプロセスでは、たとえば、サービス A と B によって照会された残高が両方とも 100 で、A が 50 を差し引き、B が 40 を差し引き、その後 A がレコードをロックし、データベースを 50 に更新します。 A がロックを解除した後、B はレコードをロックし、データベースを 60 に更新します。明らかに、後者は前者の更新を上書きします。解決策は、ロックの範囲を拡大し、新しい残高を計算する前にロックを進めることです。 通常、悲観的ロックはデータベースに大きな負担をかけます。実際には、シナリオに応じて楽観的ロックまたは分散ロックを使用して実装されるのが一般的です。 それでは、本題に入り、Redis に基づく分散ロックの実装について説明しましょう。 Redis 分散ロックの実践演習ここでは、Spring Boot、Redis、Lua スクリプトを例に、分散ロックの実装を説明します。プロセスを簡素化するために、この例の Redis は分散ロックとデータベースの両方の機能を想定しています。 シーン構築クラスター環境で、同じアカウントの金額を操作するための基本的な手順は次のとおりです。 以下は、初期のロック解除と非同期処理から最終的な分散ロックを段階的に推論したものです。 基本的な統合とクラス構築ロック解除処理のための基本的なビジネス環境を準備します。 まず、Spring Boot プロジェクトに関連する依存関係を導入します。 - <依存関係>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </依存関係>
- <依存関係>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </依存関係>
アカウントはエンティティ クラス UserAccount に対応します。 - 公共 クラスUserAccount {
-
-
- プライベート文字列ユーザーID;
-
- プライベート 整数量;
-
-
- 公共 void addAmount( int金額) {
- this .amount = this .amount + amount;
- }
-
- }
スレッド実装クラス AccountOperationThread を作成します。 - 公共 クラスAccountOperationThreadは Runnableを実装します{
-
- プライベート ファイナル 静的Logger ロガー = LoggerFactory.getLogger(AccountOperationThread.class ) ;
-
- プライベート 静的 最終的なLong RELEASE_SUCCESS = 1L;
-
- プライベート文字列ユーザーID;
-
- プライベートRedis テンプレート <Object, Object> redis テンプレート;
-
- パブリックアカウント操作スレッド(文字列userId、RedisTemplate<Object、Object> redisTemplate) {
- this .userId = ユーザーId;
- これは.redisTemplate = redisTemplate; です。
- }
-
- @オーバーライド
- 公共 void実行() {
- ロックなし();
- }
-
-
-
- プライベート void noLock() {
- 試す{
- ランダム random = new Random();
-
- TimeUnit.MILLISECONDS.sleep(random.nextInt( 100 ) +1 );
- }キャッチ(InterruptedException e) {
- e.printStackTrace();
- }
-
- ユーザーアカウント userAccount = (ユーザーアカウント) redisTemplate.opsForValue().get(userId);
-
- ユーザーアカウントに金額を追加します( 1 );
- logger.info(Thread.currentThread().getName() + " : ユーザーID : " + userId + " 金額 : " + userAccount.getAmount());
-
- redisTemplate.opsForValue() を設定します(userId、userAccount);
- }
- }
RedisTemplate のインスタンス化は Spring Boot に渡されます。 - @構成
- 公共 クラスRedisConfig {
-
- @ビーン
- パブリックRedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
- Redis テンプレートを新規作成します。
- redisTemplate.setConnectionFactory(redisConnectionFactory);
- Jackson2JsonRedisSerializer<オブジェクト> jackson2JsonRedisSerializer =
- 新しいJackson2JsonRedisSerializer<>(Object. class );
- オブジェクトマッパー objectMapper =新しいオブジェクトマッパー();
- objectMapper.setVisibility(PropertyAccessor.ALL、JsonAutoDetect.Visibility.ANY);
- オブジェクトマッパーのデフォルトタイピングを有効にします(ObjectMapper.DefaultTyping.NON_FINAL);
- jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
-
- redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
- redisTemplate.setKeySerializer(新しいStringRedisSerializer());
- redisTemplate.afterPropertiesSet();
- redisTemplateを返します。
- }
- }
最後に、マルチスレッド操作をトリガーする TestController を準備します。 - @レストコントローラ
- 公共 クラスTestController {
-
- プライベート ファイナル 静的Logger ロガー = LoggerFactory.getLogger(TestController. class );
-
- プライベート 静的ExecutorService executorService = Executors.newFixedThreadPool( 10 );
-
- オートワイヤード
- プライベートRedis テンプレート <Object, Object> redis テンプレート;
-
- @GetMapping ( "/テスト" )
- パブリック文字列test()はInterruptedExceptionをスローします{
-
- redisTemplate.opsForValue().set( "user_001" 、新しいUserAccount( "user_001" 、 0 ));
-
- ( int i = 0 ; i < 10 ; i++) {
- logger.info( "スレッド i を作成=" + i);
- executorService.execute(新しいAccountOperationThread( "user_001" 、 redisTemplate));
- }
-
-
- TimeUnit.MILLISECONDS.sleep( 1000 );
-
- ユーザーアカウント userAccount = (UserAccount) redisTemplate.opsForValue().get( "user_001" );
- logger.info( "ユーザーID: " + userAccount.getUserId() + "金額: " + userAccount.getAmount());
- 戻る "成功" ;
- }
- }
上記のプログラムを実行します。通常、スレッドは 10 個あり、各スレッドが 1 を加算するので、結果は 10 になるはずです。しかし、複数回実行すると、結果が大きく異なり、基本的に 10 より小さくなることがわかります。 - [プール- 1 -スレッド- 5 ] csredis.thread.AccountOperationThread: プール- 1 -スレッド- 5 : ユーザーID: user_001 量: 1
- [プール- 1 -スレッド- 4 ] csredis.thread.AccountOperationThread: プール- 1 -スレッド- 4 : ユーザーID: user_001 量: 1
- [プール- 1 -スレッド- 3 ] csredis.thread.AccountOperationThread : プール- 1 -スレッド- 3 : ユーザーID : user_001 量 : 1
- [プール- 1 -スレッド- 1 ] csredis.thread.AccountOperationThread : プール- 1 -スレッド- 1 : ユーザーID : user_001 量 : 1
- [プール- 1 -スレッド- 1 ] csredis.thread.AccountOperationThread : プール- 1 -スレッド- 1 : ユーザーID : user_001 量 : 2
- [プール1 -スレッド2 ] csredis.thread.AccountOperationThread: プール1 -スレッド2 : ユーザー ID: user_001 量: 2
- [プール- 1 -スレッド- 5 ] csredis.thread.AccountOperationThread: プール- 1 -スレッド- 5 : ユーザーID: user_001 量: 2
- [プール- 1 -スレッド- 4 ] csredis.thread.AccountOperationThread: プール- 1 -スレッド- 4 : ユーザーID: user_001 量: 3
- [プール- 1 -スレッド- 1 ] csredis.thread.AccountOperationThread: プール- 1 -スレッド- 1 : ユーザーID: user_001 量: 4
- [プール- 1 -スレッド- 3 ] csredis.thread.AccountOperationThread: プール- 1 -スレッド- 3 : ユーザーID: user_001 量: 5
- [nio- 8080 -exec- 1 ] csredis.controller.TestController: ユーザーID: user_001 量: 5
上記のログを例にとると、最初の 4 つのスレッドはすべて値を 1 に変更しました。つまり、次の 3 つのスレッドが以前の変更を上書きし、最終結果が 10 ではなく 5 になります。これは明らかに問題があります。 Redis 同期ロックの実装上記の状況では、同じ JVM 内でスレッド ロックを行うことで解決できます。ただし、分散環境では JVM レベルのロックは実装できないため、ここでは Redis 同期ロックを使用できます。 基本的な考え方: 最初のスレッドが入ると、Redis にレコードが入力されます。後続のスレッドがリクエストを送信すると、Redis にレコードが存在するかどうかが判断されます。存在する場合は、ロック状態にあり、待機または戻ることを意味します。存在しない場合は、後続の業務処理に進みます。 -
-
- プライベート void redisLock() {
- ランダム random = new Random();
- 試す{
- TimeUnit.MILLISECONDS.sleep(random.nextInt( 1000 ) +1 );
- }キャッチ(InterruptedException e) {
- e.printStackTrace();
- }
- (真)の間{
- オブジェクト lock = redisTemplate.opsForValue().get(userId + ":syn" );
- ロック == nullの場合
-
- logger.info(Thread.currentThread().getName() + ":ロックを取得" );
- redisTemplate.opsForValue().set(userId + ":syn" , "lock" );
- 壊す;
- }
- 試す{
-
- TimeUnit.MILLISECONDS.sleep( 500 );
- }キャッチ(InterruptedException e) {
- e.printStackTrace();
- }
- }
- 試す{
-
- ユーザーアカウント userAccount = (ユーザーアカウント) redisTemplate.opsForValue().get(userId);
- ユーザーアカウントがnullの場合
-
- ユーザーアカウントに金額を追加します( 1 );
- logger.info(Thread.currentThread().getName() + " : ユーザーID : " + userId + " 金額 : " + userAccount.getAmount());
-
- redisTemplate.opsForValue() を設定します(userId、userAccount);
- }
- 最後に
-
- redisTemplate.delete(userId + ":syn" );
- logger.info(Thread.currentThread().getName() + ":ロックを解除" );
- }
- }
while コード ブロックでは、まず対応するユーザー ID が Redis に存在するかどうかを判断します。そうでない場合は設定してロックしてください。そうであれば、ループを終了して待機を続けます。 上記のコードはロック機能を実装しているように見えますが、プログラムを実行すると、ロックされていないかのように並行性の問題があることがわかります。その理由は、取得とロックの操作がアトミックではないからです。たとえば、2 つのスレッドがロックが両方とも null であることを検出すると、両方のスレッドがロックしますが、同時実行の問題は依然として存在します。 Redis アトミック同期ロック上記の問題に対処するために、ロックの取得とロックのプロセスをアトミックに処理することができます。 spring-boot-data-redis によって提供されるアトミック API に基づいて、次のことを実現できます。 -
-
-
-
- ブール値setIfAbsent(K var1, V var2);
上記のメソッドのアトミック操作は、Redis setnx コマンドのカプセル化です。次の例では、Redis で setnx を使用します。 - redis> SETNX mykey "こんにちは"
- (整数) 1
- redis> SETNX mykey "ワールド"
- (整数) 0
- redis> GET mykey
- "こんにちは"
mykey が初めて設定されるときに、存在しない場合は、設定が成功したことを示す 1 を返します。 2 回目に mykey が設定され、すでに存在する場合は、設定が失敗したことを示す 0 が返されます。 mykey に対応する値を再度照会すると、それが最初に設定された値のままであることがわかります。つまり、redis の setnx は、一意のキーが 1 つのサービスによってのみ正常に設定されることを保証します。 上記の API と基本原理を理解した後、次のスレッドの実装コードを見てみましょう。 -
-
- プライベート voidアトミック性RedisLock() {
-
- (!redisTemplate.opsForValue().setIfAbsent(userId + " :syn" , "lock" )) {
- 試す{
-
- TimeUnit.MILLISECONDS.sleep( 100 );
- }キャッチ(InterruptedException e) {
- e.printStackTrace();
- }
- }
- logger.info(Thread.currentThread().getName() + ":ロックを取得" );
- 試す{
-
- ユーザーアカウント userAccount = (ユーザーアカウント) redisTemplate.opsForValue().get(userId);
- ユーザーアカウントがnullの場合
-
- ユーザーアカウントに金額を追加します( 1 );
- logger.info(Thread.currentThread().getName() + " : ユーザーID : " + userId + " 金額 : " + userAccount.getAmount());
-
- redisTemplate.opsForValue() を設定します(userId、userAccount);
- }
- 最後に
-
- redisTemplate.delete(userId + ":syn" );
- logger.info(Thread.currentThread().getName() + ":ロックを解除" );
- }
- }
コードを再度実行すると、結果が正しいことがわかります。これは、分散スレッドを正常にロックできることを意味します。 Redis 分散ロック デッドロック上記コードの実行結果は問題ありませんが、アプリケーションが異常終了し、最終的にロックを解除するメソッドを実行する時間がない場合、他のスレッドはロックを取得できなくなります。 このとき、setIfAbsent のオーバーロード メソッドを使用できます。 - ブール型setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
この方法に基づいて、ロックの有効期限を設定できます。こうすることで、ロックを取得したスレッドがクラッシュした場合でも、Redis 内のデータの有効期限が切れた後は他のスレッドが正常にロックを取得できるようになります。 サンプルコードは次のとおりです。 - プライベート voidアトミック性およびExRedisLock() {
- 試す{
-
- while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn" 、
- System.currentTimeMillis() + 5000 , 5000 , TimeUnit.MILLISECONDS)) {
-
- logger.info(Thread.currentThread().getName() + ": ループ内でロックを取得しようとしています" );
- TimeUnit.MILLISECONDS.sleep( 1000 );
- }
- logger.info(Thread.currentThread().getName() + ":ロックを取得--------" );
-
- スレッド.currentThread().interrupt();
-
- }キャッチ(InterruptedException e) {
- e.printStackTrace();
- 最後に
-
- (!Thread.currentThread().isInterrupted())の場合{
- redisTemplate.delete(userId + ":syn" );
- logger.info(Thread.currentThread().getName() + ":ロックを解除" );
- }
- }
- }
ビジネスタイムアウトとデーモンスレッド上記の Redis タイムアウトを追加すると問題は解決するように見えますが、新たな問題が発生します。 たとえば、通常の状況では、スレッド A は 5 秒以内にビジネス処理を完了できますが、場合によっては 5 秒以上かかることがあります。タイムアウトを 5 秒に設定すると、スレッド A はロックを取得しますが、ビジネス ロジックの処理には 6 秒かかります。この時点では、スレッド A はまだ通常のビジネス ロジックを実行しており、スレッド B がロックを取得しています。スレッドAが処理を終了すると、スレッドBのロックを解除することができます。 上記のシナリオには 2 つの問題があります。 - まず、スレッド A とスレッド B が同時に実行される可能性があり、その結果、同時実行性の問題が発生します。
- 次に、スレッド A がスレッド B のロックを解除し、一連の悪循環を引き起こす可能性があります。
もちろん、Redis で値を設定することで、ロックがスレッド A に属しているか、スレッド B に属しているかを判別できます。しかし、注意深く分析すると、この問題の本質は、スレッド A がビジネス ロジックを実行するのにかかる時間がロック タイムアウト期間を超えていることにあることがわかります。 その場合、解決策は 2 つあります。 - まず、ロックが解除される前にビジネス コードを実行できるように、タイムアウトを十分に長く設定します。
- 次に、ロックにデーモン スレッドを追加して、期限が切れそうでまだ解放されていないロックの時間を延長します。
最初の方法では、ほとんどの場合、銀行全体のビジネス ロジックの時間消費とタイムアウト期間の設定が必要になります。 2 番目の方法は、次のデーモン スレッド メソッドを使用して、ロック タイムアウトを動的に増やすことです。 - 公共 クラスDaemonThreadはRunnableを実装します{
- プライベート ファイナル 静的Logger ロガー = LoggerFactory.getLogger(DaemonThread.class ) ;
-
-
- プライベート 不安定な ブール値デーモン = true ;
-
- プライベート文字列 lockKey;
-
- プライベートRedis テンプレート <Object, Object> redis テンプレート;
-
- パブリックDaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {
- .lockKey = lockKey ;
- これは.redisTemplate = redisTemplate; です。
- }
-
- @オーバーライド
- 公共 void実行() {
- 試す{
- while (デーモン) {
- 長い時間 = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
-
- (時間 < 1000 )の場合{
- logger.info( "デーモンプロセス: " + Thread.currentThread().getName() + "ロック時間を 5000 ミリ秒延長" );
- redisTemplate.expire(lockKey, 5000 , TimeUnit.MILLISECONDS);
- }
- TimeUnit.MILLISECONDS.sleep( 300 );
- }
- logger.info( "デーモン: " + Thread.currentThread().getName() + "閉じられました" );
- }キャッチ(InterruptedException e) {
- e.printStackTrace();
- }
- }
-
-
- 公共 void stop() {
- デーモン = false ;
- }
- }
上記のスレッドは、300 ミリ秒ごとに Redis のロック タイムアウトを取得します。 1秒未満の場合は5秒延長されます。メイン スレッドがシャットダウンを呼び出すと、デーモン スレッドもシャットダウンされます。 メインスレッド内の関連コード実装: - プライベート voidデーモンRedisLock() {
-
- デーモンスレッド デーモンスレッド = null ;
-
- 文字列 uuid = UUID.randomUUID().toString();
- 文字列値 = Thread.currentThread().getId() + ":" + uuid;
- 試す{
- while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn" , value, 5000 , TimeUnit.MILLISECONDS)) {
-
- logger.info(Thread.currentThread().getName() + ": ループ内でロックを取得しようとしています" );
- TimeUnit.MILLISECONDS.sleep( 1000 );
- }
- logger.info(Thread.currentThread().getName() + ":ロックを取得----" );
-
- daemonThread =新しいDaemonThread(userId + ":syn" , redisTemplate);
- スレッド thread = new Thread(daemonThread);
- スレッドを開始します。
-
- TimeUnit.MILLISECONDS.sleep( 10000 );
- }キャッチ(InterruptedException e) {
- e.printStackTrace();
- 最後に
-
- 文字列結果 = (文字列) redisTemplate.opsForValue().get(userId + ":syn" );
- if (値.equals(結果)) {
- redisTemplate.delete(userId + ":syn" );
- logger.info(Thread.currentThread().getName() + ":ロックを解除-----" );
- }
-
- デーモンスレッドがnullの場合
- デーモンスレッドを停止します。
- }
- }
- }
ロックを取得した後、デーモン スレッドが開始され、最終的に閉じられます。 Luaスクリプトに基づく実装上記のロジックでは、spring-boot-data-redis が提供するアトミック操作に基づいて、ロックの判断と実行のアトミック性を保証します。 Spring Boot 以外のプロジェクトでは、Lua スクリプトに基づいて実装できます。 まず、ロックとロック解除用の Lua スクリプトと対応するDefaultRedisScript オブジェクトを定義し、 RedisConfig 構成クラスに次のインスタンス化コードを追加します。 - @構成
- 公共 クラスRedisConfig {
-
-
- プライベート 静的 最終文字列 LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " +
- " その後 redis.call('expire',KEYS[1],ARGV[2]) " +
- " 1 を返す " +
- " そうでなければ 0 を返して終了 " ;
- プライベート 静的 最終文字列 UNLOCK_SCRIPT = "redis.call('get', KEYS[1]) == ARGV[1] の場合、redis.call を返します" +
- 「('del', KEYS[1]) そうでなければ0を返す 終了」 ;
-
-
-
- @ビーン
- パブリックDefaultRedisScript<ブール値> lockRedisScript() {
- DefaultRedisScript<ブール値> defaultRedisScript =新しいDefaultRedisScript<>();
- defaultRedisScript.setResultType( Boolean.class );
- デフォルトのRedisScript.setScriptText(LOCK_SCRIPT);
- defaultRedisScriptを返します。
- }
-
- @ビーン
- パブリックDefaultRedisScript<Long>RedisScriptのロック解除() {
- DefaultRedisScript<Long> defaultRedisScript =新しいDefaultRedisScript<>();
- defaultRedisScript.setResultType( Long.class );
- デフォルトのRedisScript.setScriptText(UNLOCK_SCRIPT);
- defaultRedisScriptを返します。
- }
- }
次に、 AccountOperationThread クラスに新しい構築メソッドを作成し、上記の 2 つのオブジェクトをクラスに渡します (デモのこの部分は省略されています)。その後、 RedisTemplate に基づいて呼び出すことができます。変更されたコードは次のとおりです。 - プライベート void deamonRedisLockWithLua() {
-
- デーモンスレッド デーモンスレッド = null ;
-
- 文字列 uuid = UUID.randomUUID().toString();
- 文字列値 = Thread.currentThread().getId() + ":" + uuid;
- 試す{
- while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn" ), value, 5 )) {
-
- logger.info(Thread.currentThread().getName() + ": ループ内でロックを取得しようとしています" );
- TimeUnit.MILLISECONDS.sleep( 1000 );
- }
- logger.info(Thread.currentThread().getName() + ":ロックを取得----" );
-
- daemonThread =新しいDaemonThread(userId + ":syn" , redisTemplate);
- スレッド thread = new Thread(daemonThread);
- スレッドを開始します。
-
- TimeUnit.MILLISECONDS.sleep( 10000 );
- }キャッチ(InterruptedException e) {
- logger.error( "例外" , e);
- 最後に
-
-
- 長い結果 = redisTemplate.execute(unlockRedisScript、Collections.singletonList(userId + ":syn" )、値);
- logger.info( "redis のロック解除:{}" 、 RELEASE_SUCCESS.equals(result));
- (RELEASE_SUCCESS.equals(結果))の場合{
- デーモンスレッドがnullの場合
-
- デーモンスレッドを停止します。
- logger.info(Thread.currentThread().getName() + ":ロックを解除---" );
- }
- }
- }
- }
while ループでのロックと finally でのロックの解放は、どちらも Lua スクリプトに基づいて実装されています。 Redisロックのその他の要素上記の例に加えて、Redis 分散ロックを使用する場合は、次の状況と解決策も考慮できます。 Redisのロックは再入可能ではないスレッドがロックを保持したまま再度ロックを要求する場合、ロックがスレッドによる複数のロックをサポートしていれば、ロックは再入可能になります。再入不可能なロックが再度ロックされると、ロックがすでに保持されているため、ロックは失敗します。 Redis はロックの再入力をカウントし、ロック時に 1 を加算し、ロック解除時に 1 を減算し、カウントが 0 に戻るとロックを解除します。 再入可能ロックは効率的ですが、コードの複雑さが増すため、ここでは例を挙げません。 ロックが解除されるのを待っています一部のビジネス シナリオでは、システムがロックされていることが判明すると、直接戻ります。しかし、シナリオによっては、クライアントはロックを取得する前にロックが解放されるのを待つ必要があります。上記の例は後者に属します。ロックの解放を待つための解決策も 2 つあります。 - クライアント ポーリング: ロックが取得されない場合は、しばらく待ってから成功するまで再試行します。上記の例はこのように実装されています。この方法の欠点も明らかです。同時実行量が多い場合、サーバーのリソース消費量が増加し、サーバーの効率に影響します。
- Redis のサブスクリプションとパブリッシュ機能を使用します。ロックの取得に失敗した場合はロック解除メッセージをサブスクライブし、ロックの取得と解除に成功した場合は解除メッセージを送信します。
クラスタ内のアクティブ/スタンバイの切り替えとスプリットブレインマスターとスレーブの同期を含む Redis クラスターの展開モードでは、マスター ノードがハングアップすると、スレーブ ノードがマスター ノードに昇格されます。クライアント A がマスター ノードを正常にロックしたが、コマンドがスレーブ ノードにまだ同期されていない場合、マスター ノードはクラッシュし、スレーブ ノードがマスター ノードに昇格されます。新しいマスター ノードにはロックされたデータは含まれません。この場合、クライアント B はセッションを正常にロックし、同時実行シナリオが発生する可能性があります。 クラスターでスプリットブレインが発生すると、Redis マスターノードはスレーブノードおよびセンチネルクラスターとは異なるネットワークパーティションに存在します。センチネル クラスターはマスターの存在を感知できないため、スレーブ ノードをマスター ノードに昇格します。この時点では、2 つの異なるマスター ノードが存在します。これにより、同時実行の問題も発生します。 Redis Cluster の展開方法も同様です。 まとめ本番環境で問題が発生し、原因をトラブルシューティングし、解決策を見つけ、最終的にRedisの分散に基づいて詳細な調査を行うことが、学習プロセスです。 同時に、インタビューを受けたり、分散共有リソースの解決方法を尋ねられたりするときはいつでも、「Redis に基づいて分散ロックを実装する」と口走りますが、この記事の学習を通じて、Redis の分散ロックは万能ではなく、使用の過程でタイムアウト、デッドロック、ロックの誤解、クラスター リーダーの選出/ブレイン スプリットなどの問題にも注意する必要があることがわかります。 Redis は高性能であることで知られていますが、分散ロックを実装するプロセスにはまだいくつかの問題があります。したがって、Redis ベースの分散ロックは同時実行の問題を大幅に軽減できますが、同時実行を完全に防止するには、データベース レベルから開始する必要があります。 |