Kafka の設計原則を読んで忘れてしまい、もう一度読み返したことはありませんか?

Kafka の設計原則を読んで忘れてしまい、もう一度読み返したことはありませんか?

メッセージキューとは何ですか?簡単に言えば、メッセージ キューはメッセージを保存するためのコンテナーです。クライアントはメッセージ サーバーにメッセージを送信したり、メッセージ サーバーからメッセージを取得したりできます。

[[270989]]

画像はPexelsより

今日は、以下の問題について私の考えを述べたいと思います。

  • なぜメッセージングシステムが必要なのでしょうか?
  • Kafka のアーキテクチャ原則は何ですか?
  • Kafka はどのようにメッセージを保存しますか?
  • プロデューサーはどのようにメッセージを送信しますか?
  • Consumer はどのようにしてメッセージを消費しますか?
  • オフセットを保存するには?
  • メッセージング システムではどのような問題が発生する可能性がありますか?

なぜメッセージングシステムが必要なのでしょうか?

ピークシェービング

データベースの処理能力には限界があります。ピーク時には、多くのリクエストがバックグラウンドに落ちてしまいます。システムの処理能力を超えると、システムがクラッシュする可能性があります。

上図に示すように、システムの処理能力は 2k/s、MQ 処理能力は 8k/s、ピーク要求は 5k/s です。 MQ の処理能力はデータベースよりもはるかに優れています。ピーク時には、リクエストはまず MQ に蓄積され、システムは自身の処理能力に基づいて 2k/s の速度でこれらのリクエストを消費することができます。

この方法では、ピーク期間が終了すると、リクエスト レートは 100/秒のみになり、システムは MQ 内のリクエストのバックログを迅速に消費できます。

上記のリクエストは書き込みリクエストを指し、クエリ リクエストは通常​​キャッシュを通じて解決されることに注意してください。

デカップリング

次のシナリオでは、システム S はシステム A、B、C と密接に結合されています。要件の変更により、システム A は関連コードを変更し、システム S でも A 関連コードを調整する必要がありました。

数日後、C システムを削除する必要があり、S もそれに倣って C 関連のコードを削除します。数日後、D システムを追加する必要があり、S システムに D 関連のコードを追加する必要があります。数日後、プログラマーたちは発狂してしまいました...

これにより、システムが緊密に結合され、メンテナンスや拡張が容易ではなくなります。ここで MQ が導入されます。システム A が変更された場合、A は独自のコードを変更できます。システム C が削除された場合は、直接登録解除できます。システム D が追加されると、関連するメッセージをサブスクライブできるようになります。

このように、メッセージ ミドルウェアを導入することで、各システムは MQ と対話できるようになり、システム間の複雑な呼び出し関係を回避できます。

Kafka のアーキテクチャ原則は何ですか?

Kafka 関連の概念:

  • ブローカー: Kafka クラスターに含まれるサーバー。
  • プロデューサー: メッセージ プロデューサー。
  • コンシューマー: メッセージのコンシューマー。
  • コンシューマー グループ: 各コンシューマーはコンシューマー グループに属します。各メッセージは、コンシューマー グループ内の 1 つのコンシューマーによってのみ消費されますが、複数のコンシューマー グループによって消費されることもあります。
  • トピック: メッセージのカテゴリ。各メッセージはトピックに属し、異なるトピックは互いに独立しています。つまり、Kafka はトピック指向です。
  • パーティション: 各トピックは複数のパーティションに分割されます。パーティションは、Kafka によって割り当てられる単位です。 Kafka の物理的な概念はディレクトリに相当し、ディレクトリの下のログ ファイルがこのパーティションを構成します。
  • レプリカ: パーティションのコピー。パーティションの高可用性を保証します。
  • リーダー: レプリカにおける役割。プロデューサーとコンシューマーはリーダーとのみ対話します。
  • フォロワー: リーダーからデータをコピーするレプリカ内の役割。
  • コントローラー: Kafka クラスター内のサーバーのうちの 1 つ。リーダー選出やさまざまなフェイルオーバーに使用されます。
  • Zookeeper: Kafka は Zookeeper を使用してクラスターのメタ情報を保存します。

