MySQL を使用して分散ロックを実装することを聞いたことがありますか?

MySQL を使用して分散ロックを実装することを聞いたことがありますか?

概要

以前、在庫管理システムに参加したことがあります。ビジネスの複雑さのため、それをサポートするために多くのアプリケーションが開発されました。この方法では、複数のアプリケーションが同時に在庫データを変更できるようになります。

たとえば、スケジュールされたタスク ドメイン xx.cron と SystemA ドメインおよび SystemB ドメインには複数の JAVA アプリケーションがあり、同じインベントリ データを同時に変更する可能性があります。調整が行われない場合、ダーティデータが表示されます。

JAVA プロセス間のスレッド調整には、DB や Redis などの外部環境を使用できます。以下では、DB を使用して分散ロックを実装する方法について説明します。

[[427766]]

デザイン

この記事で設計した分散ロックの相互作用モードは次のとおりです。

  • ビジネス フィールドに基づいて transaction_id を生成し、スレッドセーフな方法でロック リソースを作成します。
  • transaction_idに基づいてロックを申請する
  • ロックを解除

ロックリソースを動的に作成する

synchronized キーワードを使用する場合は、ロック オブジェクトを指定する必要があります。

  1. 同期(obj) {  
  2. }

プロセス内のスレッドは、obj に基づいて同期できます。ここでの obj はロック オブジェクトとして理解できます。スレッドが同期コード ブロックに入る場合は、まず obj オブジェクトのロックを保持する必要があります。このロックは JAVA に組み込まれたロックであり、作成プロセスはスレッドセーフです。それでは、DB の助けを借りて、ロックを作成するプロセスがスレッドセーフであることをどのように保証するのでしょうか?

DB で UNIQUE KEY 機能を使用できます。重複キーが出現すると、UNIQUE KEY の一意性により例外がスローされます。 JAVA では、SQLIntegrityConstraintViolationException です。

  1. 作成する テーブル分散ロック
  2. id BIGINT符号なしプライマリ  KEY AUTO_INCREMENT COMMENT '自動増分主キー' ,
  3. トランザクションID varchar (128) NOT   NULL  デフォルト  '' COMMENT 'トランザクションID'
  4. 最終更新時刻タイムスタンプ デフォルト 現在のタイムスタンプ の上 アップデート 現在のタイムスタンプ ない  NULL COMMENT '最終更新時間'
  5. create_time タイムスタンプ デフォルト  '0000-00-00 00:00:00'  ない  NULLコメント「作成時間」
  6. 個性的 キー`idx_transaction_id` (`transaction_id`)

transaction_idはトランザクションIDです。例えば、

倉庫 + バーコード + 販売モデル

特定の倉庫内の特定の販売モデルのバーコード リソースを表す transaction_id を組み立てます。もちろん、バーコードが異なれば、transaction_id も異なります。 2 つのアプリケーションが同じ transaction_id を使用してロック リソースを作成する場合、正常に作成できるのは 1 つのアプリケーションだけです。

distribution_lock レコードが正常に挿入された場合、ロック リソースが正常に作成されたことを意味します。

DB接続プールリストの設計

書き込み操作が頻繁に行われるビジネス システムでは、通常、単一のデータベースへの書き込みの負荷を軽減し、書き込み操作のスループットを向上させるために、データベース シャーディングが実行されます。サブデータベースを使用すると、ビジネスデータは当然各データベースに分散されます。

この水平にセグメント化されたマルチデータベースで DB 分散ロックを使用して、DataSouce リストをカスタマイズできます。また、transactionId に応じて対応する Connection を見つけるために getConnection(String transactionId) メソッドを公開します。

