オンラインで待機中に、Kafka がメッセージを失ったらどうなるでしょうか?

オンラインで待機中に、Kafka がメッセージを失ったらどうなるでしょうか?

[[383294]]

画像はPexelsより

ブローカ

ブローカ

メッセージの損失は Kafka 自体によって発生します。より高いパフォーマンスとスループットを実現するために、Kafka はデータを非同期的にバッチでディスクに保存します。

メッセージをディスクにフラッシュするプロセスでは、パフォーマンスを向上させ、フラッシュ回数を減らすために、Kafka はメッセージをバッチでフラッシュする方法を採用しています。つまり、ディスクは一定量のメッセージと時間間隔に従って更新されます。

このメカニズムも Linux オペレーティング システムによって決定されます。 Linux オペレーティング システムでデータを保存する場合、データは最初にページ キャッシュ (ページ キャッシュ) に保存され、その後、時間やその他の条件に応じてディスクにフラッシュ (ページ キャッシュからファイルへ) されるか、fsync コマンドによって強制的にディスクにフラッシュされます。

データがページ キャッシュ内にある場合、システムがクラッシュするとデータは失われます。

ブローカーはLinuxサーバー上で高速に読み書きし、レプリカと同期します。

上の図は、ブローカーがデータを書き込み、同期するプロセスを簡単に説明しています。ブローカーは、メモリ内にあるページ キャッシュにのみデータを書き込みます。

この部分のデータは停電後に失われます。ページ キャッシュ内のデータは、Linux フラッシャー プログラムを使用してディスクにフラッシュされます。

フラッシュのトリガー条件は 3 つあります。

  • sync 関数または fsync 関数を積極的に呼び出します。
  • 使用可能なメモリがしきい値を下回っています。
  • ダーティデータ時間がしきい値に達しました。 Dirty はページ キャッシュのフラグです。データがページ キャッシュに書き込まれると、ページ キャッシュはダーティとしてマークされます。データがディスクにフラッシュされた後、ダーティ フラグはクリアされます。

ブローカーは、fsync 関数を呼び出してディスク フラッシュ アクションを引き継ぐことにより、ディスク フラッシュ メカニズムを構成します。単一のブローカーの観点から見ると、ページ キャッシュ データは失われます。

Kafka は同期ディスクフラッシュ方式を提供しません。 RocketMQ では同期ディスク フラッシュが実装されています。実装の原則は、Ajax コールバックや Java future と同様に、非同期ディスク フラッシュ プロセスをブロックし、応答を待機することです。

以下は RocketMQ のソースコードです。

  1. GroupCommitRequest リクエスト = 新しい GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
  2. service.putRequest(リクエスト);
  3. ブール値の flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // ディスクをフラッシュする

つまり、理論的には、Kafka が単一のブローカーがメッセージを失わないことを完全に保証することは不可能です。この状況は、ディスク フラッシュ メカニズムのパラメータを調整することによってのみ緩和できます。

たとえば、ディスク更新間隔を短縮し、ディスク更新データの量を減らします。時間が短いほどパフォーマンスは悪くなりますが、信頼性は高くなります(可能な限り信頼性が高くなります)。これは複数選択の質問です。

この問題を解決するために、Kafka は Producer と Broker を連携させて、単一の Broker がパラメータを失った状況を処理します。

プロデューサーはブローカー メッセージが失われたことを検出すると、自動的に再試行できます。再試行回数がしきい値(設定可能)を超えない限り、メッセージは失われません。

現時点では、プロデューサー クライアントはこの状況を手動で処理する必要があります。では、プロデューサーはどのようにしてデータ損失を検出するのでしょうか?これは、http の 3 ウェイ ハンドシェイクに似た ack メカニズムを介して行われます。

リクエストが完了したと判断する前に、プロデューサーがリーダーに要求する確認応答の数。これは、送信されるレコードの耐久性を制御します。次の設定が許可されます。

acks=0 ゼロに設定すると、プロデューサーはサーバーからの確認応答をまったく待機しません。レコードはすぐにソケット バッファーに追加され、送信されたとみなされます。この場合、サーバーがレコードを受信したという保証はなく、再試行構成は有効になりません (クライアントは通常、失敗を認識しないため)。各レコードに返されるオフセットは常に -1 に設定されます。

