Kafka にまた問題が発生しました!

Kafka にまた問題が発生しました!

[[384383]]

著者は、正確にスケジュールされたタスクと遅延キュー処理機能を備えた、高同時実行シナリオ向けのシンプルで安定したスケーラブルな遅延メッセージ キュー フレームワークを個人的に開発しました。半年以上前にオープンソース化されて以来、10 社を超える中小企業に正確でタイムリーなスケジューリング ソリューションを提供することに成功し、実稼働環境でのテストにも耐えてきました。より多くの人々の利益のために、オープンソース フレームワークのアドレスが提供されるようになりました: https://github.com/sunshinelyz/mykit-delay

序文

運営・保守が新年を迎える前にサーバーを拝んでいなかったものと推定されます。 Nginx の問題は修正されましたが、Kafka は再び動作しません。今日はもう少し寝たかったのですが、また電話が鳴りました。それはまだ操作でした、「こんにちは、Binghe、会社に到着しましたか?すぐにサーバーを確認してください、再び問題が発生しています。」 「今向かっています。運用保守担当者はまだ出勤していないのですか?」 「まだ休暇中…」、私:「…」おい、こいつは逃げたのか?今のところ彼を無視しましょう。問題はまだ解決する必要があります。

問題の再現

会社に到着後、私は専用のバックパックを置き、武器であるノートパソコンを取り出し、電源を入れてすぐに監視システムにログインし、メインの業務システムに問題がないことを確認しました。コア以外のサービスがアラームを発行し、監視システムではこのサービスが次の例外を頻繁にスローしていることが示されました。

  1. 2021-02-28 22:03:05 131 プール-7-スレッド-3 エラー [] -
  2. コミットに失敗しました
  3. org.apache.kafka.clients.consumer.CommitFailedException:グループがすでに再バランス調整されパーティションが別のメンバー割り当てられているため、コミットを完了できません。これは、時間 後続のpoll()呼び出し間隔が、設定された最大値.poll.interval.ms よりも長くなっていました。これは通常、ポーリング ループがメッセージ処理に時間がかかりすぎることを意味ます。これを解決するにはセッションタイムアウトを増やすか、  最大サイズを縮小することで  poll()返されるバッチ 最大ポーリングレコード数。
  4. org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest (ConsumerCoordinator.java:713) ~[MsgAgent-jar- with -dependencies.jar:na]
  5. org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596)~[MsgAgent-jar- with -dependencies.jar:na]
  6. org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218)~[MsgAgent-jar- with -dependencies.jar:na]
  7. com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121)~[MsgAgent-jar- with -dependencies.jar:na]
  8. java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
  9. java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
  10. java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

上記の例外情報出力から、システムの問題を大まかに判断できます。ポーリング メッセージのバッチを処理した後、Kafka コンシューマーはブローカーにオフセットを同期的に送信するときにエラーを報告しました。これはおそらく、Kafka がコンシューマーがクラッシュしたと判断し、現在のコンシューマー スレッドのパーティションがブローカーによって再利用されたためであり、これは以下の出力情報から確認できます。

  1. グループはすでに再バランス調整されパーティションが別のメンバー割り当てられているため、コミットを完了できません。これは、時間 後続のpoll()呼び出し間隔が、設定された最大値.poll.interval.ms よりも長くなっていました。これは通常、ポーリング ループがメッセージ処理に時間がかかりすぎることを意味ます。これを解決するにはセッションタイムアウトを増やすか、  最大サイズを縮小することで  poll()返されるバッチ 最大ポーリングレコード数。

Kafka 内で再バランス メカニズムが内部的にトリガーされ、問題が特定されます。次に、問題の分析を始めます。

問題を分析する

Kafka は Rebalance メカニズムをトリガーするので、Kafka が Rebalance をトリガーするタイミングについて説明します。

リバランスとは