実装コードは次のとおりです。

  1. パッケージ dlock;
  2.  
  3. com.alibaba.druid.pool.DruidDataSource をインポートします。
  4. org.springframework.stereotype.Component をインポートします。
  5.  
  6. javax.annotation.PostConstruct をインポートします。
  7. java.io.FileInputStreamをインポートします。
  8. java.io.IOException をインポートします。
  9. java.sql.Connectionをインポートします
  10. java.util.ArrayList をインポートします。
  11. java.util.List をインポートします。
  12. java.util.Properties をインポートします。
  13.  
  14. @成分
  15. パブリッククラス DataSourcePool {
  16. プライベートList<DruidDataSource> dlockDataSources = 新しいArrayList<>();
  17.  
  18. @投稿コンストラクト
  19. プライベートvoid initDataSourceList()はIOExceptionをスローします{
  20. プロパティ properties = new Properties();
  21. FileInputStream fis = 新しい FileInputStream( "db.properties" );
  22. プロパティをロードします(fis);
  23.  
  24. 整数lockNum = Integer .valueOf(properties.getProperty( "DLOCK_NUM" ));
  25. ( int i = 0; i < lockNum; i++) {
  26. 文字列ユーザー= properties.getProperty( "DLOCK_USER_" + i);
  27. 文字列パスワード= properties.getProperty( "DLOCK_PASS_" + i);
  28. 整数initSize =整数.valueOf(properties.getProperty( "DLOCK_INIT_SIZE_" + i));
  29. 整数maxSize = Integer .valueOf(properties.getProperty( "DLOCK_MAX_SIZE_" + i));
  30. 文字列 url = properties.getProperty( "DLOCK_URL_" + i);
  31.  
  32. DruidDataSource dataSource = createDataSource(ユーザーパスワード、initSize、maxSize、url);
  33. dlockDataSources.add (データソース);
  34. }
  35. }
  36.  
  37. プライベート DruidDataSource createDataSource(String user 、String password Integer initSize、 Integer maxSize、String url) {
  38. DruidDataSource データソース = 新しい DruidDataSource();
  39. データソース.setDriverClassName( "com.mysql.jdbc.Driver" );
  40. dataSource.setUsername(ユーザー);
  41. dataSource.setPassword(パスワード);
  42. データソースのURLを設定します。
  43. dataSource.setInitialSize(initSize);
  44. データソースの最大サイズを設定します。
  45.  
  46. データソースを返します
  47. }
  48.  
  49. 公共 接続getConnection(String transactionId) は例外をスローします {
  50. dlockDataSources.size () <= 0 の場合
  51. 戻る ヌル;
  52. }
  53.  
  54. トランザクションIDがnullの場合トランザクションIDは""です。
  55. 新しい RuntimeException をスローします ( "transactionId が必要です" );
  56. }
  57.  
  58. int hascode = トランザクションID.hashCode();
  59. (ハスコード<0)の場合{
  60. hascode = -hascode;
  61. }
  62.  
  63. dlockDataSources.get( hascode % dlockDataSources.size ()).getConnection()を返します
  64. }
  65. }

まず、initDataSourceList メソッドを記述し、Spring の PostConstruct アノテーションを使用して DataSource リストを初期化します。関連する DB 構成は db.properties から読み取られます。

  1. DLOCK_NUM=2
  2.  
  3. DLOCK_USER_0 = "ユーザー1"  
  4. DLOCK_PASS_0= "パス1"  
  5. DLOCK_INIT_SIZE_0=2
  6. DLOCK_MAX_SIZE_0=10
  7. DLOCK_URL_0 = "jdbc:mysql://localhost:3306/test1"  
  8.  
  9. DLOCK_USER_1 = "ユーザー1"  
  10. DLOCK_PASS_1= "パス1"  
  11. DLOCK_INIT_SIZE_1=2
  12. DLOCK_MAX_SIZE_1=10
  13. DLOCK_URL_1 = "jdbc:mysql://localhost:3306/test2"  

DataSource は Alibaba の DruidDataSource を使用します。

次に最も重要なのは、getConnection(String transactionId) メソッドを実装することです。実装原理は非常にシンプルです。 transactionId のハッシュコードを取得し、DataSource の長さを法として計算します。

接続プール リストを設計したら、distributed_lock テーブルにデータを挿入できます。

  1. パッケージ dlock;
  2.  
  3. org.springframework.beans.factory.annotation.Autowired をインポートします。
  4. org.springframework.stereotype.Component をインポートします。
  5.  
  6. java.sql.* をインポートします。
  7.  
  8. @成分
  9. パブリッククラスDistributedLock {
  10.  
  11. オートワイヤード
  12. プライベート DataSourcePool dataSourcePool;
  13.  
  14. /**
  15. * トランザクションIDに基づいてロックリソースを作成する
  16. */
  17. パブリックString createLock(String transactionId) は例外をスローします{
  18. トランザクションIDnullの場合
  19. 新しい RuntimeException をスローします ( "transactionId が必要です" );
  20. }
  21. 繋がり 接続= null ;
  22. ステートメント statement = null ;
  23. 試す {
  24. 接続= dataSourcePool.getConnection(トランザクションID);
  25. 接続.setAutoCommit( false );
  26. ステートメント =接続.createStatement();
  27. ステートメント.executeUpdate( "INSERT INTOdistributed_lock(transaction_id) VALUES ('" + transactionId + "')" );
  28. 繋がり専念();
  29. トランザクションIDを返します
  30. }
  31. キャッチ(SQLIntegrityConstraintViolationException icv){
  32. // 説明はすでに生成されています。
  33. if (接続!= null ) {
  34. 接続.ロールバック( ) ;
  35. }
  36. トランザクションIDを返します
  37. }
  38. キャッチ(例外e){
  39. if (接続!= null ) {
  40. 接続.ロールバック( ) ;
  41. }
  42. eを投げる;
  43. }
  44. ついに {
  45. if (ステートメント != null ) {
  46. ステートメント.close () ;
  47. }
  48.  
  49. if (接続!= null ) {
  50. 接続を閉じます
  51. }
  52. }
  53. }
  54. }

