分散WebSocketソリューションについてお話しましょう

分散WebSocketソリューションについてお話しましょう

序文

最近、自分でプロジェクトを構築しました。プロジェクト自体は非常にシンプルですが、メッセージリマインダーに WebSocket を使用する機能が含まれています。大まかな状況は次のようになります。

発行者はシステム内でメッセージを送信し、対応する部門のすべての人にメッセージをリアルタイムでプッシュします。

これがスタンドアロン アプリケーションである場合、部門 ID とユーザー ID を使用して一意のキーを形成し、アプリケーション サーバーとの WebSocket 永続接続を確立し、パブリッシャーから送信されたメッセージを受信できます。

しかし、実際にプロジェクトを本番環境に適用する場合、スタンドアロンのアプリケーションをデプロイすることはできず、クラスターをデプロイする必要があります。

[[343393]]

そこで、テスト用に Nginx と 2 つの Tomcat を使用したシンプルな負荷分散クラスターを構築しました。

しかし、問題があります。クライアント ブラウザは、1 つのサーバーとの WebSocket の長時間接続のみを確立します。したがって、発行者がメッセージを送信した場合、対象部門のすべてのユーザーがメッセージを受信できることを保証する方法はありません (これらのユーザーが同じサーバーに接続されていない可能性があるため)。

この記事では、このような問題について説明し、解決策を提案します。もちろん、解決策は複数あるので、始めましょう。

WebSocketモノリシックアプリケーションの紹介

分散クラスターを紹介する前に、Prince の WebSocket コード実装を見てみましょう。まず、次の Java バックエンド コードを見てみましょう。

  1. javax.websocket.* をインポートします。
  2. javax.websocket.server.PathParam をインポートします。
  3. javax.websocket.server.ServerEndpoint をインポートします。
  4. com.alibaba.fastjson.JSON をインポートします。
  5. com.alibaba.fastjson.JSONObject をインポートします。java.io.IOException をインポートします。
  6. java.util.Map をインポートします。
  7. java.util.concurrent.ConcurrentHashMap をインポートします。
  8. @ServerEndpoint( "/webSocket/{キー}" )
  9. パブリッククラスWebSocket{
  10. プライベートスタティック  intオンラインカウント = 0;
  11. /**
  12. * 接続を保存するクライアント
  13. */
  14. プライベート静的Map<String, WebSocket> クライアント = 新しい ConcurrentHashMap<String, WebSocket>();
  15. プライベートセッションセッション。
  16. /**
  17. * 送信対象部門コード
  18. */
  19. 秘密の文字列キー;
  20. オープン中
  21. public void onOpen(@PathParam( "key" ) String key , Session session) はIOExceptionをスローします {
  22. this.key =キー;
  23. this.session = セッション;
  24. if (!clients.containsKey(キー)) {
  25. オンラインカウントを追加します。 } クライアント.put(キー、これ);
  26. Log.info( key + "メッセージ サービスに接続されました!" );
  27. } @閉じる時
  28. パブリックvoid onClose()はIOExceptionをスローします{
  29. クライアントを削除します(キー);サブオンラインカウント(); } @メッセージ時
  30. パブリックvoid onMessage(String message) は IOException をスローします {
  31. if (message.equals( "ping" )) {
  32. 戻る;
  33. } JSONObject jsonTo = JSON.parseObject(メッセージ);文字列 mes = (文字列) jsonTo.get( "message" );
  34. if (!jsonTo.get( "to" ).equals( "すべて" )){
  35. メッセージ送信先(mes、jsonTo.get( "to" ).toString());
  36. }それ以外{
  37. メッセージをすべて送信します(mes); } } @エラー時
  38. パブリックvoid onError(セッション セッション、Throwable error) {
  39. エラー.printStackTrace(); } プライベート void sendMessageTo(String message, String To ) は IOException をスローします {
  40. for (WebSocket アイテム: クライアント.values ()) {
  41. if (item. key . contains ( To ))の場合
  42. item.session.getAsyncRemote().sendText(メッセージ); } } プライベート void sendMessageAll(String message) は IOException をスローします {
  43. for (WebSocket アイテム: クライアント.values ()) {
  44. item.session.getAsyncRemote().sendText(メッセージ); } }公共 静的同期int getOnlineCount() {
  45. onlineCountを返します
  46. }公共 静的同期void addOnlineCount() {
  47. WebSocket.onlineCount++; }公共 静的同期void subOnlineCount() {
  48. WebSocket.onlineCount --; } パブリック静的同期 Map<String, WebSocket> getClients() {  
  49. リピーター顧客
  50. }}