より具体的な例を挙げると、グループの下に 10 個の Consumer インスタンスがあり、このグループは 50 個のパーティションを持つトピックをサブスクライブします。通常、Kafka は各コンシューマーに 5 つのパーティションを割り当てます。この分配プロセスはリバランスと呼ばれます。

リバランスをトリガーするタイミング

Kafka で次の条件が満たされると、再バランスがトリガーされます。

  • グループ内のメンバー数が変更されました。たとえば、新しい消費者が消費者グループに参加したり、消費者グループから脱退したりしました。コンシューマー グループを離れるグループ メンバーには、クラッシュしたグループ メンバーや積極的にコンシューマー グループを離れるグループ メンバーが含まれます。
  • 購読されているトピックの数が変更されました。
  • サブスクライブされたトピックのパーティション数が変更されました。

後者の 2 つの状況は人為的に回避できます。実際の作業では、Kafka リバランスの最も一般的な理由は、コンシューマー グループ メンバーの変更です。

消費者メンバーの通常の追加と削除は、避けられないリバランスにつながります。ただし、場合によっては、コンシューマー インスタンスがコーディネーターによって誤って「停止」されたと判断され、グループから「追い出され」、再バランスが発生する可能性があります。

コンシューマー グループが再バランスを完了すると、各コンシューマー インスタンスは、まだ稼働していることを示すハートビート要求をコーディネーターに定期的に送信します。コンシューマー インスタンスがこれらのハートビート要求をタイムリーに送信できない場合、コーディネーターはコンシューマーを「デッド」と見なし、グループから削除して、新しいラウンドの再バランスを開始します。この時間は、コンシューマー側のパラメータ session.timeout.ms を通じて設定できます。デフォルト値は 10 秒です。

このパラメータに加えて、Consumer はハートビート要求の送信頻度を制御するパラメータ (heartbeat.interval.ms) も提供します。この値が小さいほど、コンシューマー インスタンスがハートビート要求を送信する頻度が高くなります。ハートビート要求を頻繁に送信すると追加の帯域幅リソースが消費されますが、コーディネーターが各コンシューマー インスタンスにリバランスを有効にするように通知する現在の方法は、ハートビート要求の応答本文に REBALANCE_NEEDED フラグをカプセル化することであるため、リバランスが現在有効になっているかどうかをより迅速に知ることができるという利点があります。

上記の 2 つのパラメータに加えて、コンシューマー側には、コンシューマーの実際の消費容量がリバランスに与える影響を制御するために使用されるパラメータ、つまり max.poll.interval.ms パラメータもあります。これは、コンシューマー アプリケーションによるポーリング メソッドへの 2 回の呼び出し間の最大時間間隔を制限します。デフォルト値は 5 分です。つまり、コンシューマー プログラムが 5 分以内にポーリング メソッドによって返されたすべてのメッセージを消費できない場合、コンシューマーは「グループを離れる」要求を積極的に開始し、コーディネーターは新しいラウンドの再バランスを開始します。

上記の分析により、どのリバランスを回避できるかがわかります。

最初のタイプの不要なリバランスは、ハートビートを時間内に送信できないことが原因で発生し、その結果、コンシューマーがグループから「追い出され」ます。この場合、再バランスをできるだけ回避するために、session.timeout.ms と heartbeat.interval.ms の値を設定できます。 (以下の構成はインターネットで見つかったベストプラクティスであり、まだテストされていません)

  • session.timeout.ms = 6s に設定します。
  • heartbeat.interval.ms = 2s に設定します。
  • Consumer インスタンスが「デッド」と判断される前に少なくとも 3 ラウンドのハートビート要求を送信できるようにします。つまり、session.timeout.ms >= 3 * heartbeat.interval.ms です。

session.timeout.ms を 6 秒に設定する主な目的は、コーディネーターが電話を切ったコンシューマーをより迅速に見つけ、できるだけ早くグループから追い出せるようにすることです。