acks=1 は、リーダーがレコードをローカル ログに書き込みますが、すべてのフォロワーからの完全な確認応答を待たずに応答することを意味します。この場合、リーダーがレコードを確認した直後、フォロワーがそれを複製する前に失敗すると、レコードは失われます。

acks=all これは、リーダーが同期レプリカの完全なセットがレコードを確認するまで待機することを意味します。これにより、少なくとも 1 つの同期レプリカが存続している限り、レコードが失われないことが保証されます。これは利用可能な最も強力な保証です。これは acks=-1 設定と同等です。

http://kafka.apache.org/20/documentation.html

上記の引用は、Kafka のパラメーター acks に関する公式の説明です (古いバージョンでは、パラメーターは request.required.acks です)。

①acks=0の場合、プロデューサーはブローカーの応答を待たないので最も効率的ですが、メッセージが失われる可能性があります。

②acks=1の場合、リーダーブローカーはメッセージを受信後、他のフォロワーからの応答を待たずにackを返します。 ack番号が1であることも分かります。

このとき、フォロワーがリーダーの同期メッセージを受信する前にリーダーが死亡した場合、メッセージは失われます。

上の図の例によると、リーダーがメッセージを受信して​​ PageCache に正常に書き込むと、リーダーは ack を返し、プロデューサーはメッセージが正常に送信されたとみなします。

しかし、上図によれば、この時点ではまだデータはフォロワーに同期されていません。この時点でリーダーの電源が失われると、データは失われます。

③acks=-1の場合、リーダーブローカーはメッセージを受信した後、切断し、ISRリスト内のすべてのフォロワーが結果を返すのを待ってからackを返します。

-1 はすべてに相当します。この構成では、ページキャッシュにデータを書き込むときにリーダーのみが ack を返さず、すべての ISR が ack をトリガーするために「成功」​​を返す必要があります。

この時点で電源が切断された場合、プロデューサーはメッセージが正常に送信されなかったことを認識し、メッセージを再送信します。フォロワーがデータを受信した後に ack を正常に返し、リーダーの電源が失われた場合、データは元のフォロワーに存在します。再選後、新しいリーダーがこの部分のデータを保持することになります。

リーダーからフォロワーへのデータの同期には、次の 2 つの手順が必要です。

  • データはページ キャッシュからディスクにフラッシュされます。ディスク内のデータのみをレプリカに同期できるためです。
  • データはレプリカに同期され、レプリカはデータをページ キャッシュに正常に書き込みます。プロデューサーが ACK を受信した後、すべてのマシンの電源が失われたとしても、データは少なくともリーダーのディスク上に存在します。

上記の 3 番目のポイントでは、ISR リストのフォロワーについて説明しましたが、これは、ack の有効性をより確実にするために別のパラメーターと組み合わせる必要があります。

ISR は、同期レプリカ リストである Broker によって管理される「信頼できるフォロワー リスト」です。ブローカー構成には、min.insync.replicas というパラメーターが含まれています。

このパラメータは、ISR 内のコピーの最小数を示します。この値が設定されていない場合、ISR 内のフォロワー リストは空になる可能性があります。これはacks=1と同等です。

上の図に示すように:

  • acks=0、合計時間f(t)=f(1)。
  • acks=1、合計時間f(t)=f(1)+f(2)。
  • acks=-1、合計時間f(t)=f(1)+max( f(A) 、 f(B) )+f(2)。

パフォーマンスは順番に低下しますが、信頼性は順番に増加します。

プロデューサー

プロデューサーはメッセージを失います。これはプロデューサー クライアントで発生します。

効率を向上し、IO を削減するために、プロデューサーはデータを送信するときに複数のリクエストをマージして送信できます。マージされたリクエストは、送信される前にローカル バッファーにキャッシュされます。

キャッシュ方法は、前述のディスクフラッシュと似ています。プロデューサーは、リクエストを「ブロック」にパッケージ化したり、バッファ内のデータを時間間隔で送信したりできます。

バッファを使用すると、プロデューサーを非同期モードに変換でき、送信効率が向上します。

ただし、バッファ内のデータは危険です。通常の状況では、クライアントの非同期呼び出しは、コールバックを通じてメッセージ送信の失敗またはタイムアウトを処理できます。

ただし、プロデューサーが不法に停止されると、バッファー内のデータは失われ、ブローカーはこの部分のデータを受信できなくなります。

