分散フレームワークを読んで、知っておくべきNIOの基本的な知識

分散フレームワークを読んで、知っておくべきNIOの基本的な知識

[[397718]]

この記事はWeChatの公開アカウント「KK Architect」から転載したもので、著者はKK Architectです。記事の転載についてはKKアーキテクトの公式アカウントまでご連絡ください。

1. はじめに

分散オープンソース プロジェクトを読むときに最も重要なことは、プロジェクトの通信フレームワークを理解することです。

分散オープンソース フレームワークは通常、クラスターにデプロイされるため、複雑な機能を実行するには異なるノードが相互に通信する必要があります。ソースコードを読むときに、その通信の仕組みを理解していないと、まるで原始の森に迷い込んだかのように、コードの中で迷子になってしまいます。

たとえば、HDFS は独自のカプセル化された Hadoop Rpc 通信フレームワークを使用します。 Spark は基盤となる通信に Netty を使用します。そして、私が最近読んだ Kafka のソース コードでは、最下層でネイティブ Java NIO が使用されています。

そこで今回は、Java NIO の主な知識ポイントについてお話ししましょう。

2. 複数の図でNIOの3つのコアコンセプトを理解する

NIO について話すとき、チャネル、バッファ、セレクターという 3 つのコア概念があります。

いきなり本題に入ると少し混乱するかもしれないので、最初から始める必要があります。

1. チャンネル

過去、同時実行要件がそれほど高くなかったときは、次に示すように、CPU がすべての入力と出力 (割り込み) の処理を​​担当していました。

ユーザー プログラムはサーバーへの読み取りおよび書き込み要求を開始し、CPU はこれらの要求を直接処理します。これには欠点があります。 IO 要求が大量に発生すると、CPU が大量に占有され、システム全体の処理能力が低下します。

コンピュータの発展に伴い、次に示すように、DMA を使用して IO 要求を完全に処理する新しい方法が登場しました。

DMA は Direct Memory Access、直接メモリアクセス制御の略です。

なぜこのデバイスを追加する必要があるのですか?これは、CPU 割り込みモードがデータ転送速度の要件を満たすことができないためです。割り込みモードでは、割り込みごとにブレークポイントとシーンを保存する必要があり、割り込みが戻ったときにブレークポイントとシーンを復元する必要があります。

これらすべての理由により、割り込み方式では高速周辺機器の伝送速度要件を満たすことが困難になります。

そのため、DMA のようなデバイスが存在します。 DMA モードでのデータ転送プロセス中に、I/O デバイスがデータを転送する必要がある場合、DMA コントローラを介して CPU に DMA 転送要求を行います。 CPU が応答すると、システム バスを放棄し、DMA コントローラがバスを引き継いでデータ転送を行います。このとき、CPU は一部の初期化操作を除いて独自の処理を実行できます。

しかし、DMA を使用しても、I/O 要求が多すぎるとバスの競合が発生するため、急速なビジネス開発のニーズを満たすことはできません。

それでチャンネルは後から登場したのです。 DMA との違いは、チャネルが独自の命令システムとプログラムを持ち、コプロセッサであることです。 DMA は固定のデータ転送制御のみを実装できます。

Java NIO のチャネルは、上図のチャネルの実装です。

2. バッファ

チャネルの概念を理解すれば、バッファも簡単に理解できます。

チャネルは、I/O デバイス (ファイル、ソケットなど) へのオープン接続を表しますが、チャネル自体はデータを保存しません。バッファはデータ転送の実際のキャリアです。

アプリケーションがデータを書き込む場合、まずデータをバッファに書き込む必要があります。その後、チャネルはバッファ内のデータを宛先 (ファイル、ディスク、ネットワーク) に送信し、バッファからデータを取り出す役割を担います。

NIO システムを使用する場合は、I/O デバイスに接続するためのチャネルとデータを収容するためのバッファを取得し、バッファを操作してデータを処理する必要があります。

3. セレクター

セレクターはマルチプレクサーとも呼ばれ、非ブロッキング I/O です。非ブロッキングについて話しているうちに、まずブロッキングについて話す必要があります。ブロッキング方法を次の図に示します。

クライアントがサーバーに読み取りまたは書き込み要求を送信すると、サーバーのスレッドはカーネル アドレス空間にデータがあるかどうかを継続的にチェックします。

クライアントに送信するデータがない場合、サーバー スレッドは待機し続け、この期間中は何も実行できません。

クライアントがデータを送信するまで、データはカーネル アドレス空間からユーザー アドレス空間にコピーされ、その後、データを読み取ることができます。

