Modified sending and added a new Send method

This commit is contained in:
sta 2013-10-05 23:28:43 +09:00
parent 79abc9aee3
commit a5e6e6e80e
2 changed files with 283 additions and 114 deletions

View File

@ -165,6 +165,22 @@ namespace WebSocketSharp
}
}
internal static string CheckIfCanRead (this Stream stream)
{
return stream == null
? "'stream' must not be null."
: !stream.CanRead
? "'stream' cannot be read."
: null;
}
internal static string CheckIfOpen (this WebSocketState state)
{
return state != WebSocketState.OPEN
? "A WebSocket connection isn't established or has been closed."
: null;
}
internal static string CheckIfStarted (this ServerState state)
{
return state != ServerState.START
@ -456,7 +472,7 @@ namespace WebSocketSharp
var buffer = new byte [length];
var readLen = stream.Read (buffer, 0, length);
if (readLen <= 0)
return new byte [] {};
return new byte []{};
var tmpLen = 0;
while (readLen < length)
@ -1148,7 +1164,7 @@ namespace WebSocketSharp
public static byte [] ReadBytes (this Stream stream, int length)
{
return stream == null || length < 1
? new byte [] {}
? new byte []{}
: stream.ReadBytesInternal (length);
}
@ -1168,7 +1184,7 @@ namespace WebSocketSharp
public static byte [] ReadBytes (this Stream stream, long length)
{
return stream == null || length < 1
? new byte [] {}
? new byte []{}
: length > 1024
? stream.ReadBytesInternal (length, 1024)
: stream.ReadBytesInternal ((int) length);
@ -1439,7 +1455,7 @@ namespace WebSocketSharp
? BitConverter.GetBytes ((UInt32)(object) value)
: type == typeof (UInt64)
? BitConverter.GetBytes ((UInt64)(object) value)
: new byte [] {};
: new byte []{};
return buffer.Length <= 1 || order.IsHostOrder ()
? buffer

View File

@ -916,8 +916,7 @@ namespace WebSocketSharp
private bool processPingFrame (WsFrame frame)
{
if (send (WsFrame.CreatePongFrame (
_client ? Mask.MASK : Mask.UNMASK, frame.PayloadData).ToByteArray ()))
if (send (WsFrame.CreatePongFrame (_client ? Mask.MASK : Mask.UNMASK, frame.PayloadData)))
_logger.Trace ("Returned Pong.");
return true;
@ -1026,50 +1025,77 @@ namespace WebSocketSharp
return false;
}
return _stream.WriteFrame (frame);
return _stream.Write (frame.ToByteArray ());
}
private void send (Opcode opcode, byte [] data)
{
lock (_forSend)
{
try {
var comped = false;
if (_compression != CompressionMethod.NONE)
{
data = data.Compress (_compression);
comped = true;
}
send (WsFrame.CreateFrame (
Fin.FINAL, opcode, _client ? Mask.MASK : Mask.UNMASK, data, comped));
}
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
error ("An exception has occured.");
}
}
}
private void send (Opcode opcode, Stream stream)
{
var data = stream;
var compressed = false;
try {
if (_readyState != WebSocketState.OPEN)
{
var msg = "A WebSocket connection isn't established or has been closed.";
_logger.Error (msg);
error (msg);
lock (_forSend)
{
var comp = stream;
var comped = false;
try {
if (_compression != CompressionMethod.NONE)
{
comp = stream.Compress (_compression);
comped = true;
}
return;
sendFragmented (opcode, comp, _client ? Mask.MASK : Mask.UNMASK, comped);
}
if (_compression != CompressionMethod.NONE)
{
data = data.Compress (_compression);
compressed = true;
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
error ("An exception has occured.");
}
finally {
if (comped)
comp.Dispose ();
lock (_forSend)
{
var mask = _client ? Mask.MASK : Mask.UNMASK;
var length = data.Length;
if (length <= FragmentLength)
send (WsFrame.CreateFrame (
Fin.FINAL, opcode, mask, data.ReadBytes ((int) length), compressed));
else
sendFragmented (opcode, data, mask, compressed);
stream.Dispose ();
}
}
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
error ("An exception has occured.");
}
finally {
if (compressed)
data.Dispose ();
}
stream.Dispose ();
}
private void sendAsync (Opcode opcode, byte [] data, Action completed)
{
Action<Opcode, byte []> sender = send;
AsyncCallback callback = ar =>
{
try {
sender.EndInvoke (ar);
if (completed != null)
completed ();
}
catch (Exception ex)
{
_logger.Fatal (ex.ToString ());
error ("An exception has occured.");
}
};
sender.BeginInvoke (opcode, data, callback, null);
}
private void sendAsync (Opcode opcode, Stream stream, Action completed)
@ -1092,55 +1118,75 @@ namespace WebSocketSharp
sender.BeginInvoke (opcode, stream, callback, null);
}
private long sendFragmented (Opcode opcode, Stream stream, Mask mask, bool compressed)
private bool sendFragmented (Opcode opcode, Stream stream, Mask mask, bool compressed)
{
var length = stream.Length;
var quo = length / FragmentLength;
var rem = length % FragmentLength;
var count = rem == 0 ? quo - 2 : quo - 1;
var len = stream.Length;
if (sendFragmented (opcode, stream, len, mask, compressed) == len)
return true;
long readLen = 0;
int tmpLen = 0;
var msg = "Sending fragmented data is interrupted.";
_logger.Error (msg);
error (msg);
close (CloseStatusCode.ABNORMAL, msg, false);
return false;
}
private long sendFragmented (
Opcode opcode, Stream stream, long length, Mask mask, bool compressed)
{
var quo = length / FragmentLength;
var rem = (int) (length % FragmentLength);
var count = rem == 0 ? quo - 2 : quo - 1;
long sentLen = 0;
int readLen = 0;
byte [] buffer = null;
// Not fragmented
if (quo == 0)
{
buffer = new byte [rem];
tmpLen = stream.Read (buffer, 0, buffer.Length);
if (send (WsFrame.CreateFrame (Fin.FINAL, opcode, mask, buffer, compressed)))
readLen = tmpLen;
readLen = stream.Read (buffer, 0, rem);
if (readLen == rem &&
send (WsFrame.CreateFrame (Fin.FINAL, opcode, mask, buffer, compressed)))
sentLen = readLen;
return readLen;
return sentLen;
}
buffer = new byte [FragmentLength];
// First
tmpLen = stream.Read (buffer, 0, FragmentLength);
if (send (WsFrame.CreateFrame (Fin.MORE, opcode, mask, buffer, compressed)))
readLen = tmpLen;
readLen = stream.Read (buffer, 0, FragmentLength);
if (readLen == FragmentLength &&
send (WsFrame.CreateFrame (Fin.MORE, opcode, mask, buffer, compressed)))
sentLen = readLen;
else
return 0;
return sentLen;
// Mid
for (long i = 0; i < count; i++)
{
tmpLen = stream.Read (buffer, 0, FragmentLength);
if (send (WsFrame.CreateFrame (Fin.MORE, Opcode.CONT, mask, buffer, compressed)))
readLen += tmpLen;
readLen = stream.Read (buffer, 0, FragmentLength);
if (readLen == FragmentLength &&
send (WsFrame.CreateFrame (Fin.MORE, Opcode.CONT, mask, buffer, compressed)))
sentLen += readLen;
else
return readLen;
return sentLen;
}
// Final
var tmpLen = FragmentLength;
if (rem != 0)
buffer = new byte [rem];
tmpLen = stream.Read (buffer, 0, buffer.Length);
if (send (WsFrame.CreateFrame (Fin.FINAL, Opcode.CONT, mask, buffer, compressed)))
readLen += tmpLen;
buffer = new byte [tmpLen = rem];
return readLen;
readLen = stream.Read (buffer, 0, tmpLen);
if (readLen == tmpLen &&
send (WsFrame.CreateFrame (Fin.FINAL, Opcode.CONT, mask, buffer, compressed)))
sentLen += readLen;
return sentLen;
}
// As client
@ -1291,51 +1337,51 @@ namespace WebSocketSharp
// As server, used to broadcast
internal void Send (Opcode opcode, byte [] data, Dictionary<CompressionMethod, byte []> cache)
{
try {
byte [] cached;
if (!cache.TryGetValue (_compression, out cached))
{
cached = WsFrame.CreateFrame (
Fin.FINAL,
opcode,
Mask.UNMASK,
data.Compress (_compression),
_compression != CompressionMethod.NONE).ToByteArray ();
lock (_forSend)
{
try {
byte [] cached;
if (!cache.TryGetValue (_compression, out cached))
{
cached = WsFrame.CreateFrame (
Fin.FINAL,
opcode,
Mask.UNMASK,
data.Compress (_compression),
_compression != CompressionMethod.NONE).ToByteArray ();
cache.Add (_compression, cached);
}
cache.Add (_compression, cached);
}
lock (_forSend)
{
send (cached);
}
}
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
error ("An exception has occured.");
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
error ("An exception has occured.");
}
}
}
// As server, used to broadcast
internal void Send (Opcode opcode, Stream stream, Dictionary <CompressionMethod, Stream> cache)
{
try {
Stream cached;
if (!cache.TryGetValue (_compression, out cached))
{
cached = stream.Compress (_compression);
cache.Add (_compression, cached);
}
lock (_forSend)
{
try {
Stream cached;
if (!cache.TryGetValue (_compression, out cached))
{
cached = stream.Compress (_compression);
cache.Add (_compression, cached);
}
lock (_forSend)
{
cached.Position = 0;
sendFragmented (opcode, cached, Mask.UNMASK, _compression != CompressionMethod.NONE);
}
}
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
error ("An exception has occured.");
catch (Exception ex) {
_logger.Fatal (ex.ToString ());
error ("An exception has occured.");
}
}
}
@ -1544,17 +1590,19 @@ namespace WebSocketSharp
/// </param>
public void Send (byte[] data)
{
if (data == null)
var msg = _readyState.CheckIfOpen () ?? data.CheckIfValidSendData ();
if (msg != null)
{
var msg = "'data' must not be null.";
_logger.Error (msg);
error (msg);
return;
}
var stream = new MemoryStream (data);
send (Opcode.BINARY, stream);
if (data.LongLength <= FragmentLength)
send (Opcode.BINARY, data);
else
send (Opcode.BINARY, new MemoryStream (data));
}
/// <summary>
@ -1565,17 +1613,20 @@ namespace WebSocketSharp
/// </param>
public void Send (string data)
{
if (data == null)
var msg = _readyState.CheckIfOpen () ?? data.CheckIfValidSendData ();
if (msg != null)
{
var msg = "'data' must not be null.";
_logger.Error (msg);
error (msg);
return;
}
var stream = new MemoryStream (Encoding.UTF8.GetBytes (data));
send (Opcode.TEXT, stream);
var rawData = Encoding.UTF8.GetBytes (data);
if (rawData.LongLength <= FragmentLength)
send (Opcode.TEXT, rawData);
else
send (Opcode.TEXT, new MemoryStream (rawData));
}
/// <summary>
@ -1586,9 +1637,11 @@ namespace WebSocketSharp
/// </param>
public void Send (FileInfo file)
{
if (file == null)
var msg = _readyState.CheckIfOpen () ??
(file == null ? "'file' must not be null." : null);
if (msg != null)
{
var msg = "'file' must not be null.";
_logger.Error (msg);
error (msg);
@ -1598,6 +1651,50 @@ namespace WebSocketSharp
send (Opcode.BINARY, file.OpenRead ());
}
/// <summary>
/// Sends a binary data from the specified <see cref="Stream"/> using the WebSocket connection.
/// </summary>
/// <param name="stream">
/// A <see cref="Stream"/> object from which contains a binary data to send.
/// </param>
/// <param name="length">
/// An <see cref="int"/> that contains the number of bytes to send.
/// </param>
/// <param name="dispose">
/// <c>true</c> if <paramref name="stream"/> is disposed after a binary data read;
/// otherwise, <c>false</c>.
/// </param>
public void Send (Stream stream, int length, bool dispose)
{
byte [] data = null;
int readLen = 0;
var msg = _readyState.CheckIfOpen () ??
stream.CheckIfCanRead () ??
(length < 1 ? "'length' must be greater than 0." : null) ??
((readLen = (data = stream.ReadBytesInternal (length)).Length) == 0
? "A data cannot be read from 'stream'." : null);
if (msg != null)
{
_logger.Error (msg);
error (msg);
return;
}
if (readLen != length)
_logger.Warn (String.Format (
"A data with 'length' cannot be read from 'stream'.\nexpected: {0} actual: {1}", length, readLen));
if (dispose)
stream.Dispose ();
if (readLen <= FragmentLength)
send (Opcode.BINARY, data);
else
send (Opcode.BINARY, new MemoryStream (data));
}
/// <summary>
/// Sends a binary <paramref name="data"/> asynchronously using the WebSocket connection.
/// </summary>
@ -1610,17 +1707,19 @@ namespace WebSocketSharp
/// </param>
public void SendAsync (byte [] data, Action completed)
{
if (data == null)
var msg = _readyState.CheckIfOpen () ?? data.CheckIfValidSendData ();
if (msg != null)
{
var msg = "'data' must not be null.";
_logger.Error (msg);
error (msg);
return;
}
var stream = new MemoryStream (data);
sendAsync (Opcode.BINARY, stream, completed);
if (data.LongLength <= FragmentLength)
sendAsync (Opcode.BINARY, data, completed);
else
sendAsync (Opcode.BINARY, new MemoryStream (data), completed);
}
/// <summary>
@ -1635,17 +1734,20 @@ namespace WebSocketSharp
/// </param>
public void SendAsync (string data, Action completed)
{
if (data == null)
var msg = _readyState.CheckIfOpen () ?? data.CheckIfValidSendData ();
if (msg != null)
{
var msg = "'data' must not be null.";
_logger.Error (msg);
error (msg);
return;
}
var stream = new MemoryStream (Encoding.UTF8.GetBytes (data));
sendAsync (Opcode.TEXT, stream, completed);
var rawData = Encoding.UTF8.GetBytes (data);
if (rawData.LongLength <= FragmentLength)
sendAsync (Opcode.TEXT, rawData, completed);
else
sendAsync (Opcode.TEXT, new MemoryStream (rawData), completed);
}
/// <summary>
@ -1660,9 +1762,11 @@ namespace WebSocketSharp
/// </param>
public void SendAsync (FileInfo file, Action completed)
{
if (file == null)
var msg = _readyState.CheckIfOpen () ??
(file == null ? "'file' must not be null." : null);
if (msg != null)
{
var msg = "'file' must not be null.";
_logger.Error (msg);
error (msg);
@ -1672,6 +1776,55 @@ namespace WebSocketSharp
sendAsync (Opcode.BINARY, file.OpenRead (), completed);
}
/// <summary>
/// Sends a binary data asynchronously from the specified <see cref="Stream"/>
/// using the WebSocket connection.
/// </summary>
/// <param name="stream">
/// A <see cref="Stream"/> object from which contains a binary data to send.
/// </param>
/// <param name="length">
/// An <see cref="int"/> that contains the number of bytes to send.
/// </param>
/// <param name="dispose">
/// <c>true</c> if <paramref name="stream"/> is disposed after a binary data read;
/// otherwise, <c>false</c>.
/// </param>
/// <param name="completed">
/// An <see cref="Action"/> delegate that references the method(s) called when
/// the asynchronous operation completes.
/// </param>
public void SendAsync (Stream stream, int length, bool dispose, Action completed)
{
byte [] data = null;
int readLen = 0;
var msg = _readyState.CheckIfOpen () ??
stream.CheckIfCanRead () ??
(length < 1 ? "'length' must be greater than 0." : null) ??
((readLen = (data = stream.ReadBytesInternal (length)).Length) == 0
? "A data cannot be read from 'stream'." : null);
if (msg != null)
{
_logger.Error (msg);
error (msg);
return;
}
if (readLen != length)
_logger.Warn (String.Format (
"A data with 'length' cannot be read from 'stream'.\nexpected: {0} actual: {1}", length, readLen));
if (dispose)
stream.Dispose ();
if (readLen <= FragmentLength)
sendAsync (Opcode.BINARY, data, completed);
else
sendAsync (Opcode.BINARY, new MemoryStream (data), completed);
}
/// <summary>
/// Sets a <see cref="Cookie"/> used in the WebSocket opening handshake.
/// </summary>