Kafka の消費とハートビートのメカニズム

Kafka の消費とハートビートのメカニズム

1. 概要

最近、カフカの消費とハートビートのメカニズムについて相談する学生もいます。今日は、このブログを通じて、これらの内容を一つずつ紹介していきたいと思います。

2. コンテンツ

2.1 Kafka の消費

まず、消費について見てみましょう。 Kafka は非常にシンプルな消費 API を提供します。ユーザーは、Kafka ブローカー サーバーのアドレスを初期化し、KafkaConsumer クラスをインスタンス化してトピック内のデータを取得するだけです。単純な Kafka 消費サンプルコードは次のとおりです。

  1. パブリッククラス JConsumerSubscribe は Thread を拡張します {
  2. 公共 静的void main(String[] args) { JConsumerSubscribe jconsumer = new JConsumerSubscribe(); jconsumer を起動します。 } /** Kafka クラスター情報を初期化します。 */ プライベートプロパティ構成() { Properties props = new Properties(); props.put( "bootstrap.servers" , "dn1:9092,dn2:9092,dn3:9092" );// Kafka クラスターのアドレスを指定します
  3. props.put( "group.id" , "ke" ); //消費者グループを指定する
  4. props.put( "enable.auto.commit" , "true" ); // 自動送信を有効にする
  5. props.put( "auto.commit.interval.ms" , "1000" ); // 自動送信の時間間隔
  6. // メッセージの主キーをデシリアライズします。props.put( "key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
  7. // 消費レコードをデシリアライズする props.put( "value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
  8. プロパティを返します
  9. } /** シングルスレッドのコンシューマーを実装します。 */ @Override public void run() { // コンシューマーインスタンスオブジェクトを作成します。KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure()); // 消費トピックコレクションをサブスクライブします。consumer.subscribe(Arrays.asList( "test_kafka_topic" ));
  10. // リアルタイム消費フラグ boolean flag = true ;
  11. while (フラグ) {
  12. // トピックメッセージデータを取得します。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  13. ( ConsumerRecord<String, String> レコード: レコード)
  14. // メッセージ レコードの印刷をループします。System. out .printf( "オフセット = %d、キー = %s、値 = %s%n" 、record.offset()、record.key ( )、record.value());
  15. } // 例外が発生した場合は、コンシューマー オブジェクトを閉じます。consumer.close ();
  16. }}

上記のコードを使用すると、トピック内のデータを簡単に取得できます。ただし、データを取得するために poll メソッドを呼び出すと、Kafka Broker Server がそれらの処理を実行します。次に、ソースコードの実装の詳細を見てみましょう。コアコードは次のとおりです。

org.apache.kafka.clients.consumer.KafkaConsumer

  1. プライベート ConsumerRecords<K, V> ポーリング(最終ロングタイムアウトMs、最終ブール値 includeMetadataInTimeout) {
  2. 取得して確実に開く();試す {
  3. if (timeoutMs < 0) throw new IllegalArgumentException( "タイムアウトは負の値であってはなりません" );
  4. if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
  5. throw new IllegalStateException( "コンシューマーはトピックにサブスクライブされていないか、パーティションが割り当てられていません" );
  6. } //タイムアウトが経過するまで新しいデータポーリングします
  7. 長い経過時間 = 0L;
  8. する {
  9. クライアントがトリガーウェイクアップを実行する可能性があります。最終的な長いメタデータ終了;メタデータタイムアウトを含める場合
  10. 最終的な長いmetadataStart = time .milliseconds(); if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
  11. ConsumerRecords.empty()を返します
  12. } metadataEnd = time .milliseconds();経過時間 += メタデータ終了 - メタデータ開始; }それ以外{
  13. (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) の間 {
  14. log.warn( "まだメタデータを待機しています" );
  15. } metadataEnd = time .milliseconds(); } 最終的な Map<TopicPartition、List<ConsumerRecord<K、V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs、elapsedTime));レコードが空の場合
  16. // 取得したレコードを返す前に、一連取得を送信できます
  17. //ユーザーが応答待つのをブロックしないようにしてパイプラインを有効にします 
  18. //取得したレコードを処理します
  19. //
  20. // 注意: 消費された位置はすでに更新れているため、
  21. // ウェイクアップまたは 事前に発生するその他エラー 取得したレコードを返します
  22. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
  23. クライアントのポーリングNoWakeup(); } this.interceptors.onConsume(new ConsumerRecords<>(records))を返します
  24. } 最終的な長い fetchEnd = time .milliseconds();経過時間 += フェッチ終了 - メタデータ終了; } 経過時間 < タイムアウト時間の場合;
  25. ConsumerRecords.empty()を返します
  26. ついに
  27. リリース(); } }