トピックとログ

メッセージはトピックごとに整理され、各トピックは複数のパーティション (server.properties/num.partitions に対応) に分割できます。

パーティションは、ディスクへのシーケンシャル書き込みに属するシーケンシャル追加ログです (ディスクへのシーケンシャル書き込みは、メモリへのランダム書き込みよりも効率的であり、Kafka のスループットを保証します)。

その構造は次のとおりです: server.properties/num.partitions は server.properties ファイル内の num.partitions 構成項目を表し、以下でも同様です。

パーティション内の各レコード (メッセージ) には、Offset、messageSize、Data の 3 つのプロパティが含まれます。

このうち、Offset はメッセージのオフセットを表します。 messageSize はメッセージのサイズを表します。データはメッセージの具体的な内容を表します。

パーティションはファイル形式でファイル システムに保存されます。場所は server.properties/log.dirs によって指定されます。命名規則は次のとおりです。 -

たとえば、トピック「page_visits」のメッセージは 5 つのパーティションに分割されており、そのディレクトリ構造は次のようになります。

パーティションは異なるブローカー上に配置できます。パーティションはセグメント化されており、各セグメントはセグメント ファイルです。

一般的なセグメント構成は次のとおりです。

  1. #サーバープロパティ
  2.  
  3. #セグメントファイルサイズ、デフォルトは1G
  4. ログセグメントバイト = 1024 * 1024 * 1024
  5. #新しいセグメントファイルのローリング生成の最大期間
  6. ログロール時間=24*7
  7. #セグメント ファイルが保持される最大時間。タイムアウトすると削除されます。
  8. ログ保持時間=24*7

パーティション ディレクトリには、データ ファイルとインデックス ファイルが含まれます。次の図は、パーティションのディレクトリ構造を示しています。

インデックスはスパース ストレージを使用します。メッセージごとにインデックスを作成するのではなく、インデックス ファイルが多くのスペースを占有しないように、特定のバイト数ごとにインデックスを作成します。

欠点は、インデックスなしのオフセットではメッセージの位置をすぐに特定できず、順次スキャンが必要になることですが、スキャン範囲は非常に狭くなります。

インデックスは、相対オフセットと位置という 2 つの部分 (どちらも 4 バイトの数値) で構成されます。

相対オフセットはセグメント ファイル内のオフセットを示し、位置はデータ ファイル内のメッセージの位置を示します。

概要: Kafka のメッセージ ストレージは、パーティション、ディスクの順次読み取りと書き込み、セグメンテーション (LogSegment)、およびスパース インデックスを使用して、高い効率を実現します。

パーティションとレプリカ

トピックは物理的に複数のパーティションに分割され、異なるブローカー上に配置されます。レプリカがない場合、ブローカーがダウンすると、そのブローカー上のすべてのパーティションが利用できなくなります。

各パーティションには複数のレプリカ (server.properties/default.replication.factor に対応) を含めることができ、それらは異なるブローカーに割り当てられます。

読み取りと書き込みを担当し、プロデューサーとコンシューマーからのリクエストを処理するリーダーがいます。他のメンバーはフォロワーとして機能し、リーダーからメッセージを取得し、リーダーと同期を保ちます。

パーティションとレプリカをブローカーに割り当てる方法は?手順は次のとおりです。

  • すべてのブローカー (合計で n 個のブローカーがあると仮定) と割り当てるパーティションを並べ替えます。
  • i 番目のパーティションを (i mod n) 番目のブローカーに割り当てます。
  • i番目のパーティションのj番目のレプリカを((i + j)モードn)番目のブローカーに割り当てます。

上記の割り当てルールによれば、レプリカの数がブローカーの数より多い場合、同じブローカーに必ず 2 つの同一のレプリカが割り当てられ、冗長性が生じます。したがって、レプリカの数はブローカーの数以下である必要があります。

リーダー選挙

Kafka は、Zookeeper (/brokers/topics/[topic]/partitions/[partition]/state) で ISR (同期レプリカ) を動的に維持します。

