序文 同時実行性の高いシステムでは、フローを制御することが非常に重要です。大量のトラフィックが直接サーバーに要求されると、インターフェースが短時間で利用できなくなる可能性があります。対処しないと、アプリケーション全体が使用できなくなる可能性もあります。 例えば、最近はこのような要望があります。クライアントとして、Kafka にデータを生成する必要があり、Kafka コンシューマーは継続的にデータを消費し、消費されたすべてのデータを Web サーバーに要求します。負荷はかかっているものの(Webサーバーは4台)、業務データの量も膨大で、毎秒数万件のデータが生成されることもあります。プロデューサーがデータを直接生成すると、Web サーバーがダウンする可能性が高くなります。 [[263347]] このため、Web の正常な動作をある程度保証するために、電流制限処理を実行し、一定量のデータを毎秒 Kafka に生成する必要があります。 実際、どのようなシナリオを扱う場合でも、本質はトラフィックを削減し、アプリケーションの高可用性を確保することです。 一般的なアルゴリズム 電流制限には 2 つの一般的なアルゴリズムがあります。 - リーキーバケットアルゴリズム
- トークンバケットアルゴリズム
リーキーバケットアルゴリズムは比較的単純です。トラフィックはバケットに入れられ、バケットも一定の速度で流出します。トラフィックが速すぎると、オーバーフローが発生します (リーキー バケットでは流出率は増加しません)。オーバーフローしたトラフィックは直接破棄されます。 次の図に示すように:
このアプローチは単純かつ粗雑です。 リーキーバケットアルゴリズムはシンプルですが、トラフィックの急増などの実際のシナリオには対応できません。 このとき、トークン バケット アルゴリズムが必要になります。 トークン バケットは、一定の速度で固定容量のバケットにトークンを入れ、トラフィックが到着すると 1 つ以上のトークンを取り除きます。バケット内にトークンがない場合、現在のリクエストは破棄またはブロックされます。 対照的に、トークン バケットは一定量のバースト トラフィックを処理できます。 レートリミッターの実装 トークン バケットのコード実装には、Guava パッケージの RateLimiter を直接使用できます。 - @オーバーライド
- パブリックBaseResponse<UserResVO> getUserByFeignBatch(@RequestBody UserReqVO userReqVO) {
- //リモートサービスを呼び出す
- OrderNoReqVO vo = 新しい OrderNoReqVO();
- vo.setReqNo(userReqVO.getReqNo());
- RateLimiter リミッター = RateLimiter.create (2.0);
- //バッチ呼び出し
- ( int i = 0 ; i< 10 ; i++) {
- ダブル取得 = リミッター.取得();
- logger.debug( "トークンを正常に取得しました!,consumption=" + acquire);
- BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNo(vo);
- logger.debug( "リモート戻り値:" +JSON.toJSONString(orderNo));
- }
- UserRes ユーザーRes = 新しい UserRes() ;
- ユーザーIDを設定します。
- userRes.setUserName( "张三" );
- userRes.setReqNo(userReqVO.getReqNo());
- userRes.setCode(StatusEnum.SUCCESS.getCode());
- userRes.setMessage( "成功" );
- userResを返します。
- }
詳細はこちらをご覧ください。 通話結果は次のとおりです。 コードから、1 秒あたり 2 つのトークンがバケットに入れられ、一度に 1 つのトークンが消費されることがわかります。したがって、1 秒あたりに送信できるリクエストは 2 つだけです。これは、図の時間によると確かに当てはまります (戻り値は、このトークンを取得するのに費やされた時間であり、約 500 ミリ秒ごとに 1 回です)。 RateLimiter を使用する際には、いくつか注意すべき点があります。 最初に消費し、後で支払うことができるため、リクエストが届いたときに、一度に少数のトークン、または残りのすべてのトークン、あるいはそれ以上のトークンを受け取ることができますが、後続のリクエストでは、前のリクエストに対して支払う必要があります。トークンの取得を続行するには、バケット内のトークンが補充されるまで待機する必要があります。 要約する 単一のアプリケーションの場合、RateLimiter で十分です。分散環境の場合は、Redis の助けを借りて実行できます。 来てデモをしてください。 注文アプリケーションによって提供されるインターフェースでは、電流制限が採用されています。まず、電流制限ツールを構成する Bean: - @構成
- パブリッククラスRedisLimitConfig {
- @Value( "${redis.limit}" )
- プライベートint制限;
- オートワイヤード
- プライベート JedisConnectionFactory jedisConnectionFactory;
- @ビーン
- パブリックRedisLimitビルド() {
- RedisClusterConnection の clusterConnection = jedisConnectionFactory.getClusterConnection();
- JedisCluster の場合、clusterConnection.getNativeConnection() を使用します。
- RedisLimit redisLimit = 新しい RedisLimit.Builder<>(jedisCluster)
- .limit(制限)
- 。建てる();
- redisLimitを返します。
- }
- }
次に、コントローラーでコンポーネントを使用します。 - オートワイヤード
- プライベート RedisLimit redisLimit;
- @オーバーライド
- チェック要求なし
- パブリックBaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) {
- BaseResponse<OrderNoResVO> res = 新しい BaseResponse();
- //電流制限
- ブール制限 = redisLimit.limit();
- (!制限)の場合{
- res.setCode(StatusEnum.REQUEST_LIMIT.getCode());
- res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage());
- resを返します。
- }
- res.setReqNo(orderNoReq.getReqNo());
- if ( null == orderNoReq.getAppId()){
- 新しい SBCException(StatusEnum.FAIL) をスローします。
- }
- OrderNoResVO orderNoRes = 新しい OrderNoResVO();
- orderNoRes.setOrderId(DateUtil.getLongTime());
- res.setCode(StatusEnum.SUCCESS.getCode());
- res.setMessage(StatusEnum.SUCCESS.getMessage());
- res.setDataBody(orderNoRes);
- resを返します。
- }
使いやすさのために、注釈も提供されています。 - @オーバーライド
- @コントローラー制限
- パブリックBaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) {
- BaseResponse<OrderNoResVO> res = 新しい BaseResponse();
- // ビジネスロジック
- resを返します。
- }
このアノテーションは、http リクエストをインターセプトし、リクエストがしきい値に達すると直接戻ります。 通常の方法も使用できます: - @共通制限
- パブリックvoid doSomething(){}
呼び出ししきい値に達すると例外がスローされます。 同時実行性をシミュレートするために、ユーザー アプリケーションで 10 個のスレッドが開かれ、Order インターフェイスが呼び出されます (現在の制限は 5 回) (JMeter などの専門的な同時実行性テスト ツールも使用できます)。 - @オーバーライド
- パブリックBaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) {
- //リモートサービスを呼び出す
- OrderNoReqVO vo = 新しい OrderNoReqVO();
- vo.setAppId(1L);
- vo.setReqNo(userReq.getReqNo());
- ( int i = 0; i < 10; i++) {
- executorService.execute (新しい Worker(vo、orderServiceClient));
- }
- ユーザーRes ユーザーRes = 新しい ユーザーRes();
- ユーザーIDを設定します。
- userRes.setUserName( "张三" );
- userRes.setReqNo(userReq.getReqNo());
- userRes.setCode(StatusEnum.SUCCESS.getCode());
- userRes.setMessage( "成功" );
- userResを返します。
- }
- プライベート静的クラス Worker は Runnable を実装します {
- プライベートOrderNoReqVO vo;
- プライベート OrderServiceClient orderServiceClient;
- パブリックワーカー(OrderNoReqVO vo、OrderServiceClient orderServiceClient) {
- this.vo = vo;
- this.orderServiceClient = orderServiceClient;
- }
- @オーバーライド
- パブリックボイド実行(){
- BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo);
- logger.info( "リモートリターン:" + JSON.toJSONString(orderNo));
- }
- }
分散効果を確認するために、2 つの Order アプリケーションを起動します。 効果は以下のとおりです。 実施原則 実装原理は実は非常にシンプルです。分散グローバル電流制限の効果を実現したいので、当然、リクエスト数を記録するサードパーティ コンポーネントが必要になります。 Redis はこのようなシナリオに非常に適しています。 - 現在の時刻 (秒単位の精度) が各リクエストのキーとして Redis に書き込まれ、タイムアウトは 2 秒に設定されます。 Redis はキーの値を増やします。
- しきい値に達するとエラーを返します。
- Redis への書き込み操作は Lua スクリプトを使用して完了し、Redis のシングルスレッド メカニズムにより、各 Redis リクエストのアトミック性が保証されます。
Lua スクリプトは次のとおりです。 --lua の添え字は 1 から始まります -- 現在の制限キーlocal key = KEYS[1]-- 現在の制限サイズlocal limit = tonumber(ARGV[1])-- 現在のフロー サイズを取得しますlocal curentLimit = tonumber(redis.call('get', key) or "0")if curentLimit + 1 > limit then -- 現在の制限サイズに達した場合は 0 を返します。else -- しきい値 + 1 に達していません redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1end Java でのロジックの呼び出し: -
-
- 地元 キー= KEYS[1]
-
- 局所限界 = tonumber(ARGV[1])
-
- ローカルのcurentLimit = tonumber(redis.call( 'get' , key )または 「0」 )
- 現在の制限 + 1 > 制限の場合
-
- 0を返します。
- それ以外
-
- redis.call( "INCRBY" 、キー、1)
- redis.call( "EXPIRE" 、キー、2)
- 現在の制限値 + 1を返す
- 終わり
したがって、電流制限が必要な場合にのみこのメソッドを呼び出して、戻り値を判断し、電流制限の目的を達成する必要があります。 もちろん、これは Redis を使用して作成された単なる粗いカウンターです。上記と同様のトークン バケット アルゴリズムを実装したい場合は、Lua ベースで自分で実装できます。 ビルダー このコンポーネントを設計する際には、明確で読みやすく、エラーが発生しにくい API をユーザーに提供するよう努めました。 たとえば、最初のステップでは、電流制限オブジェクトを構築する方法を説明します。 最も一般的な方法は、もちろんコンストラクターです。複数のドメインがある場合は、重複するコンストラクター メソッドを使用できます。 - パブリックA(){}
- パブリックA( int a){}
- パブリックA( int a, int b){}
欠点も明らかです。パラメータが多すぎると、読みにくくなります。パラメータの型が一貫していて、クライアントが順序を逆にした場合でも、警告は発生せず、予期しない結果が生じます。 2 番目の解決策は、JavaBean モードを使用し、setter メソッドを使用して構築することです。 - A a = 新しいA();
- a.setA(a);
- a.setB(b);
このアプローチは明確で読みやすいですが、オブジェクトを不整合な状態にし、オブジェクトをスレッドセーフでなくしてしまう可能性が高くなります。 そこで、ここではオブジェクトを作成する 3 番目の方法であるビルダーを使用します。 - パブリッククラスRedisLimit {
- プライベート JedisCommands jedis;
- プライベートint制限 = 200;
- プライベート静的最終int FAIL_CODE = 0;
- /**
- * lua スクリプト
- */
- プライベート文字列スクリプト;
- プライベートRedisLimit(ビルダービルダー) {
- ビルダーの制限
- this.jedis = builder.jedis;
- ビルドスクリプト();
- }
- /**
- * トラフィックを制限する
- * trueの場合は@return
- */
- パブリックブール制限(){
- 文字列キー= String.valueOf(System.currentTimeMillis() / 1000);
- オブジェクト結果 = null ;
- if (jedis インスタンス of Jedis) {
- 結果 = ((Jedis) this.jedis).eval(script, Collections.singletonList( key ), Collections.singletonList(String.valueOf(limit)));
- }そうでない場合 (jedis インスタンスの JedisCluster) {
- 結果 = ((JedisCluster) this.jedis).eval(スクリプト、Collections.singletonList( key )、Collections.singletonList(String.valueOf(limit)));
- }それ以外{
- //新しい RuntimeException( "インスタンスがエラーです" ) をスローします。
- 戻る 間違い;
- }
- if (FAIL_CODE != (Long) 結果) {
- 戻る 真実;
- }それ以外{
- 戻る 間違い;
- }
- }
- /**
- * luaスクリプトを読む
- */
- プライベートvoidビルドスクリプト(){
- スクリプト = ScriptUtil.getScript( "limit.lua" );
- }
- /**
- * ビルダー
- * @param <T>
- */
- 公共 静的クラス Builder<T extends JedisCommands>{
- プライベート T jedis = null ;
- プライベートint制限 = 200;
- パブリックビルダー(T jedis){
- this.jedis = jedis;
- }
- パブリックビルダー制限( int制限){
- 制限 = 制限;
- これを返します。
- }
- パブリックRedisLimitビルド(){
- 新しいRedisLimit(this)を返します。
- }
- }
- }
したがって、クライアントがこれを使用する場合: - RedisLimit redisLimit = 新しい RedisLimit.Builder<>(jedisCluster)
- .limit(制限)
- 。建てる();
これははるかにシンプルで直接的であり、作成プロセスが複数のサブステップに分割されることを回避します。 これは、コンストラクター パラメーターが複数あるが、それらが必須フィールドではない場合に便利です。 したがって、分散ロック ビルダー メソッドも更新されます。 https://github.com/crossoverJie/distributed-redis-tool#features API 上記からわかるように、使用手順は limit メソッドを呼び出すことです。 - //電流制限
- ブール制限 = redisLimit.limit();
- (!制限)の場合{
- //特定の電流制限ロジック
- }
侵入を減らし、クライアントを簡素化するために、2 つの注釈メソッドが提供されています。 @コントローラー制限 このアノテーションは、@RequestMapping によって変更されたインターフェースで使用でき、現在の制限後に現在の制限応答を提供します。 実装は次のとおりです。 - @成分
- パブリッククラスWebInterceptはWebMvcConfigurerAdapterを拡張します{
- プライベート静的Logger ロガー = LoggerFactory.getLogger(WebIntercept.class);
- オートワイヤード
- プライベート RedisLimit redisLimit;
- @オーバーライド
- パブリックvoid addInterceptors(InterceptorRegistry レジストリ) {
- レジストリ.addInterceptor(新しい CustomInterceptor())
- .addPathPatterns( "/**" );
- }
- プライベートクラス CustomInterceptor は HandlerInterceptorAdapter を拡張します {
- @オーバーライド
- パブリックブールpreHandle(HttpServletRequest リクエスト、HttpServletResponse レスポンス、
- オブジェクトハンドラ)が例外をスローします{
- redisLimit == nullの場合
- 新しい NullPointerException をスローします ( "redisLimit が null です" );
- }
- if (ハンドラーインスタンスHandlerMethod) {
- HandlerMethod メソッド = (HandlerMethod) ハンドラ;
- ControllerLimit アノテーション = method.getMethodAnnotation(ControllerLimit.class);
- if (アノテーション == null ) {
- //スキップ
- 戻る 真実;
- }
- ブール制限 = redisLimit.limit();
- (!制限)の場合{
- logger.warn( "リクエストにはBean制限があります" );
- response.sendError(500, "リクエスト制限" );
- 戻る 間違い;
- }
- }
- 戻る 真実;
- }
- }
- }
実際、SpringMVC でインターセプターを実装し、インターセプション プロセス中にアノテーションが使用されるかどうかを決定して、現在の制限ロジックを呼び出します。 前提として、アプリケーションはクラスをスキャンし、Spring に管理させる必要があります。 - @ComponentScan(値 = "com.crossoverjie.distributed.intercept" )
@共通制限 もちろん、通常の方法でもご使用いただけます。実装の原則は Spring AOP です (SpringMVC のインターセプターは基本的に AOP です)。 - @側面
- @成分
- @EnableAspectJAutoProxy(プロキシターゲットクラス = true )
- パブリッククラスCommonAspect {
- プライベート静的Logger ロガー = LoggerFactory.getLogger(CommonAspect.class);
- オートワイヤード
- プライベート RedisLimit redisLimit;
- @Pointcut( "@annotation(com.crossoverjie.distributed.annotation.CommonLimit)" )
- プライベート voidチェック(){}
- @Before( "チェック()" )
- パブリックvoid before(JoinPoint joinPoint)は例外をスローします{
- redisLimit == nullの場合
- 新しい NullPointerException をスローします ( "redisLimit が null です" );
- }
- ブール制限 = redisLimit.limit();
- (!制限)の場合{
- logger.warn( "リクエストにはBean制限があります" );
- 新しい RuntimeException をスローします ( "リクエストには Bean 制限があります" );
- }
- }
- }
非常にシンプルで、傍受プロセス中に電流制限も呼び出されます。 もちろん、使用時にパッケージをスキャンする必要もあります。 - @ComponentScan(値 = "com.crossoverjie.distributed.intercept" )
要約する 電流制限は、同時実行性が高くトラフィック量が多いシステムでアプリケーションを保護するための強力なツールです。成熟したソリューションは数多くあります。この分野を理解し始めたばかりの友人たちに、いくつかのアイデアを提供できればと思います。 |