Go 言語を使用して Kafka を操作し、メッセージの損失を防ぐ方法

Go 言語を使用して Kafka を操作し、メッセージの損失を防ぐ方法

[[423396]]

背景

現在、一部のインターネット企業は、コアビジネスを実行するためにメッセージ キューを使用しています。コアビジネスであるため、データの最終的な一貫性には敏感です。途中でデータが失われると、ユーザーからのクレームにつながり、年末の業績は325になります。以前、数人の友人とチャットしていました。これらの企業はすべて、メッセージ キューとして Kafka を使用しています。 Kafka を使用するとメッセージは失われますか?メッセージが失われた場合、どのように補償措置をとるのでしょうか?今回は一緒に解析していき、Goを使ってデータを失うことなくKafkaを操作する方法を紹介します。

この記事は、https://github.com/Shopify/sarama に基づいて Kafka を操作します。

Kafka アーキテクチャ入門

Wikipedia による Kafka の紹介:

Kafka は、Apache Software Foundation によって開発され、Scala と Java で記述されたオープンソースのストリーム処理プラットフォームです。このプロジェクトの目標は、リアルタイム データを処理するための、統合された高スループット、低レイテンシのプラットフォームを提供することです。その永続性レイヤーは本質的に「分散トランザクション ログ アーキテクチャを備えた大規模なパブリッシュ/サブスクライブ メッセージ キュー」であり、ストリーミング データを処理するためのエンタープライズ レベルのインフラストラクチャとして非常に価値があります。さらに、Kafka は Kafka Connect を介して外部システムに接続 (データ入出力用) することができ、Java ストリーム処理ライブラリである Kafka Streams も提供します。この設計はトランザクション ログに大きく影響されます。

Kafka の全体的なアーキテクチャは比較的シンプルで、主にプロデューサー、ブローカー、コンシューマーで構成されています。

スクリーンキャプチャ 2021-09-12 午前10時00分13秒

アーキテクチャ図に従って各モジュールを説明します。

  • プロデューサー: 選択したトピックにデータを公開できるデータのプロデューサー。
  • コンシューマー: データ コンシューマーはコンシューマー グループによって識別されます。トピック内の各レコードは、サブスクリプション コンシューマー グループ内のコンシューマー インスタンスに割り当てられます。コンシューマー インスタンスは、複数のプロセスまたは複数のマシンに分散できます。
  • ブローカー: メッセージ ミドルウェア処理ノード (サーバー)。ノードはブローカーです。 Kafka クラスターは 1 つ以上のブローカーで構成されます。

他にもいくつか概念を紹介します:

  • トピック: メッセージの集合として理解できます。トピックはブローカーに保存されます。トピックには複数のパーティションを含めることができます。トピックには、メッセージをプッシュする複数のプロデューサーが存在する場合があります。トピックには、そこからメッセージをプルする複数のコンシューマーが存在する場合があります。トピックは 1 つ以上のブローカーに存在できます。
  • パーティション: トピックのサブセットです。異なるパーティションが異なるブローカーに割り当てられ、水平拡張することで Kafka の並列処理機能が向上します。同じトピックの下にある異なるパーティション情報は異なり、同じパーティション情報は順序付けられます。各パーティションには 1 つ以上のレプリカがあり、その中からリーダーが選出されます。ファウラーはリーダーからデータを取得して自身のログを更新し (各パーティションは論理的にログ フォルダーに対応します)、コンシューマーはリーダーから情報を取得します。

Kafka がメッセージを失う 3 つのノード

プロデューサープッシュメッセージノード

まず、プロデューサーの一般的な執筆プロセスを見てみましょう。

  • プロデューサーはまずKafkaクラスタからパーティションのリーダーを見つける
  • プロデューサーはリーダーにメッセージを送信し、リーダーはメッセージをローカルに書き込む
  • フォロワーはリーダーからメッセージをプルし、ローカルログに書き込み、リーダーはackを送信する。
  • リーダーは ISR 内のすべてのレプリカから ACK を受信した後、高水準点を上げてプロデューサーに ACK を送信します。