ISR 内のすべてのレプリカがリーダーに「追いつき」、コントローラーは ISR から 1 つをリーダーとして選択します。

具体的なプロセスは以下のとおりです。

  • コントローラーは、Zookeeper の /brokers/ids/[brokerId] ノードに Watcher を登録します。 Broker がダウンすると、Zookeeper は Watch を起動します。
  • コントローラーは、/brokers/ids ノードから利用可能なブローカーを読み取ります。
  • コントローラは、障害が発生したブローカー上のすべてのパーティションを含む set_p を決定します。
  • set_p 内の各パーティションについて、/brokers/topics/[topic]/partitions/[partition]/state ノードから ISR を読み取り、新しいリーダーを決定し、新しいリーダー、ISR、controller_epoch、および leader_epoch 情報を State ノードに書き込みます。
  • RPC 経由で LeaderAndISRRequest コマンドを関連するブローカーに送信します。

ISR が空の場合、レプリカ (必ずしも ISR メンバーではない) がリーダーとして選択されます。すべてのレプリカがダウンすると、いずれかのレプリカが復活してリーダーになります。

ISR (同期リスト) 内のフォロワーがリーダーに「追いつきました」。 「追いついた」というのは完全な一貫性を意味するものではありません。これは、server.properties/replica.lag.time.max.ms によって設定されます。

リーダーがフォロワーのメッセージの同期を待機する最大時間を示します。タイムアウトが発生した場合、リーダーはフォロワーを ISR から削除します。構成項目 replica.lag.max.messages が削除されました。

レプリカ同期

Kafka は「プル モード」を通じてメッセージを同期します。つまり、フォロワーは同期のためにリーダーからデータを一括してプルします。

具体的な信頼性はプロデューサーによって決定されます (構成項目 producer.properties/acks に基づきます)。

Kafka 0.9 では、プロデューサーの設定である request.required.acks=-1 が acks=all に置き換えられましたが、この古い設定はドキュメントに残っています。

バージョン 0.9 では、プロデューサー構成オプション request.required.acks=-1 は acks=all に置き換えられましたが、古い構成オプションはドキュメント内にまだ保持されています。

PS: 最新のドキュメント 2.2.x request.required.acks は存在しません。

Acks=-1 の場合、ISR が min.insync.replicas で指定された数より小さいと、NotEnoughReplicas または NotEnoughReplicasAfterAppend 例外がスローされます。

プロデューサーはどのようにメッセージを送信しますか?

プロデューサーはまずメッセージを ProducerRecord インスタンスにカプセル化します。

メッセージルーティング:

  • メッセージを送信するときにパーティションが指定されている場合は、それが直接使用されます。
  • キーが指定されている場合は、キーがハッシュされ、パーティションが選択されます。このハッシュ (つまり、パーティション分割メカニズム) は、producer.properties/partitioner.class で指定されたクラスによって実装されます。このルーティング クラスは Partitioner インターフェースを実装する必要があります。
  • どちらも指定されていない場合は、ラウンドロビン方式でパーティションが選択されます。

メッセージはすぐには送信されず、まずシリアル化されて、上記のハッシュ関数である Partitioner に送信されます。パーティショナーがターゲット パーティションを決定した後、そのパーティションはメモリ バッファー (送信キュー) に送信されます。

プロデューサーの別の作業スレッド (つまり、送信スレッド) は、準備されたメッセージをバッファからリアルタイムで抽出し、バッチにカプセル化して、対応するブローカーに送信する役割を担います。

プロセスはおおよそ次のようになります。

画像は123archuより

Consumer はどのようにしてメッセージを消費しますか?

各コンシューマーは論理コンシューマー グループに割り当てられます。パーティションは同じコンシューマー グループ内の 1 つのコンシューマーによってのみ消費されますが、異なるコンシューマー グループによって消費されることもあります。

トピックのパーティション数が p で、コンシューマー グループ内でこのトピックをサブスクライブしているコンシューマー数が c の場合、次のようになります。

  1. p < c: c - p 人の消費者がアイドル状態になり、無駄が生じる
  2. p > c: 1人の消費者が複数のパーティションに対応する
  3. p = c: 1つの消費者は1つのパーティションに対応する

