Kafka の消費とハートビート

Kafka の消費とハートビート

はじめに Kafka は、分散型、パーティション化、マルチコピー、マルチサブスクライバーのメッセージ発行およびサブスクリプション システム (分散 MQ システム) であり、ログの検索、ログの監視、ログへのアクセスなどに使用できます。 Kafka は、分散型、パーティション化、マルチレプリカ、マルチサブスクライバーのメッセージ発行およびサブスクリプション システム (分散 MQ システム) であり、ログの検索、ログの監視、ログへのアクセスなどに使用できます。 今日は、Kafka の消費とハートビートのメカニズムについて学習します。

1. Kafka の消費

まず、消費について見てみましょう。 Kafka は非常にシンプルな消費 API を提供します。ユーザーは、Kafka ブローカー サーバーのアドレスを初期化し、KafkaConsumer クラスをインスタンス化してトピック内のデータを取得するだけです。単純な Kafka 消費サンプルコードは次のとおりです。

  1. パブリッククラス JConsumerSubscribe は Thread を拡張します {
  2. 公共 静的void main(String[] args) { JConsumerSubscribe jconsumer = new JConsumerSubscribe(); jconsumer を起動します。 } /** Kafka クラスター情報を初期化します。 */ プライベートプロパティ構成() { Properties props = new Properties(); props.put( "bootstrap.servers" , "dn1:9092,dn2:9092,dn3:9092" );// Kafka クラスターのアドレスを指定します
  3. props.put( "group.id" , "ke" ); //消費者グループを指定する
  4. props.put( "enable.auto.commit" , "true" ); // 自動送信を有効にする
  5. props.put( "auto.commit.interval.ms" , "1000" ); // 自動送信の時間間隔
  6. // メッセージの主キーをデシリアライズします。props.put( "key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
  7. // 消費レコードをデシリアライズする props.put( "value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
  8. プロパティを返します
  9. } /** シングルスレッドのコンシューマーを実装します。 */ @Override public void run() { // コンシューマーインスタンスオブジェクトを作成します。KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure()); // 消費トピックコレクションをサブスクライブします。consumer.subscribe(Arrays.asList( "test_kafka_topic" ));
  10. // リアルタイム消費フラグ boolean flag = true ;
  11. while (フラグ) {
  12. // トピックメッセージデータを取得します。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  13. ( ConsumerRecord<String, String> レコード: レコード)
  14. // メッセージ レコードの印刷をループします。System. out .printf( "オフセット = %d、キー = %s、値 = %s%n" 、record.offset()、record.key ( )、record.value());
  15. } // 例外が発生した場合は、コンシューマー オブジェクトを閉じます。consumer.close ();
  16. }}

上記のコードを使用すると、トピック内のデータを簡単に取得できます。ただし、データを取得するために poll メソッドを呼び出すと、Kafka Broker Server がそれらの処理を実行します。次に、ソースコードの実装の詳細を見てみましょう。コアコードは次のとおりです。

org.apache.kafka.clients.consumer.KafkaConsumer

  1. プライベート ConsumerRecords<K, V> ポーリング(最終ロングタイムアウトMs、最終ブール値 includeMetadataInTimeout) {
  2. 取得して確実に開く();試す {
  3. if (timeoutMs < 0) throw new IllegalArgumentException( "タイムアウトは負の値であってはなりません" );
  4. if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
  5. throw new IllegalStateException( "コンシューマーはトピックにサブスクライブされていないか、パーティションが割り当てられていません" );
  6. } //タイムアウトが経過するまで新しいデータポーリングします
  7. 長い経過時間 = 0L;
  8. する {
  9. クライアントがトリガーウェイクアップを実行する可能性があります。最終的な長いメタデータ終了;メタデータタイムアウトを含める場合
  10. 最終的な長いmetadataStart = time .milliseconds(); if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
  11. ConsumerRecords.empty()を返します
  12. } metadataEnd = time .milliseconds();経過時間 += メタデータ終了 - メタデータ開始; }それ以外{
  13. (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) の間 {
  14. log.warn( "まだメタデータを待機しています" );
  15. } metadataEnd = time .milliseconds(); } 最終的な Map<TopicPartition、List<ConsumerRecord<K、V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs、elapsedTime));レコードが空の場合
  16. // 取得したレコードを返す前に、一連取得を送信できます
  17. //ユーザーが応答待つのをブロックしないようにしてパイプラインを有効にします  
  18. //取得したレコードを処理します
  19. //
  20. // 注意: 消費された位置はすでに更新れているため、
  21. // ウェイクアップまたは 事前に発生するその他エラー 取得したレコードを返します
  22. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
  23. クライアントのポーリングNoWakeup(); } this.interceptors.onConsume(new ConsumerRecords<>(records))を返します
  24. } 最終的な長い fetchEnd = time .milliseconds();経過時間 += フェッチ終了 - メタデータ終了; } 経過時間 < タイムアウト時間の場合;
  25. ConsumerRecords.empty()を返します
  26. ついに
  27. リリース(); } }

