Kafka ソースコード分析とブローカーエンドのグラフィック原理

Kafka ソースコード分析とブローカーエンドのグラフィック原理

[[277321]]

まず、Kafka がトピックを作成する方法から始めましょう。

  1. kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic テスト 

いくつかのパラメータがあります:

  • --zookeeper: 飼育係のアドレス
  • --replication-factor: レプリケーション係数
  • --partitions: パーティションの数 (デフォルトは 1)
  • --topic: トピック名

2. パーティショニングとは何か

トピックには複数のパーティションがあり、各パーティション内のメッセージは異なります。パーティションによりスループットは向上しますが、パーティションの数が多いほどパフォーマンスは向上します。通常、パーティションの数は、Kafka クラスター内のマシンの数を超えてはなりません。パーティションの数が増えるほど、占有するメモリとファイル ハンドルも増えます。一般的なパーティションは 3 ~ 10 に設定されています。たとえば、クラスター内に 3 台のマシンがあり、図に示すように 2 つのパーティションを持つ test という名前のトピックを作成するとします。


パーティションは順序付けられた不変のレコード セットであり、ログ ファイルに継続的に追加されます。パーティション内の各メッセージには、オフセットである ID が割り当てられます。オフセットはパーティション内のレコードをマークするために使用されます。ここでは、公式ウェブサイトの図を使用しますが、うまく描けていません。


2.1 プロデューサーとパーティションの関係

図に示す状況では、プロデューサー側はどのパーティションに MQ を割り当てますか?これも、前のセクションで説明したパラメータpartitioner.classです。デフォルトのパーティショナー プロセスは、キーがある場合は、murmur2 アルゴリズムを使用してキー ハッシュ値を計算し、パーティションの合計の係数を取得してパーティション番号を計算し、キーがない場合はポーリングします。 (org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition)。もちろん、org.apache.kafka.clients.producer.Partitioner インターフェースを実装する限り、パーティショニング戦略をカスタマイズすることもできます。

  1. /**
  2. *指定されたレコードパーティションを計算します。
  3. *
  4. * @param topic トピック 
  5. * @param keyキー  または ない場合はnull  
  6. * @param keyBytes シリアル化されたキー  または ない場合はnull  
  7. * @param valueパーティションする または ヌル 
  8. * @param valueBytesパーティションするシリアル化され または ヌル 
  9. * @param cluster現在のクラスタメタデータ
  10. */
  11. 公共  intパーティション(文字列トピック、オブジェクトキー、byte[] keyBytes、オブジェクト値、byte[] valueBytes、クラスタークラスター) {
  12. List<PartitionInfo> パーティション = cluster.partitionsForTopic(topic);
  13. int numPartitions = パーティション数。サイズ();
  14. キーバイトがnull場合
  15. int nextValue = nextValue(トピック);
  16. リスト<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
  17. 利用可能なパーティションのサイズが0 より大きい場合
  18. int part = Utils.toPositive(nextValue) % 利用可能なパーティション。サイズ();
  19. availablePartitions.get(part).partition()を返します
  20. }それ以外{
  21. // 利用可能なパーティションがない場合は、利用できないパーティションを指定してください
  22. Utils.toPositive(nextValue) % numPartitions を返します
  23. }
  24. }それ以外{
  25. //パーティションを選択するためにキーバイトをハッシュする
  26. Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions を返します
  27. }
  28. }

2.2 消費者とパーティションの関係

まず、公式 Web サイトのコンシューマー グループの定義を見てみましょう。コンシューマーは自身にコンシューマー グループ名を付け、トピックに公開される各レコードは、サブスクライブしている各コンシューマー グループ内の 1 つのコンシューマー インスタンスに配信されます。

翻訳: コンシューマーはコンシューマー グループ名を使用して自分自身をマークし、トピックのメッセージは、それをサブスクライブしているコンシューマー グループのコンシューマー インスタンスに送信されます。

コンシューマ グループは、高いスケーラビリティと高いフォールト トレランスを実現するために使用されるコンシューマ メカニズムです。コンシューマーがクラッシュするか、新しいコンシューマーが追加されると、コンシューマー グループのバランスが再調整されます。リバランスのメカニズムについては、消費者のセクションで詳しく説明するため、このセクションでは説明しません。次に、上の図に従って消費者側を描き続けます。


