From 79abc9aee327952480f6af493003a2c328a1b37b Mon Sep 17 00:00:00 2001 From: sta Date: Thu, 3 Oct 2013 16:36:43 +0900 Subject: [PATCH] Modified broadcasting --- .../Server/WebSocketServiceHostManager.cs | 86 ++++++++++--- .../Server/WebSocketSessionManager.cs | 86 +++++++++---- websocket-sharp/WebSocket.cs | 113 ++++++++++++++---- 3 files changed, 223 insertions(+), 62 deletions(-) diff --git a/websocket-sharp/Server/WebSocketServiceHostManager.cs b/websocket-sharp/Server/WebSocketServiceHostManager.cs index 110b3970..fb8cee71 100644 --- a/websocket-sharp/Server/WebSocketServiceHostManager.cs +++ b/websocket-sharp/Server/WebSocketServiceHostManager.cs @@ -28,8 +28,10 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Text; +using System.Threading; using WebSocketSharp.Net; namespace WebSocketSharp.Server @@ -195,6 +197,59 @@ namespace WebSocketSharp.Server #region Private Methods + private void broadcast (Opcode opcode, byte [] data) + { + WaitCallback callback = state => + { + var cache = new Dictionary (); + try { + foreach (var host in ServiceHosts) + { + if (_state != ServerState.START) + break; + + host.Sessions.BroadcastInternally (opcode, data, cache); + } + } + catch (Exception ex) { + _logger.Fatal (ex.ToString ()); + } + finally { + cache.Clear (); + } + }; + + ThreadPool.QueueUserWorkItem (callback); + } + + private void broadcast (Opcode opcode, Stream stream) + { + WaitCallback callback = state => + { + var cache = new Dictionary (); + try { + foreach (var host in ServiceHosts) + { + if (_state != ServerState.START) + break; + + host.Sessions.BroadcastInternally (opcode, stream, cache); + } + } + catch (Exception ex) { + _logger.Fatal (ex.ToString ()); + } + finally { + foreach (var cached in cache.Values) + cached.Dispose (); + + cache.Clear (); + } + }; + + ThreadPool.QueueUserWorkItem (callback); + } + private Dictionary> broadping (byte [] frameAsBytes, int timeOut) { var result = new Dictionary> (); @@ -316,13 +371,10 @@ namespace WebSocketSharp.Server return; } - foreach (var host in ServiceHosts) - { - if (_state != ServerState.START) - break; - - host.Sessions.BroadcastInternally (data); - } + if (data.LongLength <= WebSocket.FragmentLength) + broadcast (Opcode.BINARY, data); + else + broadcast (Opcode.BINARY, new MemoryStream (data)); } /// @@ -341,13 +393,11 @@ namespace WebSocketSharp.Server return; } - foreach (var host in ServiceHosts) - { - if (_state != ServerState.START) - break; - - host.Sessions.BroadcastInternally (data); - } + var rawData = Encoding.UTF8.GetBytes (data); + if (rawData.LongLength <= WebSocket.FragmentLength) + broadcast (Opcode.TEXT, rawData); + else + broadcast (Opcode.TEXT, new MemoryStream (rawData)); } /// @@ -362,7 +412,7 @@ namespace WebSocketSharp.Server /// public void BroadcastTo (byte [] data, string servicePath) { - var msg = _state.CheckIfStarted () ?? data.CheckIfValidSendData () ?? servicePath.CheckIfValidServicePath (); + var msg = _state.CheckIfStarted () ?? servicePath.CheckIfValidServicePath (); if (msg != null) { _logger.Error (msg); @@ -376,7 +426,7 @@ namespace WebSocketSharp.Server return; } - host.Sessions.BroadcastInternally (data); + host.Sessions.Broadcast (data); } /// @@ -391,7 +441,7 @@ namespace WebSocketSharp.Server /// public void BroadcastTo (string data, string servicePath) { - var msg = _state.CheckIfStarted () ?? data.CheckIfValidSendData () ?? servicePath.CheckIfValidServicePath (); + var msg = _state.CheckIfStarted () ?? servicePath.CheckIfValidServicePath (); if (msg != null) { _logger.Error (msg); @@ -405,7 +455,7 @@ namespace WebSocketSharp.Server return; } - host.Sessions.BroadcastInternally (data); + host.Sessions.Broadcast (data); } /// diff --git a/websocket-sharp/Server/WebSocketSessionManager.cs b/websocket-sharp/Server/WebSocketSessionManager.cs index 437e2bf1..ac4f5afb 100644 --- a/websocket-sharp/Server/WebSocketSessionManager.cs +++ b/websocket-sharp/Server/WebSocketSessionManager.cs @@ -28,8 +28,10 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Text; +using System.Threading; using System.Timers; namespace WebSocketSharp.Server @@ -47,7 +49,7 @@ namespace WebSocketSharp.Server private Dictionary _sessions; private volatile ServerState _state; private volatile bool _sweeping; - private Timer _sweepTimer; + private System.Timers.Timer _sweepTimer; private object _sync; #endregion @@ -229,7 +231,7 @@ namespace WebSocketSharp.Server private void setSweepTimer (double interval) { - _sweepTimer = new Timer (interval); + _sweepTimer = new System.Timers.Timer (interval); _sweepTimer.Elapsed += (sender, e) => { Sweep (); @@ -254,32 +256,69 @@ namespace WebSocketSharp.Server } } - internal void BroadcastInternally (byte [] data) + internal void BroadcastInternally (Opcode opcode, byte [] data) { - var services = ServiceInstances.GetEnumerator (); - Action completed = null; - completed = () => + WaitCallback callback = state => { - if (_state == ServerState.START && services.MoveNext ()) - services.Current.SendAsync (data, completed); + var cache = new Dictionary (); + try { + BroadcastInternally (opcode, data, cache); + } + catch (Exception ex) { + _logger.Fatal (ex.ToString ()); + } + finally { + cache.Clear (); + } }; - if (_state == ServerState.START && services.MoveNext ()) - services.Current.SendAsync (data, completed); + ThreadPool.QueueUserWorkItem (callback); } - internal void BroadcastInternally (string data) + internal void BroadcastInternally (Opcode opcode, Stream stream) { - var services = ServiceInstances.GetEnumerator (); - Action completed = null; - completed = () => + WaitCallback callback = state => { - if (_state == ServerState.START && services.MoveNext ()) - services.Current.SendAsync (data, completed); + var cache = new Dictionary (); + try { + BroadcastInternally (opcode, stream, cache); + } + catch (Exception ex) { + _logger.Fatal (ex.ToString ()); + } + finally { + foreach (var cached in cache.Values) + cached.Dispose (); + + cache.Clear (); + } }; - if (_state == ServerState.START && services.MoveNext ()) - services.Current.SendAsync (data, completed); + ThreadPool.QueueUserWorkItem (callback); + } + + internal void BroadcastInternally ( + Opcode opcode, byte [] data, Dictionary cache) + { + foreach (var session in ServiceInstances) + { + if (_state != ServerState.START) + break; + + session.Context.WebSocket.Send (opcode, data, cache); + } + } + + internal void BroadcastInternally ( + Opcode opcode, Stream stream, Dictionary cache) + { + foreach (var session in ServiceInstances) + { + if (_state != ServerState.START) + break; + + session.Context.WebSocket.Send (opcode, stream, cache); + } } internal Dictionary BroadpingInternally () @@ -367,7 +406,10 @@ namespace WebSocketSharp.Server return; } - BroadcastInternally (data); + if (data.LongLength <= WebSocket.FragmentLength) + BroadcastInternally (Opcode.BINARY, data); + else + BroadcastInternally (Opcode.BINARY, new MemoryStream (data)); } /// @@ -385,7 +427,11 @@ namespace WebSocketSharp.Server return; } - BroadcastInternally (data); + var rawData = Encoding.UTF8.GetBytes (data); + if (rawData.LongLength <= WebSocket.FragmentLength) + BroadcastInternally (Opcode.TEXT, rawData); + else + BroadcastInternally (Opcode.TEXT, new MemoryStream (rawData)); } /// diff --git a/websocket-sharp/WebSocket.cs b/websocket-sharp/WebSocket.cs index e84533c3..13628dd5 100644 --- a/websocket-sharp/WebSocket.cs +++ b/websocket-sharp/WebSocket.cs @@ -58,9 +58,8 @@ namespace WebSocketSharp { #region Private Const Fields - private const int _fragmentLen = 1016; // Max value is int.MaxValue - 14. - private const string _guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - private const string _version = "13"; + private const string _guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + private const string _version = "13"; #endregion @@ -95,6 +94,12 @@ namespace WebSocketSharp #endregion + #region Internal Const Fields + + internal const int FragmentLength = 1016; // Max value is int.MaxValue - 14. + + #endregion + #region Private Constructors private WebSocket () @@ -1044,13 +1049,15 @@ namespace WebSocketSharp compressed = true; } - var length = data.Length; lock (_forSend) { - if (length <= _fragmentLen) - send (Fin.FINAL, opcode, data.ReadBytes ((int) length), compressed); + var mask = _client ? Mask.MASK : Mask.UNMASK; + var length = data.Length; + if (length <= FragmentLength) + send (WsFrame.CreateFrame ( + Fin.FINAL, opcode, mask, data.ReadBytes ((int) length), compressed)); else - sendFragmented (opcode, data, compressed); + sendFragmented (opcode, data, mask, compressed); } } catch (Exception ex) { @@ -1065,12 +1072,6 @@ namespace WebSocketSharp } } - private bool send (Fin fin, Opcode opcode, byte [] data, bool compressed) - { - return send ( - WsFrame.CreateFrame (fin, opcode, _client ? Mask.MASK : Mask.UNMASK, data, compressed)); - } - private void sendAsync (Opcode opcode, Stream stream, Action completed) { Action sender = send; @@ -1091,29 +1092,42 @@ namespace WebSocketSharp sender.BeginInvoke (opcode, stream, callback, null); } - private long sendFragmented (Opcode opcode, Stream stream, bool compressed) + private long sendFragmented (Opcode opcode, Stream stream, Mask mask, bool compressed) { var length = stream.Length; - var quo = length / _fragmentLen; - var rem = length % _fragmentLen; + var quo = length / FragmentLength; + var rem = length % FragmentLength; var count = rem == 0 ? quo - 2 : quo - 1; long readLen = 0; - var tmpLen = 0; - var buffer = new byte [_fragmentLen]; + int tmpLen = 0; + byte [] buffer = null; + + // Not fragmented + if (quo == 0) + { + buffer = new byte [rem]; + tmpLen = stream.Read (buffer, 0, buffer.Length); + if (send (WsFrame.CreateFrame (Fin.FINAL, opcode, mask, buffer, compressed))) + readLen = tmpLen; + + return readLen; + } + + buffer = new byte [FragmentLength]; // First - tmpLen = stream.Read (buffer, 0, _fragmentLen); - if (send (Fin.MORE, opcode, buffer, compressed)) - readLen += tmpLen; + tmpLen = stream.Read (buffer, 0, FragmentLength); + if (send (WsFrame.CreateFrame (Fin.MORE, opcode, mask, buffer, compressed))) + readLen = tmpLen; else return 0; // Mid for (long i = 0; i < count; i++) { - tmpLen = stream.Read (buffer, 0, _fragmentLen); - if (send (Fin.MORE, Opcode.CONT, buffer, compressed)) + tmpLen = stream.Read (buffer, 0, FragmentLength); + if (send (WsFrame.CreateFrame (Fin.MORE, Opcode.CONT, mask, buffer, compressed))) readLen += tmpLen; else return readLen; @@ -1123,7 +1137,7 @@ namespace WebSocketSharp if (rem != 0) buffer = new byte [rem]; tmpLen = stream.Read (buffer, 0, buffer.Length); - if (send (Fin.FINAL, Opcode.CONT, buffer, compressed)) + if (send (WsFrame.CreateFrame (Fin.FINAL, Opcode.CONT, mask, buffer, compressed))) readLen += tmpLen; return readLen; @@ -1274,6 +1288,57 @@ namespace WebSocketSharp _receivePong.WaitOne (timeOut); } + // As server, used to broadcast + internal void Send (Opcode opcode, byte [] data, Dictionary cache) + { + try { + byte [] cached; + if (!cache.TryGetValue (_compression, out cached)) + { + cached = WsFrame.CreateFrame ( + Fin.FINAL, + opcode, + Mask.UNMASK, + data.Compress (_compression), + _compression != CompressionMethod.NONE).ToByteArray (); + + cache.Add (_compression, cached); + } + + lock (_forSend) + { + send (cached); + } + } + catch (Exception ex) { + _logger.Fatal (ex.ToString ()); + error ("An exception has occured."); + } + } + + // As server, used to broadcast + internal void Send (Opcode opcode, Stream stream, Dictionary cache) + { + try { + Stream cached; + if (!cache.TryGetValue (_compression, out cached)) + { + cached = stream.Compress (_compression); + cache.Add (_compression, cached); + } + + lock (_forSend) + { + cached.Position = 0; + sendFragmented (opcode, cached, Mask.UNMASK, _compression != CompressionMethod.NONE); + } + } + catch (Exception ex) { + _logger.Fatal (ex.ToString ()); + error ("An exception has occured."); + } + } + #endregion #region Public Methods