あるいは、プロデューサー クライアントに十分なメモリがない場合、採用された戦略がメッセージを破棄することである場合 (別の戦略はブロックすることです)、メッセージも失われます。

または、メッセージが(非同期的に)生成される速度が速すぎるため、中断されたスレッドが多すぎてメモリが不足し、プログラムがクラッシュしてメッセージが失われることがあります。

プロデューサーはデータをバッチで送信する

非同期メッセージ生成速度が速すぎる場合の図

上記の図に基づいて、いくつかの解決策が考えられます。

  • 非同期メッセージ送信を同期メッセージ送信に変更します。または、サービスがメッセージを生成するときに、ブロックされたスレッド プールが使用され、スレッドの数には一定の上限があります。全体的な考え方は、メッセージが生成される速度を制御することです。
  • バッファ容量構成を拡張します。この方法は、この状況の発生を軽減することはできますが、完全に排除することはできません。
  • サービスはメッセージをバッファ (メモリ) に直接送信するのではなく、ローカル ディスク (データベースまたはファイル) にメッセージを書き込み、別の (または少数の) 運用スレッドがメッセージを送信します。これは、バッファとサービスの間にさらにスペースを設けたバッファ レイヤーを追加することと同じです。

消費者

消費者消費メッセージには次の手順があります。

  • メッセージの受信
  • メッセージの処理
  • フィードバックが「処理されました」(コミット済み)

消費者の消費は主に2つのタイプに分けられます。

  • 自動オフセットコミット
  • 手動オフセット制御

Consumer の自動コミット メカニズムは、受信したメッセージを一定の時間間隔でコミットすることです。コミット プロセスとメッセージ消費プロセスは非同期です。