2 番目のタイプの不必要な再バランスは、消費者が長期間支出を続けることで発生します。このとき、max.poll.interval.ms パラメータ値の設定が特に重要になります。予期しない再バランスを回避するには、このパラメータ値を、ダウンストリームの最大処理時間よりもわずかに長い大きな値に設定することをお勧めします。

つまり、ビジネス処理ロジックに十分な時間を残しておくということです。この方法では、これらのメッセージの処理に時間がかかりすぎるため、コンシューマーはリバランスをトリガーしません。

オフセットのプルとコミット

Kafka のオフセットはコンシューマーによって管理されます。オフセットには、プル オフセット (位置) とコミット オフセット (コミット済み) の 2 種類があります。プル オフセットは、現在のコンシューマー パーティションの消費の進行状況を表します。各メッセージが消費された後、オフセットを送信する必要があります。オフセットをコミットするとき、Kafka はプルされたオフセットの値をパーティションのコミットされたオフセットとして使用し、それをコーディネータに送信します。

オフセットが送信されない場合、コンシューマーが次にブローカーに再接続したときに、現在のコンシューマー グループによってブローカーに送信されたオフセットから消費を開始します。

つまり、問題はここにあります。メッセージの処理時間が長すぎると、ブローカーによってメッセージが削除され、オフセットを送信するとエラーが発生します。したがって、プルされたオフセットはブローカーに送信されず、パーティションは再バランスされます。次にパーティションが再割り当てされると、コンシューマーは最新のコミットされたオフセットから消費を開始します。ここで重複消費の問題が発生します。

異常なログプロンプトの解決方法

実際、ここまで述べたように、対応する解決策は、Kafka コンシューマーによって出力される例外ログにも示されています。

次に、Kafka のプル オフセットとコミット オフセットについて説明します。

実際、出力されたログ情報からも、問題の大まかな解決策がわかります。簡単に言えば、max.poll.interval.ms と session.timeout.ms の期間を増やし、max.poll.records の構成値を減らし、コンシューマーはメッセージを処理した後に時間内にオフセットを送信する必要があります。

問題解決

これまでの分析を通じて、この問題をどのように解決するかがわかるはずです。ここで言及する必要があるのは、Kafka を統合したときに、SpringBoot と Kafka コンシューマー リスナーを使用したことです。コンシューマー側の主なコード構造は次のとおりです。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS、パーティション = { "0" }) }、グループ ID = "kafka-consumer" 、コンテナ ファクトリ = "kafkaListenerContainerFactory" )
  2. パブリックvoid consumerReceive (ConsumerRecord<?, ?> レコード、確認応答 ack){
  3. logger.info( "トピックは{}、オフセットは{}、値は{} n" 、record.topic()、record.offset()、record.value());
  4. 試す {
  5. オブジェクト値 = record.value();
  6. logger.info(値.toString());
  7. ack.acknowledge();
  8. } キャッチ (例外 e) {
  9. logger.error( "ログコンシューマー例外: {}" , e);
  10. }
  11. }

上記のコードのロジックは比較的単純で、Kafka 内のメッセージを取得した後、それをログ ファイルに直接出力します。

解決してみる

ここでは、まず例外ログのプロンプト情報に合わせて設定を行うため、SpringBoot の application.yml ファイルに以下の設定情報を追加しました。

  1. 春:
  2. カフカ:
  3. 消費者:
  4. プロパティ:
  5. 最大ポーリング間隔(ミリ秒): 3600000
  6. 最大投票レコード数: 50
  7. セッションタイムアウト: 60000
  8. ハートビート間隔: 3000

構成が完了したら、コンシューマー ロジックを再度テストし、Rebalance 例外が引き続きスローされることを確認します。

最終解決策

Kafka コンシューマーの問題を別の観点から見てみましょう。1 つのコンシューマーがメッセージを生成し、別のコンシューマーがそのメッセージを消費します。同じグループ ID の下に配置することはできません。いずれかのグループ ID を変更するだけです。

