Kafka メッセージ送信スレッドとネットワーク通信

Kafka メッセージ送信スレッドとネットワーク通信

[[420379]]

先ほど説明したメッセージ送信シーケンス図を確認してみましょう。前のセクションでは、Kafka に関連するメタデータ情報とメッセージのカプセル化について説明しました。メッセージのカプセル化が完了すると、メッセージが送信されます。このタスクは Sender スレッドによって実装されます。

1. 送信スレッド

KafkaProducer オブジェクトを見つけます。 KafkaProducer のコンストラクターには数行のコードがあります。

  1. this.accumulator = 新規
  2. レコードアキュムレータ(config.getInt(プロデューサーConfig.BATCH_SIZE_CONFIG)、
  3. this.totalMemorySize、
  4. this.compressionType、
  5. config.getLong(プロデューサーConfig.LINGER_MS_CONFIG)、
  6. 再試行バックオフMs、
  7. 指標、
  8. 時間);

RecordAccumulator オブジェクトが構築され、オブジェクト内の各メッセージ バッチのサイズ、バッファー サイズ、圧縮形式などが設定されます。

次に、メッセージを送信するためのキャリアとして使用される非常に重要なコンポーネント NetworkClient が構築されます。

  1. NetworkClientクライアント = 新しいNetworkClient(
  2. 新しいセレクター(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG)、this.metrics、 time "producer" 、channelBuilder)、
  3. this.メタデータ、
  4. クライアントID、
  5. config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)、
  6. config.getLong(プロデューサーConfig.RECONNECT_BACKOFF_MS_CONFIG)、
  7. config.getInt(プロデューサー構成.SEND_BUFFER_CONFIG)、
  8. config.getInt(プロデューサー構成.RECEIVE_BUFFER_CONFIG)、
  9. this.requestTimeoutMs、時間);

構築された NetworkClient には、注意すべき重要なパラメータがいくつかあります。

√ connections.max.idle.ms: ネットワーク接続がアイドル状態を維持できる最長時間を示します。アイドル時間がこの値を超えると、ネットワーク接続が閉じられます。デフォルト値は 9 分です。

√ max.in.flight.requests.per.connection: 各ネットワーク接続がプロデューサーからブローカーへの応答なしで許容できるメッセージの数を示します。デフォルト値は 5 です。(ps: プロデューサーがブローカーにデータを送信する場合、実際には複数のネットワーク接続が存在します)

√ send.buffer.bytes: ソケットがデータを送信するために使用するバッファのサイズ。デフォルト値は 128K です。

√receive.buffer.bytes: ソケットがデータを受信するために使用するバッファのサイズ。デフォルト値は 32K です。

送信スレッドがメッセージを送信し始めるまで、良いニュースを送信するためのネットワーク チャネルを構築します。

  1. this.sender = 新しいSender(クライアント、
  2. this.メタデータ、
  3. this.accumulator、
  4. config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
  5. config.getInt(プロデューサー構成.MAX_REQUEST_SIZE_CONFIG)、
  6. (ショート) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG))、
  7. config.getInt(プロデューサー構成.RETRIES_CONFIG)、
  8. this.metrics、
  9. 新しいSystemTime()、
  10. クライアントID、
  11. リクエストタイムアウトMs);
  12. //デフォルトのスレッド名プレフィックスはkafka-producer-network-threadです。clientIdはプロデューサーのIDです。
  13. 文字列 ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "" );
  14. //デーモン スレッドを作成し、Sender オブジェクトをそのスレッドに渡します。
  15. this.ioThread = 新しい KafkaThread(ioThreadName、this.sender、 true );
  16. //スレッドを開始する
  17. スレッドを開始します。

ここは非常に明確です。スレッドなので、run() メソッドが必要です。この方法では実装ロジックに焦点を当てます。ここで、学習する価値のあるスレッドの使用パターンを追加する必要があります。送信スレッドが作成された後、送信スレッドがすぐに開始されず、新しい KafkaThread スレッドが作成され、送信オブジェクトがそれに渡されてから KafkaThread スレッドが開始されることがわかります。多くの友人が疑問を抱くだろうと思います。 KafkaThread クラスにアクセスして、その内容を見てみましょう。

  1. /**
  2. *物事をうまくセットアップするThreadラッパー
  3. */
  4. パブリッククラスKafkaThreadはThreadを拡張します{
  5. プライベート最終ロガー ログ = LoggerFactory.getLogger(getClass());
  6. public KafkaThread(final String name 、Runnable runnable、boolean daemon) {
  7. super(実行可能オブジェクト、名前);
  8. //バックグラウンドデーモンスレッドとして設定
  9. setDaemon(デーモン);
  10. UncaughtExceptionHandler を設定します(新しい Thread.UncaughtExceptionHandler() {
  11. パブリックvoid uncaughtException(スレッド t、Throwable e) {
  12. log.error( "" + name + " でキャッチされない例外: " , e);
  13. }
  14. });
  15. }
  16. }

KafkaThread スレッドは実際にはデーモン スレッドを開始するだけであることがわかっていますが、これを行う利点は何でしょうか?答えは、ビジネス コードとスレッド自体を分離し、複雑なビジネス ロジックを KafkaThread などのスレッドに実装できるということです。このように、Sender スレッドはコード レベルで非常に簡潔かつ読みやすくなります。

まず、Sender オブジェクトの構造を見てみましょう。

