プロデューサー実装ロジック - Kafka 知識システム (II)

プロデューサー実装ロジック - Kafka 知識システム (II)

[[409180]]

Kafka はメッセージを個別に送信しますか、それともバッチで送信しますか?

Kafka はどのようにして単一のメッセージを送信するのでしょうか?

Kafka はメッセージを順番に送信しますか?

どのような状況でプロデューサーが FullGC を頻繁に実行する可能性がありますか?

メッセージ送信ロジック

メッセージ送信プロセスの鳥瞰図。

プロデューサーデザイン

消費と送信のメカニズム:

1) シリアライザー: メッセージ オブジェクトをバイト配列にシリアル化し、ネットワーク経由で送信します。

2) パーティショナー: メッセージが送信される特定のパーティションを計算します。パーティションが明示的に指定されている場合、パーティショナーは使用されません。

3) メッセージ バッファ プール: クライアントのメッセージ バッファ プール。デフォルト サイズは 32M です。パラメータ buffer.memory を参照してください。

4) バッチ送信: バッファプール内のメッセージはバッチで送信されます。デフォルトのバッチ サイズは 16 KB です。パラメータ batch.size を参照してください。

負荷分散設計:

メッセージ トピックは複数のパーティションで構成され、パーティションは異なるブローカーに均等に分散されます。したがって、ブローカー クラスターのパフォーマンスを効果的に活用し、メッセージのスループットを向上させるために、プロデューサーはランダム方式またはハッシュ方式を使用してメッセージを複数のパーティションに均等に送信し、負荷分散を実現できます。

パーティション分割戦略:

  1. ポーリング戦略、デフォルト戦略
  2. ランダム戦略は、実際のパフォーマンスでは、ポーリング戦略よりも劣っています。
  3. メッセージ キー順序保持戦略によれば、メッセージがキーで定義されると、各パーティションでのメッセージ処理は順次行われるため、同じキーを持つすべてのメッセージが同じパーティションに入ることが保証されます。

カフカプロデューサー

ソースコード

  1. //クライアントID。 KafkaProducer を作成するときに、client.id を通じて clientId を定義できます。指定しない場合は、デフォルトはproducer-seqになります。 seq はプロセス中に増加します。クライアントが clientId を明示的に指定することを強くお勧めします。
  2. プライベート最終文字列クライアントID;
  3. // メッセージ サイズ、送信時間、その他の監視関連のメトリックなどのメトリックのストレージ コンテナー。
  4. 最終的なメトリクス メトリクス。
  5. //パーティション負荷分散アルゴリズム。パラメータpartitioner.classで指定されます。
  6. プライベート最終パーティショナー パーティショナー。
  7. //キーとメッセージ本文のシリアル化後のメッセージの合計サイズを含む、send メソッドの呼び出しによって送信される最大リクエスト サイズは、この値を超えることはできません。パラメータmax.request.size設定します
  8. プライベート最終int maxRequestSize;
  9. //プロデューサー キャッシュが占有するメモリの合計サイズは、パラメーター buffer.memory によって設定されます。
  10. プライベート最終長い合計メモリサイズ;
  11. //トピックルーティング情報などのメタデータ情報は、KafkaProducer によって自動的に更新されます。
  12. プライベート最終メタデータメタデータ;
  13. //メッセージレコードアキュムレータ
  14. プライベート最終 RecordAccumulator アキュムレータ;
  15. // メッセージを送信するロジック、つまりブローカーにメッセージを送信する処理ロジックをカプセル化するために使用されます。
  16. プライベート最終送信者送信者;
  17. // メッセージを送信するためのバックグラウンド スレッド。ブローカーにメッセージを送信するために内部的に Sender を使用する独立したスレッド。
  18. プライベート最終スレッド ioThread;
  19. //圧縮タイプ。圧縮はデフォルトでは有効になっていませんが、パラメータ compression.type を介して設定できます。オプションの値: none、gzip、snappy、lz4、zstd。
  20. プライベート最終圧縮タイプ圧縮タイプ;
  21. // 監視のメトリックとして使用されるエラー情報コレクター。
  22. プライベート最終センサーエラー;
  23. // システム時間やスレッドのスリープなどを取得するために使用されます。
  24. プライベート最終時間 時間;
  25. //メッセージのキーをシリアル化するために使用されます。
  26. プライベート最終ExtendedSerializer<K> keySerializer;
  27. //シリアライザー< V> 値シリアライザー
  28. プライベート最終ExtendedSerializer<V> valueSerializer;
  29. //プロデューサーの構成情報。
  30. プライベート最終プロデューサーConfigプロデューサーConfig;
  31. //最大ブロック時間。プロデューサーが使用するキャッシュが指定された値に達すると、メッセージの送信がブロックされます。最大待機時間は、パラメータmax.block.msで設定できます。
  32. プライベート最終long maxBlockTimeMs;
  33. // クライアントがリクエストへの応答を待機する最大時間を構成します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じて要求を再送信するか、再試行回数が上限に達した場合は要求を失敗させます。
  34. プライベート最終int requestTimeoutMs;
  35. // プロデューサー側のインターセプターは、メッセージが送信される前にいくつかのカスタマイズされた処理を実行します。
  36. プライベート最終 ProducerInterceptors<K, V> インターセプター;
  37. //API バージョンの関連メタ情報を維持します。このクラスは Kafka 内でのみ使用できます。
  38. プライベート最終 ApiVersions apiVersions;
  39. //Kafka メッセージ トランザクション マネージャー。
  40. プライベート最終トランザクションマネージャトランザクションマネージャ;
  41. //Kafka プロデューサー トランザクション コンテキスト環境の初期結果。
  42. プライベート TransactionalRequestResult initTransactionsResult;

