分散フレームワークを読んで、知っておくべきNIOの基本的な知識

分散フレームワークを読んで、知っておくべきNIOの基本的な知識

[[397718]]

この記事はWeChatの公開アカウント「KK Architect」から転載したもので、著者はKK Architectです。記事の転載についてはKKアーキテクトの公式アカウントまでご連絡ください。

1. はじめに

分散オープンソース プロジェクトを読むときに最も重要なことは、プロジェクトの通信フレームワークを理解することです。

分散オープンソース フレームワークは通常、クラスターにデプロイされるため、複雑な機能を実行するには異なるノードが相互に通信する必要があります。ソースコードを読むときに、その通信の仕組みを理解していないと、まるで原始の森に迷い込んだかのように、コードの中で迷子になってしまいます。

たとえば、HDFS は独自のカプセル化された Hadoop Rpc 通信フレームワークを使用します。 Spark は基盤となる通信に Netty を使用します。そして、私が最近読んだ Kafka のソース コードでは、最下層でネイティブ Java NIO が使用されています。

そこで今回は、Java NIO の主な知識ポイントについてお話ししましょう。

2. 複数の図でNIOの3つのコアコンセプトを理解する

NIO について話すとき、チャネル、バッファ、セレクターという 3 つのコア概念があります。

いきなり本題に入ると少し混乱するかもしれないので、最初から始める必要があります。

1. チャンネル

過去、同時実行要件がそれほど高くなかったときは、次に示すように、CPU がすべての入力と出力 (割り込み) の処理を​​担当していました。

ユーザー プログラムはサーバーへの読み取りおよび書き込み要求を開始し、CPU はこれらの要求を直接処理します。これには欠点があります。 IO 要求が大量に発生すると、CPU が大量に占有され、システム全体の処理能力が低下します。

コンピュータの発展に伴い、次に示すように、DMA を使用して IO 要求を完全に処理する新しい方法が登場しました。

DMA は Direct Memory Access、直接メモリアクセス制御の略です。

なぜこのデバイスを追加する必要があるのですか?これは、CPU 割り込みモードがデータ転送速度の要件を満たすことができないためです。割り込みモードでは、割り込みごとにブレークポイントとシーンを保存する必要があり、割り込みが戻ったときにブレークポイントとシーンを復元する必要があります。

これらすべての理由により、割り込み方式では高速周辺機器の伝送速度要件を満たすことが困難になります。

そのため、DMA のようなデバイスが存在します。 DMA モードでのデータ転送プロセス中に、I/O デバイスがデータを転送する必要がある場合、DMA コントローラを介して CPU に DMA 転送要求を行います。 CPU が応答すると、システム バスを放棄し、DMA コントローラがバスを引き継いでデータ転送を行います。このとき、CPU は一部の初期化操作を除いて独自の処理を実行できます。

しかし、DMA を使用しても、I/O 要求が多すぎるとバスの競合が発生するため、急速なビジネス開発のニーズを満たすことはできません。

それでチャンネルは後から登場したのです。 DMA との違いは、チャネルが独自の命令システムとプログラムを持ち、コプロセッサであることです。 DMA は固定のデータ転送制御のみを実装できます。

Java NIO のチャネルは、上図のチャネルの実装です。

2. バッファ

チャネルの概念を理解すれば、バッファも簡単に理解できます。

チャネルは、I/O デバイス (ファイル、ソケットなど) へのオープン接続を表しますが、チャネル自体はデータを保存しません。バッファはデータ転送の実際のキャリアです。

アプリケーションがデータを書き込む場合、まずデータをバッファに書き込む必要があります。その後、チャネルはバッファ内のデータを宛先 (ファイル、ディスク、ネットワーク) に送信し、バッファからデータを取り出す役割を担います。

NIO システムを使用する場合は、I/O デバイスに接続するためのチャネルとデータを収容するためのバッファを取得し、バッファを操作してデータを処理する必要があります。

3. セレクター

セレクターはマルチプレクサーとも呼ばれ、非ブロッキング I/O です。非ブロッキングについて話しているうちに、まずブロッキングについて話す必要があります。ブロッキング方法を次の図に示します。

クライアントがサーバーに読み取りまたは書き込み要求を送信すると、サーバーのスレッドはカーネル アドレス空間にデータがあるかどうかを継続的にチェックします。

クライアントに送信するデータがない場合、サーバー スレッドは待機し続け、この期間中は何も実行できません。

クライアントがデータを送信するまで、データはカーネル アドレス空間からユーザー アドレス空間にコピーされ、その後、データを読み取ることができます。