/**

* 主な機能は、バックグラウンドスレッドを使用してKafkaクラスタへのプロダクションリクエストを処理し、メタデータ情報を更新し、適切なノードにメッセージを送信することです。

* Kafka クラスターへの生成リクエストの送信を処理するバックグラウンド スレッド。このスレッドはメタデータを作成します

* クラスターのビューを更新するように要求し、適切なノードに生成要求を送信します。

  1. パブリッククラスSenderはRunnableを実装します{
  2. プライベート静的最終ロガー ログ = LoggerFactory.getLogger(Sender.class);
  3. //Kafka ネットワーク通信クライアント。主にブローカーとのネットワーク通信に使用されます。
  4. プライベート最終 KafkaClient クライアント;
  5. //メッセージアキュムレータ、バッチメッセージレコードを含む
  6. プライベート最終 RecordAccumulator アキュムレータ;
  7. //クライアントのメタデータ情報
  8. プライベート最終メタデータメタデータ;
  9. /* プロデューサーがメッセージの順序を保証するかどうかを示すフラグ ブローカーまたは ない。 */
  10. //メッセージの順序を保証するためにマークする
  11. プライベート最終ブール値 guaranteeMessageOrder;
  12. /* 最大リクエストサイズ サーバー送信を試みる* /
  13. //対応する設定はmax.request.sizeこれはsend() メソッドを呼び出して送信される最大リクエスト サイズを表します。
  14. プライベート最終int maxRequestSize;
  15. /*サーバー要求する確認応答数 */
  16. //メッセージの送信ステータスを確認するために使用します。3 つのオプションがあります: -1、0、1
  17. プライベート最終ショート ACK。
  18. /* 失敗したリクエスト諦める前に再試行する回数* /
  19. //リクエスト失敗後の再試行回数
  20. プライベート最終int再試行;
  21. /*時間を取得するために使用されるクロックインスタンス*/
  22. //時間ツール、時間を計算、特別な意味はありません
  23. プライベート最終時間 時間;
  24. /*送信スレッドまだ実行中の場合はtrue */
  25. //スレッドの状態を示します。trueは実行中を意味します
  26. プライベート揮発性ブール値の実行;
  27. /*真実 発信者が望むとき 無視する 未送信/送信中のすべてのメッセージ  近い。 */
  28. //メッセージ送信のフラグを強制的に閉じます。 trueに設定すると、メッセージは正常に送信されたかどうかに関係なく無視されます。
  29. プライベート volatile ブール値 forceClose;
  30. /* メトリック */
  31. //インジケーターコレクションを送信
  32. プライベート最終SenderMetricsセンサー;
  33. /*クライアントパラメータ clientId */
  34. //プロデューサークライアントID
  35. プライベート文字列クライアントID;
  36. /*最大値 時間 サーバーがリクエスト応答する待つ* /
  37. //リクエストタイムアウト
  38. プライベート最終intリクエストタイムアウト;
  39. //コンストラクタ
  40. パブリックSender(KafkaClientクライアント、
  41. メタデータ メタデータ、
  42. RecordAccumulator アキュムレータ、
  43. ブール型 guaranteeMessageOrder、
  44. 整数最大リクエストサイズ、
  45. ショートACK、
  46. int再試行、
  47. メトリクスメトリクス、
  48. 時間 時間
  49. 文字列クライアントID、
  50. intリクエストタイムアウト) {
  51. this.client = クライアント;
  52. this.accumulator = アキュムレータ;
  53. this.metadata = メタデータ;
  54. this.guaranteeMessageOrder = guaranteeMessageOrder;
  55. this.maxRequestSize = maxRequestSize;
  56. this.running = true ;
  57. this.acks = acks;
  58. this.retries = 再試行;
  59. this.time =時間;
  60. this.clientId = クライアントID;
  61. this.sensors = 新しい SenderMetrics(metrics);
  62. this.requestTimeout = リクエストタイムアウト;
  63. }
  64. ....
  65. }

Sender オブジェクトの初期化パラメータの概要を理解したら、実際に Sender オブジェクトの run() メソッドを見つけてみましょう。

  1. パブリックボイド実行(){
  2. log.debug( "Kafka プロデューサー I/O スレッドを開始しています。" );
  3. //送信スレッドが開始されると、連続実行状態になります
  4. (実行中){
  5. 試す {
  6. //コアコード
  7. 実行(時間.ミリ秒());
  8. } キャッチ (例外 e) {
  9. log.error( "Kafka プロデューサー I/O スレッドでキャッチされないエラーが発生しました: " , e);
  10. }
  11. }
  12. log.debug( "Kafka プロデューサー I/O スレッドのシャットダウンを開始し、残りのレコードを送信しています。" );
  13. // リクエストの受付を停止しましたが、まだ残っている可能性があります
  14. //アキュムレータ内のリクエストまたは確認応答待機中
  15. // これらが完了するまで待機します。
  16. while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
  17. 試す {
  18. 実行(時間.ミリ秒());
  19. } キャッチ (例外 e) {
  20. log.error( "Kafka プロデューサー I/O スレッドでキャッチされないエラーが発生しました: " , e);
  21. }
  22. }
  23. if (強制終了) {
  24. //未完了のバッチをすべて失敗させ待機中のスレッドを起動する必要あります。  
  25. // 将来。
  26. this.accumulator.abortIncompleteBatches();
  27. }
  28. 試す {
  29. this.client.close () ;
  30. } キャッチ (例外 e) {
  31. log.error( "ネットワーククライアントを閉じることができませんでした" , e);
  32. }
  33. log.debug( "Kafka プロデューサー I/O スレッドのシャットダウンが完了しました。" );
  34. }

上記の run() メソッドには、2 つの while 判断があり、どちらもスレッドを中断せずに実行し続け、ブローカーにメッセージを送信することを目的としています。どちらの場所でも、時間パラメータを持つ別の run(xx) オーバーロード メソッドを呼び出します。最初の run(ts) メソッドは、メッセージ バッファー内のメッセージをブローカーに送信します。 2 番目の run(ts) メソッドは、まずスレッドを強制的に閉じるかどうかを判断します。強制的に閉じられていない場合は、スレッドを終了する前にメッセージ バッファー内の未送信メッセージが送信されます。

/**

* 送信を1回繰り返して実行する

*

* @param 現在

* 現在のPOSIX時間(ミリ秒)

*/

  1. void run(long now) {
  2.  
  3. //最初のステップはメタデータを取得することです
  4. クラスター クラスター = メタデータ。フェッチ();
  5. //送信準備が整ったデータを含むパーティションリストを取得します
  6.  
  7. //2番目のステップは、どのパーティションが送信条件を満たすかを判断することです
  8. RecordAccumulator.ReadyCheckResult 結果 = this.accumulator.ready(cluster, now);
  9. /**
  10. * ステップ3: メタデータがまだ取得されていないトピックを特定する
  11. */
  12. 結果が空の場合
  13. //セット リーダー不明なトピックにはリーダー選出が保留トピック 
  14. // 期限が切れている可能性のあるトピック。トピックをメタデータ再度追加して確実に含まれるようします。
  15. //トピック送信するメッセージがあるため、メタデータの更新を要求します
  16. (文字列トピック: result.unknownLeaderTopics)の場合
  17. this.metadata.add (トピック);
  18. this.metadata.requestUpdate();
  19. }
  20. //送信する準備ができていないノード削除します 
  21. イテレータ<Node> iter = result.readyNodes.iterator();
  22. long notReadyTimeout = Long.MAX_VALUE;
  23. (iter.hasNext()) の間 {
  24. ノード node = iter.next ();
  25. /**
  26. * ステップ 4: データを送信するホストとのネットワークが確立されているかどうかを確認します。
  27. */
  28. //戻り値がfalseの場合 
  29. if (!this.client.ready(node, now)) {
  30. // 結果からメッセージを送信するホストを削除します。
  31. //ここですべてのホストが削除されることがわかります
  32. iter.remove();
  33. notReadyTimeout = 数学。 min (notReadyTimeout、this.client.connectionDelay(node、now));
  34. }
  35. }

/**

* 5 番目のステップでは、送信するパーティションが多数ある可能性があります。この場合、このような状況になる可能性がある

* 一部のパーティションのリーダー パーティションは同じサーバー上に分散されます。

*

*

*/

  1. Map< Integer , List<RecordBatch>> batches = this.accumulator.drain(clu​​ster,
  2. 結果.readyNodes、
  3. this.maxRequestSize、
  4. 今);
  5. if (メッセージ順序を保証する) {
  6. // 排出されたすべてのパーティションをミュートする
  7. //バッチが空の場合は実行をスキップします。
  8. for (List<RecordBatch> batchList : batches.values ​​( )) {
  9. ( RecordBatch バッチ: batchList)
  10. this.accumulator.mutePartition(batch.topicPartition);
  11. }
  12. }