サンプル コードは Spring を使用しておらず、ネイティブ Java Web で記述されています。その中の手法を簡単に紹介します。

  • onOpen: クライアントがWebSocketサービスに接続したときにメソッドの実行をトリガーします
  • onClose: クライアントがWebSocket接続を切断したときにトリガーされます
  • onMessage: クライアントから送信されたメッセージを受信したときにトリガーされます
  • onError: エラーが発生したときにトリガーされます

ご覧のとおり、onMessage メソッドでは、クライアントから送信されたメッセージに基づいてメッセージを直接転送しますが、これは単一メッセージ サービスでは問題になりません。

もう一度jsコードを見てみましょう

  1. var ホスト = ドキュメント.場所.ホスト;
  2. // 現在ログインしている部門を取得します。 var deptCodes= '${sessionScope.$UserContext.departmentID}' ;
  3. deptコード=deptコード。置換(/[\[|\]|\s]+/g, "" );
  4. varキー= '${sessionScope.$UserContext.userID}' +deptCodes;
  5. var lockReconnect = false ; //wsの繰り返し接続を避ける
  6. var ws = null ; // 現在のブラウザが WebSocket をサポートしているかどうかを判断します var wsUrl = 'ws://' + host + '/webSocket/' + key ;
  7. WebSocket を作成します (wsUrl); //wsに接続function createWebSocket(url) {
  8. 試してください{ if( 'WebSocket'  ウィンドウ){
  9. ws = 新しい WebSocket(url); }そうでない場合は( 'MozWebSocket'  ウィンドウ){
  10. ws = 新しい MozWebSocket(url); }それ以外{
  11. layer.alert( "お使いのブラウザはWebSocketプロトコルをサポートしていません。Google、Firefoxなどの新しいバージョンを使用することをお勧めします。IE10より前のブラウザは使用しないでください。360ブラウザの場合は、互換モードではなく超高速モードを使用してください!" );
  12. } イベントハンドルを初期化します。 }catch(e){ 再接続(url);コンソールログ(e);
  13. } }関数initEventHandle() {
  14. ws.onclose =関数(){
  15. 再接続(wsUrl); console.log( "llws 接続が閉じられました!" + new Date ().toUTCString());
  16. }; ws.onerror =関数(){
  17. 再接続(wsUrl); console.log( "llws 接続エラー!" );
  18. }; ws.onopen =関数(){
  19. heartCheck.reset().start(); //ハートビート検出リセット console.log( "llws 接続が成功しました!" +new Date ().toUTCString());
  20. }; ws.onmessage = function (event) { //メッセージが受信されると、ハートビート検出がリセットされます
  21. heartCheck.reset().start(); // メッセージを受信した場合、現在の接続は正常です // メッセージを受信した後の実際の業務処理... }; } // ウィンドウの閉じるイベントをリッスンします。ウィンドウが閉じられるときは、接続が切断される前にウィンドウが閉じられないように、Websocket 接続を積極的に閉じます。そうしないと、サーバーは例外をスローします。 window.onbeforeunload =関数() {
  22. ws.close ();
  23. }関数reconnect(url) {
  24. if(lockReconnect)戻り値;
  25. ロック再接続 = true ;
  26. setTimeout( function () { //接続されていない場合は再接続し続けます。リクエストが多すぎるのを避けるために遅延を設定します。
  27. WebSocket を作成します (url);ロック再接続 = false ;
  28. }, 2000);
  29. } //ハートビート検出 var heartCheck = { timeout: 300000, //5分ごとにハートビートを送信
  30. timeoutObj: null 、serverTimeoutObj: null 、リセット: function (){
  31. タイムアウトをクリアします(this.timeoutObj);タイムアウトをクリアします(this.serverTimeoutObj);これを返します
  32. }, 開始:関数(){
  33. var self = this; this.timeoutObj = setTimeout(関数(){
  34. //ここでハートビート メッセージを送信します。バックエンドはそれを受信するとハートビート メッセージを返します。 //Onmessage は返されたハートビート メッセージを取得します。これは接続が正常であることを意味します。 ws.send( "ping" );
  35. console.log( "ping!" )
  36. self.serverTimeoutObj = setTimeout( function (){ //一定時間後にリセットされない場合は、バックエンドがアクティブに切断されていることを意味します
  37. ws.close (); // onclose が reconnect を実行する場合は、ws.close ( )を実行するだけです。 reconnect が直接実行されると、onclose がトリガーされ、再接続が 2 回実行されます。
  38. }, self.timeout) }, this.timeout) } }