つまり、大量のリクエストが来た場合、後続のリクエストは前のリクエストの実行を待たなければならず、多くのキューが発生し、CPU リソースを十分に活用できず、パフォーマンスが急激に低下します。

セレクターがどのように機能するかを見てみましょう。

これで、クライアントとサーバー間の通信ではチャネル + バッファが使用されるため、すべてのチャネルがセレクタに登録されます。セレクターは、接続、読み取り、書き込みなど、これらのチャネルの I/O ステータスを監視します。

チャネル上のイベントが完全に準備されると、セレクターはタスクをサーバー上の 1 つ以上のスレッドに割り当てます。

クライアントにイベントの準備ができていない場合、サーバー スレッドはブロックされません。クライアント イベントの準備ができるまで、独自の処理を実行できます。

ブロッキング方式と比較して、この非ブロッキング方式では CPU リソースをさらに活用できます。

3. 概念を理解してからAPIを学ぶ

1. バッファAPI

バッファを完全に理解するには、バッファの 4 つのプロパティ (マーク、位置、制限、容量) を知っておく必要があります。これらを知るには、コードを一度実行するだけです。

(1)一定サイズのバッファを割り当てる

  1. //1.指定されたサイズのバッファを割り当てる
  2. ByteBuffer バッファ = ByteBuffer.allocate(10);
  3. システム。 out .println( "--------割り当て" );
  4. システム。出力.println( "位置:" + buffer.position());
  5. システム。出力.println( "limit:" + buffer.limit());
  6. システム。出力.println( "容量:" + buffer.capacity());

操作結果:

  1. ---------割り当て------------  
  2. 位置:0
  3. 制限:10
  4. 定員:10

ここでは 10 バイトのバッファを割り当てます。つまり、最後の byte[] hb に 10 バイトのスペースを開きます。 ByteBuffer のプロパティ。

つまり、容量は 10、制限はデータの読み書きが可能な最大位置も 10、位置はデータの操作が可能な位置は 0 です。

(2)バッファにデータを書き込む

  1. // 2. バッファにデータを書き込む
  2. 文字列 str = "abcde" ;
  3. システム。出力.println( "------------put-------------" );
  4. buffer.put(str.getBytes(StandardCharsets.UTF_8));
  5. システム。出力.println( "位置:" + buffer.position());
  6. システム。出力.println( "limit:" + buffer.limit());
  7. システム。出力.println( "容量:" + buffer.capacity());

操作結果:

  1. - - - - - - 置く - - - - - -  
  2. 位置:5
  3. 制限:10
  4. 定員:10

ここでは 5 バイトのデータをバッファに書き込むため、容量と制限は依然として 10 ですが、すでに 5 バイトが書き込まれているため、位置は 5 になります。

(3)データ読み取りモードに切り替える

  1. // 3. データ読み取りモードに切り替える
  2. バッファを反転します。
  3. システム。出力.println( "-------------反転----------------" );
  4. システム。出力.println( "位置:" + buffer.position());
  5. システム。出力.println( "limit:" + buffer.limit());
  6. システム。出力.println( "容量:" + buffer.capacity());

ここで、バッファからデータを読み取る場合は、フリップ モードに切り替える必要があります。反転すると、いくつかの属性の値が変更されます。

操作結果:

  1. ------------- 反転-------------  
  2. 位置:0
  3. 制限:5
  4. 定員:10

flipは位置の値を0に変更し、制限を5に変更します。つまり、最初から読み取りたいのですが、位置5までしか読み取れません。

(4)データを読む

  1. // 4. データの読み取り
  2. システム。出力.println( "-------------get----------------" );
  3. byte[] dest = 新しいbyte[buffer.limit()];
  4. バッファを取得します。
  5. システム。出力.println(新しい文字列(dest,0,dest.length));
  6. システム。出力.println( "位置:" + buffer.position());
  7. システム。出力.println( "limit:" + buffer.limit());
  8. システム。出力.println( "容量:" + buffer.capacity());

操作結果:

  1. - - - - - - 得る - - - - - -  
  2. ABCD
  3. 位置:5
  4. 制限:5
  5. 定員:10

データを読み取った後、位置は 5 になり、5 を読み取ったことを示します。

(5)繰り返し読む

  1. //5.巻き戻し()
  2. バッファを巻き戻す();
  3. システム。出力.println( "-------------巻き戻し----------------" );
  4. システム。出力.println( "位置:" + buffer.position());
  5. システム。出力.println( "limit:" + buffer.limit());
  6. システム。出力.println( "容量:" + buffer.capacity());

操作結果:

  1. ------------巻き戻し-------------  
  2. 位置:0
  3. 制限:5
  4. 定員:10