スクリーンキャプチャ 2021-09-12 午前11時16分43秒

このプロセスを通じて、Kafka が最終的に ack を返してプッシュ メッセージの結果を確認することがわかります。ここで Kafka は次の 3 つのモードを提供します。

  1. 応答なし 必要​​なAcks = 0
  2. ローカル要求されたAcksの待機 = 1
  3. WaitForAll 必須Acks = -1
  • NoResponse RequiredAcks = 0: これは、データプッシュの成功または失敗は私に関係ないことを意味します
  • WaitForLocal RequiredAcks = 1: ローカル (リーダー) がメッセージが正常に受信されたことを確認すると、戻ることができます。
  • WaitForAll RequiredAcks = -1: すべてのリーダーとフォロワーがメッセージを正常に受信した場合にのみ返されます。

したがって、これら 3 つのモードに基づいて、プロデューサーがメッセージをプッシュするときに一定の損失の可能性があると推測できます。分析は次のとおりです。

  • モード 1 を選択した場合、このモードではデータが失われる可能性が高く、再試行できません。
  • モード 2 を選択した場合、このモードでは、リーダーがダウンしていない限り、データが失われないことが保証されます。ただし、リーダーがダウンし、フォロワーがデータを同期していない場合、データが失われる可能性が一定程度あります。
  • モード 3 を選択した場合、この状況ではデータが失われることはありませんが、データが重複する可能性があります。リーダーとフォロワーがデータを同期するときにネットワークの問題が発生すると、データの重複が発生する可能性があります。

したがって、実稼働環境では、メッセージの信頼性を確保するためにモード 2 またはモード 3 を選択できます。具体的な選択はビジネス シナリオに基づいて行う必要があります。スループットを重視する場合は、モード 2 を選択します。スループットを気にしない場合は、モード 3 を選択します。データが失われないように完全に保証したい場合は、最も信頼性の高いモード 3 を選択します。

Kafkaクラスタ自体の障害が原因である

データを受信した後、Kafka クラスターはデータを永続的に保存し、最終的にデータはディスクに書き込まれます。ディスクへの書き込み時に、オペレーティング システムはまずデータをキャッシュに書き込むため、ディスクへの書き込み手順によってデータが失われる可能性もあります。オペレーティング システムがキャッシュ内のデータをディスクに書き込むタイミングは不確実です。したがって、この場合、Kafka マシンが突然クラッシュすると、データ損失も発生します。しかし、これが起こる可能性は非常に低いです。一般的には、社内の Kafka マシンがバックアップされます。この状況は非常に極端であり、無視することができます。

コンシューマー プル メッセージ ノード

メッセージがプッシュされると、データがパーティションに追加され、オフセットが割り当てられます。このオフセットは、現在のコンシューマーによって消費される場所を表します。このパーティションを通じてメッセージの順序も保証されます。コンシューマーがメッセージをプルした後、自動送信または手動送信を設定できます。送信が成功すると、オフセットは次のようにシフトします。

スクリーンキャプチャ 2021-09-12 午後3時37分33秒

したがって、自動送信ではデータが失われ、手動送信ではデータが重複することになります。分析は次のとおりです。

  • 自動コミットを設定すると、メッセージをプルするときにオフセットはコミットされますが、消費ロジックの処理時に失敗し、データが失われます。
  • 手動送信を設定する場合、メッセージを処理した後にコミットを送信すると、コミット ステップで障害が発生すると、重複消費の問題が発生します。

データ損失と比較すると、重複した消費はビジネスの期待に沿ったものです。いくつかのべき等設計によってこの問題を回避できます。

実際の戦闘

完全なコードはgithubにアップロードされています: https://github.com/asong2020/Golang_Dream/tree/master/code_demo/kafka_demo

プッシュメッセージの損失問題を解決する

これは主に次の 2 つの方法で解決されます。

  • この問題は、RequiredAcks モードを設定することで解決できます。 WaitForAll を選択すると、データが正常にプッシュされることが保証されますが、レイテンシに影響します。
  • 再試行メカニズムを導入し、再試行回数と再試行間隔を設定する