リソースの不均衡を避けるために、コンシューマーとパーティションの数を合理的に割り当てる必要があります。パーティションの数がコンシューマーの数の整数倍である場合が最適です。

①コンシューマーにパーティションを割り当てる方法

生成プロセス中、ブローカーはパーティションを割り当てる必要があり、消費プロセス中、パーティションはコンシューマーにも割り当てられる必要があります。

ブローカーがコントローラーを選択するのと同じように、コンシューマーもブローカーからコーディネーターを選択してパーティションを割り当てます。

コンシューマーの追加、コンシューマー (アクティブまたはパッシブ) の削減、パーティションの追加など、パーティションまたはコンシューマーの数が変更されると、再バランス調整が実行されます。

プロセスは次のとおりです。

  • コンシューマーはコーディネーターに JoinGroupRequest リクエストを送信します。このとき、他のコンシューマーがハートビート要求を送信すると、コーディネーターは再バランスが必要であることを伝えます。他のコンシューマーも JoinGroupRequest リクエストを送信します。
  • コーディネーターはコンシューマーの中からリーダーを選択し、残りをフォロワーとして選択します。各コンシューマーに通知し、フォロワーのメタデータをリーダーに提供します。
  • コンシューマー リーダーは、コンシューマー メタデータに基づいてパーティションを再割り当てします。
  • コンシューマーはコーディネーターに SyncGroupRequest を送信します。リーダーの SyncGroupRequest には割り当て情報が含まれます。コーディネーターは、リーダーを含むコンシューマーに割り当て状況を通知するために返信します。

②消費者フェッチメッセージ

コンシューマーは「プル モード」を使用してメッセージを消費するため、コンシューマーは独自の消費動作を決定できます。

コンシューマーは Poll(duration) を呼び出してサーバーからメッセージを取得します。メッセージのプルの具体的な動作は、次の構成項目によって決まります。

  1. #消費者のプロパティ
  2.  
  3. #コンシューマーは最大いくつのレコードをポーリングできますか?
  4. 最大ポーリングレコード数 = 500
  5.  
  6. #コンシューマがポーリングしたときにパーティションから返されるデータの最大量
  7. 最大パーティション。フェッチ.bytes=1048576
  8.  
  9. #コンシューマー最大ポーリング間隔
  10. #この値を超えると、サーバーはこのコンシューマーが失敗したとみなします
  11. #そしてこの消費者を対応する消費者グループから追い出します  
  12. 最大ポーリング間隔ms =300000

パーティションでは、各メッセージにオフセットがあります。新しいメッセージはパーティションの末尾 (最新のセグメント ファイルの末尾) に書き込まれます。各パーティション上のメッセージは順番に消費され、異なるパーティション間のメッセージの消費順序は不確実です。

コンシューマーが複数のパーティションを消費する場合、パーティション間の消費順序は不確実ですが、各パーティションでは消費は順次行われます。

異なるコンシューマー グループの複数のコンシューマーが同じパーティションを消費する場合、各コンシューマーの消費は互いに影響を及ぼさず、各コンシューマーには独自のオフセットが存在します。

消費者 A と消費者 B は異なる消費者グループに属します。消費者 A はオフセット = 9 を読み取り、消費者 B はオフセット = 11 を読み取ります。この値は、次に読み取られる位置を示します。

つまり、コンシューマー A はオフセット 0 から 8 のメッセージを読み取り、コンシューマー B はオフセット 0 から 10 のメッセージを読み取りました。

次にコンシューマーがオフセット = 9 から読み取りを開始するときには、再バランスが発生する可能性があるため、コンシューマー A ではない可能性があります。

オフセットを保存するにはどうすればいいですか?

コンシューマーがパーティションを消費する場合、現在の消費位置を記録するためにオフセットを保存する必要があります。

Offset は、自動的にコミットするか、Consumer の commitSync() または commitAsync() を呼び出して手動でコミットするかを選択できます。関連する構成は次のとおりです。

  1. #オフセットを自動的に送信するかどうか
  2. enable.auto.commit = true  
  3.  
  4. #自動コミット間隔。 Enable.auto.commit = true
  5. 自動コミット間隔=5000