js 部分はネイティブ H5 で記述されています。ブラウザの互換性を高めたい場合は、SockJS を使用することもできます。興味があればBaiduで検索してみてください。

次に、WebSocket の分散アーキテクチャのサポートを実装するために、コードを手動で最適化します。

解決策を考える

モノリシック アプリケーションのコード構造と、分散環境で WebSocket が直面する問題を理解したので、次はこの問題を解決する方法について考えてみましょう。

まずこの問題の根本的な原因を見てみましょう。

簡単に考えれば、1 つのアプリケーションにはサーバーが 1 つだけ存在し、すべてのクライアントがこのメッセージ サーバーに接続されていることがわかります。したがって、パブリッシャーがメッセージを送信すると、すべてのクライアントが実際にこのサーバーとの接続を確立し、グループ メッセージを直接送信できるようになります。

分散システムに切り替えた後、メッセージ サーバーが 2 つある場合、クライアントが Nginx によって負荷分散された後、一部のサーバーは 1 つのサーバーに接続し、他のサーバーは他のサーバーに接続します。したがって、パブリッシャーがメッセージを送信すると、そのメッセージはサーバーの 1 つにのみ送信され、このメッセージ サーバーは大量送信操作を実行できますが、問題は、他のサーバーがこれを認識しておらず、メッセージを送信できないことです。

これで、根本的な原因は、メッセージが生成されたときに 1 つのメッセージ サーバーのみがそれを認識できることであることがわかりました。そのため、他のメッセージ サーバーでもそれを認識できるようにする必要があります。それを感知すると、接続しているクライアントにグループ メッセージを送信できます。

では、この機能を実現するにはどのような方法を使用すればよいのでしょうか?王子はすぐにメッセージ ミドルウェアを導入し、そのパブリッシュ/サブスクライブ モデルを使用してすべてのメッセージ サーバーに通知することを思いつきました。

分散 WebSocket 問題を解決するために RabbitMQ を導入する

メッセージ ミドルウェアの選択に関しては、Prince はセットアップが比較的簡単で、強力な機能があり、グループ メッセージング機能のみを使用するため、RabbitMQ を選択しました。

RabbitMQ にはブロードキャスト モード (ファンアウト) があり、これを使用します。

まず、RabbitMQ 接続クラスを記述します。

  1. com.rabbitmq.client.Connectionをインポートします
  2. com.rabbitmq.client.ConnectionFactory をインポートします。
  3. java.io.IOException をインポートします。
  4. java.util.concurrent.TimeoutException をインポートします。
  5. パブリッククラスRabbitMQUtil {
  6. プライベートスタティック 繋がり 繋がり;
  7. /**
  8. * rabbitmqとの接続を確立する
  9. * @戻る 
  10. */
  11. 公共 静的 接続getConnection() {
  12. if (接続!= null &&接続.isOpen()) {
  13. 戻る 繋がり;
  14. } ConnectionFactory ファクトリー = 新しい ConnectionFactory();
  15. ファクトリー.setVirtualHost( "/" );
  16. ファクトリー.setHost( "192.168.220.110" ); // 仮想IPアドレスを使用する
  17. ファクトリーポートの設定(5672);
  18. factory.setUsername( "guest" );
  19. factory.setPassword( "ゲスト" );
  20. 試す {
  21. 接続= factory.newConnection();
  22. } キャッチ (IOException e) {
  23. e.printStackTrace();
  24. } キャッチ (TimeoutException e) {
  25. e.printStackTrace();
  26. }
  27. 戻る 繋がり;
  28. }
  29. }

このクラスについては特に言うことはありません。これは、MQ 接続を取得するための単なるファクトリ クラスです。

