導入ビジネス規模の継続的な拡大と技術アーキテクチャの進化により、分散システムは、高同時実行性と大量データ処理をサポートする重要なインフラストラクチャになりました。分散環境では、各ノードは比較的独立しており、タスクを同時に実行できるため、システム全体のパフォーマンスと可用性が大幅に向上します。共有リソースにアクセスして変更する場合、データの一貫性と正確性を確保するために、複数のノード間での同時操作を調整する技術的な手段、つまり分散ロックが必要です。 従来のスタンドアロン環境では、ローカル ロックを通じてプロセス内でクリティカル セクション リソースへの相互排他的アクセスを簡単に実現できます。ただし、単一マシンのロックはネットワーク境界を越えることができず、異なるノード間の同時制御を保証できないため、この方法は分散システムには適用できなくなりました。このコンテキストで分散ロックが作成されました。分散システム内の複数のノード間の共同作業を可能にするロック メカニズムです。これらは、同時実行の競合から共有リソースを保護し、複雑な分散シナリオでのデータ操作の秩序性と一貫性を確保することを目的としています。 在庫控除WMS システムにおける受注入庫と出荷の在庫操作を例に考えてみましょう。 CREATE TABLE `tb_inventory` ( `id` BIGINT NOT NULL AUTO_INCREMENT, `account_id` BIGINT NOT NULL DEFAULT 0 COMMENT '帐套ID', `sku` VARCHAR(128) NOT NULL DEFAULT '' COMMENT '商品sku编码', `warehouse_code` VARCHAR(16) NOT NULL DEFAULT '' COMMENT '库存编码', `available_inventory` INT UNSIGNED NOT NULL DEFAULT 0 COMMENT '可用库存', `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `deleted` TINYINT UNSIGNED NULL DEFAULT 0 COMMENT '0-未删除1/null-已删除', PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY uk_warehouse_code (customer_no, warehouse_code, sku, deleted) ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COMMENT = '库存表'; 在庫テーブルは例として使用されているため、実際のビジネス参照値はありません。
在庫管理に関しては、よくある間違いがいくつかあります。 1. メモリ内の在庫が十分であるかどうかを判断し、控除を完了するメモリ内に在庫があるかどうかを直接判定し、差し引いた値を計算し、データベースを更新します。同時実行の場合、インベントリは上書きされます。 /** * 确认订单出库* * @param customerNo * @param orderNo */ @Transactional(rollbackFor = Exception.class) @Override public void confirmOrder(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); // 忽略订单信息校验等,,, // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); // 判断库存是否足够if (qty > availableInventory){ throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 剩余库存Integer remainInventory = availableInventory - qty; // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); updateInventory.setAvailableInventory(remainInventory); tbInventoryMapper.updateInventory(updateInventory); } SQLで直接在庫更新を実行する <update id="updateInventory"> UPDATE tb_inventory SET available_inventory = #{availableInventory} WHERE sku = #{sku} AND customer_no = #{customerNo} AND warehouse_code = #{warehouseCode} AND deleted = 0 </update>
在庫 SKU の在庫がマイナスになりました: 写真 2. メモリ内に在庫が十分にあるかどうかを判断し、SQLで在庫減算を実行します。 InnoDB ストレージ エンジンでは、UPDATE では通常行ロックが適用されるため、値の上書きを回避するための操作が SQL に追加されますが、それでも在庫数量がマイナスになる可能性があります。在庫が十分かどうかのチェックはメモリ内で実行されるため、すべての同時状況で在庫が読み取られます。 /** * 确认订单出库* * @param customerNo * @param orderNo */ @Transactional(rollbackFor = Exception.class) @Override public void confirmOrder(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); // 忽略订单信息校验等,,, // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); // 判断库存是否足够if (qty > availableInventory){ throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); // 库存差值updateInventory.setDiffInventory(qty); tbInventoryMapper.updateInventory(updateInventory); } SQL での在庫減額 <update id="updateInventory"> UPDATE tb_inventory SET available_inventory = available_inventory - #{diffInventory} WHERE sku = #{sku} AND customer_no = #{customerNo} AND warehouse_code = #{warehouseCode} AND deleted = 0 </update>
在庫 SKU の在庫がマイナスになりました: 写真 在庫操作方法の同期を使用するsynchronized は、マルチ同時実行環境で複数のスレッドが在庫操作メソッドに同時にアクセスすることを防ぐことができますが、メソッドが終了すると synchronized の効果は無効になります。この時点ではトランザクションがコミットされない可能性があり、その結果、他のスレッドがロックを取得した後に古い在庫データを読み取ることになります。控除を実行すると、誤った在庫控除が発生する可能性があります。 /** * 确认订单出库* * @param customerNo * @param orderNo */ @Transactional(rollbackFor = Exception.class) @Override public synchronized void confirmOrder(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); // 忽略订单信息校验等,,, // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); // 判断库存是否足够if (qty > availableInventory){ throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); // 库存差值updateInventory.setDiffInventory(qty); tbInventoryMapper.updateInventory(updateInventory); } 在庫 SKU の在庫がマイナスになりました: 写真 上記のエラー事例から、在庫操作がアトミックではなく、在庫操作が失敗することがわかります。以下では、単一システムと分散システムという 2 つの観点からデータの一貫性と正確性を確保する方法について説明します。 スタンドアロンシステムスタンドアロン システムでは、データとビジネス ロジックが 1 つのプロセスに集中します。共有リソースへの同時アクセスが発生する場合、データの正確性と一貫性を維持するために、ロック メカニズムとデータベース トランザクション管理 (行ロック) に依存する必要があります。 ロック メカニズムについては、synchronized または Lock のどちらを使用する場合でも、データベース トランザクションがロックの制御範囲内にあることを確認する必要があります。 上記のエラーの場合、トランザクション外でロックを適用することができます。つまり、在庫操作メソッドの上位層(サービス層など)にロックを配置します。 @Service public class OrderServiceImpl implements IOrderService { private IOrderManager orderManager; /** * 确认订单出库* * @param customerNo * @param orderNo */ @Override public synchronized void confirmOrder(String customerNo, String orderNo) { orderManager.confirmOrder(customerNo, orderNo); } @Autowired public void setOrderManager(IOrderManager orderManager) { this.orderManager = orderManager; } } この時点で在庫を操作すると、在庫不足のため在庫操作が失敗します。 写真 この方法はデータの一貫性と正確性を実現できますが、トランザクションの粒度は可能な限り小さくする必要があるため、推奨されません。 推奨される方法は、ロックされた制御スコープ内でトランザクションをコミットすることです。トランザクションを手動でコミットします。トランザクションを手動で管理するには、TransactionTemplate を使用するか、コード内で PlatformTransactionManager の getTransaction メソッドと commit メソッドを直接呼び出します。 @Autowired private PlatformTransactionManager transactionManager; /** * 确认订单出库* * @param customerNo * @param orderNo */ @Override public synchronized void confirmOrder(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition()); // 忽略订单信息校验等,,, // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); // 判断库存是否足够if (qty > availableInventory){ System.err.println("库存不足,不能出库"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); // 库存差值updateInventory.setDiffInventory(qty); tbInventoryMapper.updateInventory(updateInventory); // 提交事务transactionManager.commit(status); } この時点で、在庫操作を再度実行しようとすると、在庫不足のため在庫操作は失敗します。 写真 上記の同期ロックの実装には、同期ロジックをより細かく制御できる Lock を使用して実装するのが最適です。 @Autowired private PlatformTransactionManager transactionManager; private final Lock orderLock = new ReentrantLock(); /** * 确认订单出库* * @param customerNo * @param orderNo */ @Override public void confirmOrder(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); try { // 尝试获取锁,最多等待timeout时间if (orderLock.tryLock(1, TimeUnit.SECONDS)) { // 成功获取到锁,执行确认订单的逻辑TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition()); try { // 忽略订单信息校验等,,, // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); // 判断库存是否足够if (qty > availableInventory){ System.err.println("库存不足,不能出库"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); // 库存差值updateInventory.setDiffInventory(qty); tbInventoryMapper.updateInventory(updateInventory); // 提交事务transactionManager.commit(status); }catch (Exception e){ // 回滚事务transactionManager.rollback(status); // 处理异常e.printStackTrace(); }finally { // 释放锁orderLock.unlock(); } } else { // 获取锁超时System.out.println("Failed to confirm order within the timeout period: " +orderNo); // 处理超时情况,比如记录日志、通知用户等} } catch (InterruptedException e) { // 如果在等待锁的过程中线程被中断,处理中断异常Thread.currentThread().interrupt(); // ... 处理中断逻辑... } } スタンドアロン システムでは、上記の方法によりデータの一貫性と正確性が保証されます。しかし、実際のビジネスでは、アプリケーションは通常、複数のサーバーに展開されます。この場合、上記の解決策は保証できず、解決するには分散ロックが必要になります。 分散ロックの実装スタンドアロン システムでは、ロックは複数のスレッドによる共有リソースへの同時アクセスを制御するために使用される基本的な同期メカニズムです。分散システムにアップグレードすると、サービスが複数のノードに分散されたため、単一マシン環境で最初に使用されていたロック メカニズムでは、複数のノード間でリソース アクセスを直接調整することができませんでした。そこで、このとき、拡張されたロックの概念として分散ロックが誕生しました。分散ロックは、複数のノード、プロセス、またはサービスにわたる同期プリミティブであり、分散システム内の共有リソースへのアクセスを調整して、ノードが異なる物理マシンまたは仮想マシンに分散されている場合でも、常に 1 つのノードだけが排他的に操作を実行できるようにします。 分散ロックの基本要素1. 相互排他: これは分散ロックの最も基本的な要件であり、常に 1 つのクライアント (プロセス、スレッド、またはサービス インスタンス) のみがロックを保持して使用できるため、共有リソースが複数のクライアントによって同時に変更されないことが保証されます。 2. 永続性: 分散ロックには、ある程度の永続性が必要です。サービスが再起動されたり、ネットワークが一時的に切断されたりしても、ロック状態は維持されます。 3. 再入可能性: スタンドアロン環境の再入可能ロックと同様に、分散ロックでは、同じクライアントがロックを保持したままブロックされずに再度ロックを要求できるようにサポートする必要があります。これは、再帰呼び出しや複数のリソース アクセスを伴う操作にとって非常に重要です。 4. 公平性: シナリオによっては、ロックの割り当ては特定の公平性の原則に従う必要があります。つまり、最も長く待機していたクライアントが、ロックが解放されたときに優先的にロックを取得します。すべての分散ロック実装で公平性を考慮する必要はありませんが、一部の高性能または高同時実行システムでは公平性が非常に重要です。 5. フォールト トレランス: 分散ロック サービスには、ある程度のフォールト トレランスが必要です。つまり、一部のサービス ノードに障害が発生しても、ロック機能の正しい動作が保証され、デッドロックやデータの不整合を防ぐことができます。これは通常、Raft や Paxos などのコンセンサス プロトコルや、ZooKeeper、etcd などに基づく分散調整サービスの使用など、サービス冗長性とレプリケーション メカニズムを通じて実現されます。 一般的な分散ロックソリューションデータベースベースの実装1. データベース悲観的ロック悲観的ロックは、同時アクセスによって発生するデータ競合が標準であると想定して、予防的な戦略で同時実行競合を処理します。したがって、データにアクセスする前に、ロックが解除されるまで他のトランザクションが同じデータにアクセスできないように、ロックを積極的に取得して保持します。 SELECT ... FOR UPDATE SQL ステートメントを使用すると、クエリ フェーズ中に関連する行をロックして、データへの排他的アクセスを実現できます。ただし、この操作は一意のキーに対してのみ実行する必要があることに注意することが重要です。そうしないと、ロックの範囲が大幅に拡大し、テーブル ロックのリスクが潜在的に増加し、システムの同時パフォーマンスと効率に影響する可能性があります。 最も一般的なアプローチは、ビジネス データに対して直接 SELECT ... FOR UPDATE を使用することです。次に例を示します。 <select id="selectSkuInventoryForUpdate" resultType="com.springboot.mybatis.entity.TbInventoryDO"> SELECT * FROM tb_inventory WHERE sku = #{sku} AND customer_no = #{customerNo} AND warehouse_code = #{warehouseCode} AND deleted = 0 FOR UPDATE </select> トランザクションでは、更新を実行する前に SELECT ... FOR UPDATE を使用します。 /** * 使用SELECT... FOR UPDATE 实现分布式锁,扣减库存* @param customerNo * @param orderNo */ @Transactional(rollbackFor = Exception.class) @Override public void confirmOrderWithLock(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventoryForUpdate(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); // 判断库存是否足够if (qty > availableInventory){ System.err.println("库存不足,不能出库"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); // 库存差值updateInventory.setDiffInventory(qty); tbInventoryMapper.updateInventory(updateInventory); } ただし、この実装方法では、特にデータ量が多く同時実行性が高い場合に、ビジネス テーブルにロック圧力がかかりやすくなります。したがって、別の方法としては、ビジネス データ テーブルで直接 SELECT FOR UPDATE を使用する代わりに、専用のロック テーブルを維持するという方法があります。このアプローチは、いくつかのシナリオでロック管理を簡素化するのに役立ち、ビジネス データ テーブルに対するロックの負荷をある程度軽減できます。 (実は実装方法はRedisで実装されている分散ロックと似ていますが、データベースを使って実装されています)。実装プロセスは次のとおりです。 データベースは悲観的ロックプロセスを実装する 1. ロック テーブルを作成する: まず、lock_key (ロックする必要があるビジネス リソースを識別するために使用)、lock_holder (ユーザー ID やトランザクション ID など、ロックを保持しているクライアント識別子)、acquire_time (ロックが取得された時刻) などのフィールドを含む lock_table などのロック テーブルを作成します。 CREATE TABLE `tb_lock` ( id BIGINT AUTO_INCREMENT PRIMARY KEY, lock_key VARCHAR(255) NOT NULL DEFAULT '' COMMENT '锁的业务编码。对应业务表的唯一键', lock_holder VARCHAR(32) NOT NULL DEFAULT '' COMMENT '持有锁的客户端标识', acquire_time DATETIME NOT NULL COMMENT '获取锁的时间', create_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL COMMENT '创建时间', update_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', deleted TINYINT UNSIGNED DEFAULT '0' NULL COMMENT '0-未删除1/null-已删除', UNIQUE KEY uk_lock (lock_key, deleted) ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COMMENT = 'Lock表'; - ロック レコードの挿入: クライアントがロックを取得しようとする場合、lock_table にレコードを挿入しようとします。ここで、lock_key は、製品 SKU など、保護する必要があるビジネス リソースに対応します。挿入操作は通常、INSERT INTO ... ON DUPLICATE KEY UPDATE などのステートメントを通じて実装され、同じロック キーが存在する場合はレコードが更新され、そうでない場合は新しいレコードが挿入されます。これはロックを取得することと同じです。
<insert id="insertLock"> INSERT INTO tb_lock (lock_key,lock_holder,acquire_time) VALUES (#{lockKey},#{lockHolder},#{acquireTime}) </insert> - SELECT FOR UPDATE の使用: ロック レコードを挿入するときに、SELECT ... FOR UPDATE を使用してロック テーブル内の対応するレコードをロックし、現在のトランザクションが終了する前に他のトランザクションがロック レコードを更新または削除できないようにすることができます。
<select id="selectLockByLockKey" resultType="com.springboot.mybatis.entity.TbLockDO"> SELECT * FROM tb_lock WHERE lock_key = #{lockKey} AND deleted = 0 FOR UPDATE </select> 4. ロックの状態を確認する: ロックを取得するときに、lock_holder フィールドを確認するなどして、ロックがすでに保持されているかどうかを確認できます。別のトランザクションがすでにロックを保持している場合は、ロックの取得は失敗し、待機するか再試行する必要があります。 // 尝试获取锁tryLock(lockKey, lockHolder); // 使用SELECT FOR UPDATE锁定锁表记录TbLockDO tbLockDO = tbLockMapper.selectLockByLockKey(lockKey); if (!tbLockDO.getLockHolder().equals(lockHolder)) { // 锁已被其他客户端持有,获取锁失败,需要处理此异常情况throw new IllegalStateException("Lock is held by another client."); } - ロックを解除する: 業務操作が完了したら、ロック テーブル内の対応するレコードを削除または更新することでロックを解除できます。
<delete id="deleteLockByLockKey" parameterType="java.lang.String"> DELETE FROM tb_lock WHERE lock_key = #{lockKey} AND lock_holder = #{lockHolder} AND deleted = 0 </delete> データベースの悲観的ロックの実装に基づくコードは次のようになります。 @Transactional(rollbackFor = Exception.class) @Override public void confirmOrderWithLock(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku); String lockHolder = Thread.currentThread().getName(); try { // 尝试获取锁tryLock(lockKey, lockHolder); // 使用SELECT FOR UPDATE锁定锁表记录TbLockDO tbLockDO = tbLockMapper.selectLockByLockKey(lockKey); if (!tbLockDO.getLockHolder().equals(lockHolder)) { // 锁已被其他客户端持有,获取锁失败,需要处理此异常情况throw new IllegalStateException("Lock is held by another client."); } // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventoryForUpdate(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); // 判断库存是否足够if (qty > availableInventory){ System.err.println("库存不足,不能出库"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); // 库存差值updateInventory.setDiffInventory(qty); tbInventoryMapper.updateInventory(updateInventory); }finally { unlock(lockKey, lockHolder); } } /** * 尝试获取锁* @param lockKey 锁的key 业务编码* @param lockHolder 锁的持有者* @return 是否获取成功*/ private void tryLock(String lockKey, String lockHolder) { TbLockDO tbLockDO = new TbLockDO(); tbLockDO.setLockKey(lockKey); tbLockDO.setLockHolder(lockHolder); tbLockDO.setAcquireTime(LocalDateTime.now()); //插入一条数据insert into tbLockMapper.insertLock(tbLockDO); } /** * 锁释放* @param lockKey 锁的key 业务编码*/ private void unlock(String lockKey, String lockHolder){ tbLockMapper.deleteLockByLockKey(lockKey, lockHolder); } 写真 データベースの悲観的ロックは、同時実行の競合を防ぎ、トランザクションが終了する前にこれらのレコードが他の同時トランザクションによって変更されないことを保証する分散ロックを実装します。また、ロックの粒度を制御し、行レベルのロックを提供し、ロックの範囲を縮小し、同時実行パフォーマンスを向上させることもできます。このアプローチは、更新を必要とするトランザクション シナリオ、特に銀行振込、在庫控除、およびデータの整合性と一貫性を必要とするその他の操作を処理するのに非常に適しています。 ただし、SELECT FOR UPDATE を過度に使用したり不適切に使用したりすると、より多くの行がロックされることに注意してください。同時実行性の高いシナリオでは、多数のトランザクションがロックの取得を待機している場合、ロック待機やデッドロックの問題が発生する可能性があります。さらに、トランザクションが SELECT FOR UPDATE ロックを保持している場合、これらのロックされた行を変更しようとする他のトランザクションは、ロックが解除されるまで待機状態になります。これにより、他のトランザクションが遅延し、システムのスループットが低下する可能性があります。ロックを長時間保持すると、データベース リソース (メモリ、接続数など) の消費量が増加します。特に、長いトランザクションでロックを長時間保持すると、システム全体のパフォーマンスに影響します。したがって、長いトランザクションでは悲観的ロックを使用しないように特に注意する必要があります。 2. データベースの楽観的ロック楽観的ロックでは、同時実行の競合が発生する可能性は低いと想定されるため、データの読み取り時にはリソースはロックされませんが、データの更新時には、データが他のトランザクションによって変更されたかどうかが検証されます。 データベース テーブルにバージョン フィールドを追加します。 ALTER TABLE `tb_inventory` ADD COLUMN `version` INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本' AFTER available_inventory; バージョン フィールドは、更新が行われるたびに 1 ずつ増加します。データを更新するときは、UPDATE ステートメントに WHERE version = oldVersion 条件を追加します。バージョン値が変更されていない場合にのみ、更新操作は成功します。バージョンが変更された場合、データは他のトランザクションによって変更されており、更新は失敗したことを意味します。 <update id="updateInventorWithVersion"> UPDATE tb_inventory SET available_inventory = available_inventory - #{diffInventory}, version = #{version} + 1 WHERE sku = #{sku} AND customer_no = #{customerNo} AND warehouse_code = #{warehouseCode} AND version = #{version} AND deleted = 0 </update> 楽観的ロックに基づくソリューション: @Transactional(rollbackFor = Exception.class) @Override public void confirmOrderWithVersion(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); Integer curVersion = inventoryDO.getVersion(); // 判断库存是否足够if (qty > availableInventory){ System.err.println("库存不足,不能出库"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); // 设置当前数据版本号updateInventory.setVersion(curVersion); // 库存差值updateInventory.setDiffInventory(qty); updateInventory.setVersion(inventoryDO.getVersion()); int updateRows = tbInventoryMapper.updateInventorWithVersion(updateInventory); if (updateRows != 1){ System.err.println("更新库存时发生并发冲突,请重试"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新库存时发生并发冲突,请重试"); } } 写真 楽観的ロックでは、ほとんどの場合同時実行の競合は発生しないと想定されるため、データの読み取り時にすぐにロックされることはありません。代わりに、データが更新されるまで待機し、他のトランザクションによって変更が行われたかどうかを確認します。これにより、ロック保持時間が短縮され、システムの同時実行パフォーマンスが向上します。さらに、楽観的ロックでは、データが取得されるときにロックするのではなく、データが更新されるときにのみ競合がチェックされるため、デッドロックのリスクが大幅に軽減されます。また、ロックが頻繁に追加されないため、データベース レベルでのロック管理のオーバーヘッドが削減され、読み取りが多く書き込みが少ないシナリオに非常に適しています。 ただし、同時書き込みが多数ある場合、多数の更新競合が発生する可能性があり、更新を成功させるにはトランザクションを継続的に再試行する必要があります。再試行が多すぎると、特に同時実行性が非常に高い場合にパフォーマンスが低下し、「ABA」問題が発生する可能性があります。また、極端な同時実行条件下では、正しい再試行メカニズムやタイムアウト メカニズムがなければ、楽観的ロックでは強力な一貫性を保証できない可能性があります。特に、複数のテーブルを含む複雑なトランザクションでは、すべての並行性の問題を解決するのに十分な単一の楽観的ロックでは不十分な場合があります。 Redisに基づく実装1.Redis SetNX実装Redisのsetnx(存在しない場合は設定)コマンドは原子操作です。キーが存在しない場合にのみ値を設定します。設定が成功した場合にtrueを返し、それ以外の場合はfalseを返します。このコマンドは、Redisのロックをすばやくつかむことができます。 Redisを使用して、キーの一部として一意のロックIDを生成できます。次に、setNxを使用して、値が有効期限のあるタイムスタンプになる可能性のあるキー値ペアを設定しようとします。設定が成功した場合、ロックは正常に取得されると見なされ、ビジネスロジックが実行されます。ビジネスロジックが完了したら、対応するキーを削除してロックをリリースするか、有効期限を自動的にリリースするために有効期限を設定します。 @Slf4j public class RedisDistributedLock implements AutoCloseable{ private final StringRedisTemplate stringRedisTemplate; private final DefaultRedisScript<Boolean> unlockScript; /**锁的key*/ private final String lockKey; /**锁过期时间*/ private final Integer expireTime; private static final String UNLOCK_LUA_SCRIPT = "if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n" + " return redis.call(\"del\", KEYS[1])\n" + "else\n" + " return 0\n" + "end"; public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockKey, Integer expireTime) { this.stringRedisTemplate = stringRedisTemplate; this.lockKey = lockKey; this.expireTime = expireTime; // 初始化Lua解锁脚本this.unlockScript = new DefaultRedisScript<>(); unlockScript.setScriptText(UNLOCK_LUA_SCRIPT); unlockScript.setResultType(Boolean.class); } /** * 获取锁* @return 是否获取成功*/ public Boolean getLock() { String value = UUID.randomUUID().toString(); try { return stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS); } catch (Exception e) { log.error("获取分布式锁失败: {}", e.getMessage()); return false; } } /** * 释放锁* @return 是否释放成功*/ public Boolean unLock() { // 使用Lua脚本进行解锁操作List<String> keys = Collections.singletonList(lockKey); Object result = stringRedisTemplate.execute(unlockScript, keys, stringRedisTemplate.opsForValue().get(lockKey)); boolean unlocked = (Boolean) result; log.info("释放锁的结果: {}", unlocked); return unlocked; } @Override public void close() throws Exception { unLock(); } } 次に、在庫を処理するとき、最初にロックを取得しようとします。ロックが取得された場合、インベントリを更新できます。 @Transactional(rollbackFor = Exception.class) @Override public void confirmOrderWithRedisNx(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku); // 30秒过期try (RedisDistributedLock lock = new RedisDistributedLock(stringRedisTemplate, lockKey, 30)) { if (lock.getLock()) { // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); // 判断库存是否足够if (qty > availableInventory){ System.err.println("库存不足,不能出库"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); // 库存差值updateInventory.setDiffInventory(qty); tbInventoryMapper.updateInventory(updateInventory); } else { log.error("更新库存时发生并发冲突,请重试"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新库存时发生并发冲突,请重试"); } } catch (Exception e) { log.error("处理分布式锁时发生错误: {}", e.getMessage()); } } 写真 インメモリデータベースとして、Redisは迅速に動作し、SetNXの実行時間はほとんど無視できます。これは、高電流シナリオのロック要求に特に適しています。スタンドアロンサービスとして、Redisは異なるプロセスまたはサーバー間でMutexロックを簡単に実装できます。 SETNXコマンドはアトミック操作であり、Redisの単一スレッド環境でのロックの取得を達成でき、単純なコマンドラインがロックの競争を達成できます。同時に、EXまたはPXパラメーターを使用して、予期しない状況によって引き起こされるデッドロックを避けるために、ロックを設定するときに有効期限を設定することができます。 ただし、SetNXのみを使用することで自動的に更新することはできません。ロックが期限切れになり、積極的にリリースされないと、ロックは他のクライアントによって誤って取得される可能性があります。時計とマルチコマンドの組み合わせを使用するなど、ロックの追加の自動更新メカニズムが必要です。また、設定キー値pxミリオリオ秒nx xxなどのセットコマンドの新しいパラメーターが必要です。また、setnxはロックを取得できない場合にすぐに戻ります。したがって、ロックを常に取得しようとするために、遅延再試行戦略を投票または使用する必要があります。また、複数のクライアントが同時にロックを要求した場合、Redisは特定のキューイング命令を保証しません。 RedisのSETNXコマンドは、分散ロックの実装において利便性と高性能を提供し、堅牢で信頼できる分散ロックソリューションを構築するために、他のコマンド(期限切れ、Multi/Execなど)を組み合わせて、さまざまなエッジのケースと断層許容メカニズムを考慮に入れる必要があることがよくあります。一部の成熟したRedisクライアントライブラリ(Redisson、Jedisなど)は、カプセル化された分散ロックの実装を提供し、上記の問題の多くを解決します。 Redissonの実装に基づいていますRedissonは、高性能のオープンソースJavaレジデントメモリデータグリッドです。 Redisに基づいており、分散したデータ構造と分散ロック、セマフォ、ロック、キュー、マッピングなどの分散サービスのセットを提供します。Redissonにより、開発者はJavaアプリケーションでRedisを使用しやすくなり、特に分散環境での同期プリミティブのリッチAPIサポートを提供します。 Redissonの分散ロックの中核原則はRedisコマンドに基づいていますが、より信頼性が高く使いやすい分散ロック実装を提供するために強化およびカプセル化されています。分散ロックを実装するという彼のアイデアは、RedisのSetNX実装に似ています。ただし、Redisの分散ロックのSetNX実装と比較して、Redissonはリエントラントロックもサポートしています。つまり、同じスレッドがすでにロックを取得している場合、ブロックされることなく再びロックを取得できます。ロックが保持される回数は、カウンターを通じて内部で記録されます。カウンターは、ロックが正常に取得されるたびに増加し、ロックが放出されるとロックが減少します。ロックは、カウンターがゼロに達すると本当にリリースされます。 Redissonは、ウォッチドッグメカニズムを使用してロックのステータスを監視し、ロックの有効期間を定期的に自動的に延長するため、ロックを保持しているクライアントが一時的に凍結されたり、ネットワークがジッターされたりしても、ロックはタイムアウトのために事前に解放されません。さらに、Redisクラスターの場合、RedissonはRedlockアルゴリズムを実装することもできます。これにより、複数のRedisノードでロックを取得することにより、分散ロックの可用性と断層トレランスが向上します。 Redissonを使用して分散ロックを実装して、在庫控除を実現します。 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.17.7</version> </dependency> spring: redisson: address: "redis://127.0.0.1:6379" password: @Override public void confirmOrderWithRedisson(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku); // 30秒过期RLock lock = redissonClient.getLock(lockKey); try { if (lock.tryLock(30, TimeUnit.SECONDS)) { // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); // 判断库存是否足够if (qty > availableInventory){ System.err.println("库存不足,不能出库"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); // 库存差值updateInventory.setDiffInventory(qty); tbInventoryMapper.updateInventory(updateInventory); } else { log.error("更新库存时发生并发冲突,请重试"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新库存时发生并发冲突,请重试"); } }catch (Exception e){ throw new ServiceException(StatusEnum.SERVICE_ERROR, "获取分布式锁时被中断"); }finally { // 无论成功与否,都要释放锁if (lock.isLocked() && lock.isHeldByCurrentThread()) { lock.unlock(); } } } 写真 Redissonは、さまざまなビジネスシナリオのニーズを満たすために、再突入ロック(RLock)、Read-Write Locks(ReadWriteLock)、Fair Locks(RFAirLock)など、さまざまな種類の分散ロックをサポートしています。 Redissonは、ロックホルダーが事業処理プロセス中に長期間完了できなかったため、ロックの有効期限と他のクライアントが獲得するのを防ぐことができるロックの自動更新機能をサポートしています。 Redisson Redlock Algorithm(複数のノードで展開する場合)の場合、一部のRedisノードが失敗したとしても、ほとんどのRedisノードが存続している間、ロックの安定性を維持でき、システムの断層許容度と高可用性を高めます。 単純なデータベースの悲観的ロックと比較して、Redissonの分散ロック実装はより複雑です。 Redissonは自動更新メカニズムを提供しますが、ロックを取得した後にクライアントが突然クラッシュし、ロックを正常に放出しない場合、ロックリークを引き起こすことが理論的に可能です。 Redissonはタイムアウト設定も提供していますが、極端な場合には、そのような問題を防ぐために、手動洗浄メカニズムまたはその他のソリューションが依然として必要です。 Zookeeperを使用しますZookeeperに分散ロックを実装することの基本原則は、Zookeeperの一時的なノードとWatcher監視メカニズムを利用することです。 クライアントは、Zookeeperで指定されたパスに一時的な順序付けされたノードを作成します。各ノード名には、ノードの順序を示す一意の増分番号が追加されます。複数のクライアントが同時にロックを要求すると、独自の一時的な注文ノードを作成します。 クライアントは、ノードの順序でロックを取得できるかどうかを決定します。最小のノード順序を持つクライアントはロックホルダーと見なされ、シーケンス番号がそれ自体よりも大きいことを観察するすべてのノードはロック解除されます。ロックホルダーは引き続きビジネスロジックを実行しますが、他のクライアントはウォッチャーを登録して、シリアル番号よりも小さいノードをリッスンします。 ロックホルダーがビジネス処理を完了すると、作成する一時的なノードが削除され、Zookeeperがウォッチャーをトリガーしてキューの次のノードに通知します。通知を受信する次のノードでは、観察するノードが削除されていることがわかります。そのため、現在のパスの残りのノードの順序を再確認します。現在最小のノードである場合、ロックが取得されたと考えられています。 ウォッチャーメカニズムにより、クライアントはZookeeperでノード変更イベントをリッスンできます。ノードが作成、削除、更新されると、Zookeeperは、対応するイベントを登録したクライアントに通知を送信します。分散ロックシナリオでは、クライアントはウォッチャーを登録することにより、ロックホルダーのノードステータスに耳を傾け、ロックがリリースされたときにロックを取得します。 写真 また、ApacheキュレーターフレームワークをZookeeperクライアントとして使用して、分散ロックを実装しています。キュレーターには優れたアーキテクチャデザインがあり、共有ロック、ミューテックスロック、障壁、リーダー選挙など、一般的な分散調整タスクを実装するための豊富なレシピ(つまり、プレハブテンプレート)を提供します。 分散ロックを実装する場合、キュレーターは、一時的なノードのライフサイクルアソシエーションセッション、順序付きノードの並べ替えメカニズム、およびウォッチャーイベントの通知メカニズムなど、Zookeeperの特性を完全に考慮して、ロック挙動がさまざまな異常な状況下での予想に沿っていることを保証します。 キュレーターは、再試行ポリシーとバックプレッシャー制御を内部的に統合します。 Zookeeper Operationsがネットワークの遅延または短期Zookeeperクラスターの不安定性に遭遇した場合、キュレーターはすぐに例外を投げる代わりに自動的に再試行できます。 @Component public class ZkLock { private final CuratorFramework client; private final InterProcessMutex lock; @Value("${curator.zookeeper.connect-string}") private String zookeeperConnectString; public ZkLock() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.newClient(zookeeperConnectString, retryPolicy); client.start(); // 分布式锁路径String lockPath = "/locks/product_stock"; lock = new InterProcessMutex(client, lockPath); } public void acquireLock(Runnable task) throws Exception { // 尝试获取锁,超时时间为30秒if (lock.acquire(30, TimeUnit.SECONDS)) { try { task.run(); // 在持有锁的情况下执行任务} finally { lock.release(); // 无论是否出现异常,都要确保释放锁} } else { throw new Exception("获取分布是锁失败"); } } } zklockを使用: @Override public void confirmOrderWithZk(String customerNo, String orderNo) { // 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo); String warehouseCode = outboundOrderDO.getWarehouseCode(); // 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo()); String sku = detailDO.getSku(); Integer qty = detailDO.getQty(); String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku); // 30秒过期zkLock.acquireLock(() -> { // 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku); Integer availableInventory = inventoryDO.getAvailableInventory(); // 判断库存是否足够if (qty > availableInventory){ System.err.println("库存不足,不能出库"); throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库"); } // 扣减库存TbInventoryDO updateInventory = new TbInventoryDO(); updateInventory.setCustomerNo(customerNo); updateInventory.setWarehouseCode(warehouseCode); updateInventory.setSku(sku); // 库存差值updateInventory.setDiffInventory(qty); tbInventoryMapper.updateInventory(updateInventory); }); } Apacheキュレーターによって実装された分散ロックは、分散環境で強い一貫性と高い信頼性を必要とする同時制御シナリオに適していますが、Zookeeperへの依存には、ネットワークオーバーヘッドと運用とメンテナンスの複雑さにいくつかの欠点が含まれます。 要約する分散ロックは、分散システムで相互に排他的な制御を実装するメカニズムであり、複数のマシン間で同時に1つのサービスまたは1つのリクエストによってリソースにアクセスまたは変更されるようにします。その核となる課題は、無関心な環境でグローバルな一意性と一貫性を確保する方法にあります。 その実装は、主に分散型ストレージシステムまたは調整されたサービスに依存しています。いくつかの一般的な実装方法があります。 - データベースに基づく:データベーストランザクションの酸性特性を使用して、特定の行の挿入/更新操作を通じてロックが取得され、削除/更新操作がロックを解放します。ただし、パフォーマンスのボトルネックと、高い並行性の下で限られたデータベース接続がある場合があります。
- キャッシュシステム(REDISなど)に基づく:SETNXやLUAスクリプトなどのアトミック操作でロックを取得するために一意のキー値ペアを設定し、デッドロックを防ぐためにロックタイムアウトの設定をサポートします。この方法には、パフォーマンスが高く組み込まれており、アンチディードロックメカニズムが組み込まれています。
- Zookeeperに基づく:Znode、Observerメカニズム、およびZookeeperの一時的な順序付けられたノードを使用します。このサービスは、一時的なノード競争ロックを作成することで勝ちます。ノードが失敗すると、Zookeeperは関連する一時ノードを自動的にクリーンアップして、自動ロック転送を実現します。
実際のビジネス開発では、特定のビジネスおよびシステムリソースに基づいて、適切な分散ロック実装方法を選択する必要があります。 |