序文最近、自分でプロジェクトを構築しました。プロジェクト自体は非常にシンプルですが、メッセージリマインダーに WebSocket を使用する機能が含まれています。大まかな状況は次のようになります。 発行者はシステム内でメッセージを送信し、対応する部門のすべての人にメッセージをリアルタイムでプッシュします。 これがスタンドアロン アプリケーションである場合、部門 ID とユーザー ID を使用して一意のキーを形成し、アプリケーション サーバーとの WebSocket 永続接続を確立し、パブリッシャーから送信されたメッセージを受信できます。 しかし、実際にプロジェクトを本番環境に適用する場合、スタンドアロンのアプリケーションをデプロイすることはできず、クラスターをデプロイする必要があります。 [[343393]] そこで、テスト用に Nginx と 2 つの Tomcat を使用したシンプルな負荷分散クラスターを構築しました。 しかし、問題があります。クライアント ブラウザは、1 つのサーバーとの WebSocket の長時間接続のみを確立します。したがって、発行者がメッセージを送信した場合、対象部門のすべてのユーザーがメッセージを受信できることを保証する方法はありません (これらのユーザーが同じサーバーに接続されていない可能性があるため)。 この記事では、このような問題について説明し、解決策を提案します。もちろん、解決策は複数あるので、始めましょう。 WebSocketモノリシックアプリケーションの紹介分散クラスターを紹介する前に、Prince の WebSocket コード実装を見てみましょう。まず、次の Java バックエンド コードを見てみましょう。 - javax.websocket.* をインポートします。
- javax.websocket.server.PathParam をインポートします。
- javax.websocket.server.ServerEndpoint をインポートします。
- com.alibaba.fastjson.JSON をインポートします。
- com.alibaba.fastjson.JSONObject をインポートします。java.io.IOException をインポートします。
- java.util.Map をインポートします。
- java.util.concurrent.ConcurrentHashMap をインポートします。
- @ServerEndpoint( "/webSocket/{キー}" )
- パブリッククラスWebSocket{
- プライベートスタティック intオンラインカウント = 0;
- /**
- * 接続を保存するクライアント
- */
- プライベート静的Map<String, WebSocket> クライアント = 新しい ConcurrentHashMap<String, WebSocket>();
- プライベートセッションセッション。
- /**
- * 送信対象部門コード
- */
- 秘密の文字列キー;
- オープン中
- public void onOpen(@PathParam( "key" ) String key , Session session) はIOExceptionをスローします {
- this.key =キー;
- this.session = セッション;
- if (!clients.containsKey(キー)) {
- オンラインカウントを追加します。 } クライアント.put(キー、これ);
- Log.info( key + "メッセージ サービスに接続されました!" );
- } @閉じる時
- パブリックvoid onClose()はIOExceptionをスローします{
- クライアントを削除します(キー);サブオンラインカウント(); } @メッセージ時
- パブリックvoid onMessage(String message) は IOException をスローします {
- if (message.equals( "ping" )) {
- 戻る;
- } JSONObject jsonTo = JSON.parseObject(メッセージ);文字列 mes = (文字列) jsonTo.get( "message" );
- if (!jsonTo.get( "to" ).equals( "すべて" )){
- メッセージ送信先(mes、jsonTo.get( "to" ).toString());
- }それ以外{
- メッセージをすべて送信します(mes); } } @エラー時
- パブリックvoid onError(セッション セッション、Throwable error) {
- エラー.printStackTrace(); } プライベート void sendMessageTo(String message, String To ) は IOException をスローします {
- for (WebSocket アイテム: クライアント.values ()) {
- if (item. key . contains ( To ))の場合
- item.session.getAsyncRemote().sendText(メッセージ); } } プライベート void sendMessageAll(String message) は IOException をスローします {
- for (WebSocket アイテム: クライアント.values ()) {
- item.session.getAsyncRemote().sendText(メッセージ); } }公共 静的同期int getOnlineCount() {
- onlineCountを返します。
- }公共 静的同期void addOnlineCount() {
- WebSocket.onlineCount++; }公共 静的同期void subOnlineCount() {
- WebSocket.onlineCount
- リピーター顧客
- }}
サンプル コードは Spring を使用しておらず、ネイティブ Java Web で記述されています。その中の手法を簡単に紹介します。 - onOpen: クライアントがWebSocketサービスに接続したときにメソッドの実行をトリガーします
- onClose: クライアントがWebSocket接続を切断したときにトリガーされます
- onMessage: クライアントから送信されたメッセージを受信したときにトリガーされます
- onError: エラーが発生したときにトリガーされます
ご覧のとおり、onMessage メソッドでは、クライアントから送信されたメッセージに基づいてメッセージを直接転送しますが、これは単一メッセージ サービスでは問題になりません。 もう一度jsコードを見てみましょう - var ホスト = ドキュメント.場所.ホスト;
- // 現在ログインしている部門を取得します。 var deptCodes= '${sessionScope.$UserContext.departmentID}' ;
- deptコード=deptコード。置換(/[\[|\]|\s]+/g, "" );
- varキー= '${sessionScope.$UserContext.userID}' +deptCodes;
- var lockReconnect = false ; //wsの繰り返し接続を避ける
- var ws = null ; // 現在のブラウザが WebSocket をサポートしているかどうかを判断します var wsUrl = 'ws://' + host + '/webSocket/' + key ;
- WebSocket を作成します (wsUrl); //wsに接続function createWebSocket(url) {
- 試してください{ if( 'WebSocket' ウィンドウ内){
- ws = 新しい WebSocket(url); }そうでない場合は( 'MozWebSocket' ウィンドウ内){
- ws = 新しい MozWebSocket(url); }それ以外{
- layer.alert( "お使いのブラウザはWebSocketプロトコルをサポートしていません。Google、Firefoxなどの新しいバージョンを使用することをお勧めします。IE10より前のブラウザは使用しないでください。360ブラウザの場合は、互換モードではなく超高速モードを使用してください!" );
- } イベントハンドルを初期化します。 }catch(e){ 再接続(url);コンソールログ(e);
- } }関数initEventHandle() {
- ws.onclose =関数(){
- 再接続(wsUrl); console.log( "llws 接続が閉じられました!" + new Date ().toUTCString());
- }; ws.onerror =関数(){
- 再接続(wsUrl); console.log( "llws 接続エラー!" );
- }; ws.onopen =関数(){
- heartCheck.reset().start(); //ハートビート検出リセット console.log( "llws 接続が成功しました!" +new Date ().toUTCString());
- }; ws.onmessage = function (event) { //メッセージが受信されると、ハートビート検出がリセットされます
- heartCheck.reset().start(); // メッセージを受信した場合、現在の接続は正常です // メッセージを受信した後の実際の業務処理... }; } // ウィンドウの閉じるイベントをリッスンします。ウィンドウが閉じられるときは、接続が切断される前にウィンドウが閉じられないように、Websocket 接続を積極的に閉じます。そうしないと、サーバーは例外をスローします。 window.onbeforeunload =関数() {
- ws.close ();
- }関数reconnect(url) {
- if(lockReconnect)戻り値;
- ロック再接続 = true ;
- setTimeout( function () { //接続されていない場合は再接続し続けます。リクエストが多すぎるのを避けるために遅延を設定します。
- WebSocket を作成します (url);ロック再接続 = false ;
- }, 2000);
- } //ハートビート検出 var heartCheck = { timeout: 300000, //5分ごとにハートビートを送信
- timeoutObj: null 、serverTimeoutObj: null 、リセット: function (){
- タイムアウトをクリアします(this.timeoutObj);タイムアウトをクリアします(this.serverTimeoutObj);これを返します。
- }, 開始:関数(){
- var self = this; this.timeoutObj = setTimeout(関数(){
- //ここでハートビート メッセージを送信します。バックエンドはそれを受信するとハートビート メッセージを返します。 //Onmessage は返されたハートビート メッセージを取得します。これは接続が正常であることを意味します。 ws.send( "ping" );
- console.log( "ping!" )
- self.serverTimeoutObj = setTimeout( function (){ //一定時間後にリセットされない場合は、バックエンドがアクティブに切断されていることを意味します
- ws.close (); // onclose が reconnect を実行する場合は、ws.close ( )を実行するだけです。 reconnect が直接実行されると、onclose がトリガーされ、再接続が 2 回実行されます。
- }, self.timeout) }, this.timeout) } }
js 部分はネイティブ H5 で記述されています。ブラウザの互換性を高めたい場合は、SockJS を使用することもできます。興味があればBaiduで検索してみてください。 次に、WebSocket の分散アーキテクチャのサポートを実装するために、コードを手動で最適化します。 解決策を考えるモノリシック アプリケーションのコード構造と、分散環境で WebSocket が直面する問題を理解したので、次はこの問題を解決する方法について考えてみましょう。 まずこの問題の根本的な原因を見てみましょう。 簡単に考えれば、1 つのアプリケーションにはサーバーが 1 つだけ存在し、すべてのクライアントがこのメッセージ サーバーに接続されていることがわかります。したがって、パブリッシャーがメッセージを送信すると、すべてのクライアントが実際にこのサーバーとの接続を確立し、グループ メッセージを直接送信できるようになります。 分散システムに切り替えた後、メッセージ サーバーが 2 つある場合、クライアントが Nginx によって負荷分散された後、一部のサーバーは 1 つのサーバーに接続し、他のサーバーは他のサーバーに接続します。したがって、パブリッシャーがメッセージを送信すると、そのメッセージはサーバーの 1 つにのみ送信され、このメッセージ サーバーは大量送信操作を実行できますが、問題は、他のサーバーがこれを認識しておらず、メッセージを送信できないことです。 これで、根本的な原因は、メッセージが生成されたときに 1 つのメッセージ サーバーのみがそれを認識できることであることがわかりました。そのため、他のメッセージ サーバーでもそれを認識できるようにする必要があります。それを感知すると、接続しているクライアントにグループ メッセージを送信できます。 では、この機能を実現するにはどのような方法を使用すればよいのでしょうか?王子はすぐにメッセージ ミドルウェアを導入し、そのパブリッシュ/サブスクライブ モデルを使用してすべてのメッセージ サーバーに通知することを思いつきました。 分散 WebSocket 問題を解決するために RabbitMQ を導入するメッセージ ミドルウェアの選択に関しては、Prince はセットアップが比較的簡単で、強力な機能があり、グループ メッセージング機能のみを使用するため、RabbitMQ を選択しました。 RabbitMQ にはブロードキャスト モード (ファンアウト) があり、これを使用します。 まず、RabbitMQ 接続クラスを記述します。 - com.rabbitmq.client.Connectionをインポートします。
- com.rabbitmq.client.ConnectionFactory をインポートします。
- java.io.IOException をインポートします。
- java.util.concurrent.TimeoutException をインポートします。
- パブリッククラスRabbitMQUtil {
- プライベートスタティック 繋がり 繋がり;
- /**
- * rabbitmqとの接続を確立する
- * @戻る
- */
- 公共 静的 接続getConnection() {
- if (接続!= null &&接続.isOpen()) {
- 戻る 繋がり;
- } ConnectionFactory ファクトリー = 新しい ConnectionFactory();
- ファクトリー.setVirtualHost( "/" );
- ファクトリー.setHost( "192.168.220.110" ); // 仮想IPアドレスを使用する
- ファクトリーポートの設定(5672);
- factory.setUsername( "guest" );
- factory.setPassword( "ゲスト" );
- 試す {
- 接続= factory.newConnection();
- } キャッチ (IOException e) {
- e.printStackTrace();
- } キャッチ (TimeoutException e) {
- e.printStackTrace();
- }
- 戻る 繋がり;
- }
- }
このクラスについては特に言うことはありません。これは、MQ 接続を取得するための単なるファクトリ クラスです。 次に、私たちのアイデアによれば、サーバーが起動するたびに、MQ メッセージをリッスンするための MQ コンシューマーが作成されます。ここでの Prince のテストでは、次のように Servlet リスナーを使用します。 - javax.servlet.ServletContextEvent をインポートします。
- javax.servlet.ServletContextListener をインポートします。
- パブリッククラスInitListenerはServletContextListenerを実装します{
- @オーバーライド
- パブリックvoid contextInitialized(ServletContextEvent servletContextEvent) {
- WebSocket を初期化します。 } @オーバーライド
- パブリックvoid contextDestroyed(ServletContextEvent servletContextEvent) {
- }}
Web.xmlでリスナー情報を設定することを忘れないでください - <?xml バージョン = "1.0"エンコーディング = "UTF-8" ?>
- <web-app xmlns= "http://xmlns.jcp.org/xml/ns/javaee"
- xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation= "http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
- バージョン = "4.0" >
- <リスナー>
- <リスナークラス>InitListener</リスナークラス>
- </リスナー>
- </ウェブアプリ>
MQ コンシューマー部分として WebSocket に init メソッドを追加する - 公共 静的void init() {
- try {接続 接続= RabbitMQUtil.getConnection();チャネル channel = connection .createChannel(); //スイッチ宣言(パラメータ:スイッチ名、スイッチタイプ)
- channel.exchangeDeclare( "fanoutLogs" 、BuiltinExchangeType.FANOUT);
- //一時キューを取得する
- 文字列 queueName = channel.queueDeclare().getQueue(); //キューをスイッチにバインドします (パラメータ: キュー名、スイッチ名、routingKey は無視されます)
- チャネル.queueBind(キュー名、 "fanoutLogs" 、 "" );
- // ここでは、DefaultConsumer の handleDelivery メソッドをオーバーライドします。送信時にメッセージを getByte() し、それを文字列に再構成する必要があるためです。
- コンシューマー consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, encoding, properties, body);
- 文字列メッセージ = 新しい文字列(本文、 "UTF-8" );
- システム。出力.println(メッセージ); //ここではWebSocketを使用して、メッセージコンテンツを通じて対応するクライアントにメッセージを送信できます。
- } }; // キューで消費されるメッセージを宣言します (パラメータ: キュー名、メッセージが自動的に確認されるかどうか、コンシューマー本体)
- チャネル.basicConsume(キュー名、 true 、コンシューマー);
- //ここで接続を閉じることはできません。消費メソッドを呼び出した後、消費者は常にrabbitMQに接続され、消費を待機します。
- } IOException e をキャッチします { e.printStackTrace(); } }
同時に、メッセージが受信されると、WebSocket を介して対応するクライアントに直接送信されるのではなく、MQ に送信されます。このように、複数のメッセージ サーバーが存在する場合、それらのサーバーはすべて MQ からメッセージを取得し、WebSocket を使用して取得したメッセージ コンテンツを対応するクライアントにプッシュします。 WebSocket の onMessage メソッドに次のコンテンツが追加されます。 - 試す {
- //接続を試みる
- 繋がり 接続= RabbitMQUtil.getConnection(); //チャンネルを作成してみる
- チャネル channel = connection .createChannel(); //スイッチを宣言します(パラメータ:スイッチ名、スイッチタイプ、ブロードキャストモード)
- チャネル.exchangeDeclare( "fanoutLogs" 、 BuiltinExchangeType.FANOUT);
- // メッセージの公開 (パラメータ: スイッチ名、ルーティングキー、無視。ブロードキャスト モードでは、プロデューサーはスイッチの名前とタイプを宣言するだけで済みます)
- channel.basicPublish( "fanoutLogs" , "" , null ,msg.getBytes( "UTF-8" ));
- System.out.println ( "メッセージを公開" ) ;
- チャネル。近い(); } キャッチ (IOException |TimeoutException e) {
- e.printStackTrace();
- }
追加したら、元の Websocket プッシュ コードを削除します。 これでソリューション全体が完成します。 要約する この時点で、分散 WebSocket プッシュ メッセージの問題は解決しました。 主にRabbitMQを紹介しました。 RabbitMQ のパブリッシュ/サブスクライブ モードを通じて、各メッセージ サーバーは起動時にメッセージをサブスクライブします。どのメッセージ サーバーがメッセージを送信しても、そのメッセージは MQ に送信されます。このようにして、各メッセージ サーバーはメッセージの送信時刻を感知し、Websocket を介してクライアントに送信します。 これが一般的なプロセスです。これについて考えたことはありますか: RabbitMQ が数分間ダウンしてから再起動した場合、コンシューマーは RabbitMQ に再接続できますか?メッセージを正常に受信できますか? 実稼働環境では、この問題を考慮する必要があります。 ここでは、コンシューマーが自動再接続をサポートしていることがテストされているため、このアーキテクチャを安全に使用してこの問題を解決できます。 この記事はここで終わります。ぜひメッセージを残して、一緒に議論し、学び、一緒に進歩してください。 |