そこで、次のコードを記述します (クライアント作成部分を抽出します)。

  1. 関数 NewAsyncProducer() sarama.AsyncProducer {
  2. cfg := sarama.NewConfig()
  3. バージョン、エラー:= sarama.ParseKafkaVersion(VERSION)
  4. err != nil{の場合
  5. log.Fatal( "NewAsyncProducer の kafka バージョンの解析に失敗しました" 、 err.Error())
  6. ゼロを返す
  7. }
  8. cfg.Version = バージョン
  9. cfg.Producer.RequiredAcks = sarama.WaitForAll // 選択できる 3 つのモード
  10. cfg.Producer.Partitioner = sarama.NewHashPartitioner
  11. cfg.プロデューサー。 .Successes = trueを返します 
  12. cfg.プロデューサー。 .Errors = trueを返します 
  13. cfg.Producer.再試行。最大= 3 // 再試行を3回に設定
  14. cfg.Producer.Retry.Backoff = 100 *時間.ミリ秒
  15. cli、エラー:= sarama.NewAsyncProducer([]string{ADDR}、cfg)
  16. err != nil{の場合
  17. log.Fatal( "NewAsyncProducer が失敗しました" , err.Error())
  18. ゼロを返す
  19. }
  20. CLIを返す
  21. }

プルメッセージの損失問題を解決する

この解決策はかなり粗雑です。自動コミット モードを直接使用し、消費ごとにオフセットを手動でコミットします。しかし、重複消費の問題が発生します。ただし、べき等演算を使用すると簡単に解決できます。

コード例:

  1. func NewConsumerGroup(グループ文字列) sarama.ConsumerGroup {
  2. cfg := sarama.NewConfig()
  3. バージョン、エラー:= sarama.ParseKafkaVersion(VERSION)
  4. err != nil{の場合
  5. log.Fatal( "NewConsumerGroup の kafka バージョンの解析に失敗しました" 、 err.Error())
  6. ゼロを返す
  7. }
  8.  
  9. cfg.Version = バージョン
  10. cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
  11. cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
  12. cfg.Consumer.Offsets.Retry.Max = 3
  13. cfg.Consumer.Offsets.AutoCommit.Enable = true // 自動コミットを有効にするには、MarkMessage を手動で呼び出す必要があります。
  14. cfg.Consumer.Offsets.AutoCommit.Interval = 1 *時間// 間隔
  15. クライアント、エラー:= sarama.NewConsumerGroup([]string{ADDR}、グループ、cfg)
  16. err != nil の場合 {
  17. log.Fatal( "NewConsumerGroup が失敗しました" , err.Error())
  18. }
  19. 顧客を返す
  20. }

上記は主に ConsumerGroup 部分の作成に関するものです。注意深い読者は、ここで自動送信を使用していることに気付いたはずです。手動での送信はどうですか?これは、kafka ライブラリの特性が異なるためです。この自動送信は、送信するために MarkMessage() メソッドと組み合わせて使用​​する必要があります (質問のある友人はそれを練習したり、ソース コードを確認したりできます)。そうでない場合、消費ロジックを次のように記述する必要があるため、送信は失敗します。

  1. func (e EventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) エラー {
  2. msg := range の場合、claim.Messages() {
  3. var データ common.KafkaMsg
  4. エラーの場合:= json.Unmarshal(msg.Value, &data);エラー != ゼロ {
  5. errors.Newを返します( "メッセージのアンマーシャルに失敗しました。err は " + err.Error())
  6. }
  7. // データを操作し、代わりに印刷を使用する
  8. log.Print( "consumerClaimデータは " )
  9.  
  10. // メッセージが正常に処理されたら、処理済みとしてマークし、自動的に送信します
  11. session.MarkMessage(メッセージ、 "" )
  12. }
  13. ゼロを返す
  14. }

または、手動で送信する方法を直接使用して問題を解決することもできます。これには 2 つの手順のみが必要です。

ステップ 1: 自動送信をオフにする:

  1. consumerConfig.Consumer.Offsets.AutoCommit.Enable = false // 自動コミットを無効にして手動に変更します