ここでは、ビジネス プロジェクトはモジュールとサブシステムで開発されます。たとえば、モジュール A がメッセージを生成し、モジュール B がモジュール A によって生成されたメッセージを消費します。この時点で、session.timeout.ms: 60000 などの構成パラメータを変更してもまったく効果はなく、依然として Rebalance 例外がスローされます。

この時点で、コンシューマグループのgroupIdを変更しようとし、次のコードを変更しました。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS、パーティション = { "0" }) }、グループ ID = "kafka-consumer" 、コンテナ ファクトリ = "kafkaListenerContainerFactory" )
  2. パブリックvoid consumerReceive (ConsumerRecord<?, ?> レコード、確認応答 ack){

コードを次のように変更します。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS、パーティション = { "0" }) }、グループ ID = "kafka-consumer-logs" 、コンテナ ファクトリ = "kafkaListenerContainerFactory" )
  2. パブリックvoid consumerReceive (ConsumerRecord<?, ?> レコード、確認応答 ack){

もう一度テストすると、問題は解決しました~~

この記事はWeChatの公開アカウント「Glacier Technology」から転載したものです。下のQRコードからフォローできます。この記事を転載する場合は、Glacier Technology 公式アカウントまでご連絡ください。

<<:  Kubernetes をバックアップするための 5 つのベスト プラクティス

>>:  図解分散システム - 上級プログラマーへの道

推薦する

Huawei Cloudがダークホースとして登場した。 2020 年にクラウド コンピューティングでは具体的に何が起こったのでしょうか?

2020年に「新インフラ」の構築が最高潮に達し始めたとき、クラウドコンピューティング業界が最も恩恵を...

マイクロソフトが正式に Skype 中国を買収、MSN 中国は閉鎖の可能性も

北京ニュース(記者 林奇玲)昨日午前、中国におけるSkypeの元々の運営パートナーであるTom.co...

長い動画では解決できない

長い動画がまた騒動を巻き起こしている。最近、マンゴースーパーメディアは、アリババベンチャーキャピタル...

フォロワー5万人のうち、商品を購入した人は50人だけ。WeChat電子商取引の失敗の4つの罪が明らかに

ネット上では、インターネット思考でWeChatで物を売って月に数万ドルを稼ぐという成功談がどんどん増...

外部リンク環境は楽観的ではないため、ウェブサイトの最適化は内部から始める必要があります

4月25日、Baiduの外部リンク判定に関する議論では、スパム外部リンクの分類と影響が明確に示されま...

SEOウェブサイトの戦略を変更し、ソフトカルチャーの強さを向上させる

近年、SEO ウェブサイトは急速に発展しており、ウェブマスターも SEO ウェブサイトに多大な労力を...

サーバーホストはどうですか? 「マイアミ」データセンターのVPSの簡単なレビュー

サーバーホストはどうですか? Serverhost Miami の VPS はいかがでしょうか? S...

クラウド移行のベストプラクティス

クラウド移行、つまりデータとアプリケーションをオンサイトの IT インフラストラクチャからクラウド ...

分散メッセージングシステムの設計ポイント

分散キャッシュに関しては、Redis がリードしています。しかし、メッセージ キュー MQ に関して...

オンラインでお金を稼ぐ際によくある4つの間違いを避ける

私はオンラインで稼ぐ仕事に2年間携わっています。仕事と比べると、自由度が増しています。自分の時間や物...

反省: 商標登録ウェブサイト最適化の 5 つの罪

人として、自分の欠点を発見し、間違いを正せるよう、自分自身を振り返ることを学ばなければなりません。 ...

Google検索ランキングアルゴリズムの更新を追跡および予測するための無料プラットフォームがいくつかある

5 月に Google のアルゴリズムにアップデートはありますか? Panda 4.0 と Payd...

Hostdare-$0.75/512MB RAM/10GB SSD/600GB 帯域幅/ロサンゼルス/quadranet

安いものをいじるのが好きな人には、もう 1 つ紹介したいものがあります。hostdare です。彼ら...