みなさんこんにちは。Hua Zaiです。またお会いできて嬉しいです。 前回の記事では、「ネットワーク I/O を送信するための送信スレッドのアーキテクチャ設計」について詳しく見てきました。メッセージは最初に一時的に保存され、その後ネットワーク I/O コンポーネントが呼び出されて送信されます。本日は、メッセージがどのように送信されるかを深く分析するために、「実際にネットワークI/Oを行うNetworkClientのアーキテクチャ設計」を中心にお話します。 この記事を注意深く読むと、Kafka NetworkClient のソースコードに対する理解が深まると思います。 この記事には役立つ情報が沢山含まれているので、じっくり読んでいただければ幸いです。 I. 全体概要「シナリオ駆動型」アプローチを続けて、クライアント上でメッセージがどのように蓄積され、送信されるかを見てみましょう。 前回の記事では、次の図に示すように、メッセージはまず Sender 子スレッドによって KafkaChannel の Send フィールドに一時的に保存され、その後 NetworkClient#client.poll() を呼び出して実際に送信することを学びました (手順 6 ~ 11)。 NetworkClient は、「プロデューサー」、「コンシューマー」、「サーバー」などの上位レベルのビジネスにネットワーク I/O 機能を提供します。先ほど紹介した Kafka NIO カプセル化コンポーネントは NetworkClient 内で使用され、最終的にネットワーク I/O 機能を実現するためにいくつかのカプセル化が行われます。 NetworkClient は、クライアントとサーバー間の通信だけでなく、サーバー間の通信にも使用されます。 次に、「NetworkClient ネットワーク I/O コンポーネント アーキテクチャの実装と送信処理フロー」を見てみましょう。誰もが理解しやすいように、すべてのソースコードのバックボーンのみが保持されます。 2. ネットワーククライアントアーキテクチャ設計NetworkClient クラスは、KafkaClient インターフェースの実装クラスです。重要な内部フィールドには、「Selectable」、「InflightRequest」、内部クラス「MetadataUpdate」などがあります。 github ソースコードのアドレスは次のとおりです。 https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
1. 主要分野パブリッククラスNetworkClientはKafkaClientを実装します{ // ステータス列挙値 プライベート列挙型状態{ アクティブ、 終わりに、 閉店 } /* ネットワークI/Oを実行するために使用されるセレクタ */ // ネットワークI/Oを実行するためのセレクタ プライベート最終選択可能セレクター; // メタデータ メタ情報アップデータ。メタ情報を更新しようとします プライベート最終MetadataUpdater metadataUpdater ; /* 各ノードの接続状態 */ // クラスター内のすべてのノード接続のステータスを管理する プライベート最終ClusterConnectionStates接続状態; /* 現在送信中または応答を待っているリクエストのセット */ // 現在送信中または応答を待機中のリクエスト セット プライベート最終InFlightRequests inFlightRequests ; /* ソケット送信バッファサイズ(バイト単位) */ // ソケットの送信データバッファのサイズ(バイト単位) プライベート最終int socketSendBuffer ; /* ソケット受信バッファサイズ(バイト単位) */ // ソケット受信バッファのサイズ(バイト単位) プライベート最終int socketReceiveBuffer ; /* サーバーへのリクエストでこのクライアントを識別するために使用されるクライアント ID */ // クライアントIDを示し、クライアントのIDを識別します プライベート最終文字列クライアントID ; /* サーバーにリクエストを送信するときに使用する現在の相関 ID */ // サーバーにリクエストを送信するときに使用する現在の関連付け ID プライベートint相関; /* サーバーからの確認を待つ個々のリクエストのデフォルトのタイムアウト */ // サーバーの確認を待つ単一のリクエストのデフォルトのタイムアウト プライベート最終int defaultRequestTimeoutMs ; /* サーバーへの接続を再試行するまでの待機時間(ミリ秒) */ //再接続バックオフ時間 プライベート最終長いreconnectBackoffMs ; /** * ブローカーに初めて接続するときに ApiVersionRequest を送信する場合は True です。 * ブローカーのバージョンと調整するかどうか。デフォルトは true * true の場合、ブローカーに初めて接続するときに、ブローカーのバージョンを確認するためにバージョン要求を送信する必要があります。 false の場合、バージョン要求は必要ありません。 */ プライベート最終ブール値discoverBrokerVersions ; // ブローカーバージョン プライベート最終ApiVersions apiVersions ; // 送信するバージョン要求を格納します。キーは nodeId、値は要求を構築するビルダーです。 プライベート最終Map < String 、 ApiVersionsRequest 。ビルダー> nodesNeedingApiVersionsFetch = new HashMap <> (); // キャンセルされたリクエストの収集 プライベート最終リスト< ClientResponse > abortedSends = new LinkedList <> (); このタイプの属性フィールドは多数あります。重要なフィールドをいくつか紹介します。 - セレクター: Kafka 独自のカプセル化されたセレクター。ネットワーク I/O イベント、ネットワーク接続、および読み取りおよび書き込み操作の監視を担当します。
- metadataUpdater : NetworkClient の内部クラス。主にメタデータ情報のアップデータを実装するために使用されます。メタデータ情報を更新しようとすることができます。
- connectionStates : クラスター内のすべてのノードの接続ステータスを管理します。基になるレイヤーは Map<nodeid, NodeConnectionState> を使用します。 NodeConnectionState 列挙値は接続ステータスを表し、最後の接続のタイムスタンプを記録します。
- inFlightRequests : 現在送信中または応答を待機中のリクエストのコレクションを保存するために使用されます。
- socketSenderBuffer : ソケットがデータを送信するために使用するバッファのサイズを示します。
- socketReceiveBuffer : ソケットがデータを受信するために使用するバッファのサイズを示します。
- clientId : クライアント ID を識別するクライアント ID を示します。
- reconnectBackoffMs : 再接続バックオフ イベントを示します。短時間に大量の再接続が発生することによるネットワーク負荷を防止するために、再接続を許可しない期間が設けられています。
2. 主な方法NetworkClient クラスには多くのメソッドがあります。ここでは、主要な方法を一つずつ説明していきます。 (1) 準備完了() /** * 指定されたノードへの接続を開始し、すでに接続されていてそのノードに送信する準備ができている場合は true を返します。 * * @param node チェックするノード * @param now 現在のタイムスタンプ * @return 指定されたノードに送信する準備ができている場合はTrue */ @オーバーライド パブリックブール値ready ( Node node , long now ) { // 空のノード if (ノード.isEmpty ()) throw new IllegalArgumentException ( "空のノードに接続できません " + node ); // 1. ノードがリクエストを送信する準備ができているかどうかを判断する if ( isReady (ノード、 now )) trueを返します。 // 2. ノードの接続状態を確認する if ( connectionStates . canConnect ( node . idString (), now )) // ノードへの送信に興味があり、そのノードとの接続がない場合は、接続を開始します // 3. 接続を初期化しますが、この時点では接続が成功しない可能性があります initialConnect (ノード、 now );
falseを返します。 }
/** * 指定された ID を持つノードがさらにリクエストを送信する準備ができているかどうかを確認します。 * @param node ノード * @param now 現在の時刻(ミリ秒) * ノードが準備完了の場合はtrueを返す */ @オーバーライド パブリックブール値isReady ( Nodeノード、 long now ) { // メタデータを更新する必要がある場合は、メタデータ リクエストを最優先にするために、すべてのリクエストを準備完了でないと宣言します。 // メタデータが更新されていることが判明した場合、リクエストの送信は禁止されます&& 接続が確立されていない場合、または現在送信中のリクエストが多すぎる場合、リクエストの送信も禁止されます 戻る!メタデータ更新者。 isUpdateDue ( now ) && canSendRequest ( node . idString (), now ); }
/** * 接続されており、準備ができていて、指定された接続にさらにリクエストを送信できますか? * 接続状態を確認し、リクエストが多すぎないか確認します * @param node ノード * @param 現在のタイムスタンプ */ プライベートブールcanSendRequest ( Stringノード、 long now ){ // 3つの条件をすべて満たす必要があります connectionStatesを返します。 isReady (ノード、 now ) &&セレクター。 isChannelReady (ノード) && inFlightRequests.canSendMore (ノード) ; } このメソッドは、ノードが準備完了しており、リクエストを送信できるかどうかを示します。主に次の 3 つのことを行います。 - まず、ノードが接続して要求を受信する準備ができているかどうかを判断します。次の 4 つの条件を満たす必要があります。
- !metadataUpdater.isUpdateDue(now): メタデータは更新中の状態であってはならず、またメタデータの有効期限が切れていてはなりません。
- canSendRequest(node.idString(), now): ここでは 3 つの条件があります。 (1)クライアントとノード間の接続が準備状態にあるかどうか。 (2)クライアントとノード間のチャネルが確立されているかどうか(3)inFlightRequests内の対応するノードがさらにリクエストを受信できるかどうか。
- 接続が良好な場合は、準備ができていることを示す true を返します。リクエストを受信する準備ができていない場合、対応するノードに接続しようとします。ここでは、次の 2 つの条件も満たす必要があります。
最後に、接続を初期化します。
(2) 接続を開始する() /** * 接続を作成する * 指定されたノードへの接続を開始する * @param node 接続先のノード * @param now エポックミリ秒単位の現在の時刻 */ プライベートvoidinitiateConnect ( Nodeノード、 longnow ) { 文字列nodeConnectionId =ノード。 idString (); 試す{ // 1. 接続ステータスを接続中に更新する 接続状態。接続中( nodeConnectionId 、 now 、 node . host ()、 clientDnsLookup ); // 接続アドレスを取得する InetAddressアドレス= connectionStates 。現在のアドレス(ノード接続ID ); ログ。 debug ( "アドレス {} を使用してノード {} への接続を開始しています" 、ノード、アドレス); // 2. セレクタを呼び出して非同期接続を試み、selector.poll を通じてイベントをリッスンします。 セレクター。接続(ノード接続ID 、 新しいInetSocketAddress (アドレス,ノード.ポート())、 これ。ソケット送信バッファ、 これ。ソケット受信バッファ); } IOExceptionをキャッチします。 ログ。 warn ( "ノード {} への接続エラー" 、ノード、 e ); // 試行に失敗しました。バックオフ後に再試行します 接続状態。切断されました( nodeConnectionId 、現在); // メタデータ更新者に接続失敗を通知する メタデータ更新者。 handleServerDisconnect ( now 、 nodeConnectionId 、オプション. empty ()); } } このメソッドは主に接続を初期化するために使用され、次の 2 つのことを行います。 - connectionStates.connecting() を呼び出して、接続状態を接続中に更新します。
- 非同期的に接続を開始するには、selector.connect() を呼び出します。現時点では接続されていない可能性があります。後続の Selector.poll() は、接続の準備ができているかどうかを監視し、接続を完了します。接続が成功すると、ConnectionState は CONNECTED に設定されます。
接続の準備ができたら、送信関連のメソッドを見てみましょう。 (3) 送信()、送信() /** * ClientRequestはクライアントのリクエストであり、requestBuilderをカプセル化します */ パブリックファイナルクラスClientRequest { // ノードアドレス プライベート最終文字列宛先; // ClientRequest では、requestBuilder を使用して、異なるタイプのリクエストに異なるリクエスト コンテンツを設定します。 プライベートな最終的なAbstractRequest 。ビルダー<?> requestBuilder ; // リクエスト ヘッダー内の correlationId プライベート最終int相関ID ; // リクエストヘッダー内のクライアントID プライベート最終文字列クライアントID ; // 作成時間 プライベート最終long createdTimeMs ; // 応答は必須ですか? プライベート最終ブール値expectResponse ; // リクエストタイムアウト プライベート最終int requestTimeoutMs ; // 応答を処理するコールバック関数 プライベート最終RequestCompletionHandlerコールバック; ...... }
/** * 指定されたリクエストを送信キューに入れます。リクエストは準備完了ノードにのみ送信できます。 * @param request リクエスト * @param now 現在のタイムスタンプ * リクエストを送信します。このメソッドはプロデューサーとコンシューマーの両方から呼び出され、ClientRequest はクライアントのリクエストを表します。 */ @オーバーライド public void send ( ClientRequestリクエスト、 long now ) { doSend ( request 、 false 、 now ); }
// 要求されたバージョンがサポートされているかどうかを確認し、サポートされている場合はリクエストを送信します プライベートvoid doSend ( ClientRequest clientRequest 、 boolean isInternalRequest 、 long now ) { // アクティブかどうか確認する アクティブであることを確認します(); // ターゲットノードID 文字列nodeId = clientRequest 。行き先(); // NetworkClient の内部リクエストですか?これは誤りです。 if ( ! isInternalRequest ) { // 指定されたノードにリクエストを送信できるかどうかを確認します。リクエストを送信できない場合は例外がスローされます。 if ( ! canSendRequest ( nodeId 、 now )) throw new IllegalStateException ( "準備ができていないノード " + nodeId + "にリクエストを送信しようとしました。" ); } 抽象的なリクエスト。ビルダー<?> builder = clientRequest 。リクエストビルダー(); 試す{ // バージョンを検出 NodeApiVersions versionInfo = apiVersions 。ノードIDを取得します。 // ... 無視 // builder.build() は ProduceRequest.Builder であり、結果は ProduceRequest です //doSendメソッドを呼び出す doSend ( clientRequest 、 isInternalRequest 、 now 、 builder . build ( version )); }キャッチ( UnsupportedVersionException unsupportedVersionException ) {ログ. debug ( "相関 ID {}を持つ {} を {} に送信しようとしたときにバージョンが一致しません" 、 builder 、 clientRequest.correlationId ()、 clientRequest.destination ()、 unsupportedVersionException ); // 要求されたバージョンが矛盾している場合は、clientResponse を生成します ClientResponse clientResponse = new ClientResponse ( clientRequest . makeHeader ( builder . latestAllowedVersion () )、 クライアントリクエスト。コールバック()、 clientRequest 。目的地()、今、今、 false 、 unsupportedVersionException 、 null 、 null ); // abortedSends コレクションに追加 abortedSends.add ( clientResponse ) ; } }
/** * isInternalRequest は、送信前に接続ステータスを確認する必要があるかどうかを示します。真の場合、クライアントは接続が良好であると判断したことを意味します。 * request はリクエスト本体を示します */ private void doSend ( ClientRequest clientRequest 、 boolean isInternalRequest 、 long now 、 AbstractRequest request ) { // ターゲットノードアドレス 文字列の宛先= clientRequest 。行き先(); // リクエストヘッダーを生成する RequestHeaderヘッダー= clientRequest 。 makeHeader (リクエスト.version ()); ログが有効の場合、 ログ。 debug ( "ヘッダー {} とタイムアウト {} を持つ {} リクエストをノード {} に送信しています: {}" 、 クライアントリクエスト。 apiKey ()、ヘッダー、 clientRequest 。 requestTimeoutMs (),宛先、リクエスト); } // 1. NetworkSend オブジェクトを構築し、リクエスト ヘッダーとリクエスト ボディを結合し、データをシリアル化して NetworkSend に保存します。 送信=リクエストを送信します。 toSend (宛先、ヘッダー); // 2. 送信前にすべての情報を保存するinFlightRequestオブジェクトを構築する InFlightRequest inFlightRequest =新しいInFlightRequest ( クライアントリクエスト、 ヘッダー、 内部リクエスト、 リクエスト、 送信、 今); // 3. inFlightRequests コレクションに inFlightRequest を追加する これ。フライトリクエスト。 ( inFlightRequest )を追加します。 // 4. セレクタを呼び出してデータを非同期に送信し、対応する kafkaChannel に送信をバインドし、kafkaChannel の基になるソケットの書き込みイベントを開始して、実際のネットワーク送信の次のステップを待機します。 セレクター。送信(送信); }
@オーバーライド パブリックブールアクティブ(){ // 状態がアクティブかどうかを判定する 状態を返します。 get () ==状態。アクティブ; }
// アクティブかどうか確認する プライベートvoidensureActive ( ) { if ( !アクティブ()) throw new DisconnectException ( "NetworkClient はアクティブではなくなりました。状態は " + stateです); } 上記のソース コードから、ここでの送信は実際のネットワーク送信ではなく、データが最初にキャッシュに送信されることがわかります。 - まず、最も外側の層は send() であり、これは doSend() を呼び出します。
- ここでの doSend() の主な機能は、inFlightRequests コレクション内の対応するノードがリクエストを送信できるかどうかを判断することです。次の 3 つの条件を満たす必要があります。
- クライアントとノード間の接続が準備状態にあるかどうか。
- クライアントとノード間のチャネルが確立されているかどうか。
- inFlightRequests コレクション内の対応するノードがさらにリクエストを受信できるかどうか。
- 最後に、別の doSend() が再度呼び出され、最終リクエストがキャッシュに送信されます。手順は次のとおりです。
- NetworkSend オブジェクトを構築し、リクエスト ヘッダーとリクエスト ボディを結合し、データをシリアル化して NetworkSend に保存します。
- inFlightRequest オブジェクトを構築します。
- inFlightRequests コレクションに inFlightRequest を追加し、応答を待ちます。
- Selector を呼び出してデータを非同期的に送信し、対応する kafkaChannel に送信をバインドし、kafkaChannel の基になるソケットの書き込みイベントを開始して、実際のネットワーク送信の次のステップを待機します。
要約すると、ここでの送信プロセスは、実際には、送信するリクエストを inFlightRequest にカプセル化し、それを inFlightRequests コレクションに入れて、対応するチャネルの NetworkSend フィールドにキャッシュするということになります。つまり、ここでの送信プロセスは、実際のネットワーク I/O 送信の次のステップとして機能することです。 次に、実際のネットワーク送信方法を見てみましょう。 (4) 投票()このメソッドは、ネットワーク送信を実行し、さまざまな状態での応答結果「pollSelectionKeys のさまざまな読み取りと書き込み」を処理します。ここでは、handleXXX() メソッドを呼び出して処理します。コードは次のとおりです。 /** * ソケットへの実際の読み取りと書き込みを実行します。 * @param timeout すぐに応答がない場合に応答を待つ最大時間(ミリ秒単位) * は負でない値である必要があります。実際のタイムアウトは、タイムアウト、リクエストタイムアウト、および * メタデータのタイムアウト * @param now ミリ秒単位の現在の時刻 * @return 受信した応答のリスト */ @オーバーライド パブリックリスト< ClientResponse >ポーリング(長いタイムアウト、長い現在) { // アクティブかどうか確認する アクティブであることを確認します(); //送信キャンセルは空です if ( ! abortedSends.isEmpty ()) { // サポートされていないバージョンの例外や切断により送信が中止された場合、 // Selector#poll を待たずにすぐに処理します。 リスト< ClientResponse >応答=新しいArrayList <> (); handleAbortedSends (応答); completeResponses (応答); 応答を返します。 } // 1. メタデータの更新を試みる 長いmetadataTimeout = metadataUpdater 。 maybeUpdate (今) ; 試す{ // 2. 実際の読み取りと書き込みが送信されるネットワーク I/O 操作を実行します。クライアントのリクエストが完全に処理された場合、completeSendsまたはcomplteReceivesコレクションに追加されます。 これ。セレクター。ポーリング( Utils . min ( timeout 、 metadataTimeout 、 defaultRequestTimeoutMs )); } IOExceptionをキャッチします。 ログ。 error ( "I/O 中に予期しないエラーが発生しました" 、 e ); }
// 完了したアクションを処理する long updatedNow = this です。時間。ミリ秒(); // 応答結果セット: 実際の読み取りおよび書き込み操作によって応答が生成されます リスト< ClientResponse >応答=新しいArrayList <> (); // 3. 送信処理を完了するハンドラは、completedSendsコレクションを処理する handleCompletedSends (応答、 updatedNow ); // 4. 受信ハンドラを完了し、completedReceives キューを処理する handleCompletedReceives (応答、 updatedNow ) ; // 5. 切断ハンドラ、切断リストを処理する handleDisconnections (応答、 updatedNow ); // 6. 接続ハンドラを処理し、接続リストを処理する ハンドル接続(); // 7. バージョン調整リクエスト(APIバージョン番号の取得)ハンドラの処理 handleInitiateApiVersionRequests (更新された現在)。 // 8. タイムアウト接続のハンドラ。タイムアウト接続コレクションを処理します。 handleTimedOutConnections (応答、 updatedNow ); // 9. タイムアウト要求のハンドラ、タイムアウト要求コレクションの処理 handleTimedOutRequests (応答、 updatedNow ); // 10. レスポンスコールバックを完了する completeResponses (応答);
応答を返します。 } ここには多くの手順がありますので、順番に説明していきます。 - メタデータを更新してみてください。
- 実際のネットワーク I/O 操作を実行するには、Selector.poll() を呼び出します。クリックすると、Kafka ソース コードのネットワーク層実装メカニズムの図が表示されます。セレクタマルチプレクサは主に以下の3つのコレクションを操作します。
- 接続セット: 接続を完了したノード ノードのセット。
- completeReceives コレクション: 完了した受信のコレクション、つまり KafkaChannel 上の NetworkReceive は、いっぱいになるとこのコレクションに追加されます。
- CompletedSends コレクション: 完了した送信のコレクション、つまり、チャネル上の NetworkSend は、読み取られた後にこのコレクションに格納されます。
- completeSends コレクションを処理するには、handleCompletedSends() を呼び出します。
- completeReceives キューを処理するには、handleCompletedReceives() を呼び出します。
- ノードから切断する要求を処理するには、handleDisconnections() を呼び出します。
- 接続されたリストを処理するには、handleConnections() を呼び出します。
- バージョン番号要求を処理するには、handleInitiateApiVersionRequests() を呼び出します。
- 接続タイムアウトのノード コレクションを処理するには、handleTimedOutConnections() を呼び出します。
- handleTimedOutRequests() を呼び出して、inFlightRequests コレクション内のタイムアウトしたリクエストを処理し、そのステータスを変更します。
- 各メッセージのカスタマイズされた応答コールバックを完了するには、completeResponses() を呼び出します。
次に、手順3~9の実装方法を見てみましょう。 (5) 送信処理完了() NetworkClient はリクエストの送信を完了すると、handleCompletedSends メソッドを呼び出して、リクエストがブローカーに送信されたことを示します。 /** * 完了したリクエスト送信を処理します。特に応答が期待されない場合は、リクエストが完了したとみなします。 * @param 応答 更新する応答のリスト * @param now 現在の時刻 */ private void handleCompletedSends ( < ClientResponse >応答のリスト、現在長い) { // 応答が期待されない場合は送信が完了したらそれを返します // 1. CompletedSendsリクエストセットを走査し、Selectorを呼び出して最後のポーリングからリクエストを取得します。 for ( Send send : this.selector.completedSends ()) { // 2. inFlightRequests コレクションから、Send に関連付けられたノードのキューから最新のリクエストを取得しますが、キューからは削除しません。取り出した後、リクエストが応答を期待しているかどうかを判断する InFlightRequestリクエスト= this 。フライトリクエスト。 lastSent (送信.送信先()); // 3. 応答は必須ですか?応答が不要な場合は、送信要求が完了すると直接返されます。 request.completedによって生成されたClientResponseオブジェクトがまだあります if ( ! request . expectResponse ) { // 4. 応答が必要ない場合は、inFlightRequests内のSenderに関連付けられたinFlightRequestを取得します。つまり、最新のリクエストを抽出します。 これ。フライトリクエスト。 completeLastSent (送信.送信先() ); // 5. Completed() を呼び出して ClientResponse を生成します。最初のパラメータは null であり、応答コンテンツがないことを示します。リクエストを Responses コレクションに追加します。 応答。追加(リクエスト.完了( null 、現在) ); } } } このメソッドは主に、クライアントがリクエストを送信した後の応答結果を処理するために使用され、次の 5 つのことを行います。 - セレクター内のcompletedSendsコレクションを走査し、完了したSendオブジェクトを1つずつ処理します。
- inFlightRequests コレクションから、Send に関連付けられたノードのキューの最初の要素を取得しますが、キューからは削除しません。取り出した後、リクエストが応答を期待しているかどうかを判断します。
- 応答が必要かどうかを判断します。
- 応答が不要な場合は、inFlightRequests 内の Sender に関連付けられている inFlightRequest を削除します。 Kafka の場合、一部のリクエストには応答は必要ありません。送信後に送信が成功したかどうかを考慮する必要がない場合は、コールバックを null として Response オブジェクトを構築します。
- InFlightRequest.completed() を通じて ClientResponse を生成します。最初のパラメータは null であり、応答コンテンツがないことを示します。最後に、ClientResponse を Responses コレクションに追加します。
上記のソース コードから、「completedSends」コレクションと「InflightRequests」コレクション間の連携関係がわかります。 しかし、問題があります。セレクターから返されるリクエストが、InflightRequests コレクション キューに対応する最新のリクエストであることをどうやって確認すればよいのでしょうか? CompletedSends コレクションには、poll() メソッドへの最新の呼び出しで正常に送信されたリクエスト (正常に送信されたがまだ応答を受信していないリクエストのコレクション) が格納されます。 InflightRequests コレクションには、送信されたがまだ応答を受信していない要求が格納されます。送信された各リクエストは、前のリクエストが完了するまで待機する必要があります。セレクターによって返されるリクエストは最後のポーリングから開始されるため、同時に送信されるリクエストは 1 つだけであり、これは正しいです。 次の図に示すように、「completedSends」の要素は、「InflightRequests」コレクション内の対応するキューの最後の要素に対応します。 (6) ハンドル完了受信() NetworkClient は応答を受信すると、handleCompletedReceives メソッドを呼び出します。 /** * 完了した受信を処理し、受信した応答で応答リストを更新します。 * @param 応答 更新する応答のリスト * @param now 現在の時刻 * CompletedReceivesキューを処理し、返された応答情報に基づいてClientResponseをインスタンス化し、それを応答コレクションに追加します。 */ private void handleCompletedReceives ( < ClientResponse >応答をリストします。現在は長いです) { // 1. CompletedReceives レスポンスコレクションを走査し、Selector を通じて未処理のレスポンスを返す for ( NetworkReceive受信: this . selector . completeReceives ()) { // 2. リクエストを送信するノードIDを取得する 文字列ソース=受信。ソース(); // 3. 送信されたリクエストの「最も古いリクエスト」を inFlightRequests コレクション キューから取得して削除します (inFlightRequests はリクエスト応答を受け取っていない ClientRequests を保存するため、inFlightRequests から削除します。リクエストに応答が得られたので、保存する必要はありません) InFlightRequest要求= inFlightRequests 。 completeNext (ソース); // 4. レスポンスを解析し、レスポンスヘッダーを検証し、responseStructインスタンスを生成する 構造体responseStruct = parseStructMaybeUpdateThrottleTimeMetrics ( receive.payload (), req.header , throttleTimeSensor , now ) ; // レスポンス本文を生成する AbstractResponse応答= AbstractResponse 。 parseResponse ( req.header.apiKey ( ) , responseStruct , req.header.apiVersion ( ) ) ; .... // 受信した応答にスロットル遅延が含まれている場合は、接続をスロットルします。 // フロー制御処理 maybeThrottle (応答、要求、ヘッダー、 apiVersion ()、要求、宛先、 now ); // 5. 戻り値の型を決定する if ( req . isInternalRequest && response instanceof MetadataResponse ) // メタデータ要求応答を処理する メタデータ更新者。 handleSuccessfulResponse (要求ヘッダー、現在、( MetadataResponse )応答) ; そうでない場合( req . isInternalRequest && response instanceof ApiVersionsResponse ) // バージョン調整応答を処理する handleApiVersionsResponse (応答、要求、現在、 ( ApiVersionsResponse )応答); それ以外 // 通常のメッセージ送信応答。InFlightRequest.completed() を通じて ClientResponse を生成し、応答を応答コレクションに追加します。 応答。追加(要求。完了(応答、現在) ) } }
// レスポンスを解析し、レスポンスヘッダーを検証し、responseStructインスタンスを生成します プライベート静的構造体parseStructMaybeUpdateThrottleTimeMetrics ( ByteBuffer responseBuffer 、 RequestHeader requestHeader 、センサーthrottleTimeSensor 、 long now ) { // レスポンスヘッダーを解析する レスポンスヘッダレスポンスヘッダ=レスポンスヘッダ。解析( responseBuffer 、 リクエストヘッダー。 apiKey () 。レスポンスヘッダーバージョン(リクエストヘッダー.apiVersion ())); // レスポンス本文を解析する 構造体responseBody = requestHeader 。 apiKey () 。 parseResponse ( requestHeader . apiVersion ()、 responseBuffer ); // リクエストヘッダーとレスポンスヘッダーの相関IDが等しいことを確認する 相関関係( requestHeader 、 responseHeader ); if ( throttleTimeSensor != null && responseBody . hasField ( CommonFields . THROTTLE_TIME_MS )) スロットルタイムセンサー。記録( responseBody . get ( CommonFields . THROTTLE_TIME_MS )、 now ); レスポンスボディを返します。 } このメソッドは主に、受信したネットワーク要求のセットを処理するために使用され、次の 5 つの処理を行います。 - セレクター内のcompletedReceivesコレクションを走査し、完了したReceiveオブジェクトを1つずつ処理します。
- リクエストを送信したノード ID を取得します。
- inFlightRequests コレクション キューから送信されたリクエストのうち「最も古いリクエスト」を取得して削除します (inFlightRequests にはリクエスト応答を受信していない ClientRequests が格納されているため、inFlightRequests から削除します。リクエストに応答があるため、保存する必要はありません)。
- 応答を解析し、応答ヘッダーを検証し、responseStruct インスタンスを生成し、応答本体を生成します。
- 応答結果を処理します。応答結果は次の 3 つのケースに分かれています。
- メタデータ要求応答を処理するには、metadataUpdater.handleSuccessfulResponse() を呼び出します。
- バージョン調整応答を処理するには、handleApiVersionsResponse() を呼び出します。
- 通常のメッセージに対する応答は InFlightRequest.completed() を通じて生成され、ClientResponse が生成され、応答が応答コレクションに追加されます。
上記のソース コードからわかるように、「completedReceives」コレクションは「InflightRequests」コレクションとも連携関係にあります。 CompletedReceives コレクションは、受信した応答コレクションを参照します。リクエストが応答を受信した場合は、InflightRequests から削除できます。このようにして、InflightRequests はリクエストの蓄積を防ぐことができます。 「completedSends」とは対照的に、「completedReceives」コレクションは、次の図に示すように、「InflightRequests」コレクション内の対応するキューの最初の要素に対応します。 (7) 最小負荷ノード() /** * 少なくとも接続可能な、未処理のリクエストが最も少ないノードを選択します。この方法は * 既存の接続を持つノードを優先しますが、まだ接続されていないノードを選択する可能性もあります。 * 既存の接続がすべて使用中の場合、接続が切断されます。接続が存在しない場合は、このメソッドはノードを優先します * 最も最近の接続試行の場合。この方法では、 * 既存の接続で、再接続バックオフ期間内に切断された場合、またはアクティブな * 調整されている接続。 * * @return 実行中のリクエストが最も少ないノード。 */ @オーバーライド パブリックノードleastLoadedNode ( long now ) { // メタデータからすべてのノードを取得する < Node >ノード= thisをリストします。メタデータ更新者。ノードをフェッチします(); if (ノード. isEmpty ()) 新しいIllegalStateExceptionをスローします( "Kafka クラスターにノードがありません" ); int飛行中=整数。最大値;
ノードfoundConnecting = null ; ノードfoundCanConnect = null ; ノードfoundReady = null ;
intオフセット= this 。オフセット。 nextInt (ノード. size ()); for ( int i = 0 ; i <ノード. size (); i ++ ) { int idx = ( offset + i ) %ノード。サイズ(); ノードノード=ノード。取得( idx ); // ノードはリクエストを送信できますか? if ( canSendRequest (node.idString ( ), now )) { // ノードのキューサイズを取得する int currInflight =これ。フライトリクエスト。 count (ノード. idString () ); // 0の場合は、負荷が最も小さいノードを返します (現在のフライト時間が0の場合) // 進行中のリクエストがない確立された接続が見つかった場合は、すぐに停止できます ログ。 trace ( "実行中のリクエストがない接続された、最も負荷の少ないノード {} が見つかりました" 、ノード); ノードを返します。 } else if ( currinflight < inillight ){ //キューサイズが最大よりも小さい場合 //それ以外の場合、これが私たちがこれまでに見つけた最高の場合、それを記録してください Inflight = CurrinFlight ; coundready = node ; } } else if ( connectionStates。ispreparingConnection ( node。idstring ())) { fundConnecting = node ; } else if ( canconnect ( node 、 now )){ if ( fundCanconnect == null || これ。 ConnectionStates 。 lastConnectattemptms ( fundCanconnect。idstring ()) > これ。 ConnectionStates 。 lastConnectAttemptms ( node。idstring ())){ fundCanconnect = node ; } }それ以外{ ログ。 Trace ( "削除{} {} {} {}は、どちらも準備ができていないため、最小のロードノード選択から" + " + 「送信または接続用」 、ノード); } }
//可能であれば、確立された接続を好みます。それ以外の場合は、接続を待ちます //新しいノードに接続する前に確立されています。 if ( coundready != null ){ ログ。 trace ( "{}機内リクエスト"を備えた最小のロードノード{}が見つかりました " 、 fenedready 、 inillight ); returnedready ; } else if ( foundConnecting != null ){ ログ。 trace ( "roaded node {}"が最も少なくなりました{} " 、 fundConnecting ); fundConnectingを返します。 } else if ( foundCanconnect != null ){ ログ。 trace ( "アクティブ接続なしで最小ロードノード{}が見つかりました" 、 fundCanconnect ); fundCanconnectを返します。 }それ以外{ ログ。 TRACE ( "最小のロードされたノード選択では、使用可能なノードが見つかりませんでした" ); nullを返します。 } } この方法は、主に、次の図に示すように、最小の負荷のノードを選択します。 3。Inflightrequestsコレクションデザイン上記のコード分析を通じて、「Inflightrequests」コレクションの役割は、送信されたが応答がないClientRequestリクエストコレクションをキャッシュすることであることがわかっています。基礎となるレイヤーは、reqmap <string、deque <networkclient.inflightrequest >>を介して実装されます。キーはnodeidであり、値は対応するノードに送信されるクライアントリクエストリクエストキューです。デフォルトは5です。パラメーター:max.in.flight.requests.per.connectionは、リクエストキューサイズを構成します。接続ごとに両端のキューが作成されるため、リクエストが送信されるレートを制御できます。 次の2つの機能があります。 - ノードが正常かどうか:「応答の受信」から「送信」から「送信」へのリクエストを収集して、ブローカーノードが送信されるかどうかを判断するかどうか、リクエストと接続がタイムアウトしたかどうか、つまり、ブラザーノードに送信されるリクエストが正常かどうかを監視します。
- ノードの負荷:Dequeが特定の長さに達すると、ブローカーノードが過負荷になっていると見なされます。
/** *送信された、または送信されているがまだ返信されていないリクエストのセット *送信された、または送信されているが応答がないクライアントレクエストリクエストコレクションをキャッシュするために使用されます */ 最終クラスのInflightRequests { //接続ごとに進行中のリクエストの最大数 プライベートファイナルint maxinflightrequestsperconnection ; //ノードノードのマッピングセットクライアントリクエストダブルエンドキューdeque <networkclient.inflightrequest>。キーはnodeidで、値はリクエストキューです プライベート最終マップ< string 、 deque < networkclient 。 inflightrequest >> requests = new Hashmap <> (); /**フライトリクエストの安全な総数をスレッド。 */ //スレッドセーフInflightrequestCount プライベート最終AtomicInteger InflightrequestCount = new AtomicInteger ( 0 ); //各接続の実行中にリクエストの最大数を設定します public Inflightrequests ( int maxinflightrequestsperconnection ){ これ。 maxinflightrequestsperconnection = maxinflightrequestsperconnection ; } ここでは、「シナリオ駆動型」メソッドを使用して重要な方法を説明します。送信および処理する必要がある新しいリクエストがある場合、チームの長にあるチームに参加します。実際に処理されたリクエストは、チームの終わりから離れて、チームに参加するリクエストが最初に処理されるようにすることです。 1。Cansendmore()まず、送信条件の制限を見てみましょう。 NetworkClientはこのメソッドを呼び出して、指定されたノードにリクエストを送信できるかどうかを判断します。 /** *このノードにさらにリクエストを送信できますか? *問題の@paramノードノード * @returnは、指定されたノードにまだリクエストが送信されていない場合はtrue *接続がまだリクエストを送信できるかどうかを判断します */ Public Boolean Cansendmore ( String node ){ //ノードに対応する両端のキューを取得します Deque < NetworkClient 。 inflightrequest > queue = requests 。 get ( node ); //条件のキューが空であることを判断|| (チームの長が送られて完成しました return queue == null ||列。 isempty () || ( queue。peekfirst ( ) 。 send。ceelect () && queue。size ( ) < this。maxinflightrequestsperconnection ) ; } 上記のコードから、制限条件を確認できます。キューは複数のリクエストを保存できますが、追加する新しいリクエストの条件は、前のリクエストを正常に送信する必要があるということです。 条件は次のように判断されます。 - キュー== null || queue.isempty()、空の場合はキューを送信できます。
- queue.peekfirst()。send.completed()がキューに送信されるかどうかを判断します。
- チームのリーダーが長い間リクエストを送信できない場合、それはネットワークが原因である可能性があるため、このノードにリクエストを送信することはできません。
- チームの長からの要求は、対応するKafkachannel.sendフィールドと同じ要求を指しています。安全でないメッセージの上書きを避けるために、kafkachannel.sendフィールドは新しいリクエストを指すことを許可できません。
- queue.size()<this.maxinflightrequestsperconnection、この条件は、キューに積み上げられているリクエストが多すぎるかどうかを判断することです。ノードが多くの反応しないリクエストを積み上げた場合、ノードにネットワークの輻輳があり、リクエストを送信し続けることを意味します。
2。追加()チームに参加します /** *指示された接続のキューに指定されたリクエストを追加する *キューヘッダーにリクエストを追加します */ public void add ( networkclient。flightrequestrequest ) { //このリクエストはどのブローカーノードに送信されますか? 文字列宛先=リクエスト。行き先; //指定されたリクエストのターゲットノードノードに基づいて、リクエストコレクションから対応するdeque <ClientRequest>再端のキューReqを取得します Deque < NetworkClient 。 inflightrequest > reqs = this 。リクエスト。 get (宛先); //二重端のキューがnullの場合 if ( reqs == null ){ //両端のキューアレイデクタイプのreqsを作成します reqs = new arraydeque <> (); //リクエストターゲットノードノード間のマッピング関係をリクエストコレクションにreqsに追加する これ。リクエスト。 put (宛先、 reqs ); } // Reqsチームヘッダーにリクエストを追加します reqs 。 AddFirst (リクエスト); //カウントを増やします InflightrequestCount 。 incrementAndget (); } 3。COLEMENEXT()Dequeueへの最古のリクエスト /** *指定されたノードの最古のリクエスト(次に完了するもの)を取得します *接続に対応するキューで最も古いリクエストを取得します */ パブリックネットワーククライアント。 inflightrequest completenext ( string node ){ //クライアントのリクエストを取得すると、キューの最後から指定されたノードノードとdequeueに基づいてデュアルエンドキューReqs NetworkClient 。 inflightrequest inflightrequest = requestqueue (ノード)。 Polllast (); //カウンターを減らします InflightrequestCount 。 DecrentAndget (); inflightrequestを返します。 } 結合とチームの2つの方法を比較すると、「add()に参加する」がAddFirst()メソッドを介してチームのヘッドに追加されるため、チームの最後のリクエストは最も長い時間であり、最初に処理する必要があります。したがって、「Coming Out complenext()」とは、Yuan Shuを移動することです。これは、Polllast()を介してキューで最も長い時間リクエストを処理して処理します。 4。LastSent()は最新のリクエストを取得します /** *指定されたノードに送信した最後のリクエストを取得します(ただし、キューから削除しないでください) * @paramノードノードID */ パブリックネットワーククライアント。 inflightrequest lastsent ( string node ){ RequestQueue (ノード)を返します。 peekfirst (); } 5。CompleTELASTSENT()Dequeueへの最新のリクエスト /** *特定のノードに送信された最後のリクエストを完了します。 * @paramノードノードのリクエストが送信されました * @Returnリクエスト *接続に対応するキューの最新リクエストを取得します */ パブリックネットワーククライアント。 inflightrequest completeLastsent ( string node ){ //クライアントは、チームヘッドから指定されたノードノードとdequeueに基づいて、デュアルエンドキューのreqsをリクエストします NetworkClient 。 inflightrequest inflightrequest = requestqueue (ノード)。 pollfirst (); //カウンターを減らします InflightrequestCount 。 DecrentAndget (); inflightrequestを返します。 } 最後に、「Inflightrequests」を見てみましょう。つまり、リクエストが送信されていることを意味し、リクエストが送信される前にすべての情報を保存します。 さらに、応答クライアントリスポンスの生成をサポートします。応答が正常に受信されると、完了した()は、応答コンテンツに基づいて対応するClientResponseを生成します。接続が突然切断されると、切断された()はクライアントリスポンスオブジェクトを生成します。コードは次のとおりです。 静的クラスInflightRequest { //ヘッダーを要求します 最終リクエストヘッダーヘッダー。 //このリクエストはどのブローカーノードに送信されますか? 最終的な文字列宛先; //コールバック関数 final requestcompleationhandlerコールバック。 //応答するかどうか 最終的なブールは、リスポンスを期待しています。 //ボディをリクエストします 最終的な抽象的なリクエスト。 //送信する前に接続ステータスを確認する必要がありますか 最終的なブールis -internalRequest ; // NetworkClientによって内部的に開始されるリクエストにフラグを立てるために使用されます //シリアル化されたデータを要求しました 最終送信送信; //時間を送信します 最終的な長いsendtimems ; //リクエストの作成時間、つまりClientRequestの作成時間 最終的な長いCreatedTimems ; //タイムアウトを要求します 最終的な長いrequestimeoutms ; ..... /** *応答が受信されると、応答コンテンツに基づいてClientResponseが生成されます。 */ Public ClientResponseが完了しました( Abstractresponse応答、長いタイミング){ 新しいclientResponse (ヘッダー、コールバック、宛先、 createdtimems 、 timems 、 false 、 null 、 null 、 response ); }
/** *接続が突然切断されると、ClientResponseも生成されます。 */ Public ClientResponse切断( LONG TIMEMS 、 AuthenticationException AuthenticationException ){ 新しいclientResponse (ヘッダー、コールバック、宛先、 createdtimems 、 timems 、 true 、 null 、 authenticationexception 、 null ); } } 惑星に移動して、中央のコードを表示してください。 5。要求プロセス接続を完了します完全な要求は、主に次の段階に分かれています。 - ネットワーククライアントのReady()を呼び出してサーバーに接続します。
- ネットワーククライアントのPoll()を呼び出して接続を処理します。
- NetworkClientのnewClientRequest()を呼び出して、リクエストClientRequestを作成します。
- 次に、NetworkClientのSend()を呼び出してリクエストを送信します。
- 最後に、NetworkClientのPoll()を呼び出して応答を処理します。
1.接続プロセスを作成しますリクエストを送信する前に、ブローカー側との接続を作成する必要があります。 NetworkClientは、クラスターへのすべての接続を管理する責任があります。 2。要求プロセスを生成します 3.リクエストプロセスを送信します 4。応答プロセスの処理(1)リクエスト送信が完了しました(2)リクエストは応答を受け取りました(3)処理応答を実行しますVI.結論ここで、この記事の要点をまとめてみましょう。 1.メッセージメッセージは、送信者の子スレッドによるKafkachannelの送信に一時的に保存され、「Poll Method」と呼ばれて実際のネットワークI/O操作を実行するため、クライアントにネットワークI/O機能を提供する「ネットワーククライアントコンポーネント」につながります。 2。「ネットワーククライアントコンポーネント」、「Inflightrequests」、および「ClusterConnectionState」の実装の詳細を深く分析してください。 3.最後に、リクエストと処理の応答プロセスを送信するメッセージ全体に接続するため、全体的な理解が向上します。 |