上記のコードには pollForFetches メソッドがあり、その実装ロジックは次のとおりです。

  1. プライベート Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
  2. 最終的な長いstartMs = time .milliseconds();
  3. long pollTimeout = Math.(コーディネーター.timeToNextPoll(startMs), timeoutMs);
  4. // すでにデータが利用可能場合はすぐに返します
  5. 最終的な Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
  6. レコードが空の場合
  7. レコードを返します
  8. }
  9. // 新しいフェッチを送信します(保留中のフェッチは再送信しません)
  10. フェッチャー.sendFetches();
  11. //ポジションが不足している場合、ポーリングブロックされたままになるのは望ましくありません
  12. // オフセット検索が遅れている可能性があるため 失敗の後
  13. // 注意: cachedSubscriptionHashAllFetchPositions使用する場合は、
  14. // このメソッドの前に updateAssignmentMetadataIfNeeded を実行します。
  15. if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
  16. ポーリングタイムアウト = retryBackoffMs;
  17. }
  18. client.poll(pollTimeout, startMs, () -> {
  19. //フェッチはバックグラウンドスレッドによって完了する可能性があるため、このポーリング条件が必要です
  20. // poll()不必要にブロックしないようにするため
  21. !fetcher.hasCompletedFetches()を返します
  22. });
  23. //長いポーリングの後グループの再バランス調整必要かどうかを確認する必要があります
  24. // データを返してグループより早く安定できるようにする
  25. コーディネーターの再参加が必要または保留中の場合(){
  26. Collections.emptyMap()を返します
  27. }
  28. fetcher.fetchedRecords()を返します
  29. }

上記のコードの太字部分から、コンシューマー クライアントがデータを取得するたびに、最初に poll メソッドを通じてフェッチャーの fetchedRecords 関数を呼び出すことがわかります。データが取得されない場合、新しい sendFetches 要求が開始されます。データを消費する場合、Kafka ブローカー サーバーから取得されるデータのバッチごとに最大データ量の制限があります。デフォルトは 500 レコードで、プロパティ (max.poll.records) によって制御されます。クライアントでこのプロパティ値を設定すると、消費するたびに取得されるデータの量を調整できます。

ヒント: ここで注意すべき点は、max.poll.records はポーリング要求の合計データを返すものであり、パーティションの数とは関係がないということです。したがって、各消費に対してすべてのパーティションから取得されるトピック データの合計数は、max.poll.records で設定された値を超えることはありません。

Fetcher クラスでは、sendFetches メソッドで取得されるデータの容量に制限があり、これはプロパティ (max.partition.fetch.bytes) によって設定され、デフォルトは 1 MB です。 max.partition.fetch.bytes 制限に達したときに、10,000 件のレコード (デフォルトでは 1 回につき 500 件) をフェッチする必要がある場合、今回はネットワーク経由で開始されたすべての要求のフェッチを完了するために 20 回実行する必要があるというシナリオが考えられます。

ここで、質問がある学生もいるかもしれません。デフォルトの max.poll.records プロパティ値を 10000 に調整することはできないでしょうか?はい、調整できますが、一緒に使用する必要がある別のプロパティがあります。これは各ポーリングのタイムアウトです(Duration.ofMillis(100))。ここでは、各データの実際の容量に基づいてタイムアウト設定を決定する必要があります。最大値を 10000 に調整すると、各レコードの容量が大きい場合、タイムアウトは依然として 100 ミリ秒のままとなり、取得されるデータは 10,000 未満になる可能性があります。