巻き戻しとは、バッファ内のデータを繰り返し読み取ることを意味する。

(6)データの消去

  1. //6.クリア()
  2. バッファをクリアします。
  3. システム。出力.println( "--------------クリア----------------" );
  4. システム。出力.println( "位置:" + buffer.position());
  5. システム。出力.println( "limit:" + buffer.limit());
  6. システム。出力.println( "容量:" + buffer.capacity());

操作結果:

  1. - - - - - - クリア - - - - - -  
  2. 位置:0
  3. 制限:10
  4. 定員:10

clear() 後、position は 0 に戻り、limit は 10 に戻り、最大 10 バイトまでデータを先頭から再度書き込むことができます。

ただし、バッファ内のデータはクリアされず、データはまだバッファ内に残っており、「忘れられた」状態にあることに注意してください。これらのポインタは元の状態に戻ります。

(7)マーキング

これは 4 番目の属性である「マーク」です。

マークはポジションの位置を記録することができます。 reset() メソッドを使用してマーク位置に戻ることができます。

  1. @テスト
  2. パブリックボイドtest2() {
  3. // 10バイトを割り当てる
  4. 文字列 str = "abcde" ;
  5. ByteBuffer バッファ = ByteBuffer.allocate(10);
  6. buffer.put(str.getBytes(StandardCharsets.UTF_8));
  7.  
  8. // 読み取りモードに切り替えて2バイト読み取ります
  9. バッファを反転します。
  10. byte[] dest = 新しいbyte[buffer.limit()];
  11. バッファを取得します(宛先、0、2);
  12. システム。出力.println(新しいString(dest, 0, 2));
  13. システム。出力.println(buffer.position());
  14.  
  15. // 現在の位置をマークする
  16. バッファをマークします。
  17.  
  18. // さらに2バイトを読み込む
  19. バッファを取得します(宛先, 2, 2);
  20. システム。出力.println(new String(dest, 2, 2));
  21. システム。出力.println(buffer.position());
  22.  
  23. // リセット、マーク位置に戻る
  24. バッファをリセットします。
  25. システム。出力.println(buffer.position());
  26. }
  27.  
  28. 実行結果:
  29.  
  30. ```TeX
  31. アブ
  32. 2
  33. CD
  34. 4
  35. 2

2. チャネル、バッファ、セレクタを使用してネットワークプログラムを完成させる

(1)サーバー

  1. @テスト
  2. パブリックvoid testServer()はIOExceptionをスローします{
  3. ServerSocketChannel serverSocketChannel = ServerSocketChannel。開ける();
  4. serverSocketChannel.configureBlocking( false );
  5.  
  6. serverSocketChannel.bind(新しいInetSocketAddress(8989));
  7.  
  8. セレクター セレクター = セレクター。開ける();
  9. serverSocketChannel.register(セレクタ、SelectionKey.OP_ACCEPT);
  10.  
  11. while (セレクタ.select () > 0) {
  12. イテレータ<SelectionKey> iterator = selector.selectedKeys().iterator();
  13. (イテレータ.hasNext()) の間 {
  14. 選択キーキー= iterator.next ( );
  15. if (キー.isAcceptable()) {
  16. ソケットチャネル socketChannel = serverSocketChannel.accept();
  17. socketChannel.configureBlocking( false );
  18. socketChannel.register(セレクタ、SelectionKey.OP_READ);
  19. }それ以外の場合 (キー.isReadable()) {
  20. SocketChannel チャネル = (SocketChannel)キー.channel();
  21. バイトバッファ byteBuffer = ByteBuffer.allocate(1024);
  22. さ = 0;
  23. while ((len = channel. read (byteBuffer)) > 0) {
  24. バイトバッファを反転します。
  25. システム。出力.println(新しいString(byteBuffer.array(), 0, len));
  26. バイトバッファをクリアします。
  27. }
  28. }
  29. }
  30.  
  31. イテレータを削除します。
  32. }
  33. }

1. まず、ServerSocketChannel.open() を使用してチャネルを開き、非ブロッキング モードに設定します。

2. ポート8989にバインドします。

3. セレクターにチャネルを登録します。

4. while ループで、セレクターにイベントがあるかどうかを確認します。イベントがクライアントの接続イベントである場合は、SocketChannel を開き、非ブロッキング モードで登録し、セレクターにデータ読み取りイベントを登録します。

5. クライアントがデータを送信すると、チャネルを開いてバッファ内のデータを読み取ることができます。

6. このとき、サーバーは複数のクライアントからのリクエストを同時に受け入れることができます。

(2)クライアント

  1. @テスト
  2. パブリックvoid testClient()はIOExceptionをスローします{
  3. SocketChannel socketChannel = SocketChannel。新しいInetSocketAddress( "127.0.0.1" , 8989)を開きます
  4. socketChannel.configureBlocking( false );
  5.  
  6. バイトバッファ byteBuffer = ByteBuffer.allocate(1024);
  7. byteBuffer.put(新しいDate ().toString().getBytes(StandardCharsets.UTF_8));
  8. バイトバッファを反転します。
  9. socketChannel.write(byteBuffer);
  10. バイトバッファをクリアします。
  11.  
  12. socketChannel.close () ;
  13.  
  14. }

1. クライアントは SocketChannel を開き、非ブロッキング モードで構成します。

2. ByteBuffer を使用してデータを送信します (送信前に反転する必要があることに注意してください)。

3. チャネルを閉じます。

IV.結論

今回は、Java NIO、チャネル、バッファ、セレクターのいくつかの主要な概念を予備的に調べました。

しかし、これは氷山の一角に過ぎないことを知っておく必要があります。チャネルとセレクターをさらに深く理解したい場合は、基盤となるオペレーティング システムと多くのコンピューター構成の原則に関する知識が必要になります。

たとえば、セレクターには select、poll、epoll の概念が含まれます。これらの概念をさらに拡張すると、ハードウェア割り込みとカーネルに関する知識も必要になります。

そのため、学習は苦難の旅であるとますます感じるようになりました。

<<:  ガートナー:世界のパブリッククラウド支出は2021年に3,323億ドルに達する

>>:  Linux 仮想化 KVM-Qemu 分析 Vhost-Net

推薦する

onevps が VPS 情報を受信しない場合はどうすればいいですか?

最近、大量の注文により、onevps のロサンゼルス データ センターでサーバーを増設しており、配送...

人間化はオリンピックの商業プロモーション成功の切り札

ロンドンオリンピックは間違いなくこの夏最も注目を集めるイベントの一つであり、世界中の注目がロンドンに...

Dockerイメージをバッチロードする最も簡単な方法

通常、Docker イメージ ファイルをバッチでロードする場合は、シェル ファイルを作成し、 for...

Baidu と Google は、ウェブサイトのコンテンツがオリジナルであるかどうかをどのように判断するのでしょうか?

百度のオリジナルコンテンツに対する判断の分析重複コンテンツが多いウェブサイトは、キーワードランキング...

海外でも使いやすい格安クラウドホストをおすすめします

安価なクラウド ホストは数多くあり、特に海外のホストでは価格が驚くほど安い場合があります。しかし、信...

Three Squirrels の WeChat 実験の秘密を明かす: 公開アカウントで下品な発言を避ける方法

【編集部注】ソーシャルマーケティングに長けたスナックブランド「Three Squirrels」も、W...

Baidu Statistics をインストールすると SEO 最適化ランキングに役立ちますか?

Baidu Statistics のリリースに伴い、ますます多くのウェブマスターの友人が自分の We...

Lovevps-6.99 USD/月 KVM/3 データセンター ケント/オーランド/ロサンゼルス

現在 lovevps が使用しているデータ センターは次のとおりです: - Custodian DC...

楊磊氏との対談:「究極のシンプルさ」と「究極の真実」でユーザーにとっての価値を継続的に創造する

[51CTO.comより引用] 設立以来7年間で、8万人以上のユーザーに高品質のサービスを提供してお...

クラウド コンピューティングの 7 つの主なメリット

[[342533]]調査によると、現在パブリック クラウドを導入している回答者の数は 2017 年の...

Baidu Wenku を使って企業ウェブサイトへのトラフィックを増やす方法

SEOは企業ウェブサイトのプロモーションにおいて一定の位置を占めています。企業ウェブサイトはSEOを...

Baidu の発表に疑問:Baidu が公開したアルゴリズムはどの程度真実なのか?

みなさんこんにちは。私はMuzi Chengzhouです。 SEO を行う際、SEO をうまく行うに...

百科事典ウェブサイトにおけるソーシャルメディアマーケティングの考慮事項

百科事典のウェブサイトは、中国でよく使われる百度百科事典、滬東百科事典、索想百科事典など、通常、より...

ウェブサイト復旧の実践後の洞察と考察

私は SEO 業界に約 2 年間携わっています。これまでは他の人の著作をいつも読んでいましたが、自ら...

2020 年のクラウド コンピューティングの 6 つのトレンド

2019 年、クラウド コンピューティングはテクノロジーおよびビジネス メディアで最もホットなトピッ...