KafkaProducer には次の特性があります。

  1. KafkaProducer はスレッドセーフであり、複数のスレッドで使用できます。
  2. KafkaProducer には、送信するメッセージを格納するためのキャッシュ プール、つまり ProducerRecord キューが含まれています。同時に、ProducerRecord オブジェクトを Kafka クラスターに送信するための IO スレッドが開始されます。
  3. KafkaProducer のメッセージ送信 API の send メソッドは非同期です。送信するメッセージの ProducerRecord をバッファ領域に送信し、すぐに戻って結果証明書 Future を返すことだけを担当します。

acksパラメータの役割

KafkaProducer は、メッセージが「送信」される条件 (標準) を定義するコア パラメータ acks を提供します。これは、ブローカー側がクライアントに対してメッセージが送信されたことを約束する条件です。オプションの値は次のとおりです。

  1. 0: KafkaProducer の send メソッドが呼び出される限り、成功したとみなされます。
  2. all または -1: メッセージが送信されたとみなされ、成功した送信がクライアントに返される前に、リーダー ノードがメッセージを保存しているだけでなく、そのすべてのレプリカ (正確には、ISR 内のノード) がメッセージを保存している必要があることを示します。これは最も厳格な永続性保証ですが、当然ながらパフォーマンスは最も低くなります。
  3. 1: 送信成功がクライアントに返される前に、メッセージをリーダー ノードに書き込むだけでよいことを示します。

再試行パラメータの役割

運用側で Kafka によって提供されるもう 1 つのコア プロパティは、メッセージの送信が失敗した後の再試行回数を制御するために使用されます。 0 に設定すると再試行は行われません。再試行すると、送信側でメッセージが重複する可能性があります。メッセージ送信インターフェースの観点から:

  1. Future<RecordMetadata> は ProducerRecord<K, V> レコードを送信します。 Future<RecordMetadata> を送信します(ProducerRecord<K, V> レコード、コールバック コールバック);

上記の API から、ユーザーが KafkaProducer を使用してメッセージを送信する場合、まず送信するメッセージを ProducerRecord にカプセル化して、典型的な Future 設計パターンである Future オブジェクトを返す必要があることがわかります。

Kafka メッセージ追加プロセス

KafkaProducer の send メソッドは、メッセージをブローカーに直接送信しません。 Kafka ではメッセージの送信が非同期になり、2 つのステップに分割されます。送信メソッドの役割は、メッセージをメモリ (パーティション化されたキャッシュ キュー) に追加することであり、その後、専用の送信スレッドがキャッシュされたメッセージをバッチで Kafka ブローカーに非同期的に送信します。

