Kafka ソースコード分析とブローカーエンドのグラフィック原理

Kafka ソースコード分析とブローカーエンドのグラフィック原理

[[277321]]

まず、Kafka がトピックを作成する方法から始めましょう。

  1. kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic テスト 

いくつかのパラメータがあります:

  • --zookeeper: 飼育係のアドレス
  • --replication-factor: レプリケーション係数
  • --partitions: パーティションの数 (デフォルトは 1)
  • --topic: トピック名

2. パーティショニングとは何か

トピックには複数のパーティションがあり、各パーティション内のメッセージは異なります。パーティションによりスループットは向上しますが、パーティションの数が多いほどパフォーマンスは向上します。通常、パーティションの数は、Kafka クラスター内のマシンの数を超えてはなりません。パーティションの数が増えるほど、占有するメモリとファイル ハンドルも増えます。一般的なパーティションは 3 ~ 10 に設定されています。たとえば、クラスター内に 3 台のマシンがあり、図に示すように 2 つのパーティションを持つ test という名前のトピックを作成するとします。


パーティションは順序付けられた不変のレコード セットであり、ログ ファイルに継続的に追加されます。パーティション内の各メッセージには、オフセットである ID が割り当てられます。オフセットはパーティション内のレコードをマークするために使用されます。ここでは、公式ウェブサイトの図を使用しますが、うまく描けていません。


2.1 プロデューサーとパーティションの関係

図に示す状況では、プロデューサー側はどのパーティションに MQ を割り当てますか?これも、前のセクションで説明したパラメータpartitioner.classです。デフォルトのパーティショナー プロセスは、キーがある場合は、murmur2 アルゴリズムを使用してキー ハッシュ値を計算し、パーティションの合計の係数を取得してパーティション番号を計算し、キーがない場合はポーリングします。 (org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition)。もちろん、org.apache.kafka.clients.producer.Partitioner インターフェースを実装する限り、パーティショニング戦略をカスタマイズすることもできます。

  1. /**
  2. *指定されたレコードパーティションを計算します。
  3. *
  4. * @param topic トピック 
  5. * @param keyキー  または ない場合はnull  
  6. * @param keyBytes シリアル化されたキー  または ない場合はnull  
  7. * @param valueパーティションする または ヌル 
  8. * @param valueBytesパーティションするシリアル化され または ヌル 
  9. * @param cluster現在のクラスタメタデータ
  10. */
  11. 公共  intパーティション(文字列トピック、オブジェクトキー、byte[] keyBytes、オブジェクト値、byte[] valueBytes、クラスタークラスター) {
  12. List<PartitionInfo> パーティション = cluster.partitionsForTopic(topic);
  13. int numPartitions = パーティション数。サイズ();
  14. キーバイトがnull場合
  15. int nextValue = nextValue(トピック);
  16. リスト<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
  17. 利用可能なパーティションのサイズが0 より大きい場合
  18. int part = Utils.toPositive(nextValue) % 利用可能なパーティション。サイズ();
  19. availablePartitions.get(part).partition()を返します
  20. }それ以外{
  21. // 利用可能なパーティションがない場合は、利用できないパーティションを指定してください
  22. Utils.toPositive(nextValue) % numPartitions を返します
  23. }
  24. }それ以外{
  25. //パーティションを選択するためにキーバイトをハッシュする
  26. Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions を返します
  27. }
  28. }

2.2 消費者とパーティションの関係

まず、公式 Web サイトのコンシューマー グループの定義を見てみましょう。コンシューマーは自身にコンシューマー グループ名を付け、トピックに公開される各レコードは、サブスクライブしている各コンシューマー グループ内の 1 つのコンシューマー インスタンスに配信されます。

翻訳: コンシューマーはコンシューマー グループ名を使用して自分自身をマークし、トピックのメッセージは、それをサブスクライブしているコンシューマー グループのコンシューマー インスタンスに送信されます。

コンシューマ グループは、高いスケーラビリティと高いフォールト トレランスを実現するために使用されるコンシューマ メカニズムです。コンシューマーがクラッシュするか、新しいコンシューマーが追加されると、コンシューマー グループのバランスが再調整されます。リバランスのメカニズムについては、消費者のセクションで詳しく説明するため、このセクションでは説明しません。次に、上の図に従って消費者側を描き続けます。


