プロデューサー実装ロジック - Kafka 知識システム (II)

プロデューサー実装ロジック - Kafka 知識システム (II)

[[409180]]

Kafka はメッセージを個別に送信しますか、それともバッチで送信しますか?

Kafka はどのようにして単一のメッセージを送信するのでしょうか?

Kafka はメッセージを順番に送信しますか?

どのような状況でプロデューサーが FullGC を頻繁に実行する可能性がありますか?

メッセージ送信ロジック

メッセージ送信プロセスの鳥瞰図。

プロデューサーデザイン

消費と送信のメカニズム:

1) シリアライザー: メッセージ オブジェクトをバイト配列にシリアル化し、ネットワーク経由で送信します。

2) パーティショナー: メッセージが送信される特定のパーティションを計算します。パーティションが明示的に指定されている場合、パーティショナーは使用されません。

3) メッセージ バッファ プール: クライアントのメッセージ バッファ プール。デフォルト サイズは 32M です。パラメータ buffer.memory を参照してください。

4) バッチ送信: バッファプール内のメッセージはバッチで送信されます。デフォルトのバッチ サイズは 16 KB です。パラメータ batch.size を参照してください。

負荷分散設計:

メッセージ トピックは複数のパーティションで構成され、パーティションは異なるブローカーに均等に分散されます。したがって、ブローカー クラスターのパフォーマンスを効果的に活用し、メッセージのスループットを向上させるために、プロデューサーはランダム方式またはハッシュ方式を使用してメッセージを複数のパーティションに均等に送信し、負荷分散を実現できます。

パーティション分割戦略:

  1. ポーリング戦略、デフォルト戦略
  2. ランダム戦略は、実際のパフォーマンスでは、ポーリング戦略よりも劣っています。
  3. メッセージ キー順序保持戦略によれば、メッセージがキーで定義されると、各パーティションでのメッセージ処理は順次行われるため、同じキーを持つすべてのメッセージが同じパーティションに入ることが保証されます。

カフカプロデューサー

ソースコード

  1. //クライアントID。 KafkaProducer を作成するときに、client.id を通じて clientId を定義できます。指定しない場合は、デフォルトはproducer-seqになります。 seq はプロセス中に増加します。クライアントが clientId を明示的に指定することを強くお勧めします。
  2. プライベート最終文字列クライアントID;
  3. // メッセージ サイズ、送信時間、その他の監視関連のメトリックなどのメトリックのストレージ コンテナー。
  4. 最終的なメトリクス メトリクス。
  5. //パーティション負荷分散アルゴリズム。パラメータpartitioner.classで指定されます。
  6. プライベート最終パーティショナー パーティショナー。
  7. //キーとメッセージ本文のシリアル化後のメッセージの合計サイズを含む、send メソッドの呼び出しによって送信される最大リクエスト サイズは、この値を超えることはできません。パラメータmax.request.size設定します
  8. プライベート最終int maxRequestSize;
  9. //プロデューサー キャッシュが占有するメモリの合計サイズは、パラメーター buffer.memory によって設定されます。
  10. プライベート最終長い合計メモリサイズ;
  11. //トピックルーティング情報などのメタデータ情報は、KafkaProducer によって自動的に更新されます。
  12. プライベート最終メタデータメタデータ;
  13. //メッセージレコードアキュムレータ
  14. プライベート最終 RecordAccumulator アキュムレータ;
  15. // メッセージを送信するロジック、つまりブローカーにメッセージを送信する処理ロジックをカプセル化するために使用されます。
  16. プライベート最終送信者送信者;
  17. // メッセージを送信するためのバックグラウンド スレッド。ブローカーにメッセージを送信するために内部的に Sender を使用する独立したスレッド。
  18. プライベート最終スレッド ioThread;
  19. //圧縮タイプ。圧縮はデフォルトでは有効になっていませんが、パラメータ compression.type を介して設定できます。オプションの値: none、gzip、snappy、lz4、zstd。
  20. プライベート最終圧縮タイプ圧縮タイプ;
  21. // 監視のメトリックとして使用されるエラー情報コレクター。
  22. プライベート最終センサーエラー;
  23. // システム時間やスレッドのスリープなどを取得するために使用されます。
  24. プライベート最終時間 時間;
  25. //メッセージのキーをシリアル化するために使用されます。
  26. プライベート最終ExtendedSerializer<K> keySerializer;
  27. //シリアライザー< V> 値シリアライザー
  28. プライベート最終ExtendedSerializer<V> valueSerializer;
  29. //プロデューサーの構成情報。
  30. プライベート最終プロデューサーConfigプロデューサーConfig;
  31. //最大ブロック時間。プロデューサーが使用するキャッシュが指定された値に達すると、メッセージの送信がブロックされます。最大待機時間は、パラメータmax.block.msで設定できます。
  32. プライベート最終long maxBlockTimeMs;
  33. // クライアントがリクエストへの応答を待機する最大時間を構成します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じて要求を再送信するか、再試行回数が上限に達した場合は要求を失敗させます。
  34. プライベート最終int requestTimeoutMs;
  35. // プロデューサー側のインターセプターは、メッセージが送信される前にいくつかのカスタマイズされた処理を実行します。
  36. プライベート最終 ProducerInterceptors<K, V> インターセプター;
  37. //API バージョンの関連メタ情報を維持します。このクラスは Kafka 内でのみ使用できます。
  38. プライベート最終 ApiVersions apiVersions;
  39. //Kafka メッセージ トランザクション マネージャー。
  40. プライベート最終トランザクションマネージャトランザクションマネージャ;
  41. //Kafka プロデューサー トランザクション コンテキスト環境の初期結果。
  42. プライベート TransactionalRequestResult initTransactionsResult;

