Modified broadcasting

This commit is contained in:
sta 2013-10-03 16:36:43 +09:00
parent 5d8de64d1d
commit 79abc9aee3
3 changed files with 223 additions and 62 deletions

View File

@ -28,8 +28,10 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading;
using WebSocketSharp.Net; using WebSocketSharp.Net;
namespace WebSocketSharp.Server namespace WebSocketSharp.Server
@ -195,6 +197,59 @@ namespace WebSocketSharp.Server
#region Private Methods #region Private Methods
private void broadcast (Opcode opcode, byte [] data)
{
WaitCallback callback = state =>
{
var cache = new Dictionary<CompressionMethod, byte []> ();
try {
foreach (var host in ServiceHosts)
{
if (_state != ServerState.START)
break;
host.Sessions.BroadcastInternally (opcode, data, cache);
}
}
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
}
finally {
cache.Clear ();
}
};
ThreadPool.QueueUserWorkItem (callback);
}
private void broadcast (Opcode opcode, Stream stream)
{
WaitCallback callback = state =>
{
var cache = new Dictionary<CompressionMethod, Stream> ();
try {
foreach (var host in ServiceHosts)
{
if (_state != ServerState.START)
break;
host.Sessions.BroadcastInternally (opcode, stream, cache);
}
}
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
}
finally {
foreach (var cached in cache.Values)
cached.Dispose ();
cache.Clear ();
}
};
ThreadPool.QueueUserWorkItem (callback);
}
private Dictionary<string, Dictionary<string, bool>> broadping (byte [] frameAsBytes, int timeOut) private Dictionary<string, Dictionary<string, bool>> broadping (byte [] frameAsBytes, int timeOut)
{ {
var result = new Dictionary<string, Dictionary<string, bool>> (); var result = new Dictionary<string, Dictionary<string, bool>> ();
@ -316,13 +371,10 @@ namespace WebSocketSharp.Server
return; return;
} }
foreach (var host in ServiceHosts) if (data.LongLength <= WebSocket.FragmentLength)
{ broadcast (Opcode.BINARY, data);
if (_state != ServerState.START) else
break; broadcast (Opcode.BINARY, new MemoryStream (data));
host.Sessions.BroadcastInternally (data);
}
} }
/// <summary> /// <summary>
@ -341,13 +393,11 @@ namespace WebSocketSharp.Server
return; return;
} }
foreach (var host in ServiceHosts) var rawData = Encoding.UTF8.GetBytes (data);
{ if (rawData.LongLength <= WebSocket.FragmentLength)
if (_state != ServerState.START) broadcast (Opcode.TEXT, rawData);
break; else
broadcast (Opcode.TEXT, new MemoryStream (rawData));
host.Sessions.BroadcastInternally (data);
}
} }
/// <summary> /// <summary>
@ -362,7 +412,7 @@ namespace WebSocketSharp.Server
/// </param> /// </param>
public void BroadcastTo (byte [] data, string servicePath) public void BroadcastTo (byte [] data, string servicePath)
{ {
var msg = _state.CheckIfStarted () ?? data.CheckIfValidSendData () ?? servicePath.CheckIfValidServicePath (); var msg = _state.CheckIfStarted () ?? servicePath.CheckIfValidServicePath ();
if (msg != null) if (msg != null)
{ {
_logger.Error (msg); _logger.Error (msg);
@ -376,7 +426,7 @@ namespace WebSocketSharp.Server
return; return;
} }
host.Sessions.BroadcastInternally (data); host.Sessions.Broadcast (data);
} }
/// <summary> /// <summary>
@ -391,7 +441,7 @@ namespace WebSocketSharp.Server
/// </param> /// </param>
public void BroadcastTo (string data, string servicePath) public void BroadcastTo (string data, string servicePath)
{ {
var msg = _state.CheckIfStarted () ?? data.CheckIfValidSendData () ?? servicePath.CheckIfValidServicePath (); var msg = _state.CheckIfStarted () ?? servicePath.CheckIfValidServicePath ();
if (msg != null) if (msg != null)
{ {
_logger.Error (msg); _logger.Error (msg);
@ -405,7 +455,7 @@ namespace WebSocketSharp.Server
return; return;
} }
host.Sessions.BroadcastInternally (data); host.Sessions.Broadcast (data);
} }
/// <summary> /// <summary>

View File