メインメソッドはKafkaProducer#doSendです

メッセージをプロデューサーの送信バッファに追加します。その実装クラスは RecordAccumulator です。まず、メモリに書き込まれる Kafka メッセージのフローチャートを見てみましょう。

送信者スレッド

これまで、send メソッドを呼び出すと、実際にはプロデューサー クライアントのサービス メモリにのみ送信されることを確認しました。まだブローカーに連絡が取れていません。 Kafka プロデューサー クライアントのバックグラウンドでは、スレッドが開始され、メッセージ バッチが保存されている領域が継続的にポーリングされ、ブローカーにメッセージが送信されます。

メッセージバッチのメモリ構造と割り当て

上記のソース コードから、各 ProducerBatch は batch.size バイトのメモリ ブロックであることがわかります。そしてプーリング技術が使われます。

バッファ プールのメモリ保持クラスは BufferPool です。まず、BufferPool のメンバーを見てみましょう。

  1. パブリッククラス BufferPool {
  2. //合計メモリサイズ
  3. プライベート最終長い合計メモリ;
  4. // 各メモリブロックのサイズ、つまりbatch.size  
  5. プライベート最終intプール可能なサイズ;
  6. // メモリの適用と返却のメソッドの同期ロック
  7. プライベート最終ReentrantLockロック;
  8. // メモリブロックを解放する
  9. プライベート最終 Deque<ByteBuffer>を解放します
  10. // 空きメモリブロックのイベントを待つ必要がある
  11. プライベート最終 Deque<Condition> waiters;
  12. /** 利用可能なメモリ合計  nonPooledAvailableMemoryバイトバッファ 空き* poolableSize。 */
  13. // バッファ プールにはまだ空きメモリが割り当てられていません。新しく適用されたメモリブロックはここからメモリ値を取得します
  14. プライベート long nonPooledAvailableMemory;
  15. // ...
  16. }

BufferPool のメンバーから、バッファー プールが実際には ByteBuffers で構成されていることがわかります。 BufferPool はこれらのメモリ ブロックを保持し、メンバー free に保存します。 free の合計サイズは totalMemory によって制限され、nonPooledAvailableMemory はバッファー プールに割り当てられていないメモリがどれだけ残っているかを示します。

バッチ メッセージが送信されると、そのメッセージが保持するメモリ ブロックは free に戻されるため、後続のバッチがメモリ ブロックを適用したときに新しい ByteBuffer は作成されず、free から取得できるため、JVM によってメモリ ブロックが再利用される問題を回避できます。

メモリ ブロックを作成するプロセスは次のとおりです。

メモリブロックを返す論理フロー

返されたメモリ ブロックのサイズが batchSize と等しい場合、メモリ ブロックはクリアされ、バッファー プールの空き領域に追加されます。つまり、メモリ ブロックを再利用するための JVM GC を回避して、メモリ ブロックがバッファー プールに返されます。等しくない場合は、未割り当ておよび空きメモリ サイズの値にメモリ サイズを追加します。メモリを返す必要はなく、JVM GC がメモリをリサイクルするのを待ってから、空きメモリを待機しているスレッドを起動します。

Java プロデューサーは TCP 接続をどのように管理しますか?

なぜ TCP なのか?

Apache Kafka のすべての通信は、HTTP やその他のプロトコルではなく、TCP に基づいています。これは、生産者、消費者、ブローカー間の通信にも当てはまります。

コミュニティの観点から見ると、クライアントを開発する際に、多重化要求や複数の接続を同時にポーリングする機能など、TCP 自体が提供する高度な機能の一部を活用できます。

TCP の多重化要求により、物理接続上に複数の仮想接続が作成され、各仮想接続は対応する独自のデータ ストリームを送信する役割を担います。厳密に言えば、TCP は多重化できません。失われたメッセージの自動再送信など、信頼性の高いメッセージ配信セマンティクスのみを提供します。

さらに、現在知られている HTTP ライブラリは、多くのプログラミング言語ではやや初歩的なものです。

TCP 接続はいつ作成されますか?