上記のコードには pollForFetches メソッドがあり、その実装ロジックは次のとおりです。

  1. プライベート Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
  2. 最終的な長いstartMs = time .milliseconds();
  3. long pollTimeout = Math.(コーディネーター.timeToNextPoll(startMs), timeoutMs);
  4. // すでにデータが利用可能場合はすぐに返します
  5. 最終的な Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
  6. レコードが空の場合
  7. レコードを返します
  8. }
  9. // 新しいフェッチを送信します(保留中のフェッチは再送信しません)
  10. フェッチャー.sendFetches();
  11. //ポジションが不足している場合、ポーリングブロックされたままになるのは望ましくありません
  12. // オフセット検索が遅れている可能性があるため 失敗の後
  13. // 注意: cachedSubscriptionHashAllFetchPositions使用する場合は、
  14. // このメソッドの前に updateAssignmentMetadataIfNeeded を実行します。
  15. if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
  16. ポーリングタイムアウト = retryBackoffMs;
  17. }
  18. client.poll(pollTimeout, startMs, () -> {
  19. //フェッチはバックグラウンドスレッドによって完了する可能性があるため、このポーリング条件が必要です
  20. // poll()不必要にブロックしないようにするため
  21. !fetcher.hasCompletedFetches()を返します
  22. });
  23. //長いポーリングの後グループの再バランス調整必要かどうかを確認する必要があります
  24. // データを返してグループより早く安定できるようにする
  25. コーディネーターの再参加が必要または保留中の場合(){
  26. Collections.emptyMap()を返します
  27. }
  28. fetcher.fetchedRecords()を返します
  29. }

上記のコードの太字部分から、コンシューマー クライアントがデータを取得するたびに、最初に poll メソッドを通じてフェッチャーの fetchedRecords 関数を呼び出すことがわかります。データが取得されない場合、新しい sendFetches 要求が開始されます。データを消費する場合、Kafka ブローカー サーバーから取得されるデータのバッチごとに最大データ量の制限があります。デフォルトは 500 レコードで、プロパティ (max.poll.records) によって制御されます。クライアントでこのプロパティ値を設定すると、消費するたびに取得されるデータの量を調整できます。

ヒント:ここで注意すべき点は、max.poll.records はポーリング要求の合計データを返すものであり、パーティションの数とは関係がないということです。したがって、各消費に対してすべてのパーティションから取得されるトピック データの合計数は、max.poll.records で設定された値を超えることはありません。

Fetcher クラスでは、sendFetches メソッドで取得されるデータの容量に制限があり、これはプロパティ (max.partition.fetch.bytes) によって設定され、デフォルトは 1 MB です。 max.partition.fetch.bytes 制限に達したときに、10,000 件のレコード (デフォルトでは 1 回につき 500 件) をフェッチする必要がある場合、今回はネットワーク経由で開始されたすべての要求のフェッチを完了するために 20 回実行する必要があるというシナリオが考えられます。

ここで、質問がある学生もいるかもしれません。デフォルトの max.poll.records プロパティ値を 10000 に調整することはできないでしょうか?はい、調整できますが、一緒に使用する必要がある別のプロパティがあります。これは各ポーリングのタイムアウトです(Duration.ofMillis(100))。ここでは、各データの実際の容量に基づいてタイムアウト設定を決定する必要があります。最大値を 10000 に調整すると、各レコードの容量が大きい場合、タイムアウトは依然として 100 ミリ秒のままとなり、取得されるデータは 10,000 未満になる可能性があります。