つまり、大量のリクエストが来た場合、後続のリクエストは前のリクエストの実行を待たなければならず、多くのキューが発生し、CPU リソースを十分に活用できず、パフォーマンスが急激に低下します。

セレクターがどのように機能するかを見てみましょう。

これで、クライアントとサーバー間の通信ではチャネル + バッファが使用されるため、すべてのチャネルがセレクタに登録されます。セレクターは、接続、読み取り、書き込みなど、これらのチャネルの I/O ステータスを監視します。

チャネル上のイベントが完全に準備されると、セレクターはタスクをサーバー上の 1 つ以上のスレッドに割り当てます。

クライアントにイベントの準備ができていない場合、サーバー スレッドはブロックされません。クライアント イベントの準備ができるまで、独自の処理を実行できます。

ブロッキング方式と比較して、この非ブロッキング方式では CPU リソースをさらに活用できます。

3. 概念を理解してからAPIを学ぶ

1. バッファAPI

バッファを完全に理解するには、バッファの 4 つのプロパティ (マーク、位置、制限、容量) を知っておく必要があります。これらを知るには、コードを一度実行するだけです。

(1)一定サイズのバッファを割り当てる

  1. //1.指定されたサイズのバッファを割り当てる
  2. ByteBuffer バッファ = ByteBuffer.allocate(10);
  3. システム。 out .println( "--------割り当て" );
  4. システム。出力.println( "位置:" + buffer.position());
  5. システム。出力.println( "limit:" + buffer.limit());
  6. システム。出力.println( "容量:" + buffer.capacity());

操作結果:

  1. ---------割り当て------------  
  2. 位置:0
  3. 制限:10
  4. 定員:10

ここでは 10 バイトのバッファを割り当てます。つまり、最後の byte[] hb に 10 バイトのスペースを開きます。 ByteBuffer のプロパティ。

つまり、容量は 10、制限はデータの読み書きが可能な最大位置も 10、位置はデータの操作が可能な位置は 0 です。

(2)バッファにデータを書き込む

  1. // 2. バッファにデータを書き込む
  2. 文字列 str = "abcde" ;
  3. システム。出力.println( "------------put-------------" );
  4. buffer.put(str.getBytes(StandardCharsets.UTF_8));
  5. システム。出力.println( "位置:" + buffer.position());
  6. システム。出力.println( "limit:" + buffer.limit());
  7. システム。出力.println( "容量:" + buffer.capacity());

操作結果:

  1. - - - - - - 置く - - - - - -  
  2. 位置:5
  3. 制限:10
  4. 定員:10

ここでは 5 バイトのデータをバッファに書き込むため、容量と制限は依然として 10 ですが、すでに 5 バイトが書き込まれているため、位置は 5 になります。

(3)データ読み取りモードに切り替える

  1. // 3. データ読み取りモードに切り替える
  2. バッファを反転します。
  3. システム。出力.println( "-------------反転----------------" );
  4. システム。出力.println( "位置:" + buffer.position());
  5. システム。出力.println( "limit:" + buffer.limit());
  6. システム。出力.println( "容量:" + buffer.capacity());

ここで、バッファからデータを読み取る場合は、フリップ モードに切り替える必要があります。反転すると、いくつかの属性の値が変更されます。

操作結果:

  1. ------------- 反転-------------  
  2. 位置:0
  3. 制限:5
  4. 定員:10

flipは位置の値を0に変更し、制限を5に変更します。つまり、最初から読み取りたいのですが、位置5までしか読み取れません。

(4)データを読む

  1. // 4. データの読み取り
  2. システム。出力.println( "-------------get----------------" );
  3. byte[] dest = 新しいbyte[buffer.limit()];
  4. バッファを取得します。
  5. システム。出力.println(新しい文字列(dest,0,dest.length));
  6. システム。出力.println( "位置:" + buffer.position());
  7. システム。出力.println( "limit:" + buffer.limit());
  8. システム。出力.println( "容量:" + buffer.capacity());

操作結果:

  1. - - - - - - 得る - - - - - -  
  2. ABCD
  3. 位置:5
  4. 制限:5
  5. 定員:10

データを読み取った後、位置は 5 になり、5 を読み取ったことを示します。

(5)繰り返し読む

  1. //5.巻き戻し()
  2. バッファを巻き戻す();
  3. システム。出力.println( "-------------巻き戻し----------------" );
  4. システム。出力.println( "位置:" + buffer.position());
  5. システム。出力.println( "limit:" + buffer.limit());
  6. システム。出力.println( "容量:" + buffer.capacity());

操作結果:

  1. ------------巻き戻し-------------  
  2. 位置:0
  3. 制限:5
  4. 定員:10