オフセットは、__consumeroffsets という名前のトピックに保存されます。メッセージを書き込むためのキーは、GroupId、Topic、Partition で構成され、値は Offset です。

一般に、各キーのオフセットはメモリにキャッシュされます。クエリを実行するときに、パーティションをトラバースする必要はありません。キャッシュがない場合、パーティションが最初に走査されてキャッシュが構築され、その後クエリが返されます。

__consumeroffsets のパーティション数は、次のサーバー構成によって決まります。

  1. オフセット.トピック.数値.パーティション=50

オフセットが保存されるパーティション、つまり __consumeroffsets のパーティション分割メカニズムは、次のように表現できます。

  1. groupId.hashCode() モード groupMetadataTopicPartitionCount

groupMetadataTopicPartitionCount は、上記で構成されたパーティションの数です。パーティションは同じコンシューマー グループ内の 1 つのコンシューマーによってのみ消費されるため、GroupId を使用して、このコンシューマーがオフセットを消費するパーティションを示すことができます。

メッセージング システムではどのような問題が発生する可能性がありますか?

Kafka は 3 つのメッセージ配信セマンティクスをサポートしています。

  • 最大 1 回: 最大 1 回、メッセージは失われる可能性があるが、繰り返されることはない

データの取得 -> オフセットのコミット -> ビジネス処理

  • 少なくとも1回: 少なくとも1回はメッセージは失われませんが、繰り返される可能性があります

データの取得 -> ビジネス処理 -> オフセットのコミット。

  • 正確に 1 回: 正確に 1 回、メッセージは失われたり重複したりせず、1 回だけ消費されます (0.11 で実装され、ダウンストリームと Kafka に限定されています)

① メッセージが繰り返し消費されないようにするにはどうすればよいでしょうか? (メッセージのべき等性)

更新操作の場合、べき等性は自然です。新しい操作の場合、処理前に各メッセージに一意の ID を与えて、処理済みかどうかを判断できます。この ID は Redis に保存でき、データベースに書き込まれる場合は主キー制約を使用できます。

②メッセージ伝送の信頼性を確保するには? (メッセージ損失の問題)

Kafka アーキテクチャによれば、メッセージが失われる可能性がある場所は、コンシューマー、プロデューサー、サーバーの 3 か所です。

コンシューマー側でデータが失われました: server.properties/enable.auto.commit が True に設定されている場合、Kafka は最初にオフセットをコミットしてからメッセージを処理します。このとき例外が発生すると、メッセージは失われます。

したがって、Offset の自動送信をオフにして、処理が完了した後に Offset を手動で送信することができます。これにより、メッセージが失われないことが保証されます。ただし、オフセットの送信が失敗すると、重複消費の問題が発生する可能性があります。この場合、べき等性が保証されます。

Kafka はメッセージを失います: ブローカーが誤ってクラッシュし、レプリカが 1 つしかない場合、ブローカー上のメッセージは失われます。

レプリカ>1 の場合、リーダーはフォロワーを新しいリーダーとして再選択します。フォロワーに同期されていないメッセージがある場合、それらのメッセージは失われます。

上記の問題を回避するには、次のように設定します。

  • トピックの replication.factor パラメータを設定します。この値は 1 より大きい必要があり、各パーティションに少なくとも 2 つのレプリカが必要です。
  • Kafka サーバーで min.insync.replicas パラメータを設定します。この値は 1 より大きくする必要があります。これにより、リーダーは、少なくとも 1 つのフォロワーがまだ接続しており、遅れていないことを認識する必要があります。これにより、リーダーが失敗した場合でもフォロワーが存在することが保証されます。
  • プロデューサー側で acks=all を設定します。これにより、各データがすべてのレプリカに書き込まれてからでないと、正常に書き込まれたとみなされなくなります。
  • プロデューサー側で retries=MAX を設定します (非常に大きな値、つまり無制限の再試行を意味します)。これにより、書き込みが失敗した場合に無限の再試行が必要になります。ここで問題が発生します。