ここで注意が必要なもう 1 つの点は、セッション タイムアウトの問題です。 session.timeout.ms のデフォルト値は 10 秒、group.min.session.timeout.ms のデフォルト値は 6 秒、group.max.session.timeout.ms のデフォルト値は 30 分です。消費のビジネス ロジックを処理しているときに、10 秒以内に処理されない場合、コンシューマー クライアントは Kafka ブローカー サーバーから切断され、消費されたデータと生成されたオフセットは Kafka に送信されません。これは、Kafka ブローカー サーバーがコンシューマー プログラムが切断されたと認識するためです。 auto-commit プロパティまたは auto.offset.reset プロパティを設定した場合でも、消費時に重複した消費が発生します。これは、session.timeout.ms タイムアウトが発生するためです。

2.2 ハートビートのメカニズム

上記の最後に、セッション タイムアウトによってメッセージの消費が繰り返されるという説明がありました。タイムアウトが発生したのはなぜですか?学生の中には、このような疑問を持つ人もいるかもしれません。私のコンシューマー スレッドは明らかに開始されており、終了していません。 Kafka のメッセージを消費できないのはなぜですか?コンシューマー グループが ConsumerGroupID を見つけることができません。これはタイムアウトが原因である可能性があります。 Kafka はハートビート メカニズムを通じてタイムアウトを制御します。ハートビート メカニズムは、コンシューマー クライアントには認識されません。非同期スレッドです。コンシューマー インスタンスを開始すると、ハートビート スレッドが動作を開始します。

定期的にハートビートを送信し、コンシューマーのステータスを検出するために、org.apache.kafka.clients.consumer.internals.AbstractCoordinator で HeartbeatThread スレッドが開始されます。各コンシューマーには org.apache.kafka.clients.consumer.internals.ConsumerCoordinator があり、各 ConsumerCoordinator はハートビートを維持するために HeartbeatThread スレッドを開始します。ハートビート情報は、org.apache.kafka.clients.consumer.internals.Heartbeat に保存されます。宣言されたスキーマは次のとおりです。

  1. プライベート最終intセッションタイムアウトMs;
  2. プライベート最終intハートビート間隔Ms;
  3. プライベート最終int maxPollIntervalMs;
  4. プライベート最終長い retryBackoffMs;
  5. プライベート揮発性の長い最後のハートビート送信;
  6. プライベートロングラストハートビート受信;
  7. プライベート長いlastSessionReset;
  8. プライベートな長い最後の投票;
  9. プライベートブールハートビート失敗;