つまり、消費プロセスは成功しない可能性があります (たとえば、例外がスローされる) が、コミット メッセージは送信されています。この時点でメッセージは失われます。

  1. プロパティ props = new Properties();
  2. props.put( "bootstrap.servers" , "localhost:9092" );
  3. props.put( "group.id" , "test" );
  4. // 自動送信スイッチ
  5. props.put( "enable.auto.commit" , "true" );
  6. // 自動送信の時間間隔はここでは1秒です
  7. props.put( "auto.commit.interval.ms" , "1000" );
  8. props.put( "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" );
  9. props.put( "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" );
  10. KafkaConsumer<String, String> コンシューマー = 新しい KafkaConsumer<>(props);
  11. consumer.subscribe(Arrays.asList( "foo" , "bar" ));
  12. )の間{
  13. // ポーリングを呼び出した後、1000 ミリ秒後にメッセージのステータスがコミットに変更されます 
  14. ConsumerRecords<String, String> レコード = consumer.poll(100);
  15. ( ConsumerRecord<String, String> レコード: レコード)
  16. DBにレコードを挿入します。 // メッセージをデータベースに格納します。1000 ミリ秒以上かかる場合があります。

上記の例は自動送信の例です。このとき insertIntoDB(record) 中に例外が発生すると、メッセージは失われます。

以下は手動での送信の例です。

  1. プロパティ props = new Properties();
  2. props.put( "bootstrap.servers" , "localhost:9092" );
  3. props.put( "group.id" , "test" );
  4. // 自動送信をオフにして手動で送信する
  5. props.put( "enable.auto.commit" , "false" );
  6. props.put( "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" );
  7. props.put( "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" );
  8. KafkaConsumer<String, String> コンシューマー = 新しい KafkaConsumer<>(props);
  9. consumer.subscribe(Arrays.asList( "foo" , "bar" ));
  10. 最終的なint minBatchSize = 200;
  11. List<ConsumerRecord<String, String>> バッファ = new ArrayList<>();
  12. )の間{
  13. // ポーリングを呼び出した後、自動コミットは実行されません 
  14. ConsumerRecords<String, String> レコード = consumer.poll(100);
  15. ( ConsumerRecord<String, String> レコード: レコード) {
  16. buffer.add (レコード);
  17. }
  18. バッファサイズ() >= minBatchSizeの場合{
  19. バッファに挿入します。
  20. // すべてのメッセージが消費された後、コミット操作が実行されます
  21. コンシューマー.commitSync();
  22. バッファをクリアします。
  23. }

送信タイプを手動に変更すると、メッセージが「少なくとも 1 回」消費されることを確認できます。ただし、この時点で重複消費が発生する可能性があり、これはこの記事の範囲外です。

上記の 2 つの例では、コンシューマーの高レベル API を直接使用しており、クライアントはオフセットなどのコントロールに対して透過的です。

低レベル API を使用してオフセットを手動で制御することもできます。これにより、メッセージが失われないようにすることもできますが、より複雑になります。

  1. 試す {
  2. 実行中{
  3. ConsumerRecords<String, String> レコード = consumer.poll(Long.MAX_VALUE);
  4. (トピックパーティションパーティション: records.partitions()) {
  5. List<ConsumerRecord<String, String>>partitionRecords = records.records(partition);
  6. ( ConsumerRecord<String, String> レコード: パーティションレコード) {
  7. システム。出力.println(record.offset() + ": " + record.value());
  8. }
  9. long lastOffset = partitionRecords.get( partitionRecords.size () - 1).offset();
  10. // オフセットを正確に制御する
  11. consumer.commitSync(Collections.singletonMap(partition, 新しい OffsetAndMetadata(lastOffset + 1)));
  12. }
  13. }
  14. ついに
  15. コンシューマーを閉じます() ;
  16. }

著者: DongGuoChao

編集者:タオ・ジアロン

出典: https://blog.dogchao.cn/?p=305

<<:  従来のハイパーコンバージェンスを置き換えましょう! HCIaaS(ハイパーコンバージェンス・アズ・ア・サービス)が新たなトレンドになるかもしれない

>>:  Kubernetes ベースのハイブリッド クラウドを構築するメリットとデメリット

推薦する

Meilishuoトラフィック分析SEOは死なない

まとめると、Meilishuo などの大規模な Taobao アフィリエイト サイトのユーザー グル...

プログラマーの精神修養への道 - 分散システムにおける最も重要なハブとなるかもしれない

[[345269]]分散システムにレジストリが必要なのはなぜですか?分散システム登録センターの落とし...

10分で高品質な記事を書く方法

高品質な記事とは、テキストと画像の両方で内容が充実しているように見え、ユーザーが一目見てとても素敵だ...

投稿をしばらく飛ばすためのフォーラムマーケティングの 4 つのステップ

フォーラムを頻繁に訪れるネットユーザーなら、一部の人気フォーラムの投稿が数分間で何万回も閲覧されるこ...

ハイブリッドクラウド?プライベートクラウド?パブリッククラウド?所により曇り?選び方

以前は、クラウド コンピューティングが存在するかどうかが議論されていましたが、現在はプライベート ク...

世界の医療クラウドコンピューティング市場規模は2026年に768億ドルに達すると予想されている

11月29日、市場調査会社ResearchAndMarketsが発表した最新のレポートによると、ヘル...

分散アーキテクチャの基本的な考え方の要約

インターネットの普及に伴い、さまざまな分散システムが一般的になりました。検索エンジン、電子商取引ウェ...

国内クラウドサービスプロバイダーの恥ずべき点:技術トレンド、最先端のコンピューティングツール、低レベルのモバイルインターネットに対する理解不足

[クラウド コンピューティングは、その独自の利点と巨大なビジネス展望により、近年 IT 業界で最もホ...

SEO にはウェブサイトの構築方法を学ぶ必要がありますか? SEOとコードの関係について

最近、多くの人が Xuepeng に「SEO には Web サイトの構築方法を学ぶ必要がありますか?...

Dockerイメージをバッチロードする最も簡単な方法

通常、Docker イメージ ファイルをバッチでロードする場合は、シェル ファイルを作成し、 for...

ハイブリッドクラウドの勢いは2018年も引き続き強い

調査によると、IT およびビジネスの意思決定者の 85% が、パブリック クラウド、プライベート ク...

投資か投機か、WeChatパブリックアカウントのプロモーション戦略をどう実現するか

WeChatパブリックアカウントを0から1、そしてNにまで成長させた運営者として、 WeChatパブ...

話題:誤解されている外部リンク基準

外部リンクは、SEO 最適化において比較的重要な要素です。昨年以来、Green Radish Alg...

SEOデータ分析スキル2:ウェブサイトコンテンツ品質分析

みなさんこんにちは。私は徐子宇です。前回の記事「SEOデータ分析スキル1:キーワードランキング分析」...