プロデューサーがメッセージを失いました: 書き込みが成功したと見なされる前にすべての ISR がメッセージを同期するように、プロデューサー側で acks=all を設定します。

③メッセージの順序をどうやって確保するか?

Kafka のパーティション上のメッセージは連続しています。順番に消費する必要があるメッセージは、同じパーティションに送信され、単一のコンシューマーによって消費されます。

以上が私のカフカ研究の要約です。間違いや無理なところがありましたらご指摘ください!

参考文献:

  • Kafka 学習ノート: 知識ポイントのまとめ
  • 高度なJava
  • Kafka のログストレージ分析
  • Kafka Producer プロデューサーパラメータ設定とパラメータチューニングの提案 - ビジネス環境実践シリーズ
  • ショックでした!これはカフカです!
  • kafka の設定
  • カフカ 2.3.0 API
  • Kafka コンシューマーの設定の詳細と送信方法

著者:lbzhello

プロフィール: Java プログラマー、メール: [email protected]

<<:  分散システムにおける負荷分散

>>:  伝統的な銀行はどのようにして小売業の変革の基盤を築くのでしょうか?

推薦する

budgetnode - $12/年/512MB メモリ/20GB ハードディスク/500GB トラフィック/50GB DDos 保護

Budgetnode は設立されてから 1 年未満で、正式な登録資格を有する会社です。 budget...

調査によると、クラウド監視および管理ツールは不十分であることが判明

Enterprise Management Associates (EMA) の最近の調査では、企業...

小さなウェブサイト構築会社の失敗後の深い反省

ウェブサイト構築・最適化業界は、第一に参入障壁が低く、第二に大量の人材が流入しているため、小規模なウ...

Baidu の重量 ≠ ウェブサイトの重量

今日、厦門SEOはA5ウェブマスターのウェブサイトで記事を見ました。タイトルは「ウェブマスターはウェ...

クラウド アプリケーション移行の困難を回避する方法

企業がビジネスクリティカルなアプリケーションをクラウドで実行することに決めたら、他のプロバイダーに切...

テンセントクラウドはいくつかのコア製品の値下げを発表、最大の値下げは40%に達した。

5月16日、テンセントクラウドは同社の主要クラウド製品の数種類の値下げを発表し、一部の製品ラインでは...

ウェブサイトのスナップショットが更新されず、内部ページが削減される理由の事例分析

この記事は、主に著者が所有するウェブサイトで発生した問題に対処するために書かれています。著者のウェブ...

「玄元剣」を使ってトラフィックを獲得する方法と方法について簡単に説明します

今夜、湖南衛星テレビは、復活を期待しているテレビシリーズ「玄元剣」を放送します。このテレビシリーズは...

ブランドは中国のバレンタインデー期間中のマーケティング機会をどのように活用するのでしょうか?来て学んでください。

現実でも文学作品でも、私たちはみな人生の中で完璧な愛に出会うことを切望しています。中国のバレンタイン...

欧州カップの予想を巧みに利用してWeiboマーケティングを強化

ヨーロッパカップが終盤を迎えようとしている。第1戦でもたらされた高い視聴率に加え、続く第1/4、第1...

推奨: directspace-新しい KVM/SSD/Windows/Linux

KVM 仮想テクノロジの新バージョンを導入した VPS である directspace は、SSD ...

草の根レベルのマーケティングを差別化する方法

友人が私のブログにメッセージを残しました。「残念ながら、自分のセールスポイントをどうアピールすればい...

変更された2つのJSアプリケーションの解釈

あらゆる Web テクノロジーの出現は、Web デザイナーやユーザーによりよいサービスを提供するため...

Hadoop 擬似分散セットアップ操作手順ガイド

[[207661]] 1. 準備/opt/ディレクトリにモジュールとソフトウェアのフォルダを作成しま...

#Tier3 無制限トラフィック: vps.net - $12/Xen/4 コア/1g メモリ/25g SSD/無制限 G ポート

UK2グループ傘下の有名なVPSブランドであるvps.netが、全世界で約20のデータセンター+オン...