diff --git a/Example1/AudioMessage.cs b/Example1/AudioMessage.cs new file mode 100644 index 00000000..a81ee38d --- /dev/null +++ b/Example1/AudioMessage.cs @@ -0,0 +1,12 @@ +using System; + +namespace Example1 +{ + internal class AudioMessage + { + public uint user_id; + public byte ch_num; + public uint buffer_length; + public float [,] buffer_array; + } +} diff --git a/Example1/AudioStreamer.cs b/Example1/AudioStreamer.cs index ead59406..c3d16830 100644 --- a/Example1/AudioStreamer.cs +++ b/Example1/AudioStreamer.cs @@ -1,8 +1,5 @@ using Newtonsoft.Json; using Newtonsoft.Json.Linq; -#if NOTIFY -using Notifications; -#endif using System; using System.Collections; using System.Collections.Generic; @@ -13,310 +10,194 @@ using WebSocketSharp; namespace Example1 { - public struct NfMessage - { - public string Summary; - public string Body; - public string Icon; - } - - public class AudioMessage - { - public uint user_id; - public byte ch_num; - public uint buffer_length; - public float[,] buffer_array; - } - - public class TextMessage - { - public uint? user_id; - public string name; - public string type; - public string message; - } - - public class ThreadState - { - public bool Enabled { get; set; } - public AutoResetEvent Notification { get; private set; } - - public ThreadState() - { - Enabled = true; - Notification = new AutoResetEvent(false); - } - } - - public class AudioStreamer : IDisposable + internal class AudioStreamer : IDisposable { private Dictionary _audioBox; - private Queue _msgQ; - private string _name; - private WaitCallback _notifyMsg; - private ThreadState _notifyMsgState; - private TimerCallback _sendHeartbeat; private Timer _heartbeatTimer; - private uint? _user_id; - private WebSocket _ws; + private uint? _id; + private string _name; + private Notifier _notifier; + private WebSocket _websocket; - public AudioStreamer(string url) + public AudioStreamer (string url) { - _ws = new WebSocket(url); - _msgQ = Queue.Synchronized(new Queue()); - _audioBox = new Dictionary(); - _user_id = null; + _websocket = new WebSocket (url); - configure(); + _audioBox = new Dictionary (); + _heartbeatTimer = new Timer (sendHeartbeat, null, -1, -1); + _id = null; + _notifier = new Notifier (); + + configure (); } - private void configure() + private AudioMessage acceptBinaryMessage (byte [] value) { - #if DEBUG - _ws.Log.Level = LogLevel.Trace; - #endif - _ws.OnOpen += (sender, e) => - { - var msg = createTextMessage("connection", String.Empty); - _ws.Send(msg); - }; + var id = value.SubArray (0, 4).To (ByteOrder.Big); + var chNum = value.SubArray (4, 1) [0]; + var bufferLength = value.SubArray (5, 4).To (ByteOrder.Big); + var bufferArray = new float [chNum, bufferLength]; - _ws.OnMessage += (sender, e) => - { - switch (e.Type) - { - case Opcode.Text: - var msg = parseTextMessage(e.Data); - _msgQ.Enqueue(msg); - break; - case Opcode.Binary: - var audioMsg = parseAudioMessage(e.RawData); - if (audioMsg.user_id == _user_id) goto default; - if (_audioBox.ContainsKey(audioMsg.user_id)) - { - _audioBox[audioMsg.user_id].Enqueue(audioMsg.buffer_array); - } - else - { - var q = Queue.Synchronized(new Queue()); - q.Enqueue(audioMsg.buffer_array); - _audioBox.Add(audioMsg.user_id, q); - } - break; - default: - break; - } - }; + var offset = 9; + ((int) chNum).Times ( + i => bufferLength.Times ( + j => { + bufferArray [i, j] = value.SubArray (offset, 4).To (ByteOrder.Big); + offset += 4; + })); - _ws.OnError += (sender, e) => - { - enNfMessage("[AudioStreamer] error", "WS: Error: " + e.Message, "notification-message-im"); - }; - - _ws.OnClose += (sender, e) => - { - enNfMessage - ( - "[AudioStreamer] disconnect", - String.Format("WS: Close({0}: {1})", e.Code, e.Reason), - "notification-message-im" - ); - }; - - //_ws.Compression = CompressionMethod.Deflate; - - _notifyMsgState = new ThreadState(); - _notifyMsg = (state) => - { - while (_notifyMsgState.Enabled || _msgQ.Count > 0) - { - Thread.Sleep(500); - - if (_msgQ.Count > 0) - { - NfMessage msg = (NfMessage)_msgQ.Dequeue(); - #if NOTIFY - Notification nf = new Notification(msg.Summary, - msg.Body, - msg.Icon); - nf.AddHint("append", "allowed"); - nf.Show(); - #else - Console.WriteLine("{0}: {1}", msg.Summary, msg.Body); - #endif - } - } - - _notifyMsgState.Notification.Set(); - }; - - _sendHeartbeat = (state) => - { - var msg = createTextMessage("heartbeat", String.Empty); - _ws.Send(msg); + return new AudioMessage { + user_id = id, + ch_num = chNum, + buffer_length = bufferLength, + buffer_array = bufferArray }; } - private byte[] createAudioMessage(float[,] buffer_array) + private NotificationMessage acceptTextMessage (string value) { - List msg = new List(); - - uint user_id = (uint)_user_id; - int ch_num = buffer_array.GetLength(0); - int buffer_length = buffer_array.GetLength(1); - - msg.AddRange(user_id.ToByteArray(ByteOrder.Big)); - msg.Add((byte)ch_num); - msg.AddRange(((uint)buffer_length).ToByteArray(ByteOrder.Big)); - - ch_num.Times(i => - { - buffer_length.Times(j => - { - msg.AddRange(buffer_array[i, j].ToByteArray(ByteOrder.Big)); - }); - }); - - return msg.ToArray(); - } - - private string createTextMessage(string type, string message) - { - var msg = new TextMessage - { - user_id = _user_id, - name = _name, - type = type, - message = message - }; - - return JsonConvert.SerializeObject(msg); - } - - private AudioMessage parseAudioMessage(byte[] data) - { - uint user_id = data.SubArray(0, 4).To(ByteOrder.Big); - byte ch_num = data.SubArray(4, 1)[0]; - uint buffer_length = data.SubArray(5, 4).To(ByteOrder.Big); - float[,] buffer_array = new float[ch_num, buffer_length]; - - int offset = 9; - ((int)ch_num).Times(i => - { - buffer_length.Times(j => - { - buffer_array[i, j] = data.SubArray(offset, 4).To(ByteOrder.Big); - offset += 4; - }); - }); - - return new AudioMessage - { - user_id = user_id, - ch_num = ch_num, - buffer_length = buffer_length, - buffer_array = buffer_array - }; - } - - private NfMessage parseTextMessage(string data) - { - JObject msg = JObject.Parse(data); - uint user_id = (uint)msg["user_id"]; - string name = (string)msg["name"]; - string type = (string)msg["type"]; + var json = JObject.Parse (value); + var id = (uint) json ["user_id"]; + var name = (string) json ["name"]; + var type = (string) json ["type"]; string message; - switch (type) - { - case "connection": - JArray users = (JArray)msg["message"]; - StringBuilder sb = new StringBuilder("Now keeping connection\n"); - foreach (JToken user in users) - { - sb.AppendFormat("user_id: {0} name: {1}\n", (uint)user["user_id"], (string)user["name"]); - } - message = sb.ToString().TrimEnd('\n'); - break; - case "connected": - _user_id = user_id; - message = String.Format("user_id: {0} name: {1}", user_id, name); - break; - case "message": - message = String.Format("{0}: {1}", name, (string)msg["message"]); - break; - case "start_music": - message = String.Format("{0}: Started playing music!", name); - break; - default: - message = "Received unknown type message: " + type; - break; - } + if (type == "connection") { + var users = (JArray) json ["message"]; + var msg = new StringBuilder ("Now keeping connection:"); + foreach (JToken user in users) + msg.AppendFormat ( + "\n- user_id: {0} name: {1}", (uint) user ["user_id"], (string) user ["name"]); - return new NfMessage - { - Summary = String.Format("[AudioStreamer] {0}", type), - Body = message, - Icon = "notification-message-im" - }; + message = msg.ToString (); + } + else if (type == "connected") { + _heartbeatTimer.Change (30000, 30000); + _id = id; + message = String.Format ("user_id: {0} name: {1}", id, name); + } + else if (type == "message") + message = String.Format ("{0}: {1}", name, (string) json ["message"]); + else if (type == "start_music") + message = String.Format ("{0}: Started playing music!", name); + else + message = "Received unknown type message."; + + return new NotificationMessage { + Summary = String.Format ("AudioStreamer Message ({0})", type), + Body = message, + Icon = "notification-message-im" + }; } - private void enNfMessage(string summary, string body, string icon) + private void configure () { - var msg = new NfMessage - { - Summary = summary, - Body = body, - Icon = icon +#if DEBUG + _websocket.Log.Level = LogLevel.Trace; +#endif + _websocket.OnOpen += (sender, e) => + _websocket.Send (createTextMessage ("connection", String.Empty)); + + _websocket.OnMessage += (sender, e) => { + if (e.Type == Opcode.Text) + _notifier.Notify (acceptTextMessage (e.Data)); + else { + var msg = acceptBinaryMessage (e.RawData); + if (msg.user_id == _id) + return; + + if (_audioBox.ContainsKey (msg.user_id)) { + _audioBox [msg.user_id].Enqueue (msg.buffer_array); + return; + } + + var queue = Queue.Synchronized (new Queue ()); + queue.Enqueue (msg.buffer_array); + _audioBox.Add (msg.user_id, queue); + } }; - _msgQ.Enqueue(msg); + _websocket.OnError += (sender, e) => + _notifier.Notify ( + new NotificationMessage { + Summary = "AudioStreamer Error", + Body = e.Message, + Icon = "notification-message-im" + }); + + _websocket.OnClose += (sender, e) => + _notifier.Notify ( + new NotificationMessage { + Summary = String.Format ("AudioStreamer Disconnect ({0})", e.Code), + Body = e.Reason, + Icon = "notification-message-im" + }); } - public void Connect() + private byte [] createAudioMessage (float [,] bufferArray) { - string name; - do - { - Console.Write("Your name > "); - name = Console.ReadLine(); + var msg = new List (); + + var id = (uint) _id; + var chNum = bufferArray.GetLength (0); + var bufferLength = bufferArray.GetLength (1); + + msg.AddRange (id.ToByteArray (ByteOrder.Big)); + msg.Add ((byte) chNum); + msg.AddRange (((uint) bufferLength).ToByteArray (ByteOrder.Big)); + + chNum.Times ( + i => bufferLength.Times ( + j => msg.AddRange (bufferArray [i, j].ToByteArray (ByteOrder.Big)))); + + return msg.ToArray (); + } + + private string createTextMessage (string type, string message) + { + return JsonConvert.SerializeObject ( + new TextMessage { + user_id = _id, + name = _name, + type = type, + message = message + }); + } + + private void sendHeartbeat (object state) + { + _websocket.Send (createTextMessage ("heartbeat", String.Empty)); + } + + public void Connect () + { + do { + Console.Write ("Input your name> "); + _name = Console.ReadLine (); } - while (name == String.Empty); + while (_name.Length == 0); - _name = name; - - _ws.Connect(); - - ThreadPool.QueueUserWorkItem(_notifyMsg); - _heartbeatTimer = new Timer(_sendHeartbeat, null, 30 * 1000, 30 * 1000); + _websocket.Connect (); } - public void Disconnect() + public void Disconnect () { - var wait = new AutoResetEvent(false); - _heartbeatTimer.Dispose(wait); - wait.WaitOne(); + var wait = new ManualResetEvent (false); + _heartbeatTimer.Dispose (wait); + wait.WaitOne (); - _ws.Close(); - - _notifyMsgState.Enabled = false; - _notifyMsgState.Notification.WaitOne(); + _websocket.Close (); + _notifier.Close (); } - public void Dispose() + public void Write (string message) { - Disconnect(); + _websocket.Send (createTextMessage ("message", message)); } - public void Write(string data) + void IDisposable.Dispose () { - var msg = createTextMessage("message", data); - _ws.Send(msg); - } - - public void Write(FileInfo file) - { - throw new NotImplementedException(); + Disconnect (); } } } diff --git a/Example1/Example1.csproj b/Example1/Example1.csproj index 95af031f..903f5b04 100644 --- a/Example1/Example1.csproj +++ b/Example1/Example1.csproj @@ -34,7 +34,7 @@ full false bin\Debug_Ubuntu - DEBUG;NOTIFY + DEBUG;UBUNTU prompt 4 true @@ -46,7 +46,7 @@ prompt 4 true - NOTIFY + UBUNTU @@ -62,6 +62,10 @@ + + + + diff --git a/Example1/NotificationMessage.cs b/Example1/NotificationMessage.cs new file mode 100644 index 00000000..01f1692a --- /dev/null +++ b/Example1/NotificationMessage.cs @@ -0,0 +1,24 @@ +using System; + +namespace Example1 +{ + internal class NotificationMessage + { + public string Body { + get; set; + } + + public string Icon { + get; set; + } + + public string Summary { + get; set; + } + + public override string ToString () + { + return String.Format ("{0}: {1}", Summary, Body); + } + } +} diff --git a/Example1/Notifier.cs b/Example1/Notifier.cs new file mode 100644 index 00000000..a4ca5d44 --- /dev/null +++ b/Example1/Notifier.cs @@ -0,0 +1,77 @@ +#if UBUNTU +using Notifications; +#endif +using System; +using System.Collections; +using System.Collections.Generic; +using System.Threading; + +namespace Example1 +{ + internal class Notifier : IDisposable + { + private bool _enabled; + private Queue _queue; + private ManualResetEvent _waitHandle; + + public Notifier () + { + _enabled = true; + _queue = new Queue (); + _waitHandle = new ManualResetEvent (false); + + ThreadPool.QueueUserWorkItem ( + state => { + while (_enabled || Count > 0) { + Thread.Sleep (500); + if (Count > 0) { + var msg = dequeue (); +#if UBUNTU + var nf = new Notification (msg.Summary, msg.Body, msg.Icon); + nf.AddHint ("append", "allowed"); + nf.Show (); +#else + Console.WriteLine (msg); +#endif + } + } + + _waitHandle.Set (); + }); + } + + public int Count { + get { + lock (((ICollection) _queue).SyncRoot) { + return _queue.Count; + } + } + } + + private NotificationMessage dequeue () + { + lock (((ICollection) _queue).SyncRoot) { + return _queue.Dequeue (); + } + } + + public void Close () + { + _enabled = false; + _waitHandle.WaitOne (); + _waitHandle.Close (); + } + + public void Notify (NotificationMessage message) + { + lock (((ICollection) _queue).SyncRoot) { + _queue.Enqueue (message); + } + } + + void IDisposable.Dispose () + { + Close (); + } + } +} diff --git a/Example1/Program.cs b/Example1/Program.cs index 95218014..b0aa9bbb 100644 --- a/Example1/Program.cs +++ b/Example1/Program.cs @@ -5,29 +5,22 @@ namespace Example1 { public class Program { - public static void Main(string[] args) + public static void Main (string [] args) { - //using (AudioStreamer streamer = new AudioStreamer("ws://localhost:3000/socket")) - using (AudioStreamer streamer = new AudioStreamer("ws://agektmr.node-ninja.com:3000/socket")) + using (var streamer = new AudioStreamer ("ws://agektmr.node-ninja.com:3000/socket")) + //using (var streamer = new AudioStreamer ("ws://localhost:3000/socket")) { - streamer.Connect(); + streamer.Connect (); - Thread.Sleep(500); - Console.WriteLine("\nType \"exit\" to exit.\n"); - - string data; - while (true) - { - Thread.Sleep(500); - - Console.Write("> "); - data = Console.ReadLine(); - if (data == "exit") - { + Console.WriteLine ("\nType \"exit\" to exit.\n"); + while (true) { + Thread.Sleep (500); + Console.Write ("> "); + var msg = Console.ReadLine (); + if (msg == "exit") break; - } - streamer.Write(data); + streamer.Write (msg); } } } diff --git a/Example1/TextMessage.cs b/Example1/TextMessage.cs new file mode 100644 index 00000000..5eab648f --- /dev/null +++ b/Example1/TextMessage.cs @@ -0,0 +1,12 @@ +using System; + +namespace Example1 +{ + internal class TextMessage + { + public uint? user_id; + public string name; + public string type; + public string message; + } +}