KafkaProducer には次の特性があります。

  1. KafkaProducer はスレッドセーフであり、複数のスレッドで使用できます。
  2. KafkaProducer には、送信するメッセージを格納するためのキャッシュ プール、つまり ProducerRecord キューが含まれています。同時に、ProducerRecord オブジェクトを Kafka クラスターに送信するための IO スレッドが開始されます。
  3. KafkaProducer のメッセージ送信 API の send メソッドは非同期です。送信するメッセージの ProducerRecord をバッファ領域に送信し、すぐに戻って結果証明書 Future を返すことだけを担当します。

acksパラメータの役割

KafkaProducer は、メッセージが「送信」される条件 (標準) を定義するコア パラメータ acks を提供します。これは、ブローカー側がクライアントに対してメッセージが送信されたことを約束する条件です。オプションの値は次のとおりです。

  1. 0: KafkaProducer の send メソッドが呼び出される限り、成功したとみなされます。
  2. all または -1: メッセージが送信されたとみなされ、成功した送信がクライアントに返される前に、リーダー ノードがメッセージを保存しているだけでなく、そのすべてのレプリカ (正確には、ISR 内のノード) がメッセージを保存している必要があることを示します。これは最も厳格な永続性保証ですが、当然ながらパフォーマンスは最も低くなります。
  3. 1: 送信成功がクライアントに返される前に、メッセージをリーダー ノードに書き込むだけでよいことを示します。

再試行パラメータの役割

運用側で Kafka によって提供されるもう 1 つのコア プロパティは、メッセージの送信が失敗した後の再試行回数を制御するために使用されます。 0 に設定すると再試行は行われません。再試行すると、送信側でメッセージが重複する可能性があります。メッセージ送信インターフェースの観点から:

  1. Future<RecordMetadata> は ProducerRecord<K, V> レコードを送信します。 Future<RecordMetadata> を送信します(ProducerRecord<K, V> レコード、コールバック コールバック);

上記の API から、ユーザーが KafkaProducer を使用してメッセージを送信する場合、まず送信するメッセージを ProducerRecord にカプセル化して、典型的な Future 設計パターンである Future オブジェクトを返す必要があることがわかります。

Kafka メッセージ追加プロセス

KafkaProducer の send メソッドは、メッセージをブローカーに直接送信しません。 Kafka ではメッセージの送信が非同期になり、2 つのステップに分割されます。送信メソッドの役割は、メッセージをメモリ (パーティション化されたキャッシュ キュー) に追加することであり、その後、専用の送信スレッドがキャッシュされたメッセージをバッチで Kafka ブローカーに非同期的に送信します。