KafkaProducer インスタンスが作成されると、TCP 接続が確立されます。 KafkaProducer インスタンスが作成されると、プロデューサー アプリケーションはバックグラウンドで Sender という名前のスレッドを作成して開始します。送信スレッドが実行を開始すると、最初にブローカーとの接続が作成されます。

  1. プロパティ properties = new Properties();
  2. properties.put( "bootstrap.servers" , "localhost:9092" );
  3. properties.put( "key.serializer" 、 StringSerializer.class.getName());
  4. properties.put( "value.serializer" 、 StringSerializer.class.getName());
  5. //リソースを使って試す
  6. // KafkaProducer インスタンスを作成すると、Sender スレッドが作成され、バックグラウンドで開始されます。送信スレッドが実行を開始すると、まずブローカーとのTCP接続を作成します。
  7. 試してください (Producer<String, String> producer = new KafkaProducer<>(properties)) {
  8. ProducerRecord<String, String> レコード = new ProducerRecord<>(TOPIC, KEY , VALUE);
  9. コールバック callback = (メタデータ、例外) -> {
  10. };
  11. プロデューサー.send(レコード、コールバック);
  12. }
  1. bootstrap.serversはプロデューサーのコアパラメータの1つであり、プロデューサーが起動したときに接続するブローカーのアドレスを指定します。
  2. bootstrap.servers で 1000 個のブローカーが指定されている場合、プロデューサーは起動時にまずこれらの 1000 個のブローカーとの TCP 接続を作成します。
  3. したがって、クラスター内のすべての Broker 情報を bootstrap.servers に構成することはお勧めしません。通常は 3 ~ 4 台のサーバーで十分です。
  4. プロデューサーがクラスター内の任意のブローカーに接続されると、クラスター全体のブローカー情報を取得できます(メタデータ要求)

TCP 接続は、メタデータの更新後とメッセージの送信時の 2 つの場所で作成される場合もあります。

  • プロデューサーがクラスターメタデータを更新するときに、一部のブローカーとの接続がないことが判明した場合、プロデューサーはTCP接続を作成します。

【シーン1】

プロデューサーが存在しないトピックにメッセージを送信しようとすると、ブローカーはトピックが存在しないことをプロデューサーに伝えます。このとき、プロデューサーは Kafka クラスターにメタデータ要求を送信し、最新のメタデータ情報を取得し、クラスター内のすべてのブローカーとの TCP 接続を確立しようとします。

【シーン2】

プロデューサーは、metadata.max.age.ms パラメータを通じてメタデータ情報を定期的に更新します。デフォルト値は 300000、つまり 5 分です。

  • プロデューサーがメッセージを送信しようとすると、プロデューサーはターゲット ブローカーとの接続がないことを検出し (負荷分散アルゴリズムによって異なります)、TCP 接続も作成します。

TCP 接続はいつ閉じられますか?

プロデューサーが TCP 接続を閉じる方法は 2 つあります。ユーザーによるアクティブなクローズと Kafka による自動クローズです。

[ユーザーアクティブシャットダウン]

広い意味では、アクティブ シャットダウンには、ユーザーが kill -9 を呼び出して Producer を強制終了することが含まれます。最も推奨される方法は、producer.close() です。

[Kafka は自動的にシャットダウンします]

プロデューサー パラメータ connections.max.idle.ms のデフォルト値は 540000 (9 分) です。

9 分以内に TCP 接続を介してリクエストが渡されない場合、Kafka は TCP 接続をアクティブに閉じます。

connections.max.idle.ms=-1 はこのメカニズムを無効にし、TCP 接続は永続的な長時間接続になります。

Kafka によって作成されたソケット接続ではすべてキープアライブが有効になっています。

【知らせ】

TCP 接続を閉じるイニシエーターは Kafka クライアントであり、これは受動的な閉じるシナリオです。

受動的なクローズの結果、多数のCLOSE_WAIT接続が生成される。

プロデューサーまたはクライアントには、TCP 接続が切断されたことを明示的に確認する機会がありません。

要約する

これで、冒頭の 3 つの質問に答えることができます。

1. Kafka はメッセージを個別に送信しますか、それともバッチで送信しますか?

