オンラインで待機中に、Kafka がメッセージを失ったらどうなるでしょうか?

オンラインで待機中に、Kafka がメッセージを失ったらどうなるでしょうか?

[[383294]]

画像はPexelsより

ブローカ

ブローカ

メッセージの損失は Kafka 自体によって発生します。より高いパフォーマンスとスループットを実現するために、Kafka はデータを非同期的にバッチでディスクに保存します。

メッセージをディスクにフラッシュするプロセスでは、パフォーマンスを向上させ、フラッシュ回数を減らすために、Kafka はメッセージをバッチでフラッシュする方法を採用しています。つまり、ディスクは一定量のメッセージと時間間隔に従って更新されます。

このメカニズムも Linux オペレーティング システムによって決定されます。 Linux オペレーティング システムでデータを保存する場合、データは最初にページ キャッシュ (ページ キャッシュ) に保存され、その後、時間やその他の条件に応じてディスクにフラッシュ (ページ キャッシュからファイルへ) されるか、fsync コマンドによって強制的にディスクにフラッシュされます。

データがページ キャッシュ内にある場合、システムがクラッシュするとデータは失われます。

ブローカーはLinuxサーバー上で高速に読み書きし、レプリカと同期します。

上の図は、ブローカーがデータを書き込み、同期するプロセスを簡単に説明しています。ブローカーは、メモリ内にあるページ キャッシュにのみデータを書き込みます。

この部分のデータは停電後に失われます。ページ キャッシュ内のデータは、Linux フラッシャー プログラムを使用してディスクにフラッシュされます。

フラッシュのトリガー条件は 3 つあります。

  • sync 関数または fsync 関数を積極的に呼び出します。
  • 使用可能なメモリがしきい値を下回っています。
  • ダーティデータ時間がしきい値に達しました。 Dirty はページ キャッシュのフラグです。データがページ キャッシュに書き込まれると、ページ キャッシュはダーティとしてマークされます。データがディスクにフラッシュされた後、ダーティ フラグはクリアされます。

ブローカーは、fsync 関数を呼び出してディスク フラッシュ アクションを引き継ぐことにより、ディスク フラッシュ メカニズムを構成します。単一のブローカーの観点から見ると、ページ キャッシュ データは失われます。

Kafka は同期ディスクフラッシュ方式を提供しません。 RocketMQ では同期ディスク フラッシュが実装されています。実装の原則は、Ajax コールバックや Java future と同様に、非同期ディスク フラッシュ プロセスをブロックし、応答を待機することです。

以下は RocketMQ のソースコードです。

  1. GroupCommitRequest リクエスト = 新しい GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
  2. service.putRequest(リクエスト);
  3. ブール値の flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // ディスクをフラッシュする

つまり、理論的には、Kafka が単一のブローカーがメッセージを失わないことを完全に保証することは不可能です。この状況は、ディスク フラッシュ メカニズムのパラメータを調整することによってのみ緩和できます。

たとえば、ディスク更新間隔を短縮し、ディスク更新データの量を減らします。時間が短いほどパフォーマンスは悪くなりますが、信頼性は高くなります(可能な限り信頼性が高くなります)。これは複数選択の質問です。

この問題を解決するために、Kafka は Producer と Broker を連携させて、単一の Broker がパラメータを失った状況を処理します。

プロデューサーはブローカー メッセージが失われたことを検出すると、自動的に再試行できます。再試行回数がしきい値(設定可能)を超えない限り、メッセージは失われません。

現時点では、プロデューサー クライアントはこの状況を手動で処理する必要があります。では、プロデューサーはどのようにしてデータ損失を検出するのでしょうか?これは、http の 3 ウェイ ハンドシェイクに似た ack メカニズムを介して行われます。

リクエストが完了したと判断する前に、プロデューサーがリーダーに要求する確認応答の数。これは、送信されるレコードの耐久性を制御します。次の設定が許可されます。

acks=0 ゼロに設定すると、プロデューサーはサーバーからの確認応答をまったく待機しません。レコードはすぐにソケット バッファーに追加され、送信されたとみなされます。この場合、サーバーがレコードを受信したという保証はなく、再試行構成は有効になりません (クライアントは通常、失敗を認識しないため)。各レコードに返されるオフセットは常に -1 に設定されます。