メインメソッドはKafkaProducer#doSendです

メッセージをプロデューサーの送信バッファに追加します。その実装クラスは RecordAccumulator です。まず、メモリに書き込まれる Kafka メッセージのフローチャートを見てみましょう。

送信者スレッド

これまで、send メソッドを呼び出すと、実際にはプロデューサー クライアントのサービス メモリにのみ送信されることを確認しました。まだブローカーに連絡が取れていません。 Kafka プロデューサー クライアントのバックグラウンドでは、スレッドが開始され、メッセージ バッチが保存されている領域が継続的にポーリングされ、ブローカーにメッセージが送信されます。

メッセージバッチのメモリ構造と割り当て

上記のソース コードから、各 ProducerBatch は batch.size バイトのメモリ ブロックであることがわかります。そしてプーリング技術が使われます。

バッファ プールのメモリ保持クラスは BufferPool です。まず、BufferPool のメンバーを見てみましょう。

  1. パブリッククラス BufferPool {
  2. //合計メモリサイズ
  3. プライベート最終長い合計メモリ;
  4. // 各メモリブロックのサイズ、つまりbatch.size  
  5. プライベート最終intプール可能なサイズ;
  6. // メモリの適用と返却のメソッドの同期ロック
  7. プライベート最終ReentrantLockロック;
  8. // メモリブロックを解放する
  9. プライベート最終 Deque<ByteBuffer>を解放します
  10. // 空きメモリブロックのイベントを待つ必要がある
  11. プライベート最終 Deque<Condition> waiters;
  12. /** 利用可能なメモリ合計  nonPooledAvailableMemoryバイトバッファ 空き* poolableSize。 */
  13. // バッファ プールにはまだ空きメモリが割り当てられていません。新しく適用されたメモリブロックはここからメモリ値を取得します
  14. プライベート long nonPooledAvailableMemory;
  15. // ...
  16. }

BufferPool のメンバーから、バッファー プールが実際には ByteBuffers で構成されていることがわかります。 BufferPool はこれらのメモリ ブロックを保持し、メンバー free に保存します。 free の合計サイズは totalMemory によって制限され、nonPooledAvailableMemory はバッファー プールに割り当てられていないメモリがどれだけ残っているかを示します。

バッチ メッセージが送信されると、そのメッセージが保持するメモリ ブロックは free に戻されるため、後続のバッチがメモリ ブロックを適用したときに新しい ByteBuffer は作成されず、free から取得できるため、JVM によってメモリ ブロックが再利用される問題を回避できます。

メモリ ブロックを作成するプロセスは次のとおりです。

メモリブロックを返す論理フロー

返されたメモリ ブロックのサイズが batchSize と等しい場合、メモリ ブロックはクリアされ、バッファー プールの空き領域に追加されます。つまり、メモリ ブロックを再利用するための JVM GC を回避して、メモリ ブロックがバッファー プールに返されます。等しくない場合は、未割り当ておよび空きメモリ サイズの値にメモリ サイズを追加します。メモリを返す必要はなく、JVM GC がメモリをリサイクルするのを待ってから、空きメモリを待機しているスレッドを起動します。

Java プロデューサーは TCP 接続をどのように管理しますか?

なぜ TCP なのか?

Apache Kafka のすべての通信は、HTTP やその他のプロトコルではなく、TCP に基づいています。これは、生産者、消費者、ブローカー間の通信にも当てはまります。

コミュニティの観点から見ると、クライアントを開発する際に、多重化要求や複数の接続を同時にポーリングする機能など、TCP 自体が提供する高度な機能の一部を活用できます。

TCP の多重化要求により、物理接続上に複数の仮想接続が作成され、各仮想接続は対応する独自のデータ ストリームを送信する役割を担います。厳密に言えば、TCP は多重化できません。失われたメッセージの自動再送信など、信頼性の高いメッセージ配信セマンティクスのみを提供します。

さらに、現在知られている HTTP ライブラリは、多くのプログラミング言語ではやや初歩的なものです。

TCP 接続はいつ作成されますか?

