みなさんこんにちは。Hua Zaiです。またお会いできて嬉しいです。 今日は主に Kafka ネットワーク層の送受信プロセスをまとめます。このシリーズは3つの記事に分かれています。これは次の記事で、主に最後の質問を分析します。
この記事を注意深く読むと、Kafka ネットワーク層のソースコードに対する理解が深まると思います。 この記事には役立つ情報が沢山含まれているので、じっくり読んでいただければ幸いです。 I. 全体概要シナリオ駆動型のアプローチを通じて、ネットワーク要求がカプセル化され監視された後、ネットワーク経由でメッセージがどのように送受信されるか、そして何を行う必要があるかを見てみましょう。
誰もが理解しやすいように、すべてのソースコードの骨組みのみが保持されます。 2. メッセージ送信プロセスの分析1. メッセージの事前送信この部分は多くのことが関係しているので、ここでは簡単に説明し、後でそれを分析する特別な章があります。 クライアントはまず、送信するメッセージを準備します。プロセスは次のとおりです。
次に、Selector クラスと KafkaChannel クラスの具体的なソース コード実装を順番に見ていきましょう。 (1)メモリに一時的に保存するデータを要求するgithub ソースコードのアドレスは次のとおりです。
/** ソースコードから、KafkaChannel クラスの setSend() メソッドが呼び出されていることがわかります。 パブリックvoid setSend ( 送信送信) { このメソッドは主に事前送信に使用されます。つまり、ネットワーク要求を送信する前に、送信する ByteBuffer データが KafkaChannel の send に保存され、その後トランスポート層メソッドが呼び出されて、このチャネル上の "OP_WRITE" イベントへの注目度が高まり、同時に "OP_READ" イベントも保持されます。このとき、チャネルは読み取りと書き込みを同時に行うことができます。実際に送信が実行されると、まず送信からデータが読み取られます。 2. メッセージが実際に送信されるSender 子スレッドは、Selector の「poll」メソッドを呼び出して、実際にリクエストを送信します。 (1) 投票()
このメソッドは、準備完了イベントを収集し、そのイベントに対してネットワーク操作を実行するという 1 つの処理を実行します。上記の簡略化されたコードから、「pollSelectionKeys」メソッドが呼び出されていることがわかります。実際の読み取りおよび書き込み操作はこのメソッドで行われます。見てみましょう: (2) ポーリング選択キー()void pollSelectionKeys ( <SelectionKey> selectionKeys を設定し、 boolean isImmediatelyConnected 、 long currentTimeNanos を設定します) { このメソッドは主に、接続イベント、読み取りおよび書き込みイベント、すぐに完了する接続など、監視対象イベントを処理するために使用されます。次に、実際にネットワークに書き込む方法を見てみましょう。 (3) 書き込みを試みる()プライベートvoid attemptWrite ( SelectionKey キー、 KafkaChannel チャネル、 long nowNanos )はIOException をスローします{ このメソッドは主に、ネットワーク書き込み操作を実行するために使用されます。この方法は非常にシンプルで、「同時に 4 つの条件を満たす」必要があります。
上記の 4 つの条件が満たされると、書き込み操作を実行できます。次に、書き込み操作のプロセスを見てみましょう。 (4) 書き込み()// 書き込み操作を実行する このメソッドは主に、ネットワーク書き込み操作を実際に実行するために使用されます。ご存知のとおり、ネットワーク プログラミングのプロセスでは、送信を 1 回で完了できるとは限りません。このとき、送信が完了したかどうかを判断する必要があります。そうでない場合は、null が返され、「送信を継続し、このチャネルの書き込みイベントに注意を払い続けるために、次のポーリング poll() を待機します」。送信が完了したら、「送信を返し、このソケットチャネルの OP_WRITE イベントに対するセレクターの注意をキャンセルします。」ここでは、書き込み操作を実行するために KafkaChannel クラスの write() メソッドが呼び出され、送信が完了したかどうかを判断するために maybeCompleteSend() が呼び出されます。まず write() 操作を見てみましょう。 (5) KafkaChannel.write()パブリックlong write ()はIOException をスローします{ このメソッドは主に、send に保存されたデータを実際に送信するために使用されます。実際のデータを送信するには、ByteBufferSend.writeTo を呼び出します。 writeTo() メソッドを見てみましょう:
このメソッドは主に、バッファ配列を SocketChannel に書き込むために使用されます。ネットワーク プログラミングでは、一度書き込むだけではすべてのデータが正常に書き込まれない場合があるため、Java NIO の基盤となる channel.write(buffers) メソッドを呼び出すと、「正常に書き込まれたバイト数」の戻り値が返され、一度呼び出した後に書き込まれたバイト数を知ることができます。 書き込み操作のために write() と一連の基礎となるメソッドが呼び出されると、送信されたバイト数が返されます。今回送信が完了していない場合は、null が返され、「ネットワーク書き込み操作の送信を継続するために次のポーリングを待機し、このチャネルの書き込みイベントに注意を払い続けます」ため、今回送信が完了したかどうかを判断する必要があります。見てみましょう: (6) 送信完了()//送信を完了できる この方法は主にデータの書き込みが完了したかどうかを判断するために使用されます。データの書き込みが完了したかどうかを判断する条件は、バッファに何も残っておらず、pending が false であることです。送信が完了したら、完了したリクエストを完了した送信コレクションcompletedSendsに追加します。 メッセージ要求が送信された後、他に何が行われますか?これには、NetworkClient クラスに関する関連知識が必要です。以下に簡単な説明と、さらに詳しい分析を示します。 github ソースコードのアドレスは次のとおりです。
private void handleCompletedSends ( < ClientResponse > 応答のリスト、 現在長い) { ソース コードから、「completedSends」コレクションと「inFlightRequests」コレクションには「相互に連携する」関係があることがわかります。 「completedSends」コレクションは送信されたがまだ返されていないリクエスト コレクションを参照し、「inFlightRequests」コレクションは送信されたが応答結果を受け取っていないリクエスト コレクションを格納します。 「completedSends」の要素は、「inFlightRequests」コレクションに対応するキューの最後の要素に対応します。 これでメッセージ送信プロセスの分析は終了です。送信後の後続作業については、Sender と NetWorkClient の説明時に詳しく分析します。次に、レスポンスの受信処理を見てみましょう。 3. 受信応答プロセスの分析 上記の Selector.pollSelectionKeys() を分析すると、ネットワーク読み取りイベントの準備ができると、ネットワーク読み取り操作を試行するために attemptRead() が呼び出されます。見てみましょう: 1. 回答結果を読む(1) 試行読み取り()プライベートvoid attemptRead ( KafkaChannel チャネル) はIOException をスローします{ このメソッドは主に、データを読み取り、受信したコレクションに追加するために使用されます。最初に KafkaChannel.read() メソッドが呼び出されて読み取りが行われ、その後読み取りが完了したかどうかが判断されることがわかります。そうでない場合は、次のポーリングで読み取りを続行します。完了した場合は、リクエストのcompletedReceivesコレクションに追加されます。 NetworkReceive オブジェクトが読み取られたかどうかを確認する方法を見てみましょう。 (2) 多分完了受信()// NetworkReceiveオブジェクトが読み取られたかどうかを判定する このメソッドは主に、データが読み取られたかどうかを判断するために使用されます。読み取りが完了したかどうかを判断する条件は、応答メッセージ ヘッダーのサイズ ByteBuffer と応答メッセージ本体のバッファー ByteBuffer を含め、NetworkReceive 内のバッファーが使い果たされたかどうかです。両方読んだときに初めて読書は完了します。 この時点で NetworkReceive オブジェクト全体が読み取られていない場合は、次に読み取りイベントがトリガーされたときに、NetworkReceive オブジェクト全体が引き続き読み取られます。この時点で完全な NetworkReceive オブジェクトが読み取られている場合は、そのオブジェクトはクリアされ、次に読み取りイベントがトリガーされたときに新しい NetworkReceive オブジェクトが作成されます。 2. 応答メッセージを解析する完全な応答メッセージを読んだ後、次に何をすべきでしょうか?つまり、応答メッセージを解析します。やり方を見てみましょう: github ソースコードのアドレスは次のとおりです。
private void handleCompletedReceives ( < ClientResponse > 応答をリストします。 現在は長いです) { このメソッドは主に、completedReceives コレクションをループして応答処理を実行するために使用されます。記事の冒頭で簡単に述べたように、応答を受信した後、応答は「inFlightRequests」から削除され、その後応答が解析されます。 プライベート静的構造体parseStructMaybeUpdateThrottleTimeMetrics ( ByteBuffer responseBuffer 、 RequestHeader requestHeader 、 センサーthrottleTimeSensor 、 long now ) { このメソッドは主に、応答を解析し、応答ヘッダーの correlationId 値が応答本体の値と一致しているかどうかを判断するために使用されます。一致しない場合は例外がスローされます。 現時点では、応答は解析されるだけで、処理されません。応答処理は、コールバック メソッドを呼び出すことによって処理されます。見てみましょう。 3. コールバックを処理するprivate void completeResponses ( < ClientResponse > 応答のリスト) { これで、応答メッセージを受信するプロセスの分析が完了します。 IV.結論ここで、この記事の要点をまとめてみましょう。 1. まず、Kafka ネットワーク層の送受信プロセス全体を見てみましょう。これは主に「メッセージ送信プロセス」と「応答受信プロセス」に分かれています。 2. メッセージ送信プロセスと応答受信プロセスのソースコード実装の詳細も分析します。 |
>>: Argo ロールアウトによるプログレッシブ リリース
[[217412]] 1. 概要仮想マシンをインストールすると、次の図に示すように、VMnet1 と...
最近、AWSのCEOであるアンディ・ジャシー氏は、Amazon AWSの年次開発者会議で、AWSが2...
日本の収穫期には、農家の中には毎日多くの時間を費やして、農場で収穫したキュウリを種類ごとに仕分けする...
米国ロサンゼルスに拠点を置くホスティング会社 Dreamhost は、1999 年から世界中にホステ...
社会の急速な発展に伴い、人々の生活水準は徐々に向上し、インターネットの発展も加速し、ますます多くのイ...
飲料業界の専門家とコミュニケーションをとる過程で、私たちは一連の興味深い視点を観察しました。巨大飲料...
VMware は、vSphere の新しいバージョンにより、Project Pacific の約束の...
Aimin.com からのドメイン名ニュース: 最近、ネットユーザーがフォーラムで、杭州 Budao...
[[389782]]最近の評価引き下げがなければ、IBMを除いてテクノロジー株を買うことより良い投資...
ウェブマスターのほとんどはA5で記事を投稿したことがあると思います。一般的に記事を投稿するのは男性の...
虹梅モールは半年以上人気がなく、変化を求めている2億円を投じたにもかかわらず、販売台数はわずか4万台...
月給5,000~50,000のこれらのプロジェクトはあなたの将来ですテクノロジーの発展に伴い、新しい...
分類情報サイトの台頭により、オンラインプロモーションやSEOを行う人々は分類情報サイトに夢中になって...
「背が高く、金持ちで、ハンサム」なサイト - 高品質の機能詳細、豊富なアプリケーション、ハンサムなイ...
ウェブサイトを構築するときは、まずユーザー エクスペリエンスを考慮し、次に検索エンジンを考慮する必要...