ステップ 2: 消費ロジックに次のコードを追加します。手動送信モードでは、コミットする前にマークする必要もあります。

  1. session.MarkMessage(メッセージ、 "" )
  2. セッション.コミット()

完全なコードは GitHub からダウンロードして検証できます。

要約する

この記事では、主に次の 2 つの知識ポイントについて説明します。

Kafka はメッセージ損失を引き起こす可能性がある

データ損失を回避するために Go で Kafka を設定する方法

日常のビジネス開発では、多くの企業が分離のためにメッセージ キューを使用しています。そうなると注意を払わなければなりません。 Kafka をメッセージ キューとして使用しても、データが失われないことは保証されません。補償は手動で設定する必要があります。忘れないでください。そうしないと、別の P0 事故が発生します。

<<:  ウェブサイトを構築する前に、クラウドサーバーと仮想ホストの4つの違いを見てみましょう

>>:  クラウドコンピューティングはどのように進化するのでしょうか?

推薦する

ウェブマスターの経験:問題を見つけて解決できれば成功します

私は13年間のオンライン経験を持つ古いネットユーザーです。趣味でウェブマスターとして5、6年間働いて...

ReverseHosts-6ドル/4コア/2gメモリ/40g SSD/4Tトラフィック/サンディエゴ

サンディエゴ、データセンターはかなり良いですし、VPSもSSDを搭載していればかなり良いです! Re...

百度重量とは何ですか?メン先生が百度重量について詳しく説明します

Baidu ウェイトとは何ですか? Baidu ウェイトとは何ですか? これは、ここ数日ネットユーザ...

インフォアがサプライチェーンを改革、GT Nexus Digital Network を Infor Nexu に改名

業界特化型ビジネス クラウド ソフトウェアの大手プロバイダーである Infor は、データとインテリ...

マシンアイデンティティ危機を形作る5つのトレンド

[[352356]] DevOps、モバイル通信、クラウドコンピューティングの発展と進歩により、マシ...

Emlogウェブサイト構築プログラムは突然現れ、その機能的な利点は欠点を上回り、それが鍵となる

現在、特に独立系ブログサイト向けの無料ウェブサイト構築プログラムが数多くあります。WP、zblogな...

アマゾンとマイクロソフト、クラウドコンピューティング市場の独占調査に直面

メディア規制当局オブコムによると、アマゾンとマイクロソフトは英国のインターネット産業の70~80%を...

Baiduの外部リンクデータに基づいてウェブサイトの外部リンクを操作する方法

Baidu はついに外部リンク ツールのアップグレード版をリリースしました。このツールは自社サイトの...

クラウド コンピューティングと仮想化の関係は何ですか?

仮想化はクラウド コンピューティングをサポートする重要なテクノロジです。クラウド コンピューティング...

ウェブサイト上でユーザーを維持し、直帰を減らす方法

ウェブサイトがユーザーを維持し、直帰を減らし、コンバージョンを向上させる方法は、常に注目の話題です。...

推奨: AlphaRacks - 5.99 USD/4 コア/1 GB RAM/90 GB ハード ドライブ/3.5 TB トラフィック

Alpharacks は、DDOS 保護、OpenVZ ベースの VPS、サーバー レンタルを提供す...

仮想マシンに Windows 11 をインストールするにはどうすればいいですか?

[[418362]] [51CTO.com クイック翻訳]ほとんどの人にとって、通常の PC に W...

百度と韓国のSMエンターテインメントが戦略的提携を締結

新浪科技は5月8日正午、百度が本日、韓国のSMエンターテインメントと戦略的提携を締結したと正式に発表...

ウェブマスターネットワークからの毎日のレポート:百度はLBSの開発を計画、一方グーグル中国はモバイル広告に目を向ける

1. 百度の構造調整:事業分割によりLBS部門を設立Baidu の地図部門は最近、LBS (位置情報...

エッジコンピューティング、ネットワークのエッジでの大胆な探索

こんにちは、みんな。今日はエッジコンピューティングについて説明します。エッジ コンピューティングとは...