/**

* ステップ6: 時間指定バッチ処理

*

*/

  1. リスト<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
  2. //センサーを更新
  3. ( RecordBatch期限切れバッチ: 期限切れバッチ)
  4. this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
  5. sensors.updateProduceRequestMetrics(バッチ);
  6. /**

* ステップ 7: メッセージを送信するリクエストを作成し、バッチで送信してネットワーク伝送コストを削減し、スループットを向上させます。

*/

  1. <ClientRequest> リクエストをリストします。リクエストは、バッチファイルから作成されます。
  2. // 送信準備が整ったノードあり、送信可能なデータがある場合は、 0タイムアウトポーリングして、すぐに送信できるようにします。
  3. // ループしさらにデータを送信してみます。それ以外の場合、タイムアウトはデータあるパーティションを持つノードによって決定されます
  4. // まだ送信可能ではありません (例: 保留中、バックオフ中)。これにはノードは含まれないことに注意してください
  5. //送信可能なデータビジー ループの原因となるため、送信する準備ができていません。
  6. long pollTimeout = Math. min (result.nextReadyCheckDelayMs、notReadyTimeout);
  7. (result.readyNodes.size () > 0)場合{
  8. log.trace( "送信準備のできたデータを持つノード: {}" , result.readyNodes);
  9. log.trace( "{} が作成され、リクエストが生成されました: {}" , requests.size (), requests);
  10. ポーリングタイムアウト = 0;
  11. }
  12. //リクエスト送信操作
  13. (ClientRequestリクエスト: リクエスト)
  14. // op_write をバインドする
  15. client.send(リクエスト、今);
  16.  
  17. //すでに送信準備が整っているパーティションある場合選択 時間は0になります。
  18. // それ以外の場合、一部のパーティションにすでにデータが蓄積されているがまだ準備ができていない場合
  19. //選択 時刻は現在リンガー有効期限間の時間になります
  20. // それ以外の場合は選択 時刻は現在メタデータの有効期限の間の時間になります
  21. /**

* ステップ8: NetWordClientコンポーネントは、実際にネットワーク操作を実行するコンポーネントです。

* 含まれるもの: リクエストの送信、応答の受信(応答の処理)

  1. this.client.poll(pollTimeout, 現在);

上記の run(long) メソッドの実行プロセスは、次の手順にまとめられます。

1. クラスターのメタデータ情報を取得する

2. RecordAccumulator の ready() メソッドを呼び出して、現在のタイムスタンプで送信できるパーティションを判別し、パーティション リーダー パーティションのメタデータ情報を取得して、メッセージを受信できるノードを見つけます。

3. メタデータにまだプルされていないトピックをマークします。キャッシュ内にunknownLeaderTopicsとしてマークされたトピック情報がある場合は、これらのトピックをメタデータに追加し、メタデータのrequestUpdate()メソッドを呼び出してメタデータの更新を要求します。

4.ステップで返された結果からメッセージを受信する必要のないノードを削除し、メッセージを受信する準備ができているノードのみを走査し、送信するノードとのネットワークが確立されているかどうかを確認し、送信条件を満たさないノードをreadyNodeから削除します。

5. ネットワーク接続が確立されたノード セットに対して、RecordAccumulator のdrain() メソッドを呼び出して、送信を待機しているメッセージ バッチ セットを取得します。

6. タイムアウトしたメッセージを処理し、RecordAccumulator の addExpiredBatches() メソッドを呼び出し、RecordBatch をループして、その中のメッセージがタイムアウトしたかどうかを判断します。そうであれば、キューから削除してリソーススペースを解放します。

7. メッセージを送信するリクエストを作成し、createProducerRequest メソッドを呼び出して、メッセージ バッチを ClientRequest オブジェクトにカプセル化します。バッチは通常複数であるため、リストが返されます。集める

8. NetworkClientのsend()メソッドを呼び出し、KafkaChannelのop_write操作をバインドします。

9. NetworkClientのpoll()メソッドを呼び出してメタデータ情報を取得し、接続を確立し、ネットワーク要求を実行し、応答を受信し、メッセージの送信を完了します。

上記は、メッセージとクラスター メタデータの Sender スレッドのコア プロセスです。これには、別のコア コンポーネントである NetworkClient が関係します。

2. ネットワーククライアント

NetworkClient はメッセージを送信するための媒体です。メッセージを送信するプロデューサーであっても、メッセージを受信するコンシューマーであっても、ネットワーク接続を確立するには NetworkClient に依存する必要があります。同様に、まず NetworkClient のコンポーネントを理解します。これには主に NIO に関する知識が必要です。興味のある方は、NIO の原理と構成をご覧ください。

  1. /**
  2. *非同期要求/応答ネットワーク I/O用のネットワーク クライアント。これ
  3. *ユーザー向けのプロデューサー クライアントコンシューマー クライアント。
  4. * <p>
  5. * このクラス スレッドセーフではありませ!
  6. */
  7. パブリッククラスNetworkClientはKafkaClientを実装します
  8. {
  9.  
  10. プライベート静的最終ロガー ログ = LoggerFactory.getLogger(NetworkClient.class);
  11. /* ネットワークI/Oを実行するために使用されるセレクタ*/
  12. //java NIOセレクター
  13. private final 選択可能なセレクター。
  14. プライベート最終メタデータ更新者メタデータ更新者;
  15. プライベート最終ランダム randOffset;
  16. /*各ノードの接続状態*/
  17. プライベート最終 ClusterConnectionStates 接続状態;
  18. /*セット 現在送信中または応答を待っているリクエストの数*/
  19. プライベート最終 InFlightRequests inFlightRequests;
  20. /* ソケット送信バッファサイズ バイト単位*/
  21. プライベート最終int socketSendBuffer;
  22. /* ソケット受信バッファサイズバイト単位) */
  23. プライベート最終int socketReceiveBuffer;
  24. /*サーバーへのリクエストこのクライアントを識別するために使用されるクライアント ID */
  25. プライベート最終文字列クライアントID;
  26. /* サーバーリクエストを送信するとき使用する現在の相関 ID */
  27. プライベートint相関;
  28. /*最大 時間 プロデューサーサーバーからの確認応答を待つ時間ミリ秒) */
  29. プライベート最終int requestTimeoutMs;
  30. プライベート最終時間 時間;
  31. ......
  32. }

NetworkClient は、Selectable、MetadataUpdater、ClusterConnectionStates、InFlightRequests などのいくつかのコア クラスを含む KafkaClient インターフェイスを実装していることがわかります。

2.1 選択可能

その中でも、Selectable は非同期のノンブロッキング ネットワーク IO を実装するインターフェースです。クラス注釈から、Selectable は読み取り、書き込み、接続、その他の操作を含む複数のネットワーク接続を単一のスレッドで管理できることがわかります。これは NIO と一致しています。