ハートビート スレッドの run メソッドの実装コードは次のとおりです。

  1. パブリックボイド実行(){
  2. 試す {
  3. log.debug( "ハートビートスレッドが開始されました" );
  4. )の間{
  5. 同期された (AbstractCoordinator.this) {
  6. (閉じている)
  7. 戻る;
  8. 有効になっている場合
  9. 抽象コーディネータ。this.wait();
  10. 続く;
  11. } 状態が MemberState.STABLE の場合 {
  12. //グループ  安定していない(おそらくグループを離れたため  またはコーディネーターが
  13. //追い出されました) ので、ハートビートを無効にしてメインスレッドが再参加する待ちます
  14. 無効にする();
  15. 続く;
  16. }
  17. クライアントのポーリングNoWakeup();
  18. long now = time .milliseconds();
  19. コーディネーターが不明の場合(){
  20. (findCoordinatorFuture != null || lookupCoordinator().failed()) の場合
  21. //直近未来チェックは、  
  22. // ブローカーは利用可能 接続する 
  23. AbstractCoordinator.this.wait(retryBackoffMs);
  24. }そうでない場合 (heartbeat.sessionTimeoutExpired(now)) {
  25. // ハートビートが成功しないままセッションタイムアウトが経過したので、
  26. // おそらくコーディネータがまだ正常であることを確認します
  27. コーディネーター不明をマークします();
  28. }そうでない場合 (heartbeat.pollTimeoutExpired(now)) {
  29. // ポーリングタイムアウトが期限切れになったため、フォアグラウンドスレッドが停止したことを意味します
  30. //  poll()呼び出しの間にグループを明示的に離脱します
  31. グループを離れるかもしれない();
  32. }そうでない場合 (!heartbeat.shouldHeartbeat(now)) {
  33. // 再試行バックオフ待っから再度ポーリングする 心拍が停止し場合
  34. // コーディネータが切断されました
  35. AbstractCoordinator.this.wait(retryBackoffMs);
  36. }それ以外{
  37. heartbeat.sentHeartbeat(現在)
  38. sendHeartbeatRequest().addListener(新しいRequestFutureListener<Void>() {
  39. @オーバーライド
  40. パブリックvoid onSuccess(Void値) {
  41. 同期された (AbstractCoordinator.this) {
  42. heartbeat.receiveHeartbeat(時間.milliseconds());
  43. }
  44. }
  45. @オーバーライド
  46. パブリックvoid onFailure(RuntimeException e) {
  47. 同期された (AbstractCoordinator.this) {
  48. if (e インスタンス RebalanceInProgressException) {
  49. //有効です グループが鼓動を続ける バランス調整中です
  50. // コーディネータがメンバーをグループ保持することを保証する のために 限り
  51. //再バランスのタイムアウト期間として心拍の送信を止めれば、
  52. // ただし、参加する前にセッション タイムアウトが期限切れになる可能性があります。
  53. heartbeat.receiveHeartbeat(時間.milliseconds());
  54. }それ以外{
  55. ハートビート.failHeartbeat();
  56. //スレッドがスリープ状態の場合はスレッドを起動してハートビートを再スケジュールします
  57. 抽象コーディネータ。this.notify();
  58. }
  59. }
  60. }
  61. });
  62. }
  63. }
  64. }
  65. } キャッチ (認証例外 e) {
  66. log.error( "ハートビートスレッドで認証エラーが発生しました" , e);
  67. this.failed.set (e);
  68. } キャッチ (GroupAuthorizationException e) {
  69. log.error( "ハートビートスレッドでグループ認証エラーが発生しました" , e);
  70. this.failed.set (e);
  71. } キャッチ (InterruptedException | InterruptException e) {
  72. スレッドが中断されました();
  73. log.error( "ハートビートスレッドで予期しない割り込みを受信しました" , e);
  74. this.failed.set (新しい RuntimeException(e));
  75. } キャッチ (Throwable e) {
  76. log.error( "予期しないエラーのためハートビートスレッドが失敗しました" , e);
  77. if (e インスタンスの RuntimeException)
  78. this.failed.set ( (RuntimeException) e);
  79. それ以外 
  80. this.failed.set (新しい RuntimeException(e));
  81. ついに
  82. log.debug( "ハートビートスレッドが閉じられました" );
  83. }
  84. }
  85. コードを表示

ハートビート スレッドには、sessionTimeoutExpired と pollTimeoutExpired という 2 つの最も重要なタイムアウト関数があります。

  1. パブリックブール値 sessionTimeoutExpired(long now) {
  2. 今すぐ戻る- 数学。最大(lastSessionReset、lastHeartbeatReceive) > sessionTimeoutMs;
  3. } public boolean pollTimeoutExpired(long now) {
  4. 戻ります- lastPoll > maxPollIntervalMs;
  5. }

2.2.1 セッションタイムアウト期限切れ

sessionTimeout がタイムアウトすると、現在のコーディネーターが切断を処理しているとマークされます。このとき、コンシューマーは削除され、パーティションとコンシューマー間の対応が再割り当てされます。 Kafka ブローカー サーバーでは、コンシューマー グループは 5 つの状態 (Unknown が含まれる場合は 6 つ) を定義します (org.apache.kafka.common.ConsumerGroupState、次の図を参照)。

2.2.2 ポーリングタイムアウト期限切れ

ポーリング タイムアウトがトリガーされると、コンシューマー クライアントは ConsumerGroup を終了します。再度ポーリングすると、ConsumerGroup に再参加し、RebalanceGroup がトリガーされます。 KafkaConsumer クライアントは繰り返しポーリングするのに役立たないため、実装された消費ロジックでポーリング メソッドを継続的に呼び出す必要があります。

3. パーティションとコンシューマスレッド

