みなさんこんにちは。Hua Zaiです。またお会いできて嬉しいです。 今日は主に Kafka ネットワーク層の送受信プロセスをまとめます。このシリーズは3つの記事に分かれています。これは次の記事で、主に最後の質問を分析します。
この記事を注意深く読むと、Kafka ネットワーク層のソースコードに対する理解が深まると思います。 この記事には役立つ情報が沢山含まれているので、じっくり読んでいただければ幸いです。 I. 全体概要シナリオ駆動型のアプローチを通じて、ネットワーク要求がカプセル化され監視された後、ネットワーク経由でメッセージがどのように送受信されるか、そして何を行う必要があるかを見てみましょう。
誰もが理解しやすいように、すべてのソースコードの骨組みのみが保持されます。 2. メッセージ送信プロセスの分析1. メッセージの事前送信この部分は多くのことが関係しているので、ここでは簡単に説明し、後でそれを分析する特別な章があります。 クライアントはまず、送信するメッセージを準備します。プロセスは次のとおりです。
次に、Selector クラスと KafkaChannel クラスの具体的なソース コード実装を順番に見ていきましょう。 (1)メモリに一時的に保存するデータを要求するgithub ソースコードのアドレスは次のとおりです。
/** ソースコードから、KafkaChannel クラスの setSend() メソッドが呼び出されていることがわかります。 パブリックvoid setSend ( 送信送信) { このメソッドは主に事前送信に使用されます。つまり、ネットワーク要求を送信する前に、送信する ByteBuffer データが KafkaChannel の send に保存され、その後トランスポート層メソッドが呼び出されて、このチャネル上の "OP_WRITE" イベントへの注目度が高まり、同時に "OP_READ" イベントも保持されます。このとき、チャネルは読み取りと書き込みを同時に行うことができます。実際に送信が実行されると、まず送信からデータが読み取られます。 2. メッセージが実際に送信されるSender 子スレッドは、Selector の「poll」メソッドを呼び出して、実際にリクエストを送信します。 (1) 投票()
このメソッドは、準備完了イベントを収集し、そのイベントに対してネットワーク操作を実行するという 1 つの処理を実行します。上記の簡略化されたコードから、「pollSelectionKeys」メソッドが呼び出されていることがわかります。実際の読み取りおよび書き込み操作はこのメソッドで行われます。見てみましょう: (2) ポーリング選択キー()void pollSelectionKeys ( <SelectionKey> selectionKeys を設定し、 boolean isImmediatelyConnected 、 long currentTimeNanos を設定します) { このメソッドは主に、接続イベント、読み取りおよび書き込みイベント、すぐに完了する接続など、監視対象イベントを処理するために使用されます。次に、実際にネットワークに書き込む方法を見てみましょう。 (3) 書き込みを試みる()プライベートvoid attemptWrite ( SelectionKey キー、 KafkaChannel チャネル、 long nowNanos )はIOException をスローします{ このメソッドは主に、ネットワーク書き込み操作を実行するために使用されます。この方法は非常にシンプルで、「同時に 4 つの条件を満たす」必要があります。
上記の 4 つの条件が満たされると、書き込み操作を実行できます。次に、書き込み操作のプロセスを見てみましょう。 (4) 書き込み()// 書き込み操作を実行する このメソッドは主に、ネットワーク書き込み操作を実際に実行するために使用されます。ご存知のとおり、ネットワーク プログラミングのプロセスでは、送信を 1 回で完了できるとは限りません。このとき、送信が完了したかどうかを判断する必要があります。そうでない場合は、null が返され、「送信を継続し、このチャネルの書き込みイベントに注意を払い続けるために、次のポーリング poll() を待機します」。送信が完了したら、「送信を返し、このソケットチャネルの OP_WRITE イベントに対するセレクターの注意をキャンセルします。」ここでは、書き込み操作を実行するために KafkaChannel クラスの write() メソッドが呼び出され、送信が完了したかどうかを判断するために maybeCompleteSend() が呼び出されます。まず write() 操作を見てみましょう。 (5) KafkaChannel.write()パブリックlong write ()はIOException をスローします{ このメソッドは主に、send に保存されたデータを実際に送信するために使用されます。実際のデータを送信するには、ByteBufferSend.writeTo を呼び出します。 writeTo() メソッドを見てみましょう:
このメソッドは主に、バッファ配列を SocketChannel に書き込むために使用されます。ネットワーク プログラミングでは、一度書き込むだけではすべてのデータが正常に書き込まれない場合があるため、Java NIO の基盤となる channel.write(buffers) メソッドを呼び出すと、「正常に書き込まれたバイト数」の戻り値が返され、一度呼び出した後に書き込まれたバイト数を知ることができます。 書き込み操作のために write() と一連の基礎となるメソッドが呼び出されると、送信されたバイト数が返されます。今回送信が完了していない場合は、null が返され、「ネットワーク書き込み操作の送信を継続するために次のポーリングを待機し、このチャネルの書き込みイベントに注意を払い続けます」ため、今回送信が完了したかどうかを判断する必要があります。見てみましょう: (6) 送信完了()//送信を完了できる この方法は主にデータの書き込みが完了したかどうかを判断するために使用されます。データの書き込みが完了したかどうかを判断する条件は、バッファに何も残っておらず、pending が false であることです。送信が完了したら、完了したリクエストを完了した送信コレクションcompletedSendsに追加します。 メッセージ要求が送信された後、他に何が行われますか?これには、NetworkClient クラスに関する関連知識が必要です。以下に簡単な説明と、さらに詳しい分析を示します。 github ソースコードのアドレスは次のとおりです。
private void handleCompletedSends ( < ClientResponse > 応答のリスト、 現在長い) { ソース コードから、「completedSends」コレクションと「inFlightRequests」コレクションには「相互に連携する」関係があることがわかります。 「completedSends」コレクションは送信されたがまだ返されていないリクエスト コレクションを参照し、「inFlightRequests」コレクションは送信されたが応答結果を受け取っていないリクエスト コレクションを格納します。 「completedSends」の要素は、「inFlightRequests」コレクションに対応するキューの最後の要素に対応します。 これでメッセージ送信プロセスの分析は終了です。送信後の後続作業については、Sender と NetWorkClient の説明時に詳しく分析します。次に、レスポンスの受信処理を見てみましょう。 3. 受信応答プロセスの分析 上記の Selector.pollSelectionKeys() を分析すると、ネットワーク読み取りイベントの準備ができると、ネットワーク読み取り操作を試行するために attemptRead() が呼び出されます。見てみましょう: 1. 回答結果を読む(1) 試行読み取り()プライベートvoid attemptRead ( KafkaChannel チャネル) はIOException をスローします{ このメソッドは主に、データを読み取り、受信したコレクションに追加するために使用されます。最初に KafkaChannel.read() メソッドが呼び出されて読み取りが行われ、その後読み取りが完了したかどうかが判断されることがわかります。そうでない場合は、次のポーリングで読み取りを続行します。完了した場合は、リクエストのcompletedReceivesコレクションに追加されます。 NetworkReceive オブジェクトが読み取られたかどうかを確認する方法を見てみましょう。 (2) 多分完了受信()// NetworkReceiveオブジェクトが読み取られたかどうかを判定する このメソッドは主に、データが読み取られたかどうかを判断するために使用されます。読み取りが完了したかどうかを判断する条件は、応答メッセージ ヘッダーのサイズ ByteBuffer と応答メッセージ本体のバッファー ByteBuffer を含め、NetworkReceive 内のバッファーが使い果たされたかどうかです。両方読んだときに初めて読書は完了します。 この時点で NetworkReceive オブジェクト全体が読み取られていない場合は、次に読み取りイベントがトリガーされたときに、NetworkReceive オブジェクト全体が引き続き読み取られます。この時点で完全な NetworkReceive オブジェクトが読み取られている場合は、そのオブジェクトはクリアされ、次に読み取りイベントがトリガーされたときに新しい NetworkReceive オブジェクトが作成されます。 2. 応答メッセージを解析する完全な応答メッセージを読んだ後、次に何をすべきでしょうか?つまり、応答メッセージを解析します。やり方を見てみましょう: github ソースコードのアドレスは次のとおりです。
private void handleCompletedReceives ( < ClientResponse > 応答をリストします。 現在は長いです) { このメソッドは主に、completedReceives コレクションをループして応答処理を実行するために使用されます。記事の冒頭で簡単に述べたように、応答を受信した後、応答は「inFlightRequests」から削除され、その後応答が解析されます。 プライベート静的構造体parseStructMaybeUpdateThrottleTimeMetrics ( ByteBuffer responseBuffer 、 RequestHeader requestHeader 、 センサーthrottleTimeSensor 、 long now ) { このメソッドは主に、応答を解析し、応答ヘッダーの correlationId 値が応答本体の値と一致しているかどうかを判断するために使用されます。一致しない場合は例外がスローされます。 現時点では、応答は解析されるだけで、処理されません。応答処理は、コールバック メソッドを呼び出すことによって処理されます。見てみましょう。 3. コールバックを処理するprivate void completeResponses ( < ClientResponse > 応答のリスト) { これで、応答メッセージを受信するプロセスの分析が完了します。 IV.結論ここで、この記事の要点をまとめてみましょう。 1. まず、Kafka ネットワーク層の送受信プロセス全体を見てみましょう。これは主に「メッセージ送信プロセス」と「応答受信プロセス」に分かれています。 2. メッセージ送信プロセスと応答受信プロセスのソースコード実装の詳細も分析します。 |
>>: Argo ロールアウトによるプログレッシブ リリース
正直、bitaccel がスタートした当初は暴走するかもしれないと思いました。設立当初の価格は本当に...
いわゆる「人格」が崩壊したのは、近年、彼が亡くなった妻と子供を利用して被害者の「人格」を作り、大多数...
Shuhost(Shumai Technology)の香港データセンターは現在、Huawei Clo...
顧客が広告からあなたのストアにアクセスすると、商品ページが表示されます。この時点で、顧客には 2 つ...
LetBox はプロモーション用に特別価格の KVM 仮想 VPS をいくつかリリースしました。これ...
SEO 業界に参入してから 1 年半の間、私の哲学は常に、キーワードの最適化を二次的な優先事項として...
2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています企業のモバ...
多くの企業は、Baidu からプロモーションの電話を頻繁に受け、時々挨拶を受けます。編集者のクライア...
良いランキングを得るには、ウェブサイトの適切な内部構造に加えて、外部リンクも非常に重要な要素です。外...
2012年の終わりが近づくにつれ、国内の垂直型電子商取引は売上ブームに突入し始めています。最近、国内...
[[236340]]クラウドの分析によると、エンドユーザーの回答者の 60% が現在クラウドを使用し...
歴史を学んだ友人は、1978年以来、中国が改革開放政策を実施し、中国が徐々に世界に向かって進み、最終...
10 月 27 日から 11 月 2 日まで、Fastcomet はハロウィーン プロモーションを開...
筆者は長年ウェブサイトを構築していませんでしたが、最近少し時間ができたので、2つのウェブサイトを構築...
私は分析が得意な人間ではないといつも感じていますが、幸いなことに、私は優れた組織者です。問題が発生す...