まず、org.apache.kafka.common.network パッケージにある Selectable 実装クラス Selector を見てみましょう。ソースコードはたくさんあるので、比較的重要なものをピックアップしてみましょう。

  1. パブリッククラスSelectorはSelectableを実装します{
  2. 公共 静的最終long NO_IDLE_TIMEOUT_MS = -1;
  3. プライベート静的最終ロガー log = LoggerFactory.getLogger(Selector.class);
  4. //このオブジェクトはjavaNIOのセレクタです
  5. //セレクターは、ネットワークの確立、ネットワーク要求の送信、実際のネットワーク IO の処理を​​担当します。
  6. //最もコアとなるコンポーネントと言えます。
  7. プライベート最終 java.nio.channels.Selector nioSelector;
  8. //Broker と KafkaChannel (SocketChannel) のマッピング
  9. //ここでのkafkaChannelは、一時的にSocketChannelとして理解できます
  10. //NodeIdとKafkaChannel間のマッピング関係を維持する
  11. プライベート最終 Map<String, KafkaChannel> チャネル;
  12. //送信されたリクエストを記録する
  13. プライベート最終リスト<Send> 送信完了;
  14. //受信され処理された応答を記録します。
  15. プライベート最終リスト<NetworkReceive>completedReceives;
  16. // 応答は受信されましたが、まだ処理されていません。
  17. // 1 つの接続は 1 つの応答キューに対応します
  18. プライベート最終 Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
  19. private final <SelectionKey> を直ちに接続キーに設定します
  20. //ホストへの接続またはポート接続が確立されていません
  21. プライベート最終List<String>切断されました。
  22. //ホスト接続を完了する
  23. プライベート最終リスト<String>接続;
  24. //接続に失敗したホスト。
  25. プライベート最終リスト<String> failedSends;
  26. プライベート最終時間 時間;
  27. プライベート最終SelectorMetricsセンサー;
  28. プライベート最終文字列metricGrpPrefix;
  29. プライベート最終 Map<String, String> metricTags;
  30. //KafkaChannel を作成するためのビルダー
  31. プライベート最終チャネルビルダーチャネルビルダー;
  32. プライベート最終int maxReceiveSize;
  33. プライベート最終ブール値 metricsPerConnection;
  34. プライベートファイナルIdleExpiryManagerアイドルExpiryManager;

ネットワーク要求を開始する最初のステップは、接続、イベントの登録、メッセージの送信および処理であり、これにはいくつかのコア メソッドが含まれます。

1. connect() メソッドを接続する

  1. /**
  2. *指定されたアドレスへの接続を開始し  接続追加する 指定されたID関連付けられたこのnioSelector
  3. * 番号。
  4. * <p>
  5. * この呼び出しは接続を開始するだけであり、これは将来の {@link #poll(long)}完了することに注意してください。
  6. * 電話。特定のポーリング呼び出し後にどの接続が完了したか(ある場合) を確認するには、{@link #connected()}をチェックします
  7. * @param id新しい接続ID  
  8. * @param addressアドレス 接続する  
  9. * @param sendBufferSize新しい接続送信バッファ 
  10. * @param receivedBufferSize新しい接続受信バッファ 
  11. *すでに接続がある場合はIllegalStateExceptionをスローします そのID
  12. *ホスト名のDNS解決失敗した場合、またはブローカーダウンしている場合はIOExceptionがスローされます
  13. */
  14. @オーバーライド
  15. パブリックvoid connect (String id, InetSocketAddress address, int sendBufferSize, int receivedBufferSize) は IOException をスローします {
  16. if (this.channels.containsKey(id))
  17. throw new IllegalStateException( "id の接続が既に存在します " + id);
  18. //ソケットチャネルを取得する
  19. SocketChannel socketChannel = SocketChannel。開ける();
  20. //非ブロッキングモードに設定
  21. socketChannel.configureBlocking( false );
  22. ソケット socket = socketChannel.socket();
  23. socket.setKeepAlive( true );
  24. //送信バッファサイズや受信バッファサイズなどのネットワークパラメータを設定する
  25. (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) の場合
  26. socket.setSendBufferSize(sendBufferSize);
  27. (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) の場合
  28. socket.setReceiveBufferSize(受信バッファサイズ);
  29. //デフォルト値はfalse、Nagle アルゴリズムが有効になることを意味します。
  30. //ネットワーク内の小さなデータパケットを収集し、それらを大きなデータパケットに結合して送信します
  31. //ネットワーク内で多数の小さなデータパケットが送信されると、伝送効率に影響を与えると考えられるため
  32. socket.setTcpNoDelay( true );
  33. ブール接続;
  34. 試す {
  35. //非ブロッキングなのでサーバーに接続してみる
  36. //接続がすぐに成功する可能性があり、成功した場合はtrueを返します 
  37. //接続に成功するまでに長い時間がかかり、 falseが返されることもあります
  38. connected = socketChannel.connect (アドレス);
  39. } キャッチ (UnresolvedAddressException e) {
  40. socketChannel.close () ;
  41. 新しい IOException をスローします ( "アドレスを解決できません: " + address, e);
  42. } キャッチ (IOException e) {
  43. socketChannel.close () ;
  44. eを投げる;
  45. }
  46. //SocketChannel はセレクタに OP_CONNECT を登録します
  47. 選択キーキー= socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
  48. //SocketChannel に基づいて KafkaChannel をカプセル化します
  49. KafkaChannel チャネル = channelBuilder.buildChannel(id、キー、最大受信サイズ);
  50. //キーをKafkaChannelに関連付ける
  51. //キーに基づいてKafkaChannelを見つけることができます
  52. //KafkaChannelに基づいてキーを見つけることもできます 
  53. キー.attach(チャネル);
  54. //キャッシュする
  55. this.channels.put(id, チャネル);
  56.  
  57. //接続されている場合
  58. (接続)の場合{
  59. // OP_CONNECTはトリガーされません すぐに接続できるチャンネル
  60. log.debug( "ノード {} にすぐに接続されました" , channel.id());
  61. すぐにConnectedKeys.add (キー) を実行します。
  62. // 以前に登録した OP_CONNECT イベントをキャンセルします。
  63. キー.interestOps(0);
  64. }
  65. }

2. レジスタregister()

  1. /**
  2. *既存のチャネルnioSelectorを登録する
  3. *接続確立していない場合は、サーバー側これを使用します 別のスレッドによって受け入れられるセレクタによって処理される
  4. *接続ID有効かどうかはチェックしていないことに注意してください-接続はすでに存在しているためです
  5. */
  6. パブリックvoid register(String id, SocketChannel socketChannel) は ClosedChannelException をスローします {
  7. //独自のセレクタにOP_READイベントを登録する
  8. //このようにして、プロセッサ スレッドはクライアントから送信された接続を読み取ることができます。
  9. 選択キーキー= socketChannel.register(nioSelector, SelectionKey.OP_READ);
  10. //Kafka は SocketChannel の KakaChannel をカプセル化します
  11. KafkaChannel チャネル = channelBuilder.buildChannel(id、キー、最大受信サイズ);
  12. //キーとチャンネル
  13. キー.attach(チャネル);
  14. //サーバー上のコードはクライアント上のネットワークコードと再利用されます
  15. //チャネルは複数のネットワーク接続を維持します。
  16. this.channels.put(id, チャネル);
  17. }

3. send() を送信する

  1. /**
  2. * 指定されたリクエストをキューに入れて後続の{@link #poll(long)}呼び出し送信します
  3. * @param send送信するリクエスト
  4. */
  5. パブリックvoid send(Send send) {
  6. //KafakChannel を取得する
  7. KafkaChannel チャネル = channelOrFail(send.destination());
  8. 試す {
  9. //重要なメソッド
  10. チャネルをsetSend(送信);
  11. } キャッチ (CancelledKeyException e) {
  12. this.failedSends.add ( send.destination ());
  13. 閉じる(チャネル)
  14. }
  15. }

4. メッセージ処理poll()

  1. @オーバーライド
  2. パブリックvoid poll(long timeout) は IOException をスローします {
  3. (タイムアウト < 0)の場合
  4. 新しい IllegalArgumentException をスローします ( "タイムアウトは >= 0 である必要があります" );
  5. // 最後のpoll()メソッドによって返された結果をクリアします
  6. クリア();
  7. if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
  8. タイムアウト = 0;
  9. /* 準備完了キーをチェック*/
  10. 長いstartSelect = time .nanoseconds();
  11. //セレクターから登録されているキーの数を調べ、I/Oイベントが発生するのを待ちます
  12. int readyKeys =選択(タイムアウト);
  13. long endSelect = time .nanoseconds();
  14. this.sensors.selectTime.record(endSelect - startSelect, time .milliseconds());
  15. //キーは確かに上に登録されました 
  16. (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty())の場合{
  17. //I/Oイベントを処理し、このセレクタのキーを処理します
  18. 選択キーをポーリングします(this.nioSelector.selectedKeys(), false , endSelect);
  19. pollSelectionKeys(すぐに接続されたキー、 true 、endSelect);
  20. }
  21. // stagedReceives内のデータを処理する必要がある
  22. 完了した受信を追加します();
  23. long endIo = time .nanoseconds();
  24. this.sensors.ioTime.record(endIo - endSelect, time .milliseconds());
  25. //時間を使用します 最後  選択 閉鎖されないようにするため いかなるつながりも
  26. // pollSelectionKeys処理されたばかりです
  27. //処理が完了したら、長いリンクを閉じます
  28. おそらく最も古い接続を閉じます(endSelect);
  29. }

5.セレクターでキーを処理する

  1. // OP_CONNECT、OP_READ、OP_WRITE イベントの処理に使用され、接続ステータスの検出も担当します
  2. プライベート void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
  3. ブール値 isImmediatelyConnected、
  4. 長いcurrentTimeNanos) {
  5. //すべてのキーを取得 
  6. イテレータ<SelectionKey> iterator = selectionKeys.iterator();
  7. //すべてのキーを反復処理する 
  8. (イテレータ.hasNext()) の間 {
  9. 選択キーキー= iterator.next ( );
  10. イテレータを削除します。
  11. //キーに応じて対応する KafkaChannel を検索します
  12. KafkaChannel チャネル = チャネル(キー);
  13. //接続ごとのメトリックをすべて一度登録する
  14. センサー.maybeRegisterConnectionMetrics(channel.id());
  15. (idleExpiryManager != null の場合
  16. アイドル有効期限マネージャー。更新(channel.id()、currentTimeNanos);
  17. 試す {
  18. /* ハンドシェイクが完了した接続を完了します(通常または即時)*/
  19. //接続完了とOP_CONNECTイベントを処理する
  20. if (isImmediatelyConnected ||キー.isConnectable()) {
  21. //ネットワーク接続を完了します。
  22. チャネルが接続を完了している場合
  23. //ネットワーク接続が完了したら、このチャネルを接続セットに追加します
  24. this.connected.add (channel.id());
  25. this.sensors.connectionCreated.record();
  26. SocketChannel socketChannel = (SocketChannel)キー.channel();
  27. log.debug( "SO_RCVBUF = {}、SO_SNDBUF = {}、SO_TIMEOUT = {} でノード {} にソケットを作成しました"
  28. socketChannel.socket().getReceiveBufferSize()、
  29. socketChannel.socket().getSendBufferSize()、
  30. socketChannel.socket().getSoTimeout()、
  31. チャンネルID();
  32. }それ以外 
  33. 続く;
  34. }
  35. /* チャンネル 準備ができていません完了準備*/
  36. //ID認証
  37. チャネルが接続済みの場合 (チャネルが準備完了の場合)
  38. チャネルを準備します();
  39. /* チャネル読み取り可能な状態の場合 から 読み取り可能なデータを持つ接続 * /
  40. チャネルが準備完了の場合、キーは読み取り可能になり、ステージング受信が完了します。
  41. ネットワーク受信ネットワーク受信;
  42. //OP_READ イベントを処理し、サーバーから返された応答 (リクエスト) を受け入れます
  43. //networkReceiveはサーバーから返された応答を表します
  44. ((networkReceive = channel.read ()) != null )の場合
  45. ステージング受信にチャネルを追加します。
  46. }
  47. /* チャネル準備できたら書き込み 空きのあるソケット バッファ データがあるもの * /
  48. //OP_WRITEイベントを処理する
  49. チャネルが準備完了の場合、キーは書き込み可能になります。
  50. //送信するネットワーク要求を取得し、データをサーバーに送信する
  51. //メッセージが送信されると、OP_WRITEは削除されます
  52. 送信 send = channel.write();
  53. //応答メッセージが送信されました。
  54. if (送信 != null ) {
  55. this.completedSends.add (送信) ;
  56. this.sensors.recordBytesSent(channel.id(), send.size ());
  57. }
  58. }
  59. /* 機能ていないソケットをキャンセルします */
  60. if (!キー.isValid()) {
  61. 閉じる(チャネル)
  62. this.disconnected.add (channel.id());
  63. }
  64. } キャッチ (例外 e) {
  65. 文字列desc = channel.socketDescription();
  66. if (e インスタンス IOException)
  67. log.debug( "{} との接続が切断されました" , desc , e);
  68. それ以外 
  69. log.warn( "{} から予期しないエラーが発生しました。接続を閉じています" , desc , e);
  70. 閉じる(チャネル)
  71. //失敗した接続のコレクションに追加します
  72. this.disconnected.add (channel.id());
  73. }
  74. }
  75. }

2.2 メタデータアップデータ

これは、NetworkClient がクラスター メタデータ情報の更新を要求し、クラスター ノードを取得するために使用するインターフェイスです。これは、DefaultMetadataUpdater と ManualMetadataUpdater という 2 つの実装クラスを持つ、スレッドセーフではない内部クラスです。 NetworkClient は、NetworkClient のデフォルトの実装クラスであり、NetworkClient の内部クラスでもある DefaultMetadataUpdater クラスを使用します。ソースコードから見ると次のようになります。

  1. メタデータ更新者 == nullの場合{
  2. if (メタデータ == null )
  3. 新しい IllegalArgumentException をスローします ( "`metadata` は null であってはなりません" );
  4. this.metadataUpdater = 新しい DefaultMetadataUpdater(メタデータ);
  5. }それ以外{
  6. this.metadataUpdater = メタデータUpdater;
  7. }
  8. ......
  9. クラス DefaultMetadataUpdater は MetadataUpdater を実装します
  10. {
  11. /*現在のクラスターのメタデータ */
  12. //クラスターメタデータオブジェクト
  13. プライベート最終メタデータメタデータ;
  14. /*メタデータリクエストが送信  まだ回答を受け取っていないもの* /
  15. //MetadataRequest が送信されたかどうかを識別するために使用されます。すでに送信されている場合は再度送信する必要はありません
  16. プライベートブール値 metadataFetchInProgress;
  17. /*最後の タイムスタンプ いつ ブローカーノード利用できませ 接続する*/
  18. //利用可能なノードが見つからなかったときのタイムスタンプを記録する
  19. プライベート長いlastNodeAvailableMs;
  20. DefaultMetadataUpdater(メタデータメタデータ) {
  21. this.metadata = メタデータ;
  22. this.metadataFetchInProgress = false ;
  23. this.lastNoNodeAvailableMs = 0;
  24. }
  25. //クラスタノードセットを返す
  26. @オーバーライド
  27. パブリックリスト<Node> fetchNodes() {
  28. metadata.fetch ().nodes()を返します
  29. }
  30. @オーバーライド
  31. パブリックブール値isUpdateDue(long now) {
  32. 戻り値: !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
  33. }
  34. //コア メソッド。現在のクラスターに保存されているメタデータを更新する必要があるかどうかを判断します。更新が必要な場合は、MetadataRequestリクエストを送信します。
  35. @オーバーライド
  36. パブリックlong maybeUpdate(long now) {
  37. //メタデータを更新する必要がありますか?
  38. //次のメタデータ更新のタイムスタンプを取得する
  39. 長い timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
  40. //サーバーへの接続を次に再試行するときのタイムスタンプを取得します
  41. long timeToNextReconnectAttempt = Math.最大値(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - 現在、0);
  42. //MetadataRequestリクエストが送信されたかどうかを確認します
  43. 長い waitForMetadataFetch = this.metadataFetchInProgress ?整数.MAX_VALUE: 0;
  44. //存在する場合 利用可能なノードがありません 接続してメタデータの更新中止する
  45. 長いメタデータタイムアウト = Math.最大(数学。最大(次のメタデータ更新までの時間、次の再接続試行までの時間)、
  46. メタデータ取得を待機します);
  47. メタデータタイムアウト == 0 の場合
  48. //このメソッド動作poll()タイムアウト計算は
  49. // leastLoadedNode動作大きく依存します
  50. // 負荷が最も少ないノードを見つける
  51. ノード node = leastLoadedNode(now);
  52. // MetadataRequest リクエストを作成し、poll() メソッドをトリガーして実際の送信操作を実行します。
  53. おそらく更新します(現在、ノード);
  54. }
  55. メタデータタイムアウトを返します
  56. }
  57. //接続が確立されていないリクエストを処理する
  58. @オーバーライド
  59. パブリックブール値 maybeHandleDisconnection(ClientRequest リクエスト) {
  60. ApiKeys リクエストキー = ApiKeys.forId(request.request().header().apiKey());
  61. リクエストキー == ApiKeys.METADATA && request.isInitiatedByNetworkClient() の場合 {
  62. クラスター クラスター = メタデータ。フェッチ();
  63. (cluster.isBootstrapConfigured())の場合{
  64. int nodeId = Integer .parseInt(request.request().destination());
  65. ノード node = cluster.nodeById(nodeId);
  66. if (ノード != null )
  67. log.warn( "ブートストラップ ブローカー {}:{} が切断されました" 、 node.host()、 node.port());
  68. }
  69. メタデータの取得が進行中 = false ;
  70. 戻る 真実;
  71. }
  72. 戻る 間違い;
  73. }
  74. // 応答情報を解析する
  75. @オーバーライド
  76. パブリックブール値 maybeHandleCompletedReceive(ClientRequest req, long now, 構造体本体) {
  77. 短い apiKey = req.request().header().apiKey();
  78. //MetadataRequest リクエストかどうかを確認します
  79. (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient())の場合{
  80. // レスポンスを処理する
  81. handleResponse(req.request().header(), body, now);
  82. 戻る 真実;
  83. }
  84. 戻る 間違い;
  85. }
  86. @オーバーライド
  87. パブリックボイドリクエストアップデート() {
  88. this.metadata.requestUpdate();
  89. }
  90. //MetadataRequest リクエストの応答を処理する
  91. private void handleResponse(RequestHeader ヘッダー、構造体本体、long now) {
  92. this.metadataFetchInProgress = false ;
  93. //サーバーはバイナリデータ構造を返すため
  94. //したがって、プロデューサーはこのデータ構造をここで解析する必要があります
  95. //解析後、それはメタデータのアプセル化されています。
  96. MetadatAresponse応答=新しいメタデータデータアスパンズ(body);
  97. //応答はメタデータ情報を戻します
  98. //サーバーから引っ張られたクラスターのメタデータ情報を取得します。
  99. Cluster Cluster = Response.Cluster();
  100. //トピックメタデータが更新失敗したかどうかを確認してください
  101. map <string、errors> errors = response.errors();
  102. if(!errors.isempty())
  103. log.warn( "相関IDでメタデータを取得するときのエラー{}:{}" 、header.correlationid()、errors);
  104. //有効なノードがない場合はクラスターを更新しないでください...私たちが望むトピックは、まだ存在するプロセスあるかもしれません
  105. //作成されたのは、エラーが発生することを意味します 存在するまでノードはありません
  106. //メタデータ情報が正常に取得される場合
  107. if(cluster.nodes()。 size ()> 0){
  108. //メタデータ情報を更新します。
  109. this.metadata.update (cluster、now);
  110. }それ以外{
  111. log.trace( "相関ID {}。" 、header.correlationid())を使用した空のメタデータ応答を無視します。
  112. this.metadata.failedupdate(now);
  113. }
  114. }
  115. /**
  116. *指定されたトピックメタデータリクエストを作成します
  117. */
  118. プライベートクライアントレクエストリクエスト(長い間、文字列ノード、メタデータレクストメタデータ){
  119. requestsend send = new requestsend(node、nextrequestheader(apikeys.metadata)、metadata.tostruct());
  120. 新しいclientRequestを返します(今、 true 、send、 null true );
  121. }
  122. /**
  123. *送信リストメタデータリクエストを追加することができれば、それを作成できる場合は
  124. */
  125. private voidupdate(長い間、ノードノード){
  126. //ノードが使用可能かどうかを確認します
  127. if(node == null ){
  128. log.debug( "ノードが利用できないため、メタデータリクエストの送信をあきらめる" );
  129. //タイムスタンプをマークします のために 利用可能なノードはありません 接続する 
  130. this.lastnodeavailablems = now;
  131. 戻る;
  132. }
  133. string nodeconnectionid = node.idstring();
  134. //ネットワーク接続を確立する必要があるかどうか、およびノー​​ドにリクエストを送信できるかどうかを判断します
  135. if(cansendrequest(nodeconnectionid)){
  136. this.metadatafetchinprogress = true ;
  137. Metadatarequestメタデータエクセスト。
  138. //メタデータで更新する必要があるトピックを指定します
  139. if(metadata.needmetadataforalltopics())
  140. //リクエストのメタデータ情報をカプセル化するリクエスト(すべてのトピックを取得)
  141. metadatarequest = metadatarequest.alltopics();
  142. それ以外 
  143. //この方法デフォルトでここに行きました
  144. //メッセージを送信するのは、対応するトピックを引く方法です
  145. metadatarequest = new Metadatarequest(new ArrayList <>(metadata.topics()));
  146. //ここでリクエストを作成します(メタデータをプル)
  147. ClientRequest ClientRequest = request(Now、nodeconnectionId、metadatarequest);
  148. log.debug( "metadata request {}をnode {}に送信{}" 、metadatarequest、node.id());
  149. //キャッシュリクエスト、次回Poll()メソッドがトリガーされ、送信操作が実行されるのを待ちます
  150. Dosend(ClientRequest、Now);
  151. } else if(connectionStates.canconnect(nodeconnectionId、now)){
  152. //接続がありません すぐこのノード、1つを作成します
  153. log.debug( "メタデータ要求を送信するためにnode {}への接続を初期化" 、node.id());
  154. //接続を初期化します
  155. initiateconnect(Now、Now);
  156. // initiateconnectがすぐに失敗した場合、このノードは停電なり
  157. //すぐに試行する必要があります 別の候補ノードあります。もしそれが
  158. //最悪の場合はまだ接続しています より長いタイムアウトを設定することになっているということです
  159. //ラウンド 次に、応答待ちます
  160. } else {//接続、しかしもっと送信したり接続したりすることはできません
  161. //どちらの場合でも選択したことを知らせるためにネットワークイベントを待つ必要があります
  162. //接続が再び使用できる場合があります。
  163. this.lastnonodeavailablems = now;
  164. }
  165. }
  166. }
  167. Inflightrequest CacheキューへのCaches ClientRequestリクエスト。
  168. private void dosend(clientRequestリクエスト、長い状態){
  169. request.setsendtimems(now);
  170. // inflightrequestsキャッシュキューに応答要求はありません。デフォルトでは、最大5つのリクエストを保存できます
  171. this.inflightrequests。追加(リクエスト);
  172. //次に、セレクターのsend()メソッドを継続的に呼び出し続けます
  173. selector.send(request.request());
  174. }