次に、私たちのアイデアによれば、サーバーが起動するたびに、MQ メッセージをリッスンするための MQ コンシューマーが作成されます。ここでの Prince のテストでは、次のように Servlet リスナーを使用します。

  1. javax.servlet.ServletContextEvent をインポートします。
  2. javax.servlet.ServletContextListener をインポートします。
  3. パブリッククラスInitListenerはServletContextListenerを実装します{
  4. @オーバーライド
  5. パブリックvoid contextInitialized(ServletContextEvent servletContextEvent) {
  6. WebSocket を初期化します。 } @オーバーライド
  7. パブリックvoid contextDestroyed(ServletContextEvent servletContextEvent) {
  8. }}

Web.xmlでリスナー情報を設定することを忘れないでください

  1. <?xml バージョン = "1.0"エンコーディング = "UTF-8" ?>
  2. <web-app xmlns= "http://xmlns.jcp.org/xml/ns/javaee"  
  3. xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"  
  4. xsi:schemaLocation= "http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"  
  5. バージョン = "4.0" >
  6. <リスナー>
  7. <リスナークラス>InitListener</リスナークラス>
  8. </リスナー>
  9. </ウェブアプリ>

MQ コンシューマー部分として WebSocket に init メソッドを追加する

  1. 公共  静的void init() {
  2. try {接続 接続= RabbitMQUtil.getConnection();チャネル channel = connection .createChannel(); //スイッチ宣言(パラメータ:スイッチ名、スイッチタイプ)
  3. channel.exchangeDeclare( "fanoutLogs" 、BuiltinExchangeType.FANOUT);
  4. //一時キューを取得する
  5. 文字列 queueName = channel.queueDeclare().getQueue(); //キューをスイッチにバインドします (パラメータ: キュー名、スイッチ名、routingKey は無視されます)
  6. チャネル.queueBind(キュー名、 "fanoutLogs" "" );
  7. // ここでは、DefaultConsumer の handleDelivery メソッドをオーバーライドします。送信時にメッセージを getByte() し、それを文字列に再構成する必要があるためです。
  8. コンシューマー consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, encoding, properties, body);
  9. 文字列メッセージ = 新しい文字列(本文、 "UTF-8" );
  10. システム。出力.println(メッセージ); //ここではWebSocketを使用して、メッセージコンテンツを通じて対応するクライアントにメッセージを送信できます。
  11. } }; // キューで消費されるメッセージを宣言します (パラメータ: キュー名、メッセージが自動的に確認されるかどうか、コンシューマー本体)
  12. チャネル.basicConsume(キュー名、 true 、コンシューマー);
  13. //ここで接続を閉じることはできません。消費メソッドを呼び出した後、消費者は常にrabbitMQに接続され、消費を待機します。
  14. } IOException e をキャッチします { e.printStackTrace(); } }

同時に、メッセージが受信されると、WebSocket を介して対応するクライアントに直接送信されるのではなく、MQ に送信されます。このように、複数のメッセージ サーバーが存在する場合、それらのサーバーはすべて MQ からメッセージを取得し、WebSocket を使用して取得したメッセージ コンテンツを対応するクライアントにプッシュします。

WebSocket の onMessage メソッドに次のコンテンツが追加されます。

  1. 試す {
  2. //接続を試みる
  3. 繋がり 接続= RabbitMQUtil.getConnection(); //チャンネルを作成してみる
  4. チャネル channel = connection .createChannel(); //スイッチを宣言します(パラメータ:スイッチ名、スイッチタイプ、ブロードキャストモード)
  5. チャネル.exchangeDeclare( "fanoutLogs" 、 BuiltinExchangeType.FANOUT);
  6. // メッセージの公開 (パラメータ: スイッチ名、ルーティングキー、無視。ブロードキャスト モードでは、プロデューサーはスイッチの名前とタイプを宣言するだけで済みます)
  7. channel.basicPublish( "fanoutLogs" , "" , null ,msg.getBytes( "UTF-8" ));
  8. System.out.println ( "メッセージを公開" ) ;
  9. チャネル。近い(); } キャッチ (IOException |TimeoutException e) {
  10. e.printStackTrace();
  11. }

追加したら、元の Websocket プッシュ コードを削除します。

これでソリューション全体が完成します。

要約する

この時点で、分散 WebSocket プッシュ メッセージの問題は解決しました。