KafkaProducer インスタンスが作成されると、TCP 接続が確立されます。 KafkaProducer インスタンスが作成されると、プロデューサー アプリケーションはバックグラウンドで Sender という名前のスレッドを作成して開始します。送信スレッドが実行を開始すると、最初にブローカーとの接続が作成されます。

  1. プロパティ properties = new Properties();
  2. properties.put( "bootstrap.servers" , "localhost:9092" );
  3. properties.put( "key.serializer" 、 StringSerializer.class.getName());
  4. properties.put( "value.serializer" 、 StringSerializer.class.getName());
  5. //リソースを使って試す
  6. // KafkaProducer インスタンスを作成すると、Sender スレッドが作成され、バックグラウンドで開始されます。送信スレッドが実行を開始すると、まずブローカーとのTCP接続を作成します。
  7. 試してください (Producer<String, String> producer = new KafkaProducer<>(properties)) {
  8. ProducerRecord<String, String> レコード = new ProducerRecord<>(TOPIC, KEY , VALUE);
  9. コールバック callback = (メタデータ、例外) -> {
  10. };
  11. プロデューサー.send(レコード、コールバック);
  12. }
  1. bootstrap.serversはプロデューサーのコアパラメータの1つであり、プロデューサーが起動したときに接続するブローカーのアドレスを指定します。
  2. bootstrap.servers で 1000 個のブローカーが指定されている場合、プロデューサーは起動時にまずこれらの 1000 個のブローカーとの TCP 接続を作成します。
  3. したがって、クラスター内のすべての Broker 情報を bootstrap.servers に構成することはお勧めしません。通常は 3 ~ 4 台のサーバーで十分です。
  4. プロデューサーがクラスター内の任意のブローカーに接続されると、クラスター全体のブローカー情報を取得できます(メタデータ要求)

TCP 接続は、メタデータの更新後とメッセージの送信時の 2 つの場所で作成される場合もあります。

  • プロデューサーがクラスターメタデータを更新するときに、一部のブローカーとの接続がないことが判明した場合、プロデューサーはTCP接続を作成します。

【シーン1】

プロデューサーが存在しないトピックにメッセージを送信しようとすると、ブローカーはトピックが存在しないことをプロデューサーに伝えます。このとき、プロデューサーは Kafka クラスターにメタデータ要求を送信し、最新のメタデータ情報を取得し、クラスター内のすべてのブローカーとの TCP 接続を確立しようとします。

【シーン2】

プロデューサーは、metadata.max.age.ms パラメータを通じてメタデータ情報を定期的に更新します。デフォルト値は 300000、つまり 5 分です。

  • プロデューサーがメッセージを送信しようとすると、プロデューサーはターゲット ブローカーとの接続がないことを検出し (負荷分散アルゴリズムによって異なります)、TCP 接続も作成します。

TCP 接続はいつ閉じられますか?

プロデューサーが TCP 接続を閉じる方法は 2 つあります。ユーザーによるアクティブなクローズと Kafka による自動クローズです。

[ユーザーアクティブシャットダウン]

広い意味では、アクティブ シャットダウンには、ユーザーが kill -9 を呼び出して Producer を強制終了することが含まれます。最も推奨される方法は、producer.close() です。

[Kafka は自動的にシャットダウンします]

プロデューサー パラメータ connections.max.idle.ms のデフォルト値は 540000 (9 分) です。

9 分以内に TCP 接続を介してリクエストが渡されない場合、Kafka は TCP 接続をアクティブに閉じます。

connections.max.idle.ms=-1 はこのメカニズムを無効にし、TCP 接続は永続的な長時間接続になります。

Kafka によって作成されたソケット接続ではすべてキープアライブが有効になっています。

【知らせ】

TCP 接続を閉じるイニシエーターは Kafka クライアントであり、これは受動的な閉じるシナリオです。

受動的なクローズの結果、多数のCLOSE_WAIT接続が生成される。

プロデューサーまたはクライアントには、TCP 接続が切断されたことを明示的に確認する機会がありません。

要約する

これで、冒頭の 3 つの質問に答えることができます。

1. Kafka はメッセージを個別に送信しますか、それともバッチで送信しますか?

通常は一括して送信されます。 ProducerBatch にカプセル化して送信します。