2.3 Inflightrequests

このクラスはリクエストキューであり、送信されたが応答を受け取っていないクライアントリケストをキャッシュするために使用されます。キャッシュキューを管理するための多くの方法を提供し、構成パラメーターを介してクライアントリケストの数の制御をサポートし、ソースコードを介してその基礎となるデータ構造マップを見ることができます。

  1. /**
  2. *セット 送信された、または送信されているがまだ返信を受けていないリクエスト
  3. */
  4. 最終クラスのInflightRequests {
  5. プライベートファイナルint maxinflightrequestsperconnection;
  6. プライベート最終マップ<string、deque <clientRequest >> requests = new Hashmap <>();
  7. public Inflightrequests( int maxinflightrequestsperconnection){
  8. this.maxinflightrequestsperconnection = maxinflightrequestsperconnection;
  9. }
  10. ......
  11. }

キューの処理に関する多くの方法を含めることに加えて、Cansendmore()に焦点を合わせるためのより重要な方法があります。

  1. /**
  2. *このノードさらにリクエストを送信できますか?
  3. *
  4. *問題@paramノードノード
  5. * @戻る 特定のノードまだリクエストが送信されていない場合はtrue
  6. */
  7. Public Boolean Cansendmore(String node){
  8. // clientRequestキューをノードに送信する
  9. deque <ClientRequest> queue = requests.get(node);
  10. //ノードに要求が蓄積され、時間内に処理されない場合、リクエストタイムアウトが発生する可能性があります。
  11. return queue == null || queue.isempty()||
  12. (queue.peekfirst()。request()。完了()
  13. && 列。 size ()<this.maxinflightrequestsperconnection);
  14. }