ここで注意が必要なもう 1 つの点は、セッション タイムアウトの問題です。 session.timeout.ms のデフォルト値は 10 秒、group.min.session.timeout.ms のデフォルト値は 6 秒、group.max.session.timeout.ms のデフォルト値は 30 分です。消費のビジネス ロジックを処理しているときに、10 秒以内に処理されない場合、コンシューマー クライアントは Kafka ブローカー サーバーから切断され、消費されたデータと生成されたオフセットは Kafka に送信されません。これは、Kafka ブローカー サーバーがコンシューマー プログラムが切断されたと認識するためです。 auto-commit プロパティまたは auto.offset.reset プロパティを設定した場合でも、消費時に重複した消費が発生します。これは、session.timeout.ms タイムアウトが発生するためです。

2. 心拍の仕組み

上記の最後に、セッション タイムアウトによってメッセージの消費が繰り返されるという説明がありました。タイムアウトが発生したのはなぜですか?学生の中には、このような疑問を持つ人もいるかもしれません。私のコンシューマー スレッドは明らかに開始されており、終了していません。 Kafka のメッセージを消費できないのはなぜですか?コンシューマー グループが ConsumerGroupID を見つけることができません。これはタイムアウトが原因である可能性があります。 Kafka はハートビート メカニズムを通じてタイムアウトを制御します。ハートビート メカニズムは、コンシューマー クライアントには認識されません。非同期スレッドです。コンシューマー インスタンスを開始すると、ハートビート スレッドが動作を開始します。

定期的にハートビートを送信し、コンシューマーのステータスを検出するために、org.apache.kafka.clients.consumer.internals.AbstractCoordinator で HeartbeatThread スレッドが開始されます。各コンシューマーには org.apache.kafka.clients.consumer.internals.ConsumerCoordinator があり、各 ConsumerCoordinator はハートビートを維持するために HeartbeatThread スレッドを開始します。ハートビート情報は、org.apache.kafka.clients.consumer.internals.Heartbeat に保存されます。宣言されたスキーマは次のとおりです。

  1. プライベート最終intセッションタイムアウトMs;
  2. プライベート最終intハートビート間隔Ms;
  3. プライベート最終int maxPollIntervalMs;
  4. プライベート最終長い retryBackoffMs;
  5. プライベート揮発性の長い最後のハートビート送信;
  6. プライベートロングラストハートビート受信;
  7. プライベート長いlastSessionReset;
  8. プライベートな長い最後の投票;
  9. プライベートブールハートビート失敗;