これは最良のケースであり、2 つのパーティションが 1 つのグループ内の 2 人のコンシューマーに対応します。考えてみてください。コンシューマー グループ内のコンシューマーの数がパーティションの数より多い場合はどうなるでしょうか?それともパーティション数より少ないですか?

コンシューマー グループ内のコンシューマーの数がパーティションの数より多い場合、余分なコンシューマーは無駄になり、メッセージを消費できなくなります。

コンシューマー グループ内のコンシューマーの数がパーティションの数より少ない場合、対応するコンシューマー パーティション割り当て戦略が存在します。 1 つは Range (デフォルト)、もう 1 つは RoundRobin (ポーリング) ですが、もちろん戦略をカスタマイズすることもできます。実際、考え方は同じで、各コンシューマーが負荷分散作業を実行できます。詳細については消費者向けセクションで説明されるため、ここでは説明しません。

推奨事項: パーティションの数をコンシューマーの数の整数倍になるように設定します。

3. コピーとISR設計

3.1 レプリ​​カとは何ですか?

トピックを作成するときに、コピーの数を設定するための --replication-factor というパラメータがあります。 Kafka は、システムの高可用性を維持するために、複数の同一バックアップを使用します。これらのバックアップは、Kafka ではレプリカと呼ばれます。コピーは 3 つのカテゴリに分かれています。

  • リーダーレプリカ: プロデューサー側からの読み取りおよび書き込み要求に応答します
  • フォロワー レプリカ: リーダー レプリカのデータをバックアップし、プロデューサー側の読み取りおよび書き込み要求には応答しません。
  • ISR レプリカ セット: リーダー レプリカ 1 つとフォロワー レプリカすべてが含まれます (フォロワー レプリカが存在しない場合もあります)

Kafka は、すべてのレプリカを kafka クラスター内のすべてのブローカーに均等に配布し、これらのレプリカから 1 つをリーダー レプリカとして選択し、その他はフォロワー レプリカになります。リーダー コピーが配置されているブローカーがダウンした場合、フォロワー コピーの 1 つがリーダー コピーになります。リーダー コピーはプロデューサーからの読み取りおよび書き込み要求を受信しますが、フォロワー コピーはリーダー コピーからのデータのみを要求し、読み取りおよび書き込み要求は受信しません。


3.2 レプリカ同期メカニズム

前述のように、ISR は同期されたレプリカのセットを動的に維持し、リーダー レプリカは常に ISR セットに含まれます。 ISR 内のレプリカのみがリーダー レプリカとして選出される資格があります。プロデューサー側の ack パラメータが all (-1) に設定されている場合、プロデューサーによって書き込まれた MQ は、送信されたと見なされる前に、ISR のすべてのコピーによって受信される必要があります。もちろん、前のセクションで述べたように、望ましい効果を得るには、ブローカー側で ack パラメータを min.insync.replicas (デフォルトは 1) パラメータと一緒に使用する必要があります。このパラメータは、書き込みが成功したと見なされるために ISR に書き込まれるレプリカの数を制御します。 ISR 内のレプリカの数が min.insync.replicas より少ない場合、クライアントは例外 org.apache.kafka.common.errors.NotEnoughReplicasException: 同期レプリカが必要な数より少ないため、メッセージが拒否されます。を返します。

レプリカ同期メカニズムを理解するには、いくつかの用語を学ぶ必要があります。

  • ハイ ウォーターマーク: レプリカのハイ ウォーターマーク値 (HW と呼ばれます)。 HW 未満のメッセージは「バックアップ」されているとみなされます。 HWも次のメッセージを指しています!リーダー レプリカの HW 値によって、コンシューマーがポーリングできるメッセージの数が決まります。コンシューマーは HW 値未満のメッセージのみを消費できます。
  • LEO: ログ終了オフセット、次のメッセージの変位。これは、LEO が指している場所に情報がないことを意味します。
  • リモート LEO: 厳密に言えば、これはコレクションです。リーダー コピーが配置されているブローカーは、対応するパーティション情報を格納するために、メモリ内に Partition オブジェクトを保持します。このパーティションは、パーティションのすべてのレプリカ オブジェクトを格納するレプリカ リストを維持します。リーダー レプリカ コピーを除き、リスト内の他のレプリカ オブジェクトの LEO はリモート LEO と呼ばれます。

