Kafka にまた問題が発生しました!

Kafka にまた問題が発生しました!

[[384383]]

著者は、正確にスケジュールされたタスクと遅延キュー処理機能を備えた、高同時実行シナリオ向けのシンプルで安定したスケーラブルな遅延メッセージ キュー フレームワークを個人的に開発しました。半年以上前にオープンソース化されて以来、10 社を超える中小企業に正確でタイムリーなスケジューリング ソリューションを提供することに成功し、実稼働環境でのテストにも耐えてきました。より多くの人々の利益のために、オープンソース フレームワークのアドレスが提供されるようになりました: https://github.com/sunshinelyz/mykit-delay

序文

運営・保守が新年を迎える前にサーバーを拝んでいなかったものと推定されます。 Nginx の問題は修正されましたが、Kafka は再び動作しません。今日はもう少し寝たかったのですが、また電話が鳴りました。それはまだ操作でした、「こんにちは、Binghe、会社に到着しましたか?すぐにサーバーを確認してください、再び問題が発生しています。」 「今向かっています。運用保守担当者はまだ出勤していないのですか?」 「まだ休暇中…」、私:「…」おい、こいつは逃げたのか?今のところ彼を無視しましょう。問題はまだ解決する必要があります。

問題の再現

会社に到着後、私は専用のバックパックを置き、武器であるノートパソコンを取り出し、電源を入れてすぐに監視システムにログインし、メインの業務システムに問題がないことを確認しました。コア以外のサービスがアラームを発行し、監視システムではこのサービスが次の例外を頻繁にスローしていることが示されました。

  1. 2021-02-28 22:03:05 131 プール-7-スレッド-3 エラー [] -
  2. コミットに失敗しました
  3. org.apache.kafka.clients.consumer.CommitFailedException:グループがすでに再バランス調整されパーティションが別のメンバー割り当てられているため、コミットを完了できません。これは、時間 後続のpoll()呼び出し間隔が、設定された最大値.poll.interval.ms よりも長くなっていました。これは通常、ポーリング ループがメッセージ処理に時間がかかりすぎることを意味ます。これを解決するにはセッションタイムアウトを増やすか、  最大サイズを縮小することで  poll()返されるバッチ 最大ポーリングレコード数。
  4. org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest (ConsumerCoordinator.java:713) ~[MsgAgent-jar- with -dependencies.jar:na]
  5. org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596)~[MsgAgent-jar- with -dependencies.jar:na]
  6. org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218)~[MsgAgent-jar- with -dependencies.jar:na]
  7. com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121)~[MsgAgent-jar- with -dependencies.jar:na]
  8. java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
  9. java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
  10. java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

上記の例外情報出力から、システムの問題を大まかに判断できます。ポーリング メッセージのバッチを処理した後、Kafka コンシューマーはブローカーにオフセットを同期的に送信するときにエラーを報告しました。これはおそらく、Kafka がコンシューマーがクラッシュしたと判断し、現在のコンシューマー スレッドのパーティションがブローカーによって再利用されたためであり、これは以下の出力情報から確認できます。

  1. グループはすでに再バランス調整されパーティションが別のメンバー割り当てられているため、コミットを完了できません。これは、時間 後続のpoll()呼び出し間隔が、設定された最大値.poll.interval.ms よりも長くなっていました。これは通常、ポーリング ループがメッセージ処理に時間がかかりすぎることを意味ます。これを解決するにはセッションタイムアウトを増やすか、  最大サイズを縮小することで  poll()返されるバッチ 最大ポーリングレコード数。

Kafka 内で再バランス メカニズムが内部的にトリガーされ、問題が特定されます。次に、問題の分析を始めます。

問題を分析する

Kafka は Rebalance メカニズムをトリガーするので、Kafka が Rebalance をトリガーするタイミングについて説明します。

リバランスとは

より具体的な例を挙げると、グループの下に 10 個の Consumer インスタンスがあり、このグループは 50 個のパーティションを持つトピックをサブスクライブします。通常、Kafka は各コンシューマーに 5 つのパーティションを割り当てます。この分配プロセスはリバランスと呼ばれます。