コンシューマ パーティションとコンシューマ スレッドの対応に関しては、理論的にはコンシューマ スレッドの数はパーティションの数以下である必要があります。以前は、1 つのコンシューマ スレッドが 1 つのパーティションに対応し、コンシューマ スレッドの数がパーティションの数と等しい場合に、スレッドの使用率が最大化されるという見解がありました。 KafkaConsumer Client インスタンスを直接使用しても問題はありません。ただし、十分な CPU があれば、実際にはパーティション数よりも多くのスレッドを使用して消費容量を向上させることができます。これには、KafkaConsumer クライアントインスタンスを変更し、消費戦略の事前計算を実装し、追加の CPU を使用してより多くのスレッドを開始し、消費タスクのシャーディングを実装する必要があります。具体的な実装については次のブログで取り上げます。そこでは、「Kafka に基づく分散クエリ SQL エンジン」を紹介します。

4. 結論

このブログはここであなたと共有されます。研究や学習の過程で質問がある場合は、グループに参加して話し合うか、私にメールを送ってください。できる限りお答えいたします。お互いに励まし合いましょう!

<<:  資格とリモートワークがクラ​​ウドコンピューティングの仕事の給与に与える影響

>>:  COVID-19により銀行はクラウドコンピューティングの導入を迫られているが、まずは戦略を立てる必要がある

推薦する

Google 中国語ウェブマスター ブログ: リンクに関するよくある質問

ウェブマスター ヘルプ フォーラムでは、ウェブマスターから最もよく寄せられる質問はリンクに関するもの...

リースウェブ - 年末50%オフ/サーバー/VPS/CDN/仮想ホスト/複数のコンピュータルーム

1997年に設立された老舗ブランド、leasewebが年末にプロモーションを実施。サーバー、VPS(...

ウェブサイトの最適化コンテンツとフレームワークは無視できない

インターネットの発展に伴い、多くの企業がより多くの販売チャネルと開発スペースを獲得するためにウェブサ...

Kubernetes アプリケーション アクセス管理の理解

追加ボックス ボーダーボックス rgba(0, 0, 0, 0);">種類: サービ...

百度が電子商取引の環境を再構築:愛楽火が5000万ドル以上を調達

「愛楽火」に初めてログインしたユーザーにとって、「美麗速」と「点評」の両方に似たウェブサイトから、こ...

#Cyber​​Monday# liteserver: 月額 2.5 ユーロ、KVM/1G メモリ/512G ハードディスク/6T トラフィック/オランダ データセンター

liteserver (2007~) オランダデータセンターのVPSの3大カテゴリはブラックフライデ...

推奨: VPSNet - $15/onapp/512m メモリ/10g ハードディスク/3T トラフィック/複数のコンピュータ ルーム

vps.net が待望の割引プロモーションを開始しました。このようなハイエンド VPS クラウドで値...

cloudcone: 米国 VPS プロモーション、年間 21 ドルから、1G メモリ/2 コア/30g SSD/1T トラフィック/1Gbps 帯域幅

Cloudconeは、クラウドサーバーとVPSの2つの事業に分かれており、現在はVPS(非クラウドサ...

SEO Baiduウェブマスタープラットフォームとの直接対話が再開

改良された Baidu Webmaster プラットフォームがリリースされました。インターフェースの...

電子商取引事業は機会を捉えるためにさらに深く掘り下げる必要がある

電子商取引は飛躍的に発展し、人々はインターネットでの買い物や取引にますます慣れてきています。その結果...

#ウェブサイトの推奨事項# liquidweb: cPanel または Plesk 認証による完全管理の新しい VPS シリーズ

世界的に有名な完全管理型ホスティング プロバイダー Liquidweb は本日、引き続き完全管理型で...

Baidu SearchがAuroraアルゴリズムを導入:オリジナルで高品質なコンテンツの促進を目指す

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています6月7日、...

ブログは死んではいませんが、Weibo はまだ存在しています。両者のマーケティングの焦点を詳しく見てみましょう。

ブログが誕生した当初は、自分のブログを持つというのはかなり目新しいことでした。特にブログ業界が最も栄...

gigsgigscloud: 月額 7.3 ドルから、日本 cn2 gia VPS、200M 帯域幅、月額 7.3 ドルから、3 つのネットワークに直接接続

gigsgigscloud は日本に新しいデータセンターを追加しました。このデータセンターの VPS...