トランザクションIDに基づいてスレッドをロックする

次に、DB の更新選択機能を使用してスレッドをロックします。複数のスレッドが同じ transactionId に基づいて同時に更新の選択操作を実行する場合、成功できるのは 1 つのスレッドのみであり、更新の選択を正常に実行したスレッドがコミット操作を使用するまで、他のスレッドはブロックされます。そうして初めて、ブロックされたスレッドの 1 つが動作を開始できるようになります。

上記の DistributedLock クラスにロック メソッドを作成します。

  1. パブリックブールロック(String transactionId)は例外をスローします{
  2. 繋がり 接続= null ;
  3. 準備されたステートメント 準備されたステートメント = null ;
  4. 結果セット resultSet = null ;
  5. 試す {
  6. 接続= dataSourcePool.getConnection(トランザクションID);
  7. preparedStatement = connection.prepareStatement ( "SELECT * FROM distribution_lock WHERE transaction_id = ? FOR UPDATE " );
  8. 準備されたステートメント。setString(1、トランザクションID);
  9. 結果セット = preparedStatement.executeQuery();
  10. if (! resultSet.next ()) {
  11. 接続.ロールバック( ) ;
  12. 戻る 間違い;
  13. }
  14. 戻る 真実;
  15. } キャッチ (例外 e) {
  16. if (接続!= null ) {
  17. 接続.ロールバック( ) ;
  18. }
  19. eを投げる;
  20. }
  21. ついに {
  22. 準備されたステートメントがnull場合
  23. 準備されたステートメントを閉じます() ;
  24. }
  25.  
  26. 結果セットがnull場合
  27. 結果セットを閉じます() ;
  28. }
  29.  
  30. if (接続!= null ) {
  31. 接続を閉じます
  32. }
  33. }
  34. }

ロック解除操作を実装する

スレッドがタスクを完了したら、以前にロックされていたスレッドが引き続き動作できるように、手動でロックを解除する必要があります。上記の実装では、実際には、その時点で更新に正常に選択されたスレッドに対応する接続​​を取得し、コミット操作を実行するだけです。

それで、どうやってそれを手に入れるのでしょうか? ThreadLocal を使用できます。まず、DistributedLockクラスで定義します

  1. プライベート ThreadLocal threadLocalConn = new ThreadLocal<>();

lock メソッドが呼び出されるたびに、Connection は ThreadLocal に配置されます。ロック方法を変更します。

  1. パブリックブールロック(String transactionId)は例外をスローします{
  2. 繋がり 接続= null ;
  3. 準備されたステートメント 準備されたステートメント = null ;
  4. 結果セット resultSet = null ;
  5. 試す {
  6. 接続= dataSourcePool.getConnection(トランザクションID);
  7. threadLocalConn.set (接続) ;
  8. preparedStatement = connection.prepareStatement ( "SELECT * FROM distribution_lock WHERE transaction_id = ? FOR UPDATE " );
  9. 準備されたステートメント。setString(1、トランザクションID);
  10. 結果セット = preparedStatement.executeQuery();
  11. if (! resultSet.next ()) {
  12. 接続.ロールバック( ) ;
  13. スレッドLocalConnを削除します。
  14. 戻る 間違い;
  15. }
  16. 戻る 真実;
  17. } キャッチ (例外 e) {
  18. if (接続!= null ) {
  19. 接続.ロールバック( ) ;
  20. スレッドLocalConnを削除します。
  21. }
  22. eを投げる;
  23. }
  24. ついに {
  25. 準備されたステートメントがnull場合
  26. 準備されたステートメントを閉じます() ;
  27. }
  28.  
  29. 結果セットがnull場合
  30. 結果セットを閉じます() ;
  31. }
  32.  
  33. if (接続!= null ) {
  34. 接続を閉じます
  35. }
  36. }
  37. }

このように、Connection を取得した後、ThreadLocal に設定します。 lock メソッドで例外が発生した場合は、ThreadLocal から削除します。