リバランスをトリガーするタイミング

Kafka で次の条件が満たされると、再バランスがトリガーされます。

  • グループ内のメンバー数が変更されました。たとえば、新しい消費者が消費者グループに参加したり、消費者グループから脱退したりしました。コンシューマー グループを離れるグループ メンバーには、クラッシュしたグループ メンバーや積極的にコンシューマー グループを離れるグループ メンバーが含まれます。
  • 購読されているトピックの数が変更されました。
  • サブスクライブされたトピックのパーティション数が変更されました。

後者の 2 つの状況は人為的に回避できます。実際の作業では、Kafka リバランスの最も一般的な理由は、コンシューマー グループ メンバーの変更です。

消費者メンバーの通常の追加と削除は、避けられないリバランスにつながります。ただし、場合によっては、コンシューマー インスタンスがコーディネーターによって誤って「停止」されたと判断され、グループから「追い出され」、再バランスが発生する可能性があります。

コンシューマー グループが再バランスを完了すると、各コンシューマー インスタンスは、まだ稼働していることを示すハートビート要求をコーディネーターに定期的に送信します。コンシューマー インスタンスがこれらのハートビート要求をタイムリーに送信できない場合、コーディネーターはコンシューマーを「デッド」と見なし、グループから削除して、新しいラウンドの再バランスを開始します。この時間は、コンシューマー側のパラメータ session.timeout.ms を通じて設定できます。デフォルト値は 10 秒です。

このパラメータに加えて、Consumer はハートビート要求の送信頻度を制御するパラメータ (heartbeat.interval.ms) も提供します。この値が小さいほど、コンシューマー インスタンスがハートビート要求を送信する頻度が高くなります。ハートビート要求を頻繁に送信すると追加の帯域幅リソースが消費されますが、コーディネーターが各コンシューマー インスタンスにリバランスを有効にするように通知する現在の方法は、ハートビート要求の応答本文に REBALANCE_NEEDED フラグをカプセル化することであるため、リバランスが現在有効になっているかどうかをより迅速に知ることができるという利点があります。

上記の 2 つのパラメータに加えて、コンシューマー側には、コンシューマーの実際の消費容量がリバランスに与える影響を制御するために使用されるパラメータ、つまり max.poll.interval.ms パラメータもあります。これは、コンシューマー アプリケーションによるポーリング メソッドへの 2 回の呼び出し間の最大時間間隔を制限します。デフォルト値は 5 分です。つまり、コンシューマー プログラムが 5 分以内にポーリング メソッドによって返されたすべてのメッセージを消費できない場合、コンシューマーは「グループを離れる」要求を積極的に開始し、コーディネーターは新しいラウンドの再バランスを開始します。

上記の分析により、どのリバランスを回避できるかがわかります。

最初のタイプの不要なリバランスは、ハートビートを時間内に送信できないことが原因で発生し、その結果、コンシューマーがグループから「追い出され」ます。この場合、再バランスをできるだけ回避するために、session.timeout.ms と heartbeat.interval.ms の値を設定できます。 (以下の構成はインターネットで見つかったベストプラクティスであり、まだテストされていません)

  • session.timeout.ms = 6s に設定します。
  • heartbeat.interval.ms = 2s に設定します。
  • Consumer インスタンスが「デッド」と判断される前に少なくとも 3 ラウンドのハートビート要求を送信できるようにします。つまり、session.timeout.ms >= 3 * heartbeat.interval.ms です。

session.timeout.ms を 6 秒に設定する主な目的は、コーディネーターが電話を切ったコンシューマーをより迅速に見つけ、できるだけ早くグループから追い出せるようにすることです。

2 番目のタイプの不必要な再バランスは、消費者が長期間支出を続けることで発生します。このとき、max.poll.interval.ms パラメータ値の設定が特に重要になります。予期しない再バランスを回避するには、このパラメータ値を、ダウンストリームの最大処理時間よりもわずかに長い大きな値に設定することをお勧めします。