@ -28,8 +28,10 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading;
using System.Timers; using System.Timers;
namespace WebSocketSharp.Server namespace WebSocketSharp.Server
@ -47,7 +49,7 @@ namespace WebSocketSharp.Server
private Dictionary<string, WebSocketService> _sessions; private Dictionary<string, WebSocketService> _sessions;
private volatile ServerState _state; private volatile ServerState _state;
private volatile bool _sweeping; private volatile bool _sweeping;
private Timer _sweepTimer; private System.Timers.Timer _sweepTimer;
private object _sync; private object _sync;
#endregion #endregion
@ -229,7 +231,7 @@ namespace WebSocketSharp.Server
private void setSweepTimer (double interval) private void setSweepTimer (double interval)
{ {
_sweepTimer = new Timer (interval); _sweepTimer = new System.Timers.Timer (interval);
_sweepTimer.Elapsed += (sender, e) => _sweepTimer.Elapsed += (sender, e) =>
{ {
Sweep (); Sweep ();
@ -254,32 +256,69 @@ namespace WebSocketSharp.Server
} }
} }
internal void BroadcastInternally (byte [] data) internal void BroadcastInternally (Opcode opcode, byte [] data)
{ {
var services = ServiceInstances.GetEnumerator (); WaitCallback callback = state =>
Action completed = null;
completed = () =>
{ {
if (_state == ServerState.START && services.MoveNext ()) var cache = new Dictionary<CompressionMethod, byte []> ();
services.Current.SendAsync (data, completed); try {
BroadcastInternally (opcode, data, cache);
}
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
}
finally {
cache.Clear ();
}
}; };
if (_state == ServerState.START && services.MoveNext ()) ThreadPool.QueueUserWorkItem (callback);
services.Current.SendAsync (data, completed);
} }
internal void BroadcastInternally (string data) internal void BroadcastInternally (Opcode opcode, Stream stream)
{ {
var services = ServiceInstances.GetEnumerator (); WaitCallback callback = state =>
Action completed = null;
completed = () =>
{ {
if (_state == ServerState.START && services.MoveNext ()) var cache = new Dictionary <CompressionMethod, Stream> ();
services.Current.SendAsync (data, completed); try {
BroadcastInternally (opcode, stream, cache);
}
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
}
finally {
foreach (var cached in cache.Values)
cached.Dispose ();
cache.Clear ();
}
}; };
if (_state == ServerState.START && services.MoveNext ()) ThreadPool.QueueUserWorkItem (callback);
services.Current.SendAsync (data, completed); }
internal void BroadcastInternally (
Opcode opcode, byte [] data, Dictionary<CompressionMethod, byte []> cache)
{
foreach (var session in ServiceInstances)
{
if (_state != ServerState.START)
break;
session.Context.WebSocket.Send (opcode, data, cache);
}
}
internal void BroadcastInternally (
Opcode opcode, Stream stream, Dictionary <CompressionMethod, Stream> cache)
{
foreach (var session in ServiceInstances)
{
if (_state != ServerState.START)
break;
session.Context.WebSocket.Send (opcode, stream, cache);
}
} }
internal Dictionary<string, bool> BroadpingInternally () internal Dictionary<string, bool> BroadpingInternally ()
@ -367,7 +406,10 @@ namespace WebSocketSharp.Server
return; return;
} }
BroadcastInternally (data); if (data.LongLength <= WebSocket.FragmentLength)
BroadcastInternally (Opcode.BINARY, data);
else
BroadcastInternally (Opcode.BINARY, new MemoryStream (data));
} }
/// <summary> /// <summary>
@ -385,7 +427,11 @@ namespace WebSocketSharp.Server
return; return;
} }
BroadcastInternally (data); var rawData = Encoding.UTF8.GetBytes (data);
if (rawData.LongLength <= WebSocket.FragmentLength)
BroadcastInternally (Opcode.TEXT, rawData);
else
BroadcastInternally (Opcode.TEXT, new MemoryStream (rawData));
} }
/// <summary> /// <summary>

View File