これは最良のケースであり、2 つのパーティションが 1 つのグループ内の 2 人のコンシューマーに対応します。考えてみてください。コンシューマー グループ内のコンシューマーの数がパーティションの数より多い場合はどうなるでしょうか?それともパーティション数より少ないですか?

コンシューマー グループ内のコンシューマーの数がパーティションの数より多い場合、余分なコンシューマーは無駄になり、メッセージを消費できなくなります。

コンシューマー グループ内のコンシューマーの数がパーティションの数より少ない場合、対応するコンシューマー パーティション割り当て戦略が存在します。 1 つは Range (デフォルト)、もう 1 つは RoundRobin (ポーリング) ですが、もちろん戦略をカスタマイズすることもできます。実際、考え方は同じで、各コンシューマーが負荷分散作業を実行できます。詳細については消費者向けセクションで説明されるため、ここでは説明しません。

推奨事項: パーティションの数をコンシューマーの数の整数倍になるように設定します。

3. コピーとISR設計

3.1 レプリ​​カとは何ですか?

トピックを作成するときに、コピーの数を設定するための --replication-factor というパラメータがあります。 Kafka は、システムの高可用性を維持するために、複数の同一バックアップを使用します。これらのバックアップは、Kafka ではレプリカと呼ばれます。コピーは 3 つのカテゴリに分かれています。

  • リーダーレプリカ: プロデューサー側からの読み取りおよび書き込み要求に応答します
  • フォロワー レプリカ: リーダー レプリカのデータをバックアップし、プロデューサー側の読み取りおよび書き込み要求には応答しません。
  • ISR レプリカ セット: リーダー レプリカ 1 つとフォロワー レプリカすべてが含まれます (フォロワー レプリカが存在しない場合もあります)

Kafka は、すべてのレプリカを kafka クラスター内のすべてのブローカーに均等に配布し、これらのレプリカから 1 つをリーダー レプリカとして選択し、その他はフォロワー レプリカになります。リーダー コピーが配置されているブローカーがダウンした場合、フォロワー コピーの 1 つがリーダー コピーになります。リーダー コピーはプロデューサーからの読み取りおよび書き込み要求を受信しますが、フォロワー コピーはリーダー コピーからのデータのみを要求し、読み取りおよび書き込み要求は受信しません。


3.2 レプリカ同期メカニズム

前述のように、ISR は同期されたレプリカのセットを動的に維持し、リーダー レプリカは常に ISR セットに含まれます。 ISR 内のレプリカのみがリーダー レプリカとして選出される資格があります。プロデューサー側の ack パラメータが all (-1) に設定されている場合、プロデューサーによって書き込まれた MQ は、送信されたと見なされる前に、ISR のすべてのコピーによって受信される必要があります。もちろん、前のセクションで述べたように、望ましい効果を得るには、ブローカー側で ack パラメータを min.insync.replicas (デフォルトは 1) パラメータと一緒に使用する必要があります。このパラメータは、書き込みが成功したと見なされるために ISR に書き込まれるレプリカの数を制御します。 ISR 内のレプリカの数が min.insync.replicas より少ない場合、クライアントは例外 org.apache.kafka.common.errors.NotEnoughReplicasException: 同期レプリカが必要な数より少ないため、メッセージが拒否されます。を返します。

レプリカ同期メカニズムを理解するには、いくつかの用語を学ぶ必要があります。

  • ハイ ウォーターマーク: レプリカのハイ ウォーターマーク値 (HW と呼ばれます)。 HW 未満のメッセージは「バックアップ」されているとみなされます。 HWも次のメッセージを指しています!リーダー レプリカの HW 値によって、コンシューマーがポーリングできるメッセージの数が決まります。コンシューマーは HW 値未満のメッセージのみを消費できます。
  • LEO: ログ終了オフセット、次のメッセージの変位。これは、LEO が指している場所に情報がないことを意味します。
  • リモート LEO: 厳密に言えば、これはコレクションです。リーダー コピーが配置されているブローカーは、対応するパーティション情報を格納するために、メモリ内に Partition オブジェクトを保持します。このパーティションは、パーティションのすべてのレプリカ オブジェクトを格納するレプリカ リストを維持します。リーダー レプリカ コピーを除き、リスト内の他のレプリカ オブジェクトの LEO はリモート LEO と呼ばれます。