つまり、ビジネス処理ロジックに十分な時間を残しておくということです。この方法では、これらのメッセージの処理に時間がかかりすぎるため、コンシューマーはリバランスをトリガーしません。

オフセットのプルとコミット

Kafka のオフセットはコンシューマーによって管理されます。オフセットには、プル オフセット (位置) とコミット オフセット (コミット済み) の 2 種類があります。プル オフセットは、現在のコンシューマー パーティションの消費の進行状況を表します。各メッセージが消費された後、オフセットを送信する必要があります。オフセットをコミットするとき、Kafka はプルされたオフセットの値をパーティションのコミットされたオフセットとして使用し、それをコーディネータに送信します。

オフセットが送信されない場合、コンシューマーが次にブローカーに再接続したときに、現在のコンシューマー グループによってブローカーに送信されたオフセットから消費を開始します。

つまり、問題はここにあります。メッセージの処理時間が長すぎると、ブローカーによってメッセージが削除され、オフセットを送信するとエラーが発生します。したがって、プルされたオフセットはブローカーに送信されず、パーティションは再バランスされます。次にパーティションが再割り当てされると、コンシューマーは最新のコミットされたオフセットから消費を開始します。ここで重複消費の問題が発生します。

異常なログプロンプトの解決方法

実際、ここまで述べたように、対応する解決策は、Kafka コンシューマーによって出力される例外ログにも示されています。

次に、Kafka のプル オフセットとコミット オフセットについて説明します。

実際、出力されたログ情報からも、問題の大まかな解決策がわかります。簡単に言えば、max.poll.interval.ms と session.timeout.ms の期間を増やし、max.poll.records の構成値を減らし、コンシューマーはメッセージを処理した後に時間内にオフセットを送信する必要があります。

問題解決

これまでの分析を通じて、この問題をどのように解決するかがわかるはずです。ここで言及する必要があるのは、Kafka を統合したときに、SpringBoot と Kafka コンシューマー リスナーを使用したことです。コンシューマー側の主なコード構造は次のとおりです。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS、パーティション = { "0" }) }、グループ ID = "kafka-consumer" 、コンテナ ファクトリ = "kafkaListenerContainerFactory" )
  2. パブリックvoid consumerReceive (ConsumerRecord<?, ?> レコード、確認応答 ack){
  3. logger.info( "トピックは{}、オフセットは{}、値は{} n" 、record.topic()、record.offset()、record.value());
  4. 試す {
  5. オブジェクト値 = record.value();
  6. logger.info(値.toString());
  7. ack.acknowledge();
  8. } キャッチ (例外 e) {
  9. logger.error( "ログコンシューマー例外: {}" , e);
  10. }
  11. }

上記のコードのロジックは比較的単純で、Kafka 内のメッセージを取得した後、それをログ ファイルに直接出力します。

解決してみる

ここでは、まず例外ログのプロンプト情報に合わせて設定を行うため、SpringBoot の application.yml ファイルに以下の設定情報を追加しました。

  1. 春:
  2. カフカ:
  3. 消費者:
  4. プロパティ:
  5. 最大ポーリング間隔(ミリ秒): 3600000
  6. 最大投票レコード数: 50
  7. セッションタイムアウト: 60000
  8. ハートビート間隔: 3000

構成が完了したら、コンシューマー ロジックを再度テストし、Rebalance 例外が引き続きスローされることを確認します。

最終解決策

Kafka コンシューマーの問題を別の観点から見てみましょう。1 つのコンシューマーがメッセージを生成し、別のコンシューマーがそのメッセージを消費します。同じグループ ID の下に配置することはできません。いずれかのグループ ID を変更するだけです。

ここでは、ビジネス プロジェクトはモジュールとサブシステムで開発されます。たとえば、モジュール A がメッセージを生成し、モジュール B がモジュール A によって生成されたメッセージを消費します。この時点で、session.timeout.ms: 60000 などの構成パラメータを変更してもまったく効果はなく、依然として Rebalance 例外がスローされます。

