Fix for issue #10

This commit is contained in:
sta
2012-11-13 16:38:48 +09:00
parent 8aab7bebe0
commit 4424d1d809
63 changed files with 343 additions and 183 deletions

View File

@@ -67,17 +67,17 @@ namespace WebSocketSharp.Frame {
#region Public Constructors
public WsFrame(Opcode opcode, PayloadData payloadData)
: this(Fin.FINAL, opcode, payloadData)
: this(Fin.FINAL, opcode, payloadData)
{
}
public WsFrame(Fin fin, Opcode opcode, PayloadData payloadData)
: this(fin, opcode, Mask.MASK, payloadData)
: this(fin, opcode, Mask.MASK, payloadData)
{
}
public WsFrame(Fin fin, Opcode opcode, Mask mask, PayloadData payloadData)
: this()
: this()
{
Fin = fin;
Opcode = opcode;
@@ -104,16 +104,14 @@ namespace WebSocketSharp.Frame {
public ulong Length
{
get
{
get {
return 2 + (ulong)(ExtPayloadLen.Length + MaskingKey.Length) + PayloadLength;
}
}
public ulong PayloadLength
{
get
{
get {
return PayloadData.Length;
}
}
@@ -144,6 +142,31 @@ namespace WebSocketSharp.Frame {
PayloadData.Mask(key);
}
private static WsFrame parse(Stream stream, bool unmask)
{
return parse(stream.ReadBytes(2), stream, unmask);
}
private static WsFrame parse(byte[] header, Stream stream, bool unmask)
{
if (header.IsNull() || header.Length != 2)
return null;
try
{
var frame = readHeader(header);
readExtPayloadLen(stream, frame);
readMaskingKey(stream, frame);
readPayloadData(stream, frame, unmask);
return frame;
}
catch
{
return null;
}
}
private static void readExtPayloadLen(Stream stream, WsFrame frame)
{
var length = frame.PayloadLen <= 125
@@ -155,16 +178,13 @@ namespace WebSocketSharp.Frame {
var extLength = stream.ReadBytes(length);
if (extLength == null)
throw new IOException();
frame.ExtPayloadLen = extLength;
}
}
private static WsFrame readHeader(Stream stream)
private static WsFrame readHeader(byte[] header)
{
var header = stream.ReadBytes(2);
if (header == null)
return null;
// FIN
Fin fin = (header[0] & 0x80) == 0x80 ? Fin.FINAL : Fin.MORE;
// RSV1
@@ -187,7 +207,8 @@ namespace WebSocketSharp.Frame {
Rsv3 = rsv3,
Opcode = opcode,
Masked = masked,
PayloadLen = payloadLen};
PayloadLen = payloadLen
};
}
private static void readMaskingKey(Stream stream, WsFrame frame)
@@ -197,6 +218,7 @@ namespace WebSocketSharp.Frame {
var maskingKey = stream.ReadBytes(4);
if (maskingKey == null)
throw new IOException();
frame.MaskingKey = maskingKey;
}
}
@@ -284,15 +306,41 @@ namespace WebSocketSharp.Frame {
public static WsFrame Parse(Stream stream, bool unmask)
{
var frame = readHeader(stream);
if (frame == null)
return null;
return parse(stream, unmask);
}
readExtPayloadLen(stream, frame);
readMaskingKey(stream, frame);
readPayloadData(stream, frame, unmask);
public static void ParseAsync(Stream stream, Action<WsFrame> completed)
{
ParseAsync(stream, true, completed);
}
return frame;
public static void ParseAsync(Stream stream, bool unmask, Action<WsFrame> completed)
{
var headerLen = 2;
var header = new byte[headerLen];
AsyncCallback callback = (ar) =>
{
WsFrame frame;
try
{
var readLen = stream.EndRead(ar);
frame = readLen == 2
? parse(header, stream, unmask)
: null;
}
catch
{
frame = null;
}
finally
{
if (!completed.IsNull())
completed(frame);
}
};
stream.BeginRead(header, 0, headerLen, callback, null);
}
public void Print()

View File

@@ -55,7 +55,7 @@ namespace WebSocketSharp.Server {
_isStopped = false;
_isSweeping = false;
_sessions = new Dictionary<string, WebSocketService>();
_sweepTimer = new Timer(30 * 1000);
_sweepTimer = new Timer(60 * 1000);
_sweepTimer.Elapsed += (sender, e) =>
{
Sweep();
@@ -127,6 +127,56 @@ namespace WebSocketSharp.Server {
#region Private Methods
private void broadcast(byte[] data)
{
lock (_syncRoot)
{
foreach (var service in _sessions.Values)
service.Send(data);
}
}
private void broadcast(string data)
{
lock (_syncRoot)
{
foreach (var service in _sessions.Values)
service.Send(data);
}
}
private void broadcastAsync(byte[] data)
{
var sessions = copySessions();
var services = sessions.Values.GetEnumerator();
Action completed = null;
completed = () =>
{
if (services.MoveNext())
services.Current.SendAsync(data, completed);
};
if (services.MoveNext())
services.Current.SendAsync(data, completed);
}
private void broadcastAsync(string data)
{
var sessions = copySessions();
var services = sessions.Values.GetEnumerator();
Action completed = null;
completed = () =>
{
if (services.MoveNext())
services.Current.SendAsync(data, completed);
};
if (services.MoveNext())
services.Current.SendAsync(data, completed);
}
private Dictionary<string, WebSocketService> copySessions()
{
lock (_syncRoot)
@@ -172,20 +222,18 @@ namespace WebSocketSharp.Server {
public void Broadcast(byte[] data)
{
lock (_syncRoot)
{
foreach (var service in _sessions.Values)
service.Send(data);
}
if (_isStopped)
broadcast(data);
else
broadcastAsync(data);
}
public void Broadcast(string data)
{
lock (_syncRoot)
{
foreach (var service in _sessions.Values)
service.Send(data);
}
if (_isStopped)
broadcast(data);
else
broadcastAsync(data);
}
public Dictionary<string, bool> Broadping(string message)

View File

@@ -113,6 +113,20 @@ namespace WebSocketSharp.Server {
#endregion
#region Internal Methods
internal void SendAsync(byte[] data, Action completed)
{
_socket.SendAsync(data, completed);
}
internal void SendAsync(string data, Action completed)
{
_socket.SendAsync(data, completed);
}
#endregion
#region Public Methods
public void Bind(WebSocket socket, SessionManager sessions)

View File

@@ -610,6 +610,19 @@ namespace WebSocketSharp {
return true;
}
private bool isValidFrame(WsFrame frame)
{
if (frame.IsNull())
{
var msg = "The WebSocket frame can not be read from the network stream.";
close(CloseStatusCode.ABNORMAL, msg);
return false;
}
return true;
}
// As Server
private bool isValidRequest(RequestHandshake request, out string message)
{
@@ -704,35 +717,6 @@ namespace WebSocketSharp {
return true;
}
private void message()
{
try
{
onMessage(receive());
}
catch (WsReceivedTooBigMessageException ex)
{
close(CloseStatusCode.TOO_BIG, ex.Message);
}
catch (Exception)
{
close(CloseStatusCode.ABNORMAL, "An exception has occured.");
}
}
private void messageLoopCallback(IAsyncResult ar)
{
Action messageInvoker = (Action)ar.AsyncState;
messageInvoker.EndInvoke(ar);
if (_readyState == WsState.OPEN)
{
messageInvoker.BeginInvoke(messageLoopCallback, messageInvoker);
return;
}
_exitMessageLoop.Set();
}
private void onClose(CloseEventArgs eventArgs)
{
if (!Thread.CurrentThread.IsBackground)
@@ -764,7 +748,7 @@ namespace WebSocketSharp {
private void onOpen()
{
_readyState = WsState.OPEN;
startMessageThread();
startMessageLoop();
OnOpen.Emit(this, EventArgs.Empty);
}
@@ -799,25 +783,7 @@ namespace WebSocketSharp {
private WsFrame readFrame()
{
var frame = _wsStream.ReadFrame();
if (frame.IsNull())
{
var msg = "The WebSocket frame can not be read from network stream.";
close(CloseStatusCode.ABNORMAL, msg);
}
return frame;
}
private WsFrame readFrameWithTimeout(int millisecondsTimeout)
{
if (!_wsStream.DataAvailable)
{
Thread.Sleep(millisecondsTimeout);
if (!_wsStream.DataAvailable)
return null;
}
return readFrame();
return isValidFrame(frame) ? frame : null;
}
private string[] readHandshake()
@@ -825,10 +791,9 @@ namespace WebSocketSharp {
return _wsStream.ReadHandshake();
}
private MessageEventArgs receive()
private MessageEventArgs receive(WsFrame frame)
{
var frame = _isClient ? readFrame() : readFrameWithTimeout(1 * 100);
if (frame.IsNull())
if (!isValidFrame(frame))
return null;
if ((frame.Fin == Fin.FINAL && frame.Opcode == Opcode.CONT) ||
@@ -838,10 +803,9 @@ namespace WebSocketSharp {
if (frame.Fin == Fin.MORE)
{// MORE
var merged = receiveFragmented(frame);
if (merged.IsNull())
return null;
return new MessageEventArgs(frame.Opcode, new PayloadData(merged));
return !merged.IsNull()
? new MessageEventArgs(frame.Opcode, new PayloadData(merged))
: null;
}
if (frame.Opcode == Opcode.CLOSE)
@@ -1016,22 +980,25 @@ namespace WebSocketSharp {
{
lock (_forSend)
{
if (_readyState != WsState.OPEN)
try
{
var msg = "The WebSocket connection isn't established or has been closed.";
onError(msg);
return;
}
if (_readyState != WsState.OPEN)
{
var msg = "The WebSocket connection isn't established or has been closed.";
onError(msg);
return;
}
var length = stream.Length;
if (length <= _fragmentLen)
var length = stream.Length;
if (length <= _fragmentLen)
send(Fin.FINAL, opcode, stream.ReadBytes((int)length));
else
sendFragmented(opcode, stream);
}
catch (Exception ex)
{
var buffer = stream.ReadBytes((int)length);
send(Fin.FINAL, opcode, buffer);
return;
onError(ex.Message);
}
sendFragmented(opcode, stream);
}
}
@@ -1049,13 +1016,23 @@ namespace WebSocketSharp {
private void sendAsync(Opcode opcode, Stream stream, Action completed)
{
Action<Opcode, Stream> action = send;
AsyncCallback callback = null;
callback = (ar) =>
AsyncCallback callback = (ar) =>
{
action.EndInvoke(ar);
stream.Close();
if (!completed.IsNull())
completed();
try
{
action.EndInvoke(ar);
if (!completed.IsNull())
completed();
}
catch (Exception ex)
{
onError(ex.Message);
}
finally
{
stream.Close();
}
};
action.BeginInvoke(opcode, stream, callback, null);
@@ -1132,17 +1109,33 @@ namespace WebSocketSharp {
writeHandshake(response);
}
private void startMessageThread()
private void startMessageLoop()
{
_receivePong = new AutoResetEvent(false);
_exitMessageLoop = new AutoResetEvent(false);
Action messageInvoker = () =>
_receivePong = new AutoResetEvent(false);
Action<WsFrame> completed = null;
completed = (frame) =>
{
if (_readyState == WsState.OPEN)
message();
try
{
onMessage(receive(frame));
if (_readyState == WsState.OPEN)
_wsStream.ReadFrameAsync(completed);
else
_exitMessageLoop.Set();
}
catch (WsReceivedTooBigMessageException ex)
{
close(CloseStatusCode.TOO_BIG, ex.Message);
}
catch (Exception)
{
close(CloseStatusCode.ABNORMAL, "An exception has occured.");
}
};
messageInvoker.BeginInvoke(messageLoopCallback, messageInvoker);
_wsStream.ReadFrameAsync(completed);
}
private bool tryCreateUri(string uriString, out Uri result, out string message)
@@ -1365,6 +1358,15 @@ namespace WebSocketSharp {
}
}
/// <summary>
/// Sends a text data asynchronously using the connection.
/// </summary>
/// <param name="data">
/// A <see cref="string"/> that contains the text data to be sent.
/// </param>
/// <param name="completed">
/// An <see cref="Action"/> delegate that contains the method(s) that is called when an asynchronous operation completes.
/// </param>
public void SendAsync(string data, Action completed)
{
if (data.IsNull())
@@ -1377,6 +1379,15 @@ namespace WebSocketSharp {
sendAsync(Opcode.TEXT, buffer, completed);
}
/// <summary>
/// Sends a binary data asynchronously using the connection.
/// </summary>
/// <param name="data">
/// An array of <see cref="byte"/> that contains the binary data to be sent.
/// </param>
/// <param name="completed">
/// An <see cref="Action"/> delegate that contains the method(s) that is called when an asynchronous operation completes.
/// </param>
public void SendAsync(byte[] data, Action completed)
{
if (data.IsNull())
@@ -1388,6 +1399,15 @@ namespace WebSocketSharp {
sendAsync(Opcode.BINARY, data, completed);
}
/// <summary>
/// Sends a binary data asynchronously using the connection.
/// </summary>
/// <param name="file">
/// A <see cref="FileInfo"/> that contains the binary data to be sent.
/// </param>
/// <param name="completed">
/// An <see cref="Action"/> delegate that contains the method(s) that is called when an asynchronous operation completes.
/// </param>
public void SendAsync(FileInfo file, Action completed)
{
if (file.IsNull())

View File

@@ -37,9 +37,9 @@ using System.Text;
using WebSocketSharp.Frame;
using WebSocketSharp.Net.Security;
namespace WebSocketSharp
{
public class WsStream : IDisposable
namespace WebSocketSharp {
internal class WsStream : IDisposable
{
#region Fields
@@ -69,15 +69,16 @@ namespace WebSocketSharp
public bool DataAvailable {
get {
if (_innerStreamType == typeof(SslStream))
return ((SslStream)_innerStream).DataAvailable;
return ((NetworkStream)_innerStream).DataAvailable;
return _innerStreamType == typeof(SslStream)
? ((SslStream)_innerStream).DataAvailable
: ((NetworkStream)_innerStream).DataAvailable;
}
}
public bool IsSecure {
get { return _isSecure; }
get {
return _isSecure;
}
}
#endregion
@@ -96,6 +97,50 @@ namespace WebSocketSharp
_forWrite = new object();
}
private int read(byte[] buffer, int offset, int size)
{
var readLen = _innerStream.Read(buffer, offset, size);
if (readLen < size)
{
var msg = String.Format("Data can not be read from {0}.", _innerStreamType);
throw new IOException(msg);
}
return readLen;
}
private int readByte()
{
return _innerStream.ReadByte();
}
private string[] readHandshake()
{
var buffer = new List<byte>();
while (true)
{
if (readByte().EqualsAndSaveTo('\r', buffer) &&
readByte().EqualsAndSaveTo('\n', buffer) &&
readByte().EqualsAndSaveTo('\r', buffer) &&
readByte().EqualsAndSaveTo('\n', buffer))
break;
}
return Encoding.UTF8.GetString(buffer.ToArray())
.Replace("\r\n", "\n").Replace("\n\n", "\n").TrimEnd('\n')
.Split('\n');
}
private void write(byte[] buffer, int offset, int count)
{
_innerStream.Write(buffer, offset, count);
}
private void writeByte(byte value)
{
_innerStream.WriteByte(value);
}
#endregion
#region Internal Methods
@@ -145,10 +190,9 @@ namespace WebSocketSharp
var conn = context.Connection;
var stream = conn.Stream;
if (conn.IsSecure)
return new WsStream((SslStream)stream);
return new WsStream((NetworkStream)stream);
return conn.IsSecure
? new WsStream((SslStream)stream)
: new WsStream((NetworkStream)stream);
}
#endregion
@@ -165,88 +209,74 @@ namespace WebSocketSharp
_innerStream.Dispose();
}
public int Read(byte[] buffer, int offset, int size)
{
lock (_forRead)
{
var readLen = _innerStream.Read(buffer, offset, size);
if (readLen < size)
{
var msg = String.Format("Data can not be read from {0}.", _innerStreamType);
throw new IOException(msg);
}
return readLen;
}
}
public int ReadByte()
{
lock (_forRead)
{
return _innerStream.ReadByte();
}
}
public WsFrame ReadFrame()
{
lock (_forRead)
{
return WsFrame.Parse(_innerStream);
try
{
return WsFrame.Parse(_innerStream);
}
catch
{
return null;
}
}
}
public void ReadFrameAsync(Action<WsFrame> completed)
{
WsFrame.ParseAsync(_innerStream, completed);
}
public string[] ReadHandshake()
{
lock (_forRead)
{
var buffer = new List<byte>();
while (true)
try
{
if (ReadByte().EqualsAndSaveTo('\r', buffer) &&
ReadByte().EqualsAndSaveTo('\n', buffer) &&
ReadByte().EqualsAndSaveTo('\r', buffer) &&
ReadByte().EqualsAndSaveTo('\n', buffer))
break;
return readHandshake();
}
catch
{
return null;
}
return Encoding.UTF8.GetString(buffer.ToArray())
.Replace("\r\n", "\n").Replace("\n\n", "\n").TrimEnd('\n')
.Split('\n');
}
}
public void Write(byte[] buffer, int offset, int count)
public bool WriteFrame(WsFrame frame)
{
lock (_forWrite)
{
_innerStream.Write(buffer, offset, count);
try
{
var buffer = frame.ToBytes();
write(buffer, 0, buffer.Length);
return true;
}
catch
{
return false;
}
}
}
public void WriteByte(byte value)
public bool WriteHandshake(Handshake handshake)
{
lock (_forWrite)
{
_innerStream.WriteByte(value);
}
}
try
{
var buffer = handshake.ToBytes();
write(buffer, 0, buffer.Length);
public void WriteFrame(WsFrame frame)
{
lock (_forWrite)
{
var buffer = frame.ToBytes();
_innerStream.Write(buffer, 0, buffer.Length);
}
}
public void WriteHandshake(Handshake handshake)
{
lock (_forWrite)
{
var buffer = handshake.ToBytes();
_innerStream.Write(buffer, 0, buffer.Length);
return true;
}
catch
{
return false;
}
}
}

Binary file not shown.