上記のコアクラスを理解した後、NetworkClientのプロセスと実装の分析を開始します。

2.4 NetworkClient

Kafkaのすべてのメッセージは、NetworkClientを介して上流と下流で確立する必要があり、その重要性は自明です。ここでは、メッセージの成功プロセスのみを考慮し、例外処理は解析されず、比較的重要ではありません。メッセージを送信するプロセスは、ほぼ次のとおりです。

1.最初にReady()メソッドを呼び出して、ノードがメッセージを送信する条件を満たしているかどうかを判断します

2。iSready()メソッドを使用して、より多くの要求をノードに送信できるかどうかを判断します。これは、要求の蓄積があるかどうかを確認するために使用されます。

3。初期コネクトを使用して接続を初期化します

4.次に、Selector's Connect()メソッドを呼び出して接続を確立します

5.Socketchannelを入手して、サーバーとの接続を確立します

6. socketchannelのセレクターでop_connectイベントを登録します

7。send()を呼び出してリクエストを送信します

8。リクエストを処理するためにPoll()メソッドを呼び出します

以下は、各プロセスに関連する主要な操作を理解するためのメッセージ送信プロセスに含まれるコアメソッドに基づいた分析です。

1.ノードがメッセージ送信条件を満たしているかどうかを確認します

  1. /**
  2. *指定されたノードへの接続を開始し返します すでに接続されていてそのノード送信する準備ができている場合はTRUEです
  3. *
  4. * @paramノードノード チェック 
  5. * @param今すぐ タイムスタンプ 
  6. * @戻る 特定のノード送信する準備ができている場合はtrue
  7. */
  8. @オーバーライド
  9. public boolean Ready(Node Node、Long Now){
  10. if(node.isempty())
  11. 新しいIllegalargumentException( 「空のノードに接続できない」 +ノード)を投げます。
  12. //メッセージを送信するためのホストがメッセージを送信する条件を満たすかどうかを識別します
  13. if(isready(node、now))
  14. 戻る 真実;
  15. //ネットワークの構築を試みることができるかどうかを判断します
  16. if(connectionstates.canconnect(node.idstring()、now))
  17. //ノードへの送信興味があり接続がない場合 それに、1つを開始します
  18. //接続を初期化します
  19. //イベントに接続するためにバインド
  20. initiateconnect(Now、Now);
  21. 戻る 間違い;
  22. }