これらの手順で、ロック解除操作を実装できます。 DistributedLock に unlock メソッドを追加します。

  1. パブリックvoid unlock()は例外をスローします{
  2. 繋がり 接続= null ;
  3. 試す {
  4. 接続= threadLocalConn.get();
  5. if (!接続.isClosed()) {
  6. 繋がり専念();
  7. 接続を閉じます
  8. スレッドLocalConnを削除します。
  9. }
  10. } キャッチ (例外 e) {
  11. if (接続!= null ) {
  12. 接続.ロールバック( ) ;
  13. 接続を閉じます
  14. }
  15. スレッドLocalConnを削除します。
  16. eを投げる;
  17. }
  18. }

欠点

結局のところ、分散ロックを実装するために DB が使用されるため、DB には依然として一定の負荷がかかります。当時、配信に DB を使用することを検討した重要な理由は、私たちのアプリケーションがトラフィック量の少ないバックエンド アプリケーションであったことです。代わりに、在庫データの正確性を確保することが重要でした。在庫を占有するためにショッピング カートを追加するなどのフロントエンド在庫システムの場合、分散ロックを実装するために DB を使用しないことが最善です。

さらに考える

データの複数のコピーをロックしたい場合はどうすればよいでしょうか?たとえば、在庫操作では、実在庫と仮想在庫の両方を変更する必要があります。実在庫と仮想在庫の両方をロックします。実のところ、それほど難しいことではありません。 lock メソッドを参照して、multiLock メソッドを記述し、複数の transactionId 入力パラメータを指定して、for ループで処理します。後で時間ができたらこれを補います。

<<:  クラウドへの移行によって企業が経費を削減する方法

>>:  学習ノート - 分散型デジタル華容路(第2部)

推薦する

Kafka の適用可能なシナリオをネットワーク全体で最も包括的に図解で解説します。

メッセージングシステムメッセージング システムは、データ プロデューサーの分離や未処理メッセージのキ...

Henghost: US CN2 GIAラインVPSの簡単なレビュー。データからHenghostがいかに優れているかがわかります。

ヘンゴーストはどうですか? (恒創科技) 恒創は良いですか?香港と日本のデータセンターをテストした後...

百度のインデックスデータ減少の理由分析

2012年8月31日、インデックスされたSEOブログの数は203に達し、その後9月1日から今日まで、...

Baidu のアップデートで古いウェブサイトを安定させる方法

Baidu が更新されるたびに、ウェブサイトの一部が崩壊し、多くの古いウェブマスターは Baidu ...

Gラインデスクトップクラウドサービスプラットフォームの実践に関する簡単な説明

01 デスクトップクラウドテクノロジー入門従来のオフィス端末の欠点デジタル経済の発展に伴い、データセ...

ウェブサイトを効率的にインデックスする方法 [写真]

1-1: スタート前のポイントSEO は、検索エンジンの出現とともに生まれた統計分析として捉えるべき...

Baidu SEO ウェブサイトのトップ 10 の問題と解決策

(1)ウェブサイトリンク理由: ウェブサイトに外部リンクが不足しているか、外部リンクが徐々に減少して...

A5トピック:プロのオンラインショッピングの悪いレビューが横行し、タオバオの商店主が打撃を受ける

Taobao には数百万のオンライン ストアがあり、各ストアはこのオンライン取引プラットフォームで生...

SaaS の洞察: 中国の SaaS の過去と現在

[[398891]] SaaS は 20 世紀後半に誕生し、「古い」エンタープライズ アプリケーショ...

ユーザーの購買意欲に影響を与えるいくつかの要因がコンバージョン率を向上させる可能性がある

ウェブサイトがオンラインになってから最初の訪問者が到着し、その後一定量のトラフィックが確保された後、...

エッジとクラウド、あるいはエッジとクラウド: 今後の方向性は?

エッジ コンピューティングにより、データ処理がデータのソース、および結果として生じるアクションや決定...

テンセントの電子商取引が「トレンドに従わない」理由:エコシステムが再構築される可能性がある

深圳から辛元偉が報告「今年、当社のオンライン販売は初めて予想を下回る成長率を示しており、残念ながら8...

ウェブサイトのランキングを上げる鍵は、最適化のボトルネックを打破することです

SEO最適化を行うスタッフが最も望んでいるのは、ウェブサイトのランキングが継続的に上昇することです。...

SEOは早急に考え方を変える

部外者にとって、SEO は何年も変わっていないように見えます。近年、なぜ SEO のコストがどんどん...

個人ウェブマスターはウェブサイト記事の呪縛から抜け出さなければならない

多くの人が次のような質問をしているのをよく見かけます。「ウェブサイトは毎日何件の記事を更新すべきか?...