以下は実際の例です(この例については、Hu Xi のブログを参照してください)。この例のトピックは単一のパーティションであり、レプリケーション係数は 2 です。つまり、リーダー レプリカが 1 つとフォロワー レプリカが 1 つあり、ISR にはこれら 2 つのレプリカ セットが含まれています。まず、プロデューサーがメッセージを送信したときにリーダー/フォロワー ブローカーのレプリカ オブジェクトに何が起こるか、およびパーティション HW がどのように更新されるかを見てみましょう。まず、初期状態:


このとき、プロデューサーはトピック パーティションにメッセージを送信します。この時点での状態は下図のようになります。


上の図に示すように、プロデューサーがメッセージを正常に送信すると (acks=1 と仮定し、リーダーは書き込みが成功した後に戻ります)、フォロワーは新しい FECTH 要求を送信し、引き続き fetchOffset = 0 でデータを要求します。前回との違いは、今回は読み取るデータがあることです。そのため、全体のプロセスは次のようになります。


明らかに、リーダーとフォロワーの両方がメッセージを変位 0 で保存しましたが、両側の HW 値は更新されていません。次の図に示すように、次の FETCH 要求処理ラウンドで更新する必要があります。


簡単に説明すると、FETCH 要求の 2 回目のラウンドで、フォロワーは fetchOffset = 1 の FETCH 要求を送信します。これは、fetchOffset = 0 のメッセージがフォロワーのローカル ログに正常に書き込まれたためであり、今回は要求 fetchOffset = 1 のデータです。 FETCH 要求を受信した後、リーダー ブローカーは最初に他のレプリカの LEO 値を更新します。つまり、リモート LEO を 1 に更新し、次にパーティション HW 値を 1 に更新します。具体的な更新ルールについては、上記の説明を参照してください。これを実行した後、現在のパーティションHW値(1)がFETCH応答にカプセル化され、フォロワーに送信されます。 FETCH 応答を受信した後、フォロワー ブローカーはそこから現在のパーティション HW 値 1 を抽出し、それを自身の LEO 値と比較して、自身の HW 値を 1 に更新します。この時点で、完全な HW および LEO 更新サイクルが完了します。

3.3 ISRメンテナンス

バージョン 0.9.0.0 以降では、レプリカを ISR セットに含めるかどうかを決定するパラメーターは replica.lag.time.max.ms の 1 つだけです。このパラメータのデフォルト値は 10 秒です。つまり、フォロワー レプリカがリーダー レプリカに応答するのに 10 秒以上かかる場合、Kafka はレプリカが遠くへ行ってしまったと見なし、同期レプリカのリストからそれを削除します。

4. ログ設計

Kafka 内の各トピックは互いに分離されています。各トピックには 1 つ以上のパーティションがあり、各パーティションにはメッセージ データを記録するログ ファイルがあります。


図には、demo-topic というトピックがあります。このトピックには 8 つのパーティションがあり、各パーティションには [topic-partition] という名前のメッセージ ログ ファイルがあります。パーティション ログ ファイルには、プレフィックスは同じだがファイル タイプが異なる複数のファイルが表示されます。たとえば、図の 3 つのファイルは (00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000000.log) です。これは LogSegment と呼ばれます。

4.1 ログセグメント

テスト環境の具体的な例として、ALC.ASSET.EQUITY.SUBJECT.CHANGE という名前のトピックを取り上げ、パーティション 0 のログ ファイルを見てみましょう。


各 LogSegment には、同じファイル名を持つファイルのコレクションが含まれています。ファイル名は20桁の固定の数字です。ファイル名が 000000000000000000000 の場合、現在の LogSegment の最初のメッセージのオフセットは 0 であることを意味します。ファイル名が 000000000000000000097 の場合、現在の LogSegment の最初のメッセージのオフセットは 97 であることを意味します。ログ ファイルにはさまざまなサフィックスがあり、.index、.timestamp、.log の 3 種類のファイルに注目できます。

  • .index: オフセットインデックスファイル
  • .timeindex: 時間インデックスファイル
  • .log: ログファイル
  • .snapshot: スナップショット ファイル
  • .swap: ログ圧縮後の一時ファイル

4.2 インデックスとログファイル