2。接続を初期化します

  1. /**
  2. *接続を開始します 指定されたノード
  3. */
  4. private void initiateConnect(ノードノード、長い){
  5. string nodeconnectionid = node.idstring();
  6. 試す {
  7. log.debug( "{}でnode {}への接続の開始{}:{}。" 、node.id()、node.host()、node.port());
  8. this.connectionStates.connecting(nodeconnectionId、now);
  9. //接続の確立を開始します
  10. セレクタ。 connect (nodeconnectionid、
  11. new inetsocketAddress(node.host()、node.port())、
  12. this.socketsendbuffer、
  13. this.SocketReceiveBuffer);
  14. } キャッチ (IOException e) {
  15. / *試行が失敗しました、バックオフ後に再試行します */
  16. connectionStates.disconnected(nodeconnectionId、now);
  17. / *問題私たちのメタデータかもしれません、それを更新*/
  18. metadataupdater.requestupdate();
  19. log.debug( "{}でnode {}に接続するエラー{}:{}:" 、node.id()、node.host()、node.port()、e);
  20. }
  21. }

3. initiateconnect()メソッドで呼び出される接続()メソッドは、セレクタークラスの選択可能な実装のconnect()メソッドです。 Socketchannelの取得とop_connect、op_read、およびop_writeイベントの登録が含まれます。上記の分析が行われました。ここでは詳細には触れません。上記の一連のアクションを完了してネットワーク接続を確立した後、メッセージ要求はダウンストリームノードに送信されます。送信者のsend()メソッドは、networkclientのsend()メソッドをsendに呼び出し、networkclientのsend()メソッドは最終的にセレクターのsend()メソッドを呼び出します。

  1. /**
  2. * 指定されたリクエスト送信キューに入れます。リクエストは送信することしかできません 準備ができたノード
  3. *
  4. * @param request リクエスト
  5. * @param今すぐ タイムスタンプ 
  6. */
  7. @オーバーライド
  8. public void send(clientRequestリクエスト、長い今){
  9. string nodeid = request.request()。destination();
  10. //接続ステータスを確立したノードがより多くのリクエストを受信できるかどうかを識別します
  11. if(!cansendrequest(nodeid))
  12. 新しいIllegalStateExceptionを投げます( "Node" + nodeid + "にリクエストを送信しようとする試みができていません。" );
  13. // clientRequestを送信します
  14. Dosend(リクエスト、今);
  15. }
  16. private void dosend(clientRequestリクエスト、長い状態){
  17. request.setsendtimems(now);
  18. //キャッシュリクエスト
  19. this.inflightrequests。追加(リクエスト);
  20. selector.send(request.request());
  21. }

