[[393657]] 環境: springboot.2.4.9 + RabbitMQ3.7.4 ベストエフォート通知とは何ですか?これは充電のケースです 対話プロセス: 1. アカウント システムが再チャージ システム インターフェイスを呼び出します。 2. 再チャージシステムが支払いを完了すると、アカウントシステムに再チャージ結果通知が送信されます。通知が失敗した場合、再充電システムは戦略に従って通知を繰り返します。 3. アカウントシステムは再チャージ結果通知を受信し、再チャージステータスを変更します。 4. アカウント システムが通知を受信しない場合、再チャージ システムのインターフェイスを積極的に呼び出して再チャージの結果を照会します。 上記の例から、ベスト エフォート通知スキームの目標をまとめることができます。つまり、通知の発信者は、特定のメカニズムを通じてビジネス処理の結果を受信者に通知するために最善の努力をします。具体的には以下が含まれます: 1. 特定のメッセージ繰り返し通知メカニズムがあります。通知の受信者が通知を受け取っていない可能性があるため、メッセージを繰り返すための特定のメカニズムが必要です。 2. メッセージ校正メカニズム。最善の努力にもかかわらず受信者に通知されない場合、または受信者がメッセージを消費した後に再度消費する必要がある場合、受信者は要求を満たすために通知側にメッセージ情報を積極的に問い合わせることができます。 ベストエフォート通知と信頼性の高いメッセージ一貫性の違いは何ですか? 1. さまざまなソリューションのアイデア: 信頼性の高いメッセージの一貫性。通知の発信者は、メッセージが送信され、通知の受信者に届くことを確認する必要があります。メッセージの信頼性は主に通知の発信者によって保証されます。ベスト エフォート通知: 通知の発信者は、通知の受信者にビジネス処理の結果を通知するために最善の努力を払いますが、メッセージが受信されない可能性があります。この場合、通知の受信者は、ビジネス処理の結果を照会するために、イニシエーターのインターフェースを積極的に呼び出す必要があります。通知の信頼性は通知の受信者によって異なります。 2. 両者のビジネスアプリケーションシナリオは異なります。信頼性の高いメッセージの一貫性は、トランザクション プロセスのトランザクション一貫性に重点を置き、非同期方式でトランザクションを完了します。ベスト エフォート通知は、トランザクション後の通知、つまりトランザクション結果を確実に通知することに重点を置いています。 3. さまざまな技術的ソリューション 信頼性の高いメッセージの一貫性を実現するには、送信から受信までのメッセージの一貫性、つまりメッセージの送受信を解決する必要があります。ベストエフォート通知では、送信から受信までのメッセージの一貫性を保証することはできませんが、メッセージ受信の信頼性メカニズムのみを提供します。信頼できるメカニズムは、メッセージの受信者に通知するために最大限の努力をすることです。受信者がメッセージを受信できない場合、受信者は消費について積極的に問い合わせます。 RabbitMQ によるベストエフォート通知RabbitMQ に関する関連記事: 「SpringBoot RabbitMQ メッセージの信頼性の高い送受信」、「RabbitMQ メッセージ確認メカニズムの確認」。 プロジェクト構造 2 つのサブモジュール: users-mananger (アカウント モジュール)、pay-manager (支払いモジュール) 頼る - <依存関係>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-jpa</artifactId>
- </依存関係>
- <依存関係>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </依存関係>
- <依存関係>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </依存関係>
- <依存関係>
- <グループID>mysql</グループID>
- <artifactId>mysql-コネクタ-java</artifactId>
- <scope>ランタイム</scope>
- </依存関係>
サブモジュール pay-manager設定ファイル - サーバ:
- ポート: 8080
-
- 春:
- ウサギさん:
- ホスト: ローカルホスト
- ポート: 5672
- ユーザー名: ゲスト
- パスワード: ゲスト
- 仮想ホスト: /
- 発行者確認タイプ: 相関
- 発行者戻り値: true
- リスナー:
- 単純:
- 同時実行数: 5
- 最大同時実行数: 10
- プリフェッチ: 5
- 承認モード: 手動
- リトライ:
- 有効: true
- 初期間隔: 3000
- 最大試行回数: 3
- デフォルトの再キュー拒否: false
エンティティクラス チャージ金額とアカウント情報を記録する - @実在物
- @テーブル(名前= "t_pay_info" )
- パブリッククラス PayInfo はSerializable を実装します{
- @ID
- プライベートな Long ID;
- プライベート BigDecimal マネー;
- プライベート Long accountId ;
- }
DAOとサービス - パブリックインターフェース PayInfoRepository は JpaRepository<PayInfo, Long> を拡張します {
- PayInfo findByOrderId(文字列 orderId);
- }
- @サービス
- パブリッククラス PayInfoService {
-
- @リソース
- プライベート PayInfoRepository payInfoRepository ;
- @リソース
- プライベート RabbitTemplate rabbitTemplate ;
-
- // データが保存された後にメッセージを送信します(メッセージは確認モードまたはトランザクションモードで送信できます)
- @トランザクション
- パブリックPayInfo savePayInfo(PayInfo payInfo) {
- payInfo.setId(System.currentTimeMillis());
- PayInfo 結果 = payInfoRepository.save(payInfo);
- 相関データ correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll( "-" , "" ));
- 試す {
- rabbitTemplate.convertAndSend( "pay-exchange" 、 "pay.#" 、新しいObjectMapper().writeValueAsString(payInfo)、相関データ);
- } キャッチ (AmqpException | JsonProcessingException e) {
- e.printStackTrace();
- }
- 結果を返します。
- }
-
- パブリックPayInfo queryByOrderId(String orderId) {
- payInfoRepository.findByOrderId(orderId)を返します。
- }
-
- }
お支払いが完了したらメッセージを送信してください。 コントローラーインターフェース - @レストコントローラ
- @RequestMapping( "/payInfos" )
- パブリッククラス PayInfoController {
- @リソース
- プライベート PayInfoService payInfoService ;
-
- // 支払いインターフェース
- @PostMapping( "/pay" )
- パブリックオブジェクト支払い(@RequestBody PayInfo payInfo) {
- payInfoService.savePayInfo(payInfo);
- 戻る 「支払いが送信されました。結果を待っています」 ;
- }
-
- @GetMapping( "/queryPay" )
- パブリックオブジェクトクエリペイ(文字列オーダーID) {
- payInfoService.queryByOrderId(orderId)を返します。
- }
-
- }
サブモジュール users-managerアプリケーション構成 - サーバ:
- ポート: 8081
-
- 春:
- ウサギさん:
- ホスト: ローカルホスト
- ポート: 5672
- ユーザー名: ゲスト
- パスワード: ゲスト
- 仮想ホスト: /
- 発行者確認タイプ: 相関
- 発行者戻り値: true
- リスナー:
- 単純:
- 同時実行数: 5
- 最大同時実行数: 10
- プリフェッチ: 5
- 承認モード: 手動
- リトライ:
- 有効: true
- 初期間隔: 3000
- 最大試行回数: 3
- デフォルトの再キュー拒否: false
エンティティクラス - @実在物
- @テーブル(名前= "t_users" )
- パブリッククラスUsers{
- @ID
- プライベートな Long ID;
- プライベート文字列名;
- プライベート BigDecimal マネー;
- }
アカウント情報フォーム - @実在物
- @テーブル(名前= "t_users_log" )
- パブリッククラスUsersLog{
- @ID
- プライベートな Long ID;
- プライベート文字列 orderId ;
- // 0: 支払い中、1: 支払い済み、2: キャンセル済み
- @列(columnDefinition = "int default 0" )
- プライベート整数ステータス = 0 ;
- プライベート BigDecimal マネー;
- プライベートDate createTime ;
- }
アカウント再チャージ記録テーブル(重複排除) DAOとサービス - パブリックインターフェースUsersRepositoryはJpaRepository<Users, Long>を拡張します。
- }
- パブリックインターフェースUsersLogRepositoryはJpaRepository<UsersLog, Long>を拡張します。
- ユーザーログ findByOrderId(文字列 orderId);
- }
サービスクラス - @サービス
- パブリッククラスUsersService{
- @リソース
- プライベートUsersRepository usersRepository;
- @リソース
- プライベートUsersLogRepository usersLogRepository;
-
- @トランザクション
- パブリックブール型 updateMoneyAndLogStatus(Long id, String orderId) {
- ユーザーログ usersLog = usersLogRepository.findByOrderId(orderId);
- (usersLog != null && 1 == usersLog.getStatus()) の場合 {
- 新しい RuntimeException( "paid" ) をスローします。
- }
- ユーザー users = usersRepository.findById(id).orElse( null );
- ユーザー == nullの場合
- 新しい RuntimeException( "アカウントが存在しません" ) をスローします。
- }
- users.setMoney(users.getMoney(). add (usersLog.getMoney()));
- usersRepository.save(ユーザー);
- ユーザーログのステータスを設定します(1);
- usersLog リポジトリを保存します。
- 戻る 真実;
- }
-
- @トランザクション
- パブリックブール値 saveLog(UsersLog usersLog) {
- ユーザーログにIdを設定します。
- usersLog リポジトリを保存します。
- 戻る 真実;
- }
- }
メッセージ監視 - @成分
- パブリッククラス PayMessageListener {
-
- プライベート静的最終ロガー logger = LoggerFactory.getLogger(PayMessageListener.class);
-
- @リソース
- プライベートUsersService usersService;
-
- @SuppressWarnings( "チェックなし" )
- @RabbitListener(キュー = { "ペイキュー" })
- @RabbitHandler
- パブリックvoid 受信(メッセージ メッセージ、チャネル チャネル) {
- 長い配信タグ = message.getMessageProperties().getDeliveryTag();
- バイト[] buf = null ;
- 試す {
- buf = message.getBody();
- logger.info( "受信したメッセージ: {}" , new String(buf, "UTF-8" )) ;
- Map<String, Object> 結果 = new JsonMapper().readValue(buf, Map.class);
- Long id = (( Integer ) result.get( "accountId" )) + 0L ;
- 文字列 orderId = (文字列) result.get( "orderId" ) ;
- usersService.updateMoneyAndLogStatus(id、orderId);
- チャネル.basicAck(配信タグ、 true );
- } キャッチ (例外 e) {
- logger.error( "メッセージ受信中に例外が発生しました: {}、例外メッセージ: {}" 、 e.getMessage()、 new String(buf、 Charset.forName( "UTF-8" ))) ;
- e.printStackTrace();
- 試す {
- // このような異常なメッセージは、手動で調査するためにデッドレターキューに入れる必要があります。
- チャネル.basicReject(配信タグ、 false );
- } キャッチ (IOException e1) {
- logger.error( "メッセージ再エントリキュー例外を拒否: {}" , e1.getMessage());
- e1.printStackTrace();
- }
- }
- }
- }
コントローラーインターフェース - @レストコントローラ
- @RequestMapping( "/users" )
- パブリッククラスUsersController{
-
- @リソース
- プライベート RestTemplate 残りのテンプレート ;
- @リソース
- プライベートUsersService usersService;
-
- @PostMapping( "/pay" )
- パブリックオブジェクトpay(Long id, BigDecimal money)は例外をスローします{
- HttpHeaders ヘッダー = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
- 文字列 orderId = UUID.randomUUID().toString().replaceAll( "-" , "" );
- Map<String, String> パラメータ = new HashMap<>();
- params.put( "accountId" , String.valueOf(id));
- params.put( "orderId" 、orderId);
- params.put( "お金" 、money.toString());
-
- ユーザーログ usersLog = 新しいユーザーログ() ;
- usersLog.setCreateTime(新しい日付());
- usersLog.setOrderId(注文ID);
- usersLog.setMoney(お金);
- ユーザーログのステータスを0に設定します。
- usersService.saveLog(usersLog);
- HttpEntity<String> requestEntity = new HttpEntity<String>(new ObjectMapper().writeValueAsString(params), headers);
- restTemplate.postForObject( "http://localhost:8080/payInfos/pay" 、 requestEntity、 String.class )を返します。
- }
-
- }
上記は2つのサブモジュールのコード全体です テスト初期データ アカウントサブモジュールコンソール 支払いサブモジュールコンソール データテーブルデータ 完了! ! ! |