Kafka には 2 種類のインデックス ファイルがあります。 1 つ目はオフセット インデックス ファイルで、.index で終わるファイルです。 2 番目のタイプは、.timeindex で終わるタイムスタンプ インデックス ファイルです。

kafka-run-class.sh を使用して、オフセット インデックス ファイルの内容を表示できます。


各行のオフセット: xxx、位置: xxxx であることがわかります。両者の間には直接的な関係はありません。

  • オフセット: 相対オフセット
  • 位置: 物理アドレス

では、最初の行の offset: 12 position: 4423 はどういう意味でしょうか?これは、オフセットが 0 ~ 12 のメッセージの物理アドレスが 0 ~ 4423 であることを意味します。

同様に、2 行目の offset: 24 position: 8773 の意味も推測できます。これは、オフセットが 13 から 24 のメッセージの物理アドレスが 4424-8773 であることを意味します。

kafka-run-class.sh を使用して .log ファイルの内容を確認し、baseOffset と position の値に注目することができます。上記で述べたことと一致しているかどうかわかりますか?

[[277322]]

4.3 オフセットを使った検索方法

上記の例によると、オフセット60のメッセージを照会する方法は

まず、オフセットに応じて対応する LogSegment を見つけます。ここで0000000000000000000000.indexが見つかります

バイナリ検索を使用して、オフセットより大きくない最大のインデックス項目を見つけます。ここでオフセット: 24 位置: 8773 が見つかります

0000000000000000000000.log ファイルを開き、位置 8773 から順番にスキャンして、オフセット = 60 のメッセージを見つけます。


<<:  クラウド データベース移行の 10 の間違い

>>:  Javaの進化、クラウドネイティブ時代の変革

推薦する

hostmybytes - アジア向けに最適化された KVM VPS、512M、年間 9 ドル、Windows 搭載

Hostmybytes は、今月中旬に OpenVZ ベースの仮想 VPS を超低価格で開始しました...

V5Net: 香港サーバー(物理マシン)、20% 割引、342 元から、e3-1230/8g メモリ/240gSSD/15M 帯域幅/2IP

香港専用サーバーのプロモーション: v5server は現在、香港データセンターの国際 BGP 回線...

Baidu が再び SEO を取り締まります。SEO 担当者は準備できていますか?

今朝、いつものように百度を開いてランダムな単語を検索しました。すると突然、インターフェースがぎこちな...

文学ウェブサイトの発展における3つのボトルネックを突破する方法

この経済社会において、文学ウェブサイトは結局のところビジネスであり、文学ウェブサイトを運営する最終的...

Tencent Cloudはサーバーレスの「障害」を克服するための3つのソリューションを用意している

[51CTO.com からのオリジナル記事] サーバーレスは、イベント駆動型、ステートレス、メンテナ...

検索フレンドリーなデザインの基本原則について語るシャリ・サロウ

5月25日、厦門でグローバル検索エンジン戦略会議が開催されました。Grantastic Design...

エッジコンピューティングとその驚くべき応用

エッジコンピューティングとは何ですか? エッジ コンピューティングは、モバイル コンピューティングと...

さまざまなタイプのクエリを分析して、SERPでのクリック率を向上させます

ウェブ検索は、ユーザーに基づいて顧客に最高のサービスを提供する重要な方法の 1 つです。より良い結果...

クラウド コンピューティング管理のために習得する必要があるテクノロジーは何ですか?

クラウド コンピューティングでは、ローカル コンピューターやリモート サーバーではなく、多数の分散コ...

エンタープライズレベルのクラウドコンピューティング、クラウドコンピューティングの後半

はじめに:初期のクラウド コンピューティングのターゲット顧客は、主に中小企業と革新的な企業でした。ク...

50vm: 鎮江/貴州、専用サーバー 299 元/月、デュアルコア L5630/16g メモリ/20M 帯域幅

50vm は独立サーバーを推進しています。鎮江電信と貴州電信から選択できます。デュアルチャネル L5...

河南新郷市とファーウェイが共同でクラウドコンピューティングとビッグデータオープン協力会議を開催し、クラウド産業の発展を促進

2017年11月10日、深センで「新郷国家自主革新実証区とファーウェイ・新郷クラウドコンピューティン...

コミュニティの再理解:希望はあるが、冬は長い

月収10万元の起業の夢を実現するミニプログラム起業支援プランコンテンツ分野は高収益のトラフィックプー...