この時点で、コンシューマグループのgroupIdを変更しようとし、次のコードを変更しました。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS、パーティション = { "0" }) }、グループ ID = "kafka-consumer" 、コンテナ ファクトリ = "kafkaListenerContainerFactory" )
  2. パブリックvoid consumerReceive (ConsumerRecord<?, ?> レコード、確認応答 ack){

コードを次のように変更します。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS、パーティション = { "0" }) }、グループ ID = "kafka-consumer-logs" 、コンテナ ファクトリ = "kafkaListenerContainerFactory" )
  2. パブリックvoid consumerReceive (ConsumerRecord<?, ?> レコード、確認応答 ack){

もう一度テストすると、問題は解決しました~~

この記事はWeChatの公開アカウント「Glacier Technology」から転載したものです。下のQRコードからフォローできます。この記事を転載する場合は、Glacier Technology 公式アカウントまでご連絡ください。

<<:  Kubernetes をバックアップするための 5 つのベスト プラクティス

>>:  図解分散システム - 上級プログラマーへの道

推薦する

香港スペース(香港ホスト、香港バーチャルホスト)

香港スペース: 通常は香港ホスティング、特に香港仮想ホスティングを指します。主な特徴は、高速で申請が...

ハイブリッド クラウドは将来のインフラストラクチャの新しい標準となるでしょうか?

[51CTO.com クイック翻訳] クラウドコンピューティング変革の波は、企業の発展を推進し続けて...

crazydomains - たった 1 ドルで .xyz ドメインを登録

crazydomains を紹介してから長い時間が経ちました。今日この会社について言及したのは、彼ら...

山西省陽泉市が変わる!百度と提携し、中小都市のスマートシティ実証モデルを構築

12月17日、百度は山西省陽泉市人民政府と戦略協力協定を締結した。双方は協力して、百度脳、自動運転、...

伝統的な地域メディアが一斉に変革の時代を先導する

ある人々にとって、変革は非常にファッショナブルでクールな言葉ですが、ある人々にとって、変革は諦めるこ...

おすすめ: dotster - 無制限のウェブサイトホスティングが 25% オフ / 控えめな贅沢と意味合い

dotster、このアメリカの古いコンソールブランドは、すべてのコンソールの25%オフプロモーション...

ウェブサイトの最適化は検索エンジンの最適化と同じではありません

最近、ロビンは国内の検索エンジン最適化 (SEO) サービス プロバイダーの Web サイトのコピー...

根本原因の追跡: 外部リンクの不安定性の根本原因について話す

外部リンクを作成する際、外部リンクの品質と範囲だけでなく、外部リンクの安定性も追求します。もちろん、...

アンケート: Baidu のハイパーリンク アルゴリズムのアップグレードによる影響を受けていますか?

百度のウェブ検索不正対策チームが10月23日に発表した発表によると、百度の不正対策アルゴリズムは、ハ...

仮想化について語る - カーネルとIO

[[211645]]序文時間は流れる水のように、あっという間に過ぎていきます。自分が仮想環境にいるの...

Kubernetes オペレーターは何ができますか?

Kubernetes は、複雑なクラウド インフラストラクチャの作成を自動化し、クラウド展開の管理プ...

パシフィックダイレクトパーチェスマルチレベルマーケティング帝国が崩壊:680万人の被害者が38億ドルを騙し取られる

同社は全国30省の人員を巻き込み、680万人以上の会員を育成し、最大38億元の預金を集めた。江西ワン...

Moments 広告の新しい @friend 機能はどのような効果をもたらすのでしょうか?

モーメントの@Friendsの新機能が話題になりました。 WeChat広告アシスタント公式アカウント...

Tencent Cloud: 超お得なプロモーション、年間38元から、さまざまな「ハイエンド」および安価なクラウドサーバーから選択可能

Tencent Cloud はほぼ常にプロモーションを行っていますが、最近開始されたプロモーションの...

SEO実践テクニック - フレンドリーリンクの作り方

ウェブサイトにフレンドリーリンクを作成するにはどうすればいいですか?検索エンジンでのサイトのランキン...