通常は一括して送信されます。 ProducerBatch にカプセル化して送信します。

2. Kafka はどのようにして単一のメッセージを送信しますか?

send メソッドを同期的に呼び出すには、単一のプロデューサーと単一のスレッドのみを設定できます。

3. Kafka はメッセージを順番に送信しますか?

いいえ、順序が必要な場合は、キーを設定する必要があり、プロデューサーはシングルスレッドになります。

4. プロデューサーが FullGC を頻繁に実行するのはどのような状況でしょうか?

メッセージ サイズが batchSize より大きい場合、割り当てられたメモリ ブロックはループ内で free から取得されませんが、新しい ByteBuffer が作成され、ByteBuffer はバッファー プールに返されません (JVM GC リカバリ)。この時点で nonPooledAvailableMemory がメッセージ本文より小さい場合、free 内の空きメモリ ブロックが破棄され (JVM GC リカバリ)、ユーザー アプリケーション用のバッファ プールに十分なメモリ領域が確保されます。これらのアクションにより、GC の問題が頻繁に発生します。

したがって、頻繁な GC を回避するには、ビジネス メッセージのサイズに応じて batch.size を適切に調整する必要があります。

<<:  GitOps 継続的デプロイメント ツールである Argo CD を初めて体験

>>:  最高のPython仮想環境。

推薦する

どのようなタイトルが訪問者を本当に惹きつけクリックさせるのかの解釈例

私がSEO業界に入った当初は、タイトルを書くときにキーワードの書き方しか知らなかったことを覚えていま...

spinservers 中国電信ネットワークサーバーレビュー: 3 つのネットワークへの完全な直接接続

spinserversは今月初め、中国電信のネットワークに接続されたサンノゼデータセンターに独立した...

ウェブサイトのランキングを安定させるには、6つの重要なポイントに注意する必要があります

ウェブサイト業界の競争がますます激しくなるにつれ、ランキングはすべてのウェブマスターにとって大きな問...

ウェブサイトを構築するにはどれくらいの費用がかかりますか?

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っていますはじめに:...

SEOとSNSはeコマースの最高のパートナー

あらゆる大きな出来事は、一連の疑問や論争を引き起こします。 7月21日、北京は61年ぶりの暴風雨に見...

Expedia が AWS に全面的に参入

[51CTO.com からのオリジナル記事] AWS は本日、世界的なオンライン旅行会社 Exped...

itldcはどうですか?ニュージャージー州セコーカスのデータセンターで VPS をテストしましょう!

itldcはどうですか?ニュージャージー州セコーカスにある itldc の VPS はいかがでしょう...

関連性の高い外部リンクの重み値が高い理由を分析する2つの側面

外部リンクはウェブサイトの最適化に欠かせない要素であるため、多くの友人が外部リンクに興味を持っている...

週刊ニュースレビュー:Sina WeiboがWeiboに名前を変更、Alipayが4大銀行と競争して冷静に

1. Ctripの脆弱性は、インターネット業界全体のセキュリティ意識の欠如を露呈しているユーザーの支...

#オーストラリア VPS# flowvps-$6.99/KVM/1g メモリ/10g NVMe/1T トラフィック

新ブランドのflowvpsは、Alpha Layer Pty Ltd(ABN: 99 617 970...

ウェブサイト分析ツールの徹底解説:訪問元統計(パート1)

事業背景1. 訪問者の流入元は何ですか?当社の Web サイトを訪問するユーザーはどのようにして当社...

キーワード最適化の主な方法を理解する

まず、1点明確にしておきたいことがあります。最適化は、Web サイト構築の補助的な手段にすぎないと私...

racknerd: 米国の大型ハードディスク サーバー、509 ドル、2*e5-2640v2、64G メモリ、250gSSD+160T SAS、200T トラフィック/月

Racknerdはこれまで、米国ユタ州で超大型160Tハードドライブを搭載したストレージサーバーを発...

新製品: z.com、シンガポール/米国/日本/KVM/1G メモリ/50g SSD/2T トラフィック/7 USD

新しい業者、z.comを紹介しましょう。知らない人も多いので新しいと言いますが、実はz.comは日本...