[[225288]] 概要 以前、在庫管理システムに参加したことがあります。ビジネスの複雑さのため、それをサポートするために多くのアプリケーションが開発されました。この方法では、複数のアプリケーションが同時に在庫データを変更できるようになります。たとえば、スケジュールされたタスク ドメイン xx.cron と SystemA ドメインおよび SystemB ドメインには複数の JAVA アプリケーションがあり、同じインベントリ データを同時に変更する可能性があります。調整が行われない場合、ダーティデータが表示されます。 JAVA プロセス間のスレッド調整には、DB や Redis などの外部環境を使用できます。以下では、DB を使用して分散ロックを実装する方法について説明します。 デザイン この記事で設計した分散ロックの相互作用モードは次のとおりです。 1. ビジネスフィールドに基づいてtransaction_idを生成し、スレッドセーフな方法でロックリソースを作成する 2. transaction_idに基づいてロックを申請する 3. ロックを解除する ロックリソースを動的に作成する synchronized キーワードを使用する場合は、ロック オブジェクトを指定する必要があります。 同期化(obj) { } プロセス内のスレッドは、obj に基づいて同期できます。ここでの obj はロック オブジェクトとして理解できます。スレッドが同期コード ブロックに入る場合は、まず obj オブジェクトのロックを保持する必要があります。このロックは JAVA に組み込まれたロックであり、作成プロセスはスレッドセーフです。それでは、DB の助けを借りて、ロックを作成するプロセスがスレッドセーフであることをどのように保証できるでしょうか? DB で UNIQUE KEY 機能を使用できます。重複キーが出現すると、UNIQUE KEY の一意性により例外がスローされます。 JAVA では、SQLIntegrityConstraintViolationException です。 - 作成する テーブル分散ロック(
- id BIGINT符号なしプライマリ KEY AUTO_INCREMENT COMMENT '自動増分主キー' ,
- トランザクションID varchar (128) NOT NULL デフォルト '' COMMENT 'トランザクションID' 、
- 最終更新時刻タイムスタンプ デフォルト 現在のタイムスタンプ の上 アップデート 現在のタイムスタンプ ない NULLコメント'***更新時間' 、
- create_time タイムスタンプ デフォルト '0000-00-00 00:00:00' ない NULLコメント「作成時間」 、
- 個性的 キー`idx_transaction_id` (`transaction_id`)
- )
transaction_idはトランザクションIDです。例えば、 倉庫 + バーコード + 販売モデル 特定の倉庫内の特定の販売モデルのバーコード リソースを表す transaction_id を組み立てます。もちろん、バーコードが異なれば、transaction_id も異なります。 2 つのアプリケーションが同じ transaction_id を使用してロック リソースを作成する場合、正常に作成できるのは 1 つのアプリケーションだけです。 distribution_lock レコードが正常に挿入された場合、ロック リソースが正常に作成されたことを意味します。 DB接続プールリストの設計 書き込み操作が頻繁に行われるビジネス システムでは、通常、単一のデータベースへの書き込みの負荷を軽減し、書き込み操作のスループットを向上させるために、データベース シャーディングが実行されます。サブデータベースを使用すると、ビジネスデータは当然各データベースに分散されます。この水平にセグメント化されたマルチデータベースで DB 分散ロックを使用して、DataSouce リストをカスタマイズできます。また、transactionId に応じて対応する Connection を見つけるために getConnection(String transactionId) メソッドを公開します。 実装コードは次のとおりです。 - パッケージ dlock;
- com.alibaba.druid.pool.DruidDataSource をインポートします。
- org.springframework.stereotype.Component をインポートします。
- javax.annotation.PostConstruct をインポートします。
- java.io.FileInputStreamをインポートします。
- java.io.IOException をインポートします。
- java.sql.Connectionをインポートします。
- java.util.ArrayList をインポートします。
- java.util.List をインポートします。
- java.util.Properties をインポートします。
-
- @成分
- パブリッククラス DataSourcePool {
- プライベートリスト dlockDataSources = new ArrayList();
-
- @投稿コンストラクト
- プライベートvoid initDataSourceList()はIOExceptionをスローします{
- プロパティ properties = new Properties();
- FileInputStream fis = 新しい FileInputStream( "db.properties" );
- プロパティをロードします(fis);
-
- 整数lockNum = Integer .valueOf(properties.getProperty( "DLOCK_NUM" ));
- ( int i = 0; i "DLOCK_USER_" + i) ;
- 文字列パスワード= properties.getProperty( "DLOCK_PASS_" + i);
- 整数initSize =整数.valueOf(properties.getProperty( "DLOCK_INIT_SIZE_" + i));
- 整数maxSize = Integer .valueOf(properties.getProperty( "DLOCK_MAX_SIZE_" + i));
- 文字列 url = properties.getProperty( "DLOCK_URL_" + i);
-
- DruidDataSource dataSource = createDataSource(ユーザー、パスワード、initSize、maxSize、url);
- dlockDataSources.add (データソース);
- }
- }
-
- プライベート DruidDataSource createDataSource(String user 、String password 、 Integer initSize、 Integer maxSize、String url) {
- DruidDataSource データソース = 新しい DruidDataSource();
- データソース.setDriverClassName( "com.mysql.jdbc.Driver" );
- dataSource.setUsername(ユーザー);
- dataSource.setPassword(パスワード);
- データソースのURLを設定します。
- dataSource.setInitialSize(initSize);
- データソースの最大サイズを設定します。
-
- データソースを返します。
- }
-
- 公共 接続getConnection(String transactionId) は例外をスローします {
- dlockDataSources.size () が 0 の場合
- 戻る ヌル;
- }
-
- トランザクションIDがnullの場合、トランザクションIDは""です。
- 新しい RuntimeException をスローします ( "transactionId が必要です" );
- }
- int hascode = トランザクションID.hashCode();
- (コードが0の場合) {
- hascode = -hascode;
- }
- dlockDataSources.get( hascode % dlockDataSources.size ()).getConnection()を返します。
- }
- }
まず、initDataSourceList メソッドを記述し、Spring の PostConstruct アノテーションを使用して DataSource リストを初期化します。関連する DB 構成は db.properties から読み取られます。 - DLOCK_NUM=2
- DLOCK_USER_0 = "ユーザー1"
- DLOCK_PASS_0= "パス1"
- DLOCK_INIT_SIZE_0=2
- DLOCK_MAX_SIZE_0=10
- DLOCK_URL_0 = "jdbc:mysql://localhost:3306/test1"
-
-
- DLOCK_USER_1 = "ユーザー1"
- DLOCK_PASS_1= "パス1"
- DLOCK_INIT_SIZE_1=2
- DLOCK_MAX_SIZE_1=10
- DLOCK_URL_1 = "jdbc:mysql://localhost:3306/test2"
DataSource は Alibaba の DruidDataSource を使用します。 次に最も重要なのは、getConnection(String transactionId) メソッドを実装することです。実装原理は非常にシンプルです。 transactionId のハッシュコードを取得し、DataSource の長さを法として計算します。 接続プール リストを設計したら、distributed_lock テーブルにデータを挿入できます。 - パッケージ dlock;
- org.springframework.beans.factory.annotation.Autowired をインポートします。
- org.springframework.stereotype.Component をインポートします。
-
- java.sql.* をインポートします。
-
- @成分
- パブリッククラスDistributedLock {
- オートワイヤード
- プライベート DataSourcePool dataSourcePool;
-
- /**
- * トランザクションIDに基づいてロックリソースを作成する
- */
- パブリックString createLock(String transactionId) は例外をスローします{
- トランザクションIDがnullの場合
- 新しい RuntimeException をスローします ( "transactionId が必要です" );
- }
- 繋がり 接続= null ;
- ステートメント statement = null ;
- 試す {
- 接続= dataSourcePool.getConnection(トランザクションID);
- 接続.setAutoCommit( false );
- ステートメント =接続.createStatement();
- ステートメント.executeUpdate( "INSERT INTOdistributed_lock(transaction_id) VALUES ('" + transactionId + "')" );
- 繋がり。専念();
- トランザクションIDを返します。
- }
- キャッチ(SQLIntegrityConstraintViolationException icv){
- // 説明はすでに生成されています。
- if (接続!= null ) {
- 接続.ロールバック( ) ;
- }
- トランザクションIDを返します。
- }
- キャッチ(例外e){
- if (接続!= null ) {
- 接続.ロールバック( ) ;
- }
- eを投げる;
- }
- ついに {
- if (ステートメント != null ) {
- ステートメント.close () ;
- }
-
- if (接続!= null ) {
- 接続を閉じます。
- }
- }
- }
- }
トランザクションIDに基づいてスレッドをロックする 次に、DB の更新選択機能を使用してスレッドをロックします。複数のスレッドが同じ transactionId に基づいて同時に更新の選択操作を実行する場合、成功できるのは 1 つのスレッドのみであり、更新の選択を正常に実行したスレッドがコミット操作を使用するまで、他のスレッドはブロックされます。そうして初めて、ブロックされたスレッドの 1 つが動作を開始できるようになります。上記の DistributedLock クラスにロック メソッドを作成します。 - パブリックブールロック(String transactionId)は例外をスローします{
- 繋がり 接続= null ;
- 準備されたステートメント 準備されたステートメント = null ;
- 結果セット resultSet = null ;
- 試す {
- 接続= dataSourcePool.getConnection(トランザクションID);
- preparedStatement = connection.prepareStatement ( "SELECT * FROM distribution_lock WHERE transaction_id = ? FOR UPDATE " );
- 準備されたステートメント。setString(1、トランザクションID);
- 結果セット = preparedStatement.executeQuery();
- if (! resultSet.next ()) {
- 接続.ロールバック( ) ;
- 戻る 間違い;
- }
- 戻る 真実;
- } キャッチ (例外 e) {
- if (接続!= null ) {
- 接続.ロールバック( ) ;
- }
- eを投げる;
- }
- ついに {
- 準備されたステートメントがnullの場合
- 準備されたステートメントを閉じます() ;
- }
-
- 結果セットがnullの場合
- 結果セットを閉じます() ;
- }
-
- if (接続!= null ) {
- 接続を閉じます。
- }
- }
- }
ロック解除操作を実装する スレッドがタスクを完了したら、以前にロックされていたスレッドが引き続き動作できるように、手動でロックを解除する必要があります。上記の実装では、実際には、その時点で更新に正常に選択されたスレッドに対応する接続を取得し、コミット操作を実行するだけです。 それでどうやってそれを手に入れるのですか? ThreadLocal を使用できます。まず、DistributedLockクラスで定義します - プライベート ThreadLocal threadLocalConn = new ThreadLocal();
lock メソッドが呼び出されるたびに、Connection は ThreadLocal に配置されます。ロック方法を変更します。 - パブリックブールロック(String transactionId)は例外をスローします{
- 繋がり 接続= null ;
- 準備されたステートメント 準備されたステートメント = null ;
- 結果セット resultSet = null ;
- 試す {
- 接続= dataSourcePool.getConnection(トランザクションID);
- threadLocalConn.set (接続) ;
- preparedStatement = connection.prepareStatement ( "SELECT * FROM distribution_lock WHERE transaction_id = ? FOR UPDATE " );
- 準備されたステートメント。setString(1、トランザクションID);
- 結果セット = preparedStatement.executeQuery();
- if (! resultSet.next ()) {
- 接続.ロールバック( ) ;
- スレッドLocalConnを削除します。
- 戻る 間違い;
- }
- 戻る 真実;
- } キャッチ (例外 e) {
- if (接続!= null ) {
- 接続.ロールバック( ) ;
- スレッドLocalConnを削除します。
- }
- eを投げる;
- }
- ついに {
- 準備されたステートメントがnullの場合
- 準備されたステートメントを閉じます() ;
- }
- 結果セットがnullの場合
- 結果セットを閉じます() ;
- }
-
- if (接続!= null ) {
- 接続を閉じます。
- }
- }
- }
このように、Connection を取得した後、ThreadLocal に設定します。 lock メソッドで例外が発生した場合は、ThreadLocal から削除します。 これらの手順で、ロック解除操作を実装できます。 DistributedLock に unlock メソッドを追加します。 - パブリックvoid unlock()は例外をスローします{
- 繋がり 接続= null ;
- 試す {
- 接続= threadLocalConn.get();
- if (!接続.isClosed()) {
- 繋がり。専念();
- 接続を閉じます。
- スレッドLocalConnを削除します。
- }
- } キャッチ (例外 e) {
- if (接続!= null ) {
- 接続.ロールバック( ) ;
- 接続を閉じます。
- }
- スレッドLocalConnを削除します。
- eを投げる;
- }
- }
欠点 結局のところ、分散ロックを実装するために DB が使用されるため、DB には依然として一定の負荷がかかります。当時、配信に DB を使用することを検討した重要な理由は、私たちのアプリケーションがトラフィック量の少ないバックエンド アプリケーションであったことです。代わりに、在庫データの正確性を確保することが重要でした。ショッピングカートを追加して在庫を占有するなどのフロントエンド在庫システムでは、分散ロックを実装するために DB を使用しないことが最善です。 さらに考える データの複数のコピーをロックするにはどうすればよいですか?たとえば、特定の在庫操作では、実在庫と仮想在庫の両方を変更する必要があります。実在庫と仮想在庫の両方をロックします。実のところ、それほど難しいことではありません。 lock メソッドを参照して、multiLock メソッドを記述し、複数の transactionId 入力パラメータを指定して、for ループで処理します。後で時間ができたらこれを補います。 |