環境: SpringBoot2.7.12 この記事では、Spring Integrationが提供する分散ロック機能について紹介します。 1. はじめにSpring Integration は、イベント駆動型アプリケーションを構築するためのフレームワークです。 Spring Integration では、LockRegistry は分散ロックを管理するためのインターフェースです。分散ロックは、分散システム内の複数のノード間で共有リソースへの相互排他的アクセスを保証するために使用される同期メカニズムです。 LockRegistry と関連サブインターフェース (RenewableLockRegistry など) の主な機能は次のとおりです。 - ロックの取得: アプリケーションが共有リソースにアクセスする必要がある場合、LockRegistry を通じてロックを取得できます。
- ロックを解除する: アプリケーションが共有リソースへのアクセスを完了したら、他のアプリケーションがロックを取得できるようにロックを解除する必要があります (最初のポイントで述べたように、直接的なロック解除操作は提供されていませんが、内部で自動的に実行されます)。
- 更新: 必要に応じてロック保持時間を延長するための更新メカニズムを提供します。
一般的な LockRegistry 実装には、データベース、ZooKeeper、Redis に基づくものが含まれます。 パブリック依存関係<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency>
2. データベースベースの分散ロック依存関係の導入<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> </dependency> <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> <scope>compile</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.30</version> </dependency>
構成spring: datasource: driverClassName: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/spring_lock?serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&useSSL=false username: root password: xxxooo type: com.zaxxer.hikari.HikariDataSource hikari: minimumIdle: 10 maximumPoolSize: 200 --- spring: integration: jdbc: initialize-schema: always # 基于数据库需要执行初始化脚本schema: classpath:schema-mysql.sql
コアBeanオブジェクトの登録@Bean public DefaultLockRepository defaultLockRepository(DataSource dataSource) { DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource); // 这里根据你的业务需要,配置表前缀,默认:IN_ lockRepository.setPrefix("T_") ; return lockRepository ; } // 注册基于数据库的分布式锁@Bean public JdbcLockRegistry jdbcLockRegistry(DefaultLockRepository lockRepository) { return new JdbcLockRegistry(lockRepository) ; }
テストケース@Test public void testLock() throws Exception int len = 10 ; CountDownLatch cdl = new CountDownLatch(len) ; CountDownLatch waiter = new CountDownLatch(len) ; Thread[] ts = new Thread[len] ; for (int i = 0; i < len; i++) { ts[i] = new Thread(() -> { waiter.countDown() ; System.out.println(Thread.currentThread().getName() + " - 准备获取锁") ; try { waiter.await() ; } catch (InterruptedException e1) { e1.printStackTrace(); } // 获取锁Lock lock = registry.obtain("drug_store_key_001") ; lock.lock() ; System.out.println(Thread.currentThread().getName() + " - 获取锁成功") ; try { try { TimeUnit.SECONDS.sleep(2) ; } catch (InterruptedException e) { e.printStackTrace(); } } finally { // 释放锁lock.unlock() ; cdl.countDown() ; System.out.println(Thread.currentThread().getName() + " - 锁释放成功") ; } }, "T - " + i) ; } for (int i = 0; i < len; i++) { ts[i].start() ; } cdl.await() ; }
データベース写真 ロック実装 JdbcLock は java.util.concurrent.locks.Lock を実装するため、ロックは再入などの操作をサポートします。 ロック取得に失敗した後の再試行間隔を設定します。デフォルト値は100msです。 JdbcLockRegistry jdbcLockRegistry = new JdbcLockRegistry(lockRepository); // 定义锁对象时设置当获取锁失败后重试间隔时间。 jdbcLockRegistry.setIdleBetweenTries(Duration.ofMillis(200)) ; ロックの更新jdbcLockRegistry.renewLock("drug_store_key_001");
3. Redisベースの分散ロック依存関係の導入<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
構成spring: redis: host: localhost port: 6379 password: xxxooo database: 8 lettuce: pool: maxActive: 8 maxIdle: 100 minIdle: 10 maxWait: -1
テストケーステスト コードは上記の JDBC ベースのものと同じです。ロックを呼び出すコードを変更するだけで済みます。 Lock lock = redisLockRegistry.obtain("001") ; ロックの有効期間を設定します。デフォルトは60秒です。 // 第三个参数设置了key的有效期,这里改成10s RedisLockRegistry redisLockRegistry = new RedisLockRegistry(connectionFactory, registryKey, 10000) ; 注: Redis キーの有効期間は 10 秒に設定されています。ビジネスの実行が 10 秒を超えると、プログラムはエラーを報告します。 Redis ウォッチドッグ メカニズムはありません。 Exception in thread "T - 0" java.lang.IllegalStateException: Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised. at org.springframework.integration.redis.util.RedisLockRegistry$RedisLock.unlock(RedisLockRegistry.java:450) at com.pack.SpringIntegrationDemoApplicationTests.lambda$1(SpringIntegrationDemoApplicationTests.java:83) at java.lang.Thread.run(Thread.java:748) 10 秒経過後にキーが自動的に削除される場合、他のスレッドはすぐにロックを取得できますか?単一ノードの場合、他のスレッドはロックを取得できず、前のスレッドが終了するまで待機する必要があります。これは、ReentrantLock ロックが内部的に維持され、分散ロックを取得する前にローカル ロックを取得する必要があるためです。 private abstract class RedisLock implements Lock { private final ReentrantLock localLock = new ReentrantLock(); public final void lock() { this.localLock.lock(); while (true) { try { if (tryRedisLock(-1L)) { return; } } catch (InterruptedException e) { } catch (Exception e) { this.localLock.unlock(); rethrowAsLockException(e); } } } } 注: データベースベースでもRedisベースでも、まずローカルロックを取得する必要があります。 Spring Cloud Task は、Spring Integration のデータベースベースのロックを使用します。 概要: Spring Integration の分散ロックは、分散システムで信頼性の高い同期を実現する効果的な方法を開発者に提供します。これらのロック実装を適切に選択して使用することで、共有リソースへのアクセスが複数のノード間で調整され、一貫性が保たれ、システム全体の信頼性とパフォーマンスが向上します。 完了! ! ! |