acks=1 は、リーダーがレコードをローカル ログに書き込みますが、すべてのフォロワーからの完全な確認応答を待たずに応答することを意味します。この場合、リーダーがレコードを確認した直後、フォロワーがそれを複製する前に失敗すると、レコードは失われます。

acks=all これは、リーダーが同期レプリカの完全なセットがレコードを確認するまで待機することを意味します。これにより、少なくとも 1 つの同期レプリカが存続している限り、レコードが失われないことが保証されます。これは利用可能な最も強力な保証です。これは acks=-1 設定と同等です。

http://kafka.apache.org/20/documentation.html

上記の引用は、Kafka のパラメーター acks に関する公式の説明です (古いバージョンでは、パラメーターは request.required.acks です)。

①acks=0の場合、プロデューサーはブローカーの応答を待たないので最も効率的ですが、メッセージが失われる可能性があります。

②acks=1の場合、リーダーブローカーはメッセージを受信後、他のフォロワーからの応答を待たずにackを返します。 ack番号が1であることも分かります。

このとき、フォロワーがリーダーの同期メッセージを受信する前にリーダーが死亡した場合、メッセージは失われます。

上の図の例によると、リーダーがメッセージを受信して​​ PageCache に正常に書き込むと、リーダーは ack を返し、プロデューサーはメッセージが正常に送信されたとみなします。

しかし、上図によれば、この時点ではまだデータはフォロワーに同期されていません。この時点でリーダーの電源が失われると、データは失われます。

③acks=-1の場合、リーダーブローカーはメッセージを受信した後、切断し、ISRリスト内のすべてのフォロワーが結果を返すのを待ってからackを返します。

-1 はすべてに相当します。この構成では、ページキャッシュにデータを書き込むときにリーダーのみが ack を返さず、すべての ISR が ack をトリガーするために「成功」​​を返す必要があります。

この時点で電源が切断された場合、プロデューサーはメッセージが正常に送信されなかったことを認識し、メッセージを再送信します。フォロワーがデータを受信した後に ack を正常に返し、リーダーの電源が失われた場合、データは元のフォロワーに存在します。再選後、新しいリーダーがこの部分のデータを保持することになります。

リーダーからフォロワーへのデータの同期には、次の 2 つの手順が必要です。

  • データはページ キャッシュからディスクにフラッシュされます。ディスク内のデータのみをレプリカに同期できるためです。
  • データはレプリカに同期され、レプリカはデータをページ キャッシュに正常に書き込みます。プロデューサーが ACK を受信した後、すべてのマシンの電源が失われたとしても、データは少なくともリーダーのディスク上に存在します。

上記の 3 番目のポイントでは、ISR リストのフォロワーについて説明しましたが、これは、ack の有効性をより確実にするために別のパラメーターと組み合わせる必要があります。

ISR は、同期レプリカ リストである Broker によって管理される「信頼できるフォロワー リスト」です。ブローカー構成には、min.insync.replicas というパラメーターが含まれています。

このパラメータは、ISR 内のコピーの最小数を示します。この値が設定されていない場合、ISR 内のフォロワー リストは空になる可能性があります。これはacks=1と同等です。

上の図に示すように:

  • acks=0、合計時間f(t)=f(1)。
  • acks=1、合計時間f(t)=f(1)+f(2)。
  • acks=-1、合計時間f(t)=f(1)+max( f(A) 、 f(B) )+f(2)。

パフォーマンスは順番に低下しますが、信頼性は順番に増加します。

プロデューサー

プロデューサーはメッセージを失います。これはプロデューサー クライアントで発生します。

効率を向上し、IO を削減するために、プロデューサーはデータを送信するときに複数のリクエストをマージして送信できます。マージされたリクエストは、送信される前にローカル バッファーにキャッシュされます。

キャッシュ方法は、前述のディスクフラッシュと似ています。プロデューサーは、リクエストを「ブロック」にパッケージ化したり、バッファ内のデータを時間間隔で送信したりできます。

バッファを使用すると、プロデューサーを非同期モードに変換でき、送信効率が向上します。

ただし、バッファ内のデータは危険です。通常の状況では、クライアントの非同期呼び出しは、コールバックを通じてメッセージ送信の失敗またはタイムアウトを処理できます。

ただし、プロデューサーが不法に停止されると、バッファー内のデータは失われ、ブローカーはこの部分のデータを受信できなくなります。

あるいは、プロデューサー クライアントに十分なメモリがない場合、採用された戦略がメッセージを破棄することである場合 (別の戦略はブロックすることです)、メッセージも失われます。