2. Kafka はどのようにして単一のメッセージを送信しますか?

send メソッドを同期的に呼び出すには、単一のプロデューサーと単一のスレッドのみを設定できます。

3. Kafka はメッセージを順番に送信しますか?

いいえ、順序が必要な場合は、キーを設定する必要があり、プロデューサーはシングルスレッドになります。

4. プロデューサーが FullGC を頻繁に実行するのはどのような状況でしょうか?

メッセージ サイズが batchSize より大きい場合、割り当てられたメモリ ブロックはループ内で free から取得されませんが、新しい ByteBuffer が作成され、ByteBuffer はバッファー プールに返されません (JVM GC リカバリ)。この時点で nonPooledAvailableMemory がメッセージ本文より小さい場合、free 内の空きメモリ ブロックが破棄され (JVM GC リカバリ)、ユーザー アプリケーション用のバッファ プールに十分なメモリ領域が確保されます。これらのアクションにより、GC の問題が頻繁に発生します。

したがって、頻繁な GC を回避するには、ビジネス メッセージのサイズに応じて batch.size を適切に調整する必要があります。

<<:  GitOps 継続的デプロイメント ツールである Argo CD を初めて体験

>>:  最高のPython仮想環境。

推薦する

hostodo-アジアに最適化された VPS/KVM/$4.5/1g メモリ/30g ハードディスク/1.5T トラフィック

hostodo からの公式メールには、特別な低価格の VPS、KVM、クアドラネット コンピュータ ...

カフカも理解していないのに、面接を受けに行くのですか?

[51CTO.com からのオリジナル記事] Apache Kafka は、最も人気のあるエンタープ...

KVM 仮想化 KVM 仮想マシンのクローン作成

kvm 仮想マシンのクローンを作成するケースは 2 つあります。この記事では、次の 2 つのケースに...

2012 年にウェブマスターは外部リンクをどのように構築するのでしょうか?

2011年になると、百度のアルゴリズムは何度も調整され、どんどん標準化されていきましたが、トラフィッ...

企業ウェブサイトの降格に影響を与える4つの主な要因の簡単な分析

一般的に、中小企業のウェブサイト所有者は、最初はあまり気にしていませんでした。彼らは自分の企業がより...

どのような外部リンクが良いリンクなのか

SEO における外部リンクの重要性は誰もが知っています。特に人気のあるキーワードを最適化する場合は、...

タオバオはタオバオの顧客による商品検索サービスの利用を禁止しており、リベートサイトは存続できない可能性が高い

5月14日、Taobaoは最近規則を変更し、TaobaoのアプリケーションとツールがTaobaoの商...

ユーザー生成コンテンツの理由

Baidu の継続的な改善により、オリジナル記事は、多くのオリジナル記事を作成するための十分なリソー...

KSEO: SEO/SEM 部門マネージャーの責任

以下は、多国籍企業が SEO/SEM 部門の責任者を採用する際の具体的な職務内容であり、SEO チー...

Pechoin が明かす: インターネットの考え方を活用して 1 日で 380 万個を売るには?

スキンケア商品の専業販売員として、私が日々考えているのは、女性のお客さまにどうしたらもっとお役に立て...

百科事典ウェブサイトの新たな探究:Wikipedia が SMS リクエスト サービスを開始

北京時間2月24日朝のニュースによると、Wikipediaは発展途上市場のユーザーにWikipedi...

Redmi 人気の裏にある真実: 並外れたマーケティングが鍵

Xiaomiの携帯電話は2011年11月に発売されて以来、中国でますます人気が高まっており、その優れ...

Hong Bo氏:検索クロールはルールを遵守する必要があります。合意に違反すると混乱が生じます。

最近、Qihoo 360総合検索が国際的に認められているロバーツ議定書を無視し、BaiduやGoog...

テンセントクラウドと海雲傑雲が協力し、より良い未来を創造するハイブリッドクラウドを構築

4月初旬、「テンセントクラウドソリューション推進会議」が武漢で成功裏に開催されました。海雲捷運はパー...

Tektonを物理マシンに接続して構築する方法

[[396376]]この記事はWeChatの公開アカウント「Ask Qi」から転載したもので、著者は...