@ -58,9 +58,8 @@ namespace WebSocketSharp
{ {
#region Private Const Fields #region Private Const Fields
private const int _fragmentLen = 1016; // Max value is int.MaxValue - 14. private const string _guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
private const string _guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; private const string _version = "13";
private const string _version = "13";
#endregion #endregion
@ -95,6 +94,12 @@ namespace WebSocketSharp
#endregion #endregion
#region Internal Const Fields
internal const int FragmentLength = 1016; // Max value is int.MaxValue - 14.
#endregion
#region Private Constructors #region Private Constructors
private WebSocket () private WebSocket ()
@ -1044,13 +1049,15 @@ namespace WebSocketSharp
compressed = true; compressed = true;
} }
var length = data.Length;
lock (_forSend) lock (_forSend)
{ {
if (length <= _fragmentLen) var mask = _client ? Mask.MASK : Mask.UNMASK;
send (Fin.FINAL, opcode, data.ReadBytes ((int) length), compressed); var length = data.Length;
if (length <= FragmentLength)
send (WsFrame.CreateFrame (
Fin.FINAL, opcode, mask, data.ReadBytes ((int) length), compressed));
else else
sendFragmented (opcode, data, compressed); sendFragmented (opcode, data, mask, compressed);
} }
} }
catch (Exception ex) { catch (Exception ex) {
@ -1065,12 +1072,6 @@ namespace WebSocketSharp
} }
} }
private bool send (Fin fin, Opcode opcode, byte [] data, bool compressed)
{
return send (
WsFrame.CreateFrame (fin, opcode, _client ? Mask.MASK : Mask.UNMASK, data, compressed));
}
private void sendAsync (Opcode opcode, Stream stream, Action completed) private void sendAsync (Opcode opcode, Stream stream, Action completed)
{ {
Action<Opcode, Stream> sender = send; Action<Opcode, Stream> sender = send;
@ -1091,29 +1092,42 @@ namespace WebSocketSharp
sender.BeginInvoke (opcode, stream, callback, null); sender.BeginInvoke (opcode, stream, callback, null);
} }
private long sendFragmented (Opcode opcode, Stream stream, bool compressed) private long sendFragmented (Opcode opcode, Stream stream, Mask mask, bool compressed)
{ {
var length = stream.Length; var length = stream.Length;
var quo = length / _fragmentLen; var quo = length / FragmentLength;
var rem = length % _fragmentLen; var rem = length % FragmentLength;
var count = rem == 0 ? quo - 2 : quo - 1; var count = rem == 0 ? quo - 2 : quo - 1;
long readLen = 0; long readLen = 0;
var tmpLen = 0; int tmpLen = 0;
var buffer = new byte [_fragmentLen]; byte [] buffer = null;
// Not fragmented
if (quo == 0)
{
buffer = new byte [rem];
tmpLen = stream.Read (buffer, 0, buffer.Length);
if (send (WsFrame.CreateFrame (Fin.FINAL, opcode, mask, buffer, compressed)))
readLen = tmpLen;
return readLen;
}
buffer = new byte [FragmentLength];
// First // First
tmpLen = stream.Read (buffer, 0, _fragmentLen); tmpLen = stream.Read (buffer, 0, FragmentLength);
if (send (Fin.MORE, opcode, buffer, compressed)) if (send (WsFrame.CreateFrame (Fin.MORE, opcode, mask, buffer, compressed)))
readLen += tmpLen; readLen = tmpLen;
else else
return 0; return 0;
// Mid // Mid
for (long i = 0; i < count; i++) for (long i = 0; i < count; i++)
{ {
tmpLen = stream.Read (buffer, 0, _fragmentLen); tmpLen = stream.Read (buffer, 0, FragmentLength);
if (send (Fin.MORE, Opcode.CONT, buffer, compressed)) if (send (WsFrame.CreateFrame (Fin.MORE, Opcode.CONT, mask, buffer, compressed)))
readLen += tmpLen; readLen += tmpLen;
else else
return readLen; return readLen;
@ -1123,7 +1137,7 @@ namespace WebSocketSharp
if (rem != 0) if (rem != 0)
buffer = new byte [rem]; buffer = new byte [rem];
tmpLen = stream.Read (buffer, 0, buffer.Length); tmpLen = stream.Read (buffer, 0, buffer.Length);
if (send (Fin.FINAL, Opcode.CONT, buffer, compressed)) if (send (WsFrame.CreateFrame (Fin.FINAL, Opcode.CONT, mask, buffer, compressed)))
readLen += tmpLen; readLen += tmpLen;
return readLen; return readLen;
@ -1274,6 +1288,57 @@ namespace WebSocketSharp
_receivePong.WaitOne (timeOut); _receivePong.WaitOne (timeOut);
} }
// As server, used to broadcast
internal void Send (Opcode opcode, byte [] data, Dictionary<CompressionMethod, byte []> cache)
{
try {
byte [] cached;
if (!cache.TryGetValue (_compression, out cached))
{
cached = WsFrame.CreateFrame (
Fin.FINAL,
opcode,
Mask.UNMASK,
data.Compress (_compression),
_compression != CompressionMethod.NONE).ToByteArray ();
cache.Add (_compression, cached);
}
lock (_forSend)
{
send (cached);
}
}
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
error ("An exception has occured.");
}
}
// As server, used to broadcast
internal void Send (Opcode opcode, Stream stream, Dictionary <CompressionMethod, Stream> cache)
{
try {
Stream cached;
if (!cache.TryGetValue (_compression, out cached))
{
cached = stream.Compress (_compression);
cache.Add (_compression, cached);
}
lock (_forSend)
{
cached.Position = 0;
sendFragmented (opcode, cached, Mask.UNMASK, _compression != CompressionMethod.NONE);
}
}
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
error ("An exception has occured.");
}
}
#endregion #endregion
#region Public Methods #region Public Methods