ハートビート スレッドの run メソッドの実装コードは次のとおりです。

  1. パブリックボイド実行(){
  2. 試す {
  3. log.debug( "ハートビートスレッドが開始されました" );
  4. )の間{
  5. 同期された (AbstractCoordinator.this) {
  6. (閉じている)
  7. 戻る;
  8. 有効になっている場合
  9. 抽象コーディネータ。this.wait();
  10. 続く;
  11. } 状態が MemberState.STABLE の場合 {
  12. //グループ  安定していない(おそらくグループを離れたため  またはコーディネーターが
  13. //追い出されました) ので、ハートビートを無効にしてメインスレッドが再参加する待ちます
  14. 無効にする();
  15. 続く;
  16. }
  17. クライアントのポーリングNoWakeup();
  18. long now = time .milliseconds();
  19. コーディネーターが不明の場合(){
  20. (findCoordinatorFuture != null || lookupCoordinator().failed()) の場合
  21. //直近未来チェックは、   
  22. // ブローカーは利用可能 接続する 
  23. AbstractCoordinator.this.wait(retryBackoffMs);
  24. }そうでない場合 (heartbeat.sessionTimeoutExpired(now)) {
  25. // ハートビートが成功しないままセッションタイムアウトが経過したので、
  26. // おそらくコーディネータがまだ正常であることを確認します
  27. コーディネーター不明をマークします();
  28. }そうでない場合 (heartbeat.pollTimeoutExpired(now)) {
  29. // ポーリングタイムアウトが期限切れになったため、フォアグラウンドスレッドが停止したことを意味します
  30. //  poll()呼び出しの間にグループを明示的に離脱します
  31. グループを離れるかもしれない();
  32. }そうでない場合 (!heartbeat.shouldHeartbeat(now)) {
  33. // 再試行バックオフ待っから再度ポーリングする 心拍が停止し場合
  34. // コーディネータが切断されました
  35. AbstractCoordinator.this.wait(retryBackoffMs);
  36. }それ以外{
  37. heartbeat.sentHeartbeat(現在)
  38. sendHeartbeatRequest().addListener(新しいRequestFutureListener() {
  39. @オーバーライド
  40. パブリックvoid onSuccess(Void値) {
  41. 同期された (AbstractCoordinator.this) {
  42. heartbeat.receiveHeartbeat(時間.milliseconds());
  43. }
  44. }
  45. @オーバーライド
  46. パブリックvoid onFailure(RuntimeException e) {
  47. 同期された (AbstractCoordinator.this) {
  48. if (e インスタンス RebalanceInProgressException) {
  49. //有効です グループが鼓動を続ける バランス調整中です
  50. // コーディネータがメンバーをグループ保持することを保証する のために 限り
  51. //再バランスのタイムアウト期間として心拍の送信を止めれば、
  52. // ただし、参加する前にセッション タイムアウトが期限切れになる可能性があります。
  53. heartbeat.receiveHeartbeat(時間.milliseconds());
  54. }それ以外{
  55. ハートビート.failHeartbeat();
  56. //スレッドがスリープ状態の場合はスレッドを起動してハートビートを再スケジュールします
  57. 抽象コーディネータ。this.notify();
  58. }
  59. }
  60. }
  61. });
  62. }
  63. }
  64. }
  65. } キャッチ (認証例外 e) {
  66. log.error( "ハートビートスレッドで認証エラーが発生しました" , e);
  67. this.failed.set (e);
  68. } キャッチ (GroupAuthorizationException e) {
  69. log.error( "ハートビートスレッドでグループ認証エラーが発生しました" , e);
  70. this.failed.set (e);
  71. } キャッチ (InterruptedException | InterruptException e) {
  72. スレッドが中断されました();
  73. log.error( "ハートビートスレッドで予期しない割り込みを受信しました" , e);
  74. this.failed.set (新しい RuntimeException(e));
  75. } キャッチ (Throwable e) {
  76. log.error( "予期しないエラーのためハートビートスレッドが失敗しました" , e);
  77. if (e インスタンスの RuntimeException)
  78. this.failed.set ( (RuntimeException) e);
  79. それ以外  
  80. this.failed.set (新しい RuntimeException(e));
  81. ついに
  82. log.debug( "ハートビートスレッドが閉じられました" );
  83. }
  84. }

ハートビート スレッドには、sessionTimeoutExpired と pollTimeoutExpired という 2 つの最も重要なタイムアウト関数があります。

  1. パブリックブール値 sessionTimeoutExpired(long now) {
  2. 今すぐ戻る- 数学。最大(lastSessionReset、lastHeartbeatReceive) > sessionTimeoutMs;
  3. } public boolean pollTimeoutExpired(long now) {
  4. 戻ります- lastPoll > maxPollIntervalMs;
  5. }

2.1、セッションタイムアウト期限切れ

sessionTimeout がタイムアウトすると、現在のコーディネーターが切断を処理しているとマークされます。このとき、コンシューマーは削除され、パーティションとコンシューマー間の対応が再割り当てされます。 Kafka ブローカー サーバーでは、コンシューマー グループは 5 つの状態 (Unknown が含まれる場合は 6 つ) を定義します (org.apache.kafka.common.ConsumerGroupState、次の図を参照)。


2.2、ポーリングタイムアウト期限切れ

ポーリング タイムアウトがトリガーされると、コンシューマー クライアントは ConsumerGroup を終了します。再度ポーリングすると、ConsumerGroup に再参加し、RebalanceGroup がトリガーされます。 KafkaConsumer クライアントは繰り返しポーリングするのに役立たないため、実装された消費ロジックでポーリング メソッドを継続的に呼び出す必要があります。