主にRabbitMQを紹介しました。 RabbitMQ のパブリッシュ/サブスクライブ モードを通じて、各メッセージ サーバーは起動時にメッセージをサブスクライブします。どのメッセージ サーバーがメッセージを送信しても、そのメッセージは MQ に送信されます。このようにして、各メッセージ サーバーはメッセージの送信時刻を感知し、Websocket を介してクライアントに送信します。

これが一般的なプロセスです。これについて考えたことはありますか: RabbitMQ が数分間ダウンしてから再起動した場合、コンシューマーは RabbitMQ に再接続できますか?メッセージを正常に受信できますか?

実稼働環境では、この問題を考慮する必要があります。

ここでは、コンシューマーが自動再接続をサポートしていることがテストされているため、このアーキテクチャを安全に使用してこの問題を解決できます。

この記事はここで終わります。ぜひメッセージを残して、一緒に議論し、学び、一緒に進歩してください。

<<:  あまり知られていないが、非常に実用的な Docker 使用のヒント 10 選

>>:  Amazon SageMaker は、Xingzhe AI がゲーム コンテンツのフィルタリングで 96% の精度を達成するのを支援します

推薦する

企業がソフト記事マーケティングを実施する場合、どのような詳細を考慮すべきでしょうか?

ご存知のとおり、ソフトテキストマーケティングの役割と効果は、企業価値の向上、企業ブランドのアピール、...

テンセントクラウドの6つのエッジアベイラビリティゾーンは同日に開始され、新しいインフラストラクチャのレイアウトが引き続き加速している。

テンセントクラウドは12月17日、武漢、杭州、長沙、福州、済南、石家荘の6つの省都に位置するエッジア...

justhost.asia: 香港データセンターが 10Gbps 帯域幅にアップグレード、香港 VPS トラフィック無制限、20% 割引、月額 33 元から

justhost.asiaは、香港データセンターの帯域幅が200Gbpsにアップグレードされたことを...

ウェブサイト画像最適化の秘密

1. 画像に alt 属性が付いていると、検索エンジンがクロールする際に非常に役立ちます。 alt ...

Big Bird Grassroots SEO チュートリアル: タイトルとキーワードの書き方

第1章第2節 頭が大きくて雨でも心配ない --- タイトルの書き方ビッグピッグ: 「ヘッダーって何?...

マニュアルチュートリアル: centos6.x での LAMP 環境の構築

CentOSは非常にシンプルで直接的です: yum install -y httpd php php...

2015 Google サンフランシスコ旅行記

序文Google トップ コントリビューター サミットに参加してから 1 か月以上が経ち、これまでの...

自給自足のインターネットの例: コンテンツが王様であることが基盤であり、利益の最大化が目標である

安全を保つため、あるいは一時的に妥協するため、選択肢は異なりますが、目的は同じです。すべては発展のた...

360度検索と百度検索に関する意見

ちょうど今、自分の Web サイト (http://www.zcoos.com) をいじっていたとき...

hostodo: 9 月限定版 VPS、トラフィック 8TB、スポケーン\ラスベガス\マイアミ、年間 35 ドル - 1.5G メモリ/1 コア/25G NVMe

Hostodo は、9 月に安価な米国 VPS の特別オファー「期間限定オファー (9 月のみ)」を...

小米の飢餓マーケティングは費用対​​効果の排除に課題に直面

記者の陳立栄店舗を持たずインターネットのみで販売しているXiaomiの携帯電話販売台数は、2年間で0...

インターネット業界のクラウドへの道

ビジネスの急速な成長に伴い、インターネット業界では通常、急速なビジネスの変化と製品の急速な反復および...

flipperhost - $28/256m メモリ/4CPU/30g ハードディスク/3T トラフィック/QN3 コンピュータ ルーム

Flipperhostは半年以上見られなくなりました。HostCatに初めて登場したのは2009年で...

閉じ込められた獣か、それとも温かい水の中のカエルか?小規模ネットワーク企業の状況に関する簡単な議論

3つの超巨大企業と多くの強力な企業は私とは何の関係もありません。小さなインターネット企業の全体的な能...

Douban.com プロモーション戦略: ウェブマスターに無視される禁煙の場所

私は5年間Doubanのベテランユーザーであり、Doubanブランドのオンラインプロモーションに2年...