以下は実際の例です(この例については、Hu Xi のブログを参照してください)。この例のトピックは単一のパーティションであり、レプリケーション係数は 2 です。つまり、リーダー レプリカが 1 つとフォロワー レプリカが 1 つあり、ISR にはこれら 2 つのレプリカ セットが含まれています。まず、プロデューサーがメッセージを送信したときにリーダー/フォロワー ブローカーのレプリカ オブジェクトに何が起こるか、およびパーティション HW がどのように更新されるかを見てみましょう。まず、初期状態:


このとき、プロデューサーはトピック パーティションにメッセージを送信します。この時点での状態は下図のようになります。


上の図に示すように、プロデューサーがメッセージを正常に送信すると (acks=1 と仮定し、リーダーは書き込みが成功した後に戻ります)、フォロワーは新しい FECTH 要求を送信し、引き続き fetchOffset = 0 でデータを要求します。前回との違いは、今回は読み取るデータがあることです。そのため、全体のプロセスは次のようになります。


明らかに、リーダーとフォロワーの両方がメッセージを変位 0 で保存しましたが、両側の HW 値は更新されていません。次の図に示すように、次の FETCH 要求処理ラウンドで更新する必要があります。


簡単に説明すると、FETCH 要求の 2 回目のラウンドで、フォロワーは fetchOffset = 1 の FETCH 要求を送信します。これは、fetchOffset = 0 のメッセージがフォロワーのローカル ログに正常に書き込まれたためであり、今回は要求 fetchOffset = 1 のデータです。 FETCH 要求を受信した後、リーダー ブローカーは最初に他のレプリカの LEO 値を更新します。つまり、リモート LEO を 1 に更新し、次にパーティション HW 値を 1 に更新します。具体的な更新ルールについては、上記の説明を参照してください。これを実行した後、現在のパーティションHW値(1)がFETCH応答にカプセル化され、フォロワーに送信されます。 FETCH 応答を受信した後、フォロワー ブローカーはそこから現在のパーティション HW 値 1 を抽出し、それを自身の LEO 値と比較して、自身の HW 値を 1 に更新します。この時点で、完全な HW および LEO 更新サイクルが完了します。

3.3 ISRメンテナンス

バージョン 0.9.0.0 以降では、レプリカを ISR セットに含めるかどうかを決定するパラメーターは replica.lag.time.max.ms の 1 つだけです。このパラメータのデフォルト値は 10 秒です。つまり、フォロワー レプリカがリーダー レプリカに応答するのに 10 秒以上かかる場合、Kafka はレプリカが遠くへ行ってしまったと見なし、同期レプリカのリストからそれを削除します。

4. ログ設計

Kafka 内の各トピックは互いに分離されています。各トピックには 1 つ以上のパーティションがあり、各パーティションにはメッセージ データを記録するログ ファイルがあります。


図には、demo-topic というトピックがあります。このトピックには 8 つのパーティションがあり、各パーティションには [topic-partition] という名前のメッセージ ログ ファイルがあります。パーティション ログ ファイルには、プレフィックスは同じだがファイル タイプが異なる複数のファイルが表示されます。たとえば、図の 3 つのファイルは (00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000000.log) です。これは LogSegment と呼ばれます。

4.1 ログセグメント

テスト環境の具体的な例として、ALC.ASSET.EQUITY.SUBJECT.CHANGE という名前のトピックを取り上げ、パーティション 0 のログ ファイルを見てみましょう。


各 LogSegment には、同じファイル名を持つファイルのコレクションが含まれています。ファイル名は20桁の固定の数字です。ファイル名が 000000000000000000000 の場合、現在の LogSegment の最初のメッセージのオフセットは 0 であることを意味します。ファイル名が 000000000000000000097 の場合、現在の LogSegment の最初のメッセージのオフセットは 97 であることを意味します。ログ ファイルにはさまざまなサフィックスがあり、.index、.timestamp、.log の 3 種類のファイルに注目できます。

  • .index: オフセットインデックスファイル
  • .timeindex: 時間インデックスファイル
  • .log: ログファイル
  • .snapshot: スナップショット ファイル
  • .swap: ログ圧縮後の一時ファイル