3. パーティションとコンシューマスレッド

コンシューマ パーティションとコンシューマ スレッドの対応に関しては、理論的にはコンシューマ スレッドの数はパーティションの数以下である必要があります。以前は、1 つのコンシューマ スレッドが 1 つのパーティションに対応し、コンシューマ スレッドの数がパーティションの数と等しい場合に、スレッドの使用率が最大化されるという見解がありました。 KafkaConsumer Client インスタンスを直接使用しても問題はありません。ただし、十分な CPU があれば、実際にはパーティション数よりも多くのスレッドを使用して消費容量を向上させることができます。これには、KafkaConsumer クライアントインスタンスを変更し、消費戦略の事前計算を実装し、追加の CPU を使用してより多くのスレッドを開始し、消費タスクのシャーディングを実装する必要があります。 Linuxを学ぶにはこうするべきだ

<<:  ホスティングにメリットをもたらすハイブリッドクラウドを構築する4つの方法

>>:  HarmonyOS 分散データ管理: デバイス間のデータ障壁を解消し、データの自由な流れを可能にします

推薦する

Pinduoduoのブレークスルーと進歩のための4つのモジュール

電子商取引大手の包囲網を突破したPinduoduoは、ユーザー数でAlibabaに次ぐ電子商取引プラ...

drServer-6 USD/年/64M メモリ/1G ハードディスク/50G トラフィック/G ポート/ダラス

drServer は以前、ローエンド VPS ランキングのトップ 10 にランクインしていました。社...

Google で上位にランクインする 13 の方法

SEO はデジタル時代の新しいビジネスにとって非常に重要です。賢い起業家は、マーケティング ツールの...

CentOS 以降の時代において、コミュニティはどのように発展し、革新していくのでしょうか?

CentOS は廃止され、再設計されたコミュニティ バージョンの CentOS Stream に置き...

純利益の「接近戦」クラウドコンピューティングが次の戦場か?

[[265739]]過去1年間、アリババとテンセントはともに経済環境の不確実性と、沈みゆく市場におけ...

tudcloud: 香港データセンターVPSの簡単なレビュー。データからtudcloudの優秀さがわかる

Tudcloud、この会社は登録されてまだ半年しか経っていません。香港のデータセンターで香港 VPS...

プロモーションに1000万与えられたら、どのように効果的にチャネルを配分できますか?

正直に言うと、今日のトラフィックをめぐる厳しい市場環境では、主流のチャネルからのトラフィックはますま...

dedipathはどうですか?ダラスデータセンターVPSの簡単なレビュー

dedipath は米国中部のダラスに独自のデータセンターを持ち、ダラスで VPS および専用サーバ...

百度百科ブラック 2013年7月

7月以降、Baidu百科事典のエントリを作成するのが非常に困難になっていることに気付きましたか? 以...

HUYAの急速な海外進出の秘密

[51CTO.com からのオリジナル記事] Huya は、ゲームライブストリーミングを主力事業とす...

クラウドネイティブの5つの特徴

[[433938]]この記事は、劉文茂、江国龍らが執筆したWeChatパブリックアカウント「ビッグデ...

プログラマーは明確なナビゲーションをどのように設計すべきでしょうか?

明確なナビゲーション構造は、ユーザーと検索エンジンの両方にとって非常に重要です。優れたサイト ナビゲ...

ウェブサイトのランクが下がってしまったらどう対処しますか?

みなさんこんにちは。今日は、降格されたウェブサイトへの対処方法についてお話ししたいと思います。私は江...

Green Radish 2.0 リリース後の外部リンク構築理論に関する雑談

青大根アルゴリズムの再登場は、より広範囲な影響を及ぼしています。文字通り、大規模なウェブサイトでのソ...

米国のウェブサイトが中国で投資詐欺を仕掛け、20万人を騙し24億元をだまし取った

「ポンジ・スキーム」の創始者、ポンジ。台湾海峡網、10月26日、揚子江晩報によると、1919年、イタ...