巻き戻しとは、バッファ内のデータを繰り返し読み取ることを意味する。

(6)データの消去

  1. //6.クリア()
  2. バッファをクリアします。
  3. システム。出力.println( "--------------クリア----------------" );
  4. システム。出力.println( "位置:" + buffer.position());
  5. システム。出力.println( "limit:" + buffer.limit());
  6. システム。出力.println( "容量:" + buffer.capacity());

操作結果:

  1. - - - - - - クリア - - - - - -  
  2. 位置:0
  3. 制限:10
  4. 定員:10

clear() 後、position は 0 に戻り、limit は 10 に戻り、最大 10 バイトまでデータを先頭から再度書き込むことができます。

ただし、バッファ内のデータはクリアされず、データはまだバッファ内に残っており、「忘れられた」状態にあることに注意してください。これらのポインタは元の状態に戻ります。

(7)マーキング

これは 4 番目の属性である「マーク」です。

マークはポジションの位置を記録することができます。 reset() メソッドを使用してマーク位置に戻ることができます。

  1. @テスト
  2. パブリックボイドtest2() {
  3. // 10バイトを割り当てる
  4. 文字列 str = "abcde" ;
  5. ByteBuffer バッファ = ByteBuffer.allocate(10);
  6. buffer.put(str.getBytes(StandardCharsets.UTF_8));
  7.  
  8. // 読み取りモードに切り替えて2バイト読み取ります
  9. バッファを反転します。
  10. byte[] dest = 新しいbyte[buffer.limit()];
  11. バッファを取得します(宛先、0、2);
  12. システム。出力.println(新しいString(dest, 0, 2));
  13. システム。出力.println(buffer.position());
  14.  
  15. // 現在の位置をマークする
  16. バッファをマークします。
  17.  
  18. // さらに2バイトを読み込む
  19. バッファを取得します(宛先, 2, 2);
  20. システム。出力.println(new String(dest, 2, 2));
  21. システム。出力.println(buffer.position());
  22.  
  23. // リセット、マーク位置に戻る
  24. バッファをリセットします。
  25. システム。出力.println(buffer.position());
  26. }
  27.  
  28. 実行結果:
  29.  
  30. ```TeX
  31. アブ
  32. 2
  33. CD
  34. 4
  35. 2

2. チャネル、バッファ、セレクタを使用してネットワークプログラムを完成させる

(1)サーバー

  1. @テスト
  2. パブリックvoid testServer()はIOExceptionをスローします{
  3. ServerSocketChannel serverSocketChannel = ServerSocketChannel。開ける();
  4. serverSocketChannel.configureBlocking( false );
  5.  
  6. serverSocketChannel.bind(新しいInetSocketAddress(8989));
  7.  
  8. セレクター セレクター = セレクター。開ける();
  9. serverSocketChannel.register(セレクタ、SelectionKey.OP_ACCEPT);
  10.  
  11. while (セレクタ.select () > 0) {
  12. イテレータ<SelectionKey> iterator = selector.selectedKeys().iterator();
  13. (イテレータ.hasNext()) の間 {
  14. 選択キーキー= iterator.next ( );
  15. if (キー.isAcceptable()) {
  16. ソケットチャネル socketChannel = serverSocketChannel.accept();
  17. socketChannel.configureBlocking( false );
  18. socketChannel.register(セレクタ、SelectionKey.OP_READ);
  19. }それ以外の場合 (キー.isReadable()) {
  20. SocketChannel チャネル = (SocketChannel)キー.channel();
  21. バイトバッファ byteBuffer = ByteBuffer.allocate(1024);
  22. さ = 0;
  23. while ((len = channel. read (byteBuffer)) > 0) {
  24. バイトバッファを反転します。
  25. システム。出力.println(新しいString(byteBuffer.array(), 0, len));
  26. バイトバッファをクリアします。
  27. }
  28. }
  29. }
  30.  
  31. イテレータを削除します。
  32. }
  33. }

1. まず、ServerSocketChannel.open() を使用してチャネルを開き、非ブロッキング モードに設定します。

2. ポート8989にバインドします。

3. セレクターにチャネルを登録します。

4. while ループで、セレクターにイベントがあるかどうかを確認します。イベントがクライアントの接続イベントである場合は、SocketChannel を開き、非ブロッキング モードで登録し、セレクターにデータ読み取りイベントを登録します。

5. クライアントがデータを送信すると、チャネルを開いてバッファ内のデータを読み取ることができます。

6. このとき、サーバーは複数のクライアントからのリクエストを同時に受け入れることができます。

(2)クライアント

  1. @テスト
  2. パブリックvoid testClient()はIOExceptionをスローします{
  3. SocketChannel socketChannel = SocketChannel。新しいInetSocketAddress( "127.0.0.1" , 8989)を開きます
  4. socketChannel.configureBlocking( false );
  5.  
  6. バイトバッファ byteBuffer = ByteBuffer.allocate(1024);
  7. byteBuffer.put(新しいDate ().toString().getBytes(StandardCharsets.UTF_8));
  8. バイトバッファを反転します。
  9. socketChannel.write(byteBuffer);
  10. バイトバッファをクリアします。
  11.  
  12. socketChannel.close () ;
  13.  
  14. }

1. クライアントは SocketChannel を開き、非ブロッキング モードで構成します。

2. ByteBuffer を使用してデータを送信します (送信前に反転する必要があることに注意してください)。

3. チャネルを閉じます。

IV.結論

今回は、Java NIO、チャネル、バッファ、セレクターのいくつかの主要な概念を予備的に調べました。

しかし、これは氷山の一角に過ぎないことを知っておく必要があります。チャネルとセレクターをさらに深く理解したい場合は、基盤となるオペレーティング システムと多くのコンピューター構成の原則に関する知識が必要になります。

たとえば、セレクターには select、poll、epoll の概念が含まれます。これらの概念をさらに拡張すると、ハードウェア割り込みとカーネルに関する知識も必要になります。

そのため、学習は苦難の旅であるとますます感じるようになりました。

<<:  ガートナー:世界のパブリッククラウド支出は2021年に3,323億ドルに達する

>>:  Linux 仮想化 KVM-Qemu 分析 Vhost-Net

推薦する

宏源電信のデータセンター、vandweb Taiwan VPSの簡単なレビュー

vandweb.com は 2001 年に設立された台湾のホスティング会社です。その事業内容には、仮...

#推奨# VPS.net: 35% 割引、13 のコンピュータ ルーム、手間がかからず、ウェブサイト構築に信頼性あり/x

vps.net、とても有名ですが、知らない人も多いのではないでしょうか。vps.netのVPS(オプ...

SaaS セキュリティ: 現代のセキュリティ管理における新たな課題

[[408295]]大規模な組織のセキュリティ チームのメンバーに「SaaS セキュリティ」という話...

クラウド ネイティブ テクノロジー - マイクロサービスからサーバーレス サーバーレス アーキテクチャへの進化に関する考察

今日は、マイクロサービスから ServerLess サーバーレス アーキテクチャへの進化プロセスにつ...

ゲーム VPS の推奨、ゲームのロック解除、ネイティブ ローカル IP、ビデオ ストリーミング。

ゲームによってはIP制限が非常に厳しく、ゲーム会社が指定したエリアのIPが必要となります。このため、...

Pinterest でマクドナルドのマーケティングを成功させるための 7 つのヒント

Pinterest は急速に成長し、世界が注目する新しいスターになっています。そのニュースを聞いたマ...

クラウド意思決定の十戒のうち、どれに従いましたか?

過去数年間、私は多くの企業がクラウド サービスを完璧で安全かつ信頼できるものにできるよう支援してきま...

中央銀行:オンライン決済の制限に関する新たな規制は、短期的には発行・実施されない

中央銀行がオンライン決済の送金額を制限する新たな規制を制定する予定であるというニュースが明らかになる...

仮想化データセンター向けレイヤー2テクノロジー

仮想化されたデータ センターでは、物理サーバーが仮想マシン (VM) と呼ばれる複数の論理サーバーに...

A5トピック: 中小電子商取引企業は発展、解雇、閉鎖の困難に直面している

2012年上半期には、多くの大手電子商取引企業が「価格戦争」に参入し、中小電子商取引企業の市場シェア...

ウェブサイトのランキングを最適化する際に習得すべきヒント

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っていますウェブサイ...

ウェブサイトの Google PR 価値を高める方法

ウェブサーファーは、検索速度が速く、検索結果のヒット率が高いことから、Google を特に好んでいま...

クラウド ネイティブと従来のクラウド コンピューティングの違いは何ですか?

クラウド ネイティブはここ 2 年で突然人気が高まり、ソーシャル メディアで頻繁に言及されるようにな...

エンタープライズ サイトは、データに基づいてコンテンツに対するユーザーのニーズをどのように分析できるでしょうか?

あらゆる企業サイトにとって、ユーザーと検索エンジンに高品質のコンテンツを毎日提供することは必須のタス...

在庫 | 2020 年に注目を集めるクラウド コンピューティング スタートアップ 10 社

コロナウイルスのパンデミックにより、クラウドコンピューティングの発展がさらに促進され、この分野のスタ...