4.最後に、リクエストを処理するためにPoll()メソッドに電話してください

  1. /**
  2. *ソケットへの実際の読み取り書き込みを実行します。
  3. *
  4. * @paramタイムアウト最大量 時間 すぐに何もない場合は MS応答を待つために
  5. * は負でない値である必要があります。実際のタイムアウトは、タイムアウト、リクエストタイムアウト、および 
  6. * メタデータのタイムアウト
  7. * @param今すぐ 時間 ミリ秒単位
  8. * @受信した応答リストを返します
  9. */
  10. @オーバーライド
  11. パブリックリスト<ClientResponse> Poll(Long Timeout、Long Now){
  12.     
  13. //ステップ1:メタデータを更新するリクエスト
  14. long metadatimeout = metadataupdater.maybeupdate(now);
  15. 試す {
  16.   
  17. //ステップ2:I/O操作を実行してリクエストを送信します
  18. this.selector.poll( utils。min (timeout、metadatimeout、request timeoutms));
  19. } キャッチ (IOException e) {
  20. log.Error( "I/Oの間の予期しないエラー" 、e);
  21. }
  22. // 完了したアクションを処理する
  23. long updatednow = this。 time .milliseconds();
  24. リスト<ClientResponse> responses = new ArrayList <>();
  25. //ステップ3:さまざまな応答を処理します
  26. HandleCompletedSends(Responses、updatedNow);
  27. HandleCompletedReceives(Responses、updatedNow);
  28. hondledisconnections(responses、updatednow);
  29. handleconnections();
  30. handletimedoutrequests(responses、updatednow);
  31. //コールバックを呼び出します
  32. // loop Call clientRequestのコールバック関数を呼び出します
  33. for (clientResponse応答:応答){
  34. if(respons.request()。hascallback()){
  35. 試す {
  36. Response.Request()。callback()。oncomplete(response);
  37. } キャッチ (例外 e) {
  38. log.Error( "要求完了時の有名なエラー:" 、e);
  39. }
  40. }
  41. }
  42. 返信応答。
  43. }

興味のある子供の靴は、コールバック関数のロジックとセレクターの動作を引き続き探求できます。

追加メモ:

Kafkaのメモリプールは含まれない上記のポイントがあります。 BufferPoolクラスを見ることができます。この知識ポイントは、前の記事で言及する必要があります。私は突然それを思い出し、それを逃しました。ここに追加します。これは、前述の記録的な蓄積クラスのデータ構造に対応しています。カプセル化されたレコードカミュレータオブジェクトは複数のdqueueで構成され、各dqueuは複数のレコードバッチで構成されています。さらに、RecordAccumulatorにはBufferPoolメモリプールも含まれています。 RecordAccumulatorクラスがConcurrentMapを初期化することをここで少し思い出しましょう。

  1. パブリックファイナルクラスRecordAccumulator {
  2. ......
  3. プライベートファイナルバッファプール無料;
  4. ......
  5. プライベート最終的なCONCURRENTMAP <TOPPITPARTITION、deque <RecordBatch >>バッチ;
  6. }

図に示すように、メモリ内のarecate()とリリースdeallocate()の2つの方法に焦点を当てます。興味のある友達は個人的にそれを見ることができます。このクラスのコードは合計で300行を超えており、コンテンツはそれほど多くありません。一緒にコミュニケーションして学ぶことへようこそ。この記事のトピックに影響を与えないように、ここでは拡張しません。

3. まとめ

この記事では、主に、メッセージを送信するKafkaプロデューサーの実際の執行者である送信者スレッドと、メッセージの上流および下流の伝送チャネルであるNetworkClientコンポーネントを分析します。主にNIOの適用を伴い、主にメッセージの送信を伴うコア依存関係クラスも導入します。この記事を書くことは、主に過去と未来の間のリンクとして機能します。これは、Kafkaの生産者から送信されたメッセージの以前の分析の補足であるだけでなく、上流の消費者消費の次の分析への道を開くこともできます。それは少し体系的です。記事を書くというアイデアは、主に、合計スコアのアイデアを分析する手がかりとして考慮しています。個人的には、長さが長すぎて読むのが不便だと思うので、読者に役立つことを望んで、それを合理化し、コアの方法とプロセスの分析に集中しようとします。

<<:  マイクロサービス メッセージ ブローカーの選択: Redis、Kafka、RabbitMQ

>>:  Kafka の運用とメンテナンス |データ移行を本当に理解していますか?

推薦する

最適化計画を効果的に改善する方法

SEO 最適化を行う際には、特に会社を選ぶ際には多くの選択肢があります。信頼できる会社であれば、適切...

軽量ログシステム Loki を 10 分で K8s に導入

ロキとは何ですか? Loki は、Grafana Labs によってオープンソース化された、水平スケ...

Baidu がリンク アルゴリズムを改善した後、外部リンクはどうすればよいですか?

多くの友人が私のウェブサイトに外部リンクを作成する方法をよく尋ねていることに気付きました。外部リンク...

ガートナーの2022年クラウドプラットフォームサービスのハイプサイクルでは、2年以内に実用化が成熟する2つのテクノロジーが特定されています。

ガートナーの 2022 年クラウド プラットフォーム サービス テクノロジー ハイプ サイクルによる...

ウェブサイトのランキングを最適化するにはどうすればよいでしょうか? 方法よりもアイデアの方が重要です。

ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービスウェブサイトを最適化して...

クラウドコンピューティング、仮想化、コンテナ化について

クラウドコンピューティングとは何ですか? 1.1 クラウドコンピューティングの概念[[268482]...

Baiduスナップショットウェブマスターもこのように使用できます

まず、Baidu スナップショットについて理解しましょう。Baidu は、インターネット上で検索が許...

絶妙な内部リンク構築を行い、ウェブサイトを繁栄させましょう

現在、ウェブマスターは内部リンクの構築について話すとき、しばしば混乱しているように見えます。これは、...

プログラマーのロールモデル:完全に自動化された生活

ロシアのプログラマーたちは完全に自動化された生活を実現し、妻に残業のテキストメッセージを送ったり、二...

垂直型電子商取引はO2Oに頼り、非標準製品の収益モデルは多様化している

レッドスターマカリンオンラインモールとQijia Mallの公式トライアルにより、非標準製品(さまざ...

SEO トラフィック サイトはどのようにして検索エンジンを通じてトラフィックを取得するのでしょうか?

月収10万元の起業の夢を実現するミニプログラム起業支援プラントラフィックサイトは多くのSEO専門家が...

Jiuxian.comは寒い冬に大暴れし、2012年初頭にさらに2億ドルを調達した。

これは素晴らしい電子商取引会社です。派手な広告や誇張されたビジネスモデルはなく、経営者も姿を現すまで...

ウェブサイトのキーワードの選択は科学です

ウェブサイトのキーワードのランキングは、ウェブサイトの盛衰の鍵と言えます。近年、SEO 技術がますま...

inceptionhosting-22.5 ユーロ/年払い/KVM/512m メモリ/フェニックス シティ/G ポート

inceptionhosting.comは2011年に設立されたようです。イギリスに登録された会社で...

この技術の解釈は、読むと理解できるようになります

あなたが女の子で、彼氏がいるとします。同時に、あなたは別の男の子と、友達以上恋人未満という曖昧な関係...