Kafka は、高いパフォーマンス、永続性、マルチコピー バックアップ、水平スケーラビリティを備えた分散メッセージ キューです。プロデューサーはキューにメッセージを書き込み、コンシューマーはキューからメッセージを取得してビジネス ロジックを実行します。一般的に、アーキテクチャ設計では、デカップリング、ピークシェービング、非同期処理の役割を果たします。 Kafka はトピックの概念を外部的に使用します。プロデューサーはトピックにメッセージを書き込み、コンシューマーはトピックからメッセージを読み取ります。 水平方向の拡張を実現するために、トピックは実際には複数のパーティションで構成されます。ボトルネックが発生した場合は、パーティションの数を増やすことで水平拡張を行うことができます。メッセージの順序は単一のパーティション内で保証されます。 新しいメッセージが書き込まれるたびに、Kafka はそれを対応するファイルに追加するので、パフォーマンスが非常に高くなります。 Kafka の全体的なデータフローは次のとおりです。 一般的な使用方法は、プロデューサーがブローカー内の指定されたトピックにメッセージを書き込み、コンシューマーがブローカーから指定されたトピックからメッセージをプルして、ビジネス処理を実行することです。 図には 2 つのトピックがあります。 Topic0 には 2 つのパーティションがあり、Topic1 には 1 つのパーティションがあり、3 つのコピーがバックアップされます。 Consumer Gourp1 内の Consumer2 は処理用のパーティションに割り当てられていないことがわかります。これは可能です。以下で説明します。 ZK は、ブローカー、トピック、パーティションに関するメタデータを保存するために使用されます。 ZK は監視とルーティングにも使用されます。 生産 基本的なプロセスは次のとおりです。 レコードを作成します。レコードでは、対応するトピックと値を指定する必要があります。キーとパーティションはオプションです。 まずシリアル化し、次にトピックとパーティションに従って対応する送信キューに入れます。 Kafka Produce はバッチ要求であり、バッチを蓄積してまとめて送信します。 send() を呼び出した後、すぐにネットワーク パケットが送信されるわけではありません。 パーティションがいっぱいでない場合、状況は次のようになります。
同じパーティションに送信されるこれらの要求は、構成に従ってウェーブに蓄積され、別のスレッドによって一度に送信されます。 API オフセットやルーティングなど、多くのことを実行する高レベル API があり、非常に使いやすいです。 Simple API もあり、Offset などを自分で記録する必要があります。 (注: メッセージを消費するときは、まず消費場所 (ルート) を知っておく必要があります。消費後は、消費順序 (オフセット) を記録する必要があります) パーティション 複数のコピーがある場合は、異なるブローカーに配布するように努めます。 Kafka はパーティションのリーダーを選択します。その後、パーティションに対するすべてのリクエストはリーダーによって実際に操作され、他のフォロワーに同期されます。 ブローカーが機能を停止すると、そのブローカー上のすべてのリーダー パーティションがリーダーを再選出します。 (これは、コピー数を維持するために自動的にレプリケーションが実行される分散ファイルストレージシステムとは異なります) ここで関係する詳細が 2 つあります。
パーティションの割り当てとリーダーの選出に関しては、必ず実行者が存在しなければなりません。 Kafka では、このエグゼキューターはコントローラーと呼ばれます。 Kafka は、パーティションの割り当てとリーダーの選出のためにブローカー内のコントローラーを選択するために ZK を使用します。 パーティションの割り当て:
リーダー災害復旧 コントローラーは、ブローカーがダウンしたかどうかを認識できるように、ZK の /brokers/ids ノードにウォッチを登録します。 ブローカーがダウンすると、コントローラーは影響を受けるパーティションの新しいリーダーを選択します。 コントローラーは、ZK の /brokers/topics/[topic]/partitions/[partition]/state から対応するパーティションの ISR (同期レプリカ) リストを読み取り、リーダーとなるものを 1 つ選択します。 リーダーが選出された後、ZK が更新され、影響を受けるブローカーに変更を通知するために LeaderAndISRRequest が送信されます。 ここで ZK 通知を使用せず、代わりに RPC リクエストをブローカーに直接送信するのはなぜでしょうか?私の理解では、これにより ZK のパフォーマンスに問題が発生する可能性があります。 ISR リストが空の場合、構成に基づいてレプリカがリーダーとしてランダムに選択されるか、パーティションが単に無効になります。 ISR リストにマシンがあっても、そのマシンもダウンしている場合は、ISR マシンが復旧するまで待つことができます。 複数のレプリカの同期 ここでの戦略は、サーバー側の処理として、フォロワーがリーダーからデータを一括して取得して同期するというものです。ただし、具体的な信頼性は製造元によって決まります。 プロデューサーがメッセージを生成すると、データの信頼性は request.required.acks パラメータを通じて設定されます。 Acks=-1 の場合、ISR が min.insync.replicas で指定された数より小さいと、使用不可が返されます。 ISR リスト内のマシンが変更されます。 replica.lag.time.max.ms の設定に従って、一定期間同期が行われないと ISR リストから削除されます。 以前は、ノードは遅れているメッセージの数に基づいて ISR から追い出されていましたが、この値を取得するのが難しく、ピーク時にノードが ISR リストに頻繁に出入りすることが容易であったため、バージョン 1.0 以降は削除されました。 ISA からリーダーが選択されると、フォロワーは自身のログ内の前回の最高水準点以降のレコードを削除し、リーダーに移動して新しいデータを取得します。 新しいリーダーが選出された後、フォロワーのデータが新しいリーダーのデータより多くなる可能性があるため、それを傍受する必要があります。 ここでの最高水準点は、パーティションとリーダーの場合、すべての ISR の最後のレコードを意味します。消費者は最高水準点までしか読み取ることができません。 リーダーの観点から見ると、ハイウォーターマークの更新は 1 ラウンド遅れることになります。たとえば、新しいメッセージが書き込まれると、ISR 内のブローカーはそれをフェッチしますが、ISR 内のブローカーは次のフェッチラウンドでリーダーにのみ通知できます。 まさにこの最高水準点の遅延が原因で、場合によっては Kafka でデータ損失やプライマリ データとスタンバイ データ間の不整合が発生する可能性があります。 0.11 以降では、ハイ ウォーター マークの代わりにリーダー エポックが使用されます。 考えてみましょう: Acks=-1 の場合
消費 トピックのサブスクリプションはコンシューマー グループに基づいており、コンシューマー グループには複数のコンシューマーを含めることができます。同じコンシューマー グループ内の 2 人のコンシューマーが同時にパーティションを消費することはありません。 つまり、パーティションはコンシューマー グループ内の 1 つのコンシューマーによってのみ消費されますが、複数のコンシューマー グループによって同時に消費される可能性があります。 したがって、コンシューマー グループ内のコンシューマーの数がパーティションの数より多い場合、一部のコンシューマーは常にアイドル状態になります。 API トピックをサブスクライブするときに、正規表現を使用できます。新しいトピックが一致すると、自動的に購読されます。 オフセット保存 コンシューマー グループがパーティションを消費する場合、消費場所を記録するためにオフセット レコードを保存する必要があります。以前は、これは ZK に保存されていました。 ZK の書き込みパフォーマンスが低いため、以前のソリューションでは、コンシューマーが 1 分ごとにレポートしていました。 ここで、ZK のパフォーマンスは消費速度に重大な影響を及ぼし、繰り返し消費が発生する可能性が非常に高くなります。バージョン 0.10 以降、Kafka はこのオフセットのストレージを ZK から分離し、consumeroffsets topic と呼ばれるトピックに保存します。 メッセージに書き込まれるキーは、Groupid、Topic、Partition で構成され、値はオフセットです。トピックに設定されているクリーンアップ戦略は Compact です。常に最適なキーを保持し、残りは削除します。 一般に、各キーのオフセットはメモリにキャッシュされるため、クエリを実行するときにパーティションをトラバースする必要はありません。キャッシュがない場合、パーティションが最初に走査されてキャッシュが作成され、その後クエリが返されます。 Consumer Group のオフセット情報が書き込まれる consumer_offsets のパーティションを決定します。具体的な計算式は以下のとおりです。
考えてみてください。実行中のサービスが offsets.topic.num.partitions を変更すると、Offset のストレージが台無しになるでしょうか? パーティションの割り当て - 再バランス 生成プロセス中、ブローカーはパーティションを割り当てる必要があり、消費プロセス中、パーティションはコンシューマーにも割り当てられる必要があります。 ブローカーがコントローラーを選択するのと同じように、コンシューマーもブローカーからコーディネーターを選択してパーティションを割り当てます。 上から下への説明は以下の通りです。
① コーディネーターを選択: オフセットが保存されているパーティションを確認します。パーティション リーダーが配置されているブローカーが選択されたコーディネーターです。 ここで、コンシューマー グループのコーディネーターとコンシューマー グループ オフセットを格納するパーティション リーダーが同じマシンであることがわかります。 ②インタラクションプロセス:コーディネーターを選択したら、割り当てます。全体のプロセスは次のとおりです。
ブローカーは、上記の方法に従って、このコンシューマーに対応するコーディネーターのアドレスを選択します。
結果が成功した場合、コンシューマーは最後に割り当てられたパーティションから実行を続行します。 ③リバランスプロセス:
パーティションまたはコンシューマーの数が変更された場合は、再バランス調整を実行する必要があります。 リバランスが発生する可能性がある状況は次のとおりです。
メッセージ配信セマンティクス Kafka は 3 つのメッセージ配信セマンティクスをサポートしています。
ビジネスでは、「少なくとも 1 回」モデルがよく使用されます。再入可能性が必要な場合は、通常、企業が自らそれを実装します。 少なくとも1回 最初にデータを取得し、次にビジネスを処理し、ビジネス処理が成功した後にオフセットをコミットします。
最大1回 まずデータを取得し、次にオフセットをコミットし、最後にビジネス処理を実行します。
正確に1回 まずメッセージが失われないようにし、次にメッセージが繰り返されないようにするというのが目的です。それでは、「少なくとも一度は」の理由に注目してみましょう。 最初に思いついたのは:
Kafka はビジネス インターフェースがべき等であるかどうかを保証できないため、ここで Kafka が提供する Exactly once は制限されており、コンシューマーのダウンストリームも Kafka である必要があります。 したがって、以下の説明では、特に指定がない限り、コンシューマーの下流システムは Kafka です (注: 一部のシステムを適応させて Exactly once を実現する Kafka Conector が使用されます)。プロデューサーの冪等性は実装が簡単で、問題はありません。 重複消費を解決するには 2 つの方法があります。
本来であれば、ポイント 1 を 1 回だけ達成すれば問題ありません。ただし、使用シナリオによっては、データ ソースが複数のトピックであり、処理されて複数のトピックに出力されることがあります。この場合、出力が完全に成功するか、完全に失敗するかのいずれかになることを期待します。これにはトランザクション性が必要です。 トランザクションを実行する必要があるため、ルートからの重複消費の問題を解決し、コミット オフセットと他のトピックへの出力を 1 つのトランザクションにバインドできます。 生成冪等性 アイデアとしては、各プロデューサーにプロデューサーの一意の識別子として Pid を割り当てることです。 プロデューサーはそれぞれに対して単調に増加する Seq を維持します。同様に、ブローカーはそれぞれに対して *** Seq も記録します。 ブローカーは、req_seq == broker_seq+1 の場合にのみメッセージを受け入れます。理由は次の通りです。
トランザクション/アトミックブロードキャスト シナリオは次のとおりです。
ポイント 2 と 3 は、完全に成功するか完全に失敗するトランザクションと見なされます。これは、オフセットが実際には特別なトピックに保存されるためです。これら 2 つのポイントは、複数のトピックを書き込むトランザクション処理に統合されます。 基本的な考え方は次のとおりです。
つまり、どのプロデューサーもこの Tid を使用してトランザクションを実行できるため、途中で終了したトランザクションは別のプロデューサーによって回復できます。
クラスターには複数のトランザクション コーディネーターが存在し、各 Tid は一意のトランザクション コーディネーターに対応します。 注: トランザクション ログの削除戦略は Compact です。完了したトランザクションは Null としてマークされ、Compact 後に保持されません。 トランザクションを実行するときは、まずトランザクションをオープンとしてマークし、データを書き込み、すべてが成功した場合は、トランザクション ログに Prepare Commit ステータスとして記録し、それ以外の場合は Prepare Abort ステータスとして書き込みます。 次に、関連する各パーティションにマーカー (コミットまたは中止) メッセージを書き込み、このトランザクションのメッセージを読み取り可能または破棄済みとしてマークします。成功すると、コミット/中止ステータスがトランザクション ログに記録され、トランザクションが終了します。 データフロー:
メッセージのエポックが現在維持されているエポックよりも小さい場合、そのメッセージは拒否されます。 TidとPidは1対1に対応しているので、同じTidに対しては同じPidが返されます。
クライアントは関連するパーティションにデータを出力します。次に、クライアントはトランザクションコーディネーターにオフセットのトランザクションステータスを記録するように要求します。クライアントは、対応するオフセット パーティションにオフセット コミットを送信します。
すべてが成功すると、コミット/中止ステータスが記録されます。このレコードは、Prepare が失われない限り最終的な正確性が保証されるため、他のレプリカの ACK を待つ必要はありません。 ここでの準備ステータスは、主にトランザクションの回復に使用されます。たとえば、制御メッセージが該当するパーティションに送信され、完了する前にマシンがクラッシュした場合、スタンバイマシンが起動した後、プロデューサーが Pid を取得する要求を送信すると、未完了のトランザクションが完了します。 コミット マーカーがパーティションに書き込まれると、関連するメッセージを読み取ることができます。したがって、Kafka トランザクションの Prepare Commit から Commit までの期間では、メッセージは同時に表示されるのではなく、徐々に表示されます。 消費者問題 上記はすべて制作の観点から見たものです。消費の観点からもいくつかの問題を考慮する必要があります。 消費中、パーティション内にはコミットされていない状態、つまりビジネス側では表示されないメッセージがいくつかあります。これらのメッセージは、ビジネス側で表示されないようにフィルタリングする必要があります。 Kafka は、主にパフォーマンスを考慮して、ブローカーでフィルタリングするのではなく、コンシューマー プロセスでこれを行うことを選択します。 Kafka の高パフォーマンスの重要なポイントはゼロコピーです。ブローカーでフィルタリングが必要な場合は、メッセージの内容をメモリに読み込む必要があり、ゼロ コピー機能が失われます。 ファイル構成 Kafka のデータは、実際にはファイル システムにファイルの形式で保存されます。トピックの下にパーティションがあり、パーティションの下にセグメントがあります。セグメントは実際のファイルですが、トピックとパーティションは抽象的な概念です。 実際のログ ファイル (つまり、セグメント) と対応するインデックス ファイルは、ディレクトリ /partitionid}/ に保存されます。 各セグメント ファイルのサイズは同じで、ファイル名はセグメント内の最小のオフセットに基づいて命名されます。ファイル拡張子は.logです。セグメントに対応するインデックスのファイル名は同じで、拡張子は .index です。 インデックス ファイルは 2 つあります。
全体的な構成は次のとおりです。 インデックス ファイルのサイズを縮小し、スペースの使用量を減らし、メモリへの直接読み込みを容易にするために、ここでのインデックスではスパース マトリックスを使用します。各メッセージの特定の場所を記録する代わりに、一定のバイト数ごとにインデックスが作成されます。 インデックスは 2 つの部分で構成されます。
オフセットに対応するレコードを検索する場合、まずバイナリ検索を使用して、対応するオフセットがどのセグメントにあるかを調べます。次に、インデックスを使用してセグメント内のオフセットのおおよその位置を特定し、メッセージを走査します。 一般的な設定項目 ブローカー構成 トピックの設定 ログのクリーニングに関しては、デフォルトでは、現在書き込まれているログはクリーンアップされません。 0.10 より前のバージョンでは、時間はログ ファイルの Mtime に基づいていますが、この値は不正確です。ファイルが変更されると、Mtime が変更される場合があります。したがって、バージョン 0.10 以降では、ファイルが最後に送信された時刻を使用して時間が決定されます。 サイズ別にクリーンアップします。ここで注目すべき点は、Kafka がスケジュールされたタスク内の現在のログ ボリュームの合計サイズを比較して、少なくとも 1 つのログ セグメントのしきい値を超えているかどうかを確認しようとしていることです。 1 つのログ セグメントを超えているが、超えていない場合は削除されません。 著者: 鄭潔文 紹介:Tencent Cloud Storage、シニアバックエンドエンジニア。2014年に卒業後、Tencentに入社し、付加価値ビジネス開発とTencent Cloud Storage開発に従事。当社は、ビジネスおよび技術プラットフォームのバックエンド アーキテクチャの設計について、徹底的な調査と実践を行っています。彼は、アーキテクチャの大規模な同時実行性、高可用性、スケーラビリティに関して豊富な経験を持ち、現在は分散およびストレージ分野に重点を置いています。 |
<<: 分散トレーニング入門: PyTorch を使用してマルチ GPU 分散トレーニングを実装する方法
>>: UiPathはシリーズDの資金調達で5億6800万ドルを獲得し、評価額は70億ドルを超える
国家の「二会期」はちょうど終了しました。 IT 実務家として、今回提案された「新しいインフラ」に期待...
Baiduには自社製品が多すぎますが、その中でも複数のキーワードで長い間1位を占めているのが、Bai...
序文ディープラーニングは機械学習と人工知能研究の人気の分野であり、今日最も人気のある科学研究トレンド...
クラウド コンピューティング インフラストラクチャは、内部システムとパブリック クラウド間のソフトウ...
今日は役に立つ情報がないのでどうやって更新するか悩んでいたので、Cyyzaid の VPS (rfc...
多くの人が初めて SEO を学ぶとき、インターネット上のさまざまなチュートリアル、特に Baidu ...
ウェブサイトのランキングに影響を与える主な問題は何だと思いますか?キーワードの密度、コンテンツの更新...
クラウド テクノロジーの急速な発展に伴い、企業がクラウド コンピューティングを通じてデジタル変革を実...
近年、WeChatマーケティングは急速に成長しており、短期間でWeChat販売を通じて人気商品を生み...
この会議では、政府、企業、大学、業界の専門家を招き、マルチエージェントや画像認識などの人工知能の重要...
インターネット マーケティングに参入する業界が増えるにつれて、アウトソーシングされた SEO サービ...
以前、「50kvm-ロサンゼルス/C3データセンター/3 USD/1gメモリ/30gハードドライブ/...
ランキングのために生まれ、ランキングのために死ぬ。この文章は医療業界における SEO を説明するのに...
格安サーバーを販売するアメリカの会社reprisehostingが、正式にAlipay決済を導入した...
[編集者注]: これは、UI インターフェースの改訂によってもたらされた驚くべき改善点について Ch...