4.2 インデックスとログファイル

Kafka には 2 種類のインデックス ファイルがあります。 1 つ目はオフセット インデックス ファイルで、.index で終わるファイルです。 2 番目のタイプは、.timeindex で終わるタイムスタンプ インデックス ファイルです。

kafka-run-class.sh を使用して、オフセット インデックス ファイルの内容を表示できます。


各行のオフセット: xxx、位置: xxxx であることがわかります。両者の間には直接的な関係はありません。

  • オフセット: 相対オフセット
  • 位置: 物理アドレス

では、最初の行の offset: 12 position: 4423 はどういう意味でしょうか?これは、オフセットが 0 ~ 12 のメッセージの物理アドレスが 0 ~ 4423 であることを意味します。

同様に、2 行目の offset: 24 position: 8773 の意味も推測できます。これは、オフセットが 13 から 24 のメッセージの物理アドレスが 4424-8773 であることを意味します。

kafka-run-class.sh を使用して .log ファイルの内容を確認し、baseOffset と position の値に注目することができます。上記で述べたことと一致しているかどうかわかりますか?

[[277322]]

4.3 オフセットを使った検索方法

上記の例によると、オフセット60のメッセージを照会する方法は

まず、オフセットに応じて対応する LogSegment を見つけます。ここで0000000000000000000000.indexが見つかります

バイナリ検索を使用して、オフセットより大きくない最大のインデックス項目を見つけます。ここでオフセット: 24 位置: 8773 が見つかります

0000000000000000000000.log ファイルを開き、位置 8773 から順番にスキャンして、オフセット = 60 のメッセージを見つけます。


<<:  クラウド データベース移行の 10 の間違い

>>:  Javaの進化、クラウドネイティブ時代の変革

推薦する

datapacket - 専用サーバー、20G 帯域幅、無制限のトラフィック、世界中に 17 のデータセンター

2007 年に設立されたウェブ ホスティング会社である DataPacket は、超大容量の帯域幅と...

検索エンジンから人工知能への究極の進化

KK はドキュメンタリー「Google and the World Brain」の中で、Google...

hostodo-70% オフ (KVM)/Windows/2.4 USD/512 MB RAM/クアドラネット コンピュータ ルーム

hostodo.com では、すべての KVM 仮想 VPS を 30% 割引で提供しており、更新も...

Yinke がウェブサイトの最適化について語り、SEO 最適化を簡単にします (I)

私はウェブサイトの最適化に1年近く携わり、この間多くのSEOの依頼を引き受けてきました。残念ながら、...

ウェブサイトのインタラクションデザイン: ページングに関する簡単な説明とさまざまなページングフォームのケース分析

機能: 前のページまたは次のページにジャンプします。要素: 分類の基本要素は、前のページ + ページ...

大規模、中規模、小規模のウェブサイトに最適な運用 KPI/指標

ウェブサイト分析において、大量のデータを取得することは難しくありません。難しいのは、評価システムを確...

ブランド マーケティング: 製品のセールス ポイントを見つける 4 つのステップ!

誰もが同じ疑問を抱いています。「製品をもっとよく売るにはどうすればいいでしょうか?」より良い買い物を...

私は転職の準備をしているので、SEOについての意見を述べたいと思います

私はいわゆるSEOの専門職に2年間従事してきました。何も知らないいわゆる外部リンクの専門家としてスタ...

ウェブサイトを再設計する際に注意すべきこと

ウェブサイトの改訂の目的は、サイトの魅力を高め、訪問者を維持することです。もちろん、検索エンジンでの...

WeChatパブリックアカウントに接続するSogou WeChat検索が正式にリリースされました!

Sogou は WeChat と提携し、WeChat パブリック プラットフォームと記事検索 (we...

最高財務責任者 = 最高未来責任者?金融デジタル変革の時代が到来しました!

近年、さまざまな業界間の境界や障壁が徐々に曖昧になり、消えつつあります。新しいテクノロジー、新しいチ...

Vipshop は栄光にもかかわらず、どのような困難に直面しているのでしょうか?

最近の電子商取引のニュースに注目しているかどうかは分かりませんが、ジャック・マー氏がCainiao ...