または、メッセージが(非同期的に)生成される速度が速すぎるため、中断されたスレッドが多すぎてメモリが不足し、プログラムがクラッシュしてメッセージが失われることがあります。

プロデューサーはデータをバッチで送信する

非同期メッセージ生成速度が速すぎる場合の図

上記の図に基づいて、いくつかの解決策が考えられます。

  • 非同期メッセージ送信を同期メッセージ送信に変更します。または、サービスがメッセージを生成するときに、ブロックされたスレッド プールが使用され、スレッドの数には一定の上限があります。全体的な考え方は、メッセージが生成される速度を制御することです。
  • バッファ容量構成を拡張します。この方法は、この状況の発生を軽減することはできますが、完全に排除することはできません。
  • サービスはメッセージをバッファ (メモリ) に直接送信するのではなく、ローカル ディスク (データベースまたはファイル) にメッセージを書き込み、別の (または少数の) 運用スレッドがメッセージを送信します。これは、バッファとサービスの間にさらにスペースを設けたバッファ レイヤーを追加することと同じです。

消費者

消費者消費メッセージには次の手順があります。

  • メッセージの受信
  • メッセージの処理
  • フィードバックが「処理されました」(コミット済み)

消費者の消費は主に2つのタイプに分けられます。

  • 自動オフセットコミット
  • 手動オフセット制御

Consumer の自動コミット メカニズムは、受信したメッセージを一定の時間間隔でコミットすることです。コミット プロセスとメッセージ消費プロセスは非同期です。

つまり、消費プロセスは成功しない可能性があります (たとえば、例外がスローされる) が、コミット メッセージは送信されています。この時点でメッセージは失われます。

  1. プロパティ props = new Properties();
  2. props.put( "bootstrap.servers" , "localhost:9092" );
  3. props.put( "group.id" , "test" );
  4. // 自動送信スイッチ
  5. props.put( "enable.auto.commit" , "true" );
  6. // 自動送信の時間間隔はここでは1秒です
  7. props.put( "auto.commit.interval.ms" , "1000" );
  8. props.put( "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" );
  9. props.put( "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" );
  10. KafkaConsumer<String, String> コンシューマー = 新しい KafkaConsumer<>(props);
  11. consumer.subscribe(Arrays.asList( "foo" , "bar" ));
  12. )の間{
  13. // ポーリングを呼び出した後、1000 ミリ秒後にメッセージのステータスがコミットに変更されます 
  14. ConsumerRecords<String, String> レコード = consumer.poll(100);
  15. ( ConsumerRecord<String, String> レコード: レコード)
  16. DBにレコードを挿入します。 // メッセージをデータベースに格納します。1000 ミリ秒以上かかる場合があります。

上記の例は自動送信の例です。このとき insertIntoDB(record) 中に例外が発生すると、メッセージは失われます。

以下は手動での送信の例です。

  1. プロパティ props = new Properties();
  2. props.put( "bootstrap.servers" , "localhost:9092" );
  3. props.put( "group.id" , "test" );
  4. // 自動送信をオフにして手動で送信する
  5. props.put( "enable.auto.commit" , "false" );
  6. props.put( "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" );
  7. props.put( "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" );
  8. KafkaConsumer<String, String> コンシューマー = 新しい KafkaConsumer<>(props);
  9. consumer.subscribe(Arrays.asList( "foo" , "bar" ));
  10. 最終的なint minBatchSize = 200;
  11. List<ConsumerRecord<String, String>> バッファ = new ArrayList<>();
  12. )の間{
  13. // ポーリングを呼び出した後、自動コミットは実行されません 
  14. ConsumerRecords<String, String> レコード = consumer.poll(100);
  15. ( ConsumerRecord<String, String> レコード: レコード) {
  16. buffer.add (レコード);
  17. }
  18. バッファサイズ() >= minBatchSizeの場合{
  19. バッファに挿入します。
  20. // すべてのメッセージが消費された後、コミット操作が実行されます
  21. コンシューマー.commitSync();
  22. バッファをクリアします。
  23. }

送信タイプを手動に変更すると、メッセージが「少なくとも 1 回」消費されることを確認できます。ただし、この時点で重複消費が発生する可能性があり、これはこの記事の範囲外です。

上記の 2 つの例では、コンシューマーの高レベル API を直接使用しており、クライアントはオフセットなどのコントロールに対して透過的です。

低レベル API を使用してオフセットを手動で制御することもできます。これにより、メッセージが失われないようにすることもできますが、より複雑になります。

  1. 試す {
  2. 実行中{
  3. ConsumerRecords<String, String> レコード = consumer.poll(Long.MAX_VALUE);
  4. (トピックパーティションパーティション: records.partitions()) {
  5. List<ConsumerRecord<String, String>>partitionRecords = records.records(partition);
  6. ( ConsumerRecord<String, String> レコード: パーティションレコード) {
  7. システム。出力.println(record.offset() + ": " + record.value());
  8. }
  9. long lastOffset = partitionRecords.get( partitionRecords.size () - 1).offset();
  10. // オフセットを正確に制御する
  11. consumer.commitSync(Collections.singletonMap(partition, 新しい OffsetAndMetadata(lastOffset + 1)));
  12. }
  13. }
  14. ついに
  15. コンシューマーを閉じます() ;
  16. }

著者: DongGuoChao

編集者:タオ・ジアロン

出典: https://blog.dogchao.cn/?p=305

<<:  従来のハイパーコンバージェンスを置き換えましょう! HCIaaS(ハイパーコンバージェンス・アズ・ア・サービス)が新たなトレンドになるかもしれない

>>:  Kubernetes ベースのハイブリッド クラウドを構築するメリットとデメリット

推薦する

Longhorn、Kubernetes 向けクラウドネイティブ分散ブロックストレージ

[[417918]]この記事はWeChatの公開アカウント「Hacker Afternoon Tea...

Baidu の入札キーワード品質最適化によりマーケティング パフォーマンスを向上

検索エンジンマーケティングでは、企業が自社の商品やサービスに関連するキーワードを購入し、入札すること...

trentahost-49 USD/E3-1230/32 GB RAM/500 GB HDD/10 TB フロー/IPMI/ポートランド

Trentahostは2009年に事業を開始し、ドメイン名、仮想ホスティング(配信を含む)、ゲームホ...

Prometheus、Istio、Hpa、Keda、Karpenter をベースにした K8s アプリケーションとノードの弾力性の実装

導入この記事では、Prometheus、Istio、HPA、Keda、Karpenter に基づいて...

タオバオセレブライブストリーミングの簡単な歴史

今年のダブルイレブンは、タオバオ14年間で初めて売上高が公表されていないダブルイレブンだが、タオバオ...

SEO は金鉱ではありません。方法を学習することによってのみ、段階的に勝利することができます。

2012年のインターネットウェブマスターカンファレンスからしばらく経ちました。雷軍と李国青の成功を考...

友人にリンクを返すことがSEO最適化の鍵です

現在、一定の関連性、発展の可能性があるフレンドリーリンクの見つけ方、フレンドリーリンクが多すぎないこ...

2018年のクラウド予測

12月はテクノロジー業界における「予言の季節」です。専門家の予測を読むのはいつも楽しいが、これらの予...

ガートナーのマジック・クアドラントによると、GoogleとMicrosoftはパブリッククラウドストレージで大きな進歩を遂げている。

ガートナーのパブリッククラウドストレージプロバイダーに関するマジッククアドラントによると、AWS は...

テンセントクラウド:新旧ユーザー向けの新年ギフトパッケージが50%割引で提供され、国産機と海外機の両方が購入可能で、「超低価格プロモーション」もあります

「テンセントクラウド新年ギフトパッケージ」は、実際のメリットをもたらします。新規ユーザーと既存ユーザ...

無制限トラフィック VPS: limewave、米国 VPS (シアトル)、最小構成 $20/月、4 RAM/2 コア/50GSSD/100M 帯域幅

Limewave は独自のネットワーク AS36369 と IP を持っています。現在はインターネッ...

ウェブマスターは 2013 年も SEO で利益を上げることができるのでしょうか?

さようなら 2012 年、こんにちは 2013 年。年末に、ウェブマスターはようやく一息ついて、新し...

アマゾン ウェブ サービスとボルボ カーズがスマートカー共創加速プログラムを終了

アマゾン ウェブ サービスは2022年9月22日、「スマートカー共創アクセラレーションプログラム」が...

ウェブサイトの SEO 最適化の例

みなさんこんにちは、lzbiz です。友人が、ここの技術交流の雰囲気がとても良いと言って、この講義グ...