using Microsoft.AspNet.SignalR.Client; using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Tango.Core; namespace Tango.Transport.Adapters { /// /// Represents an adapter for communicating via SignalR protocol. /// /// public class SignalRTransportAdapter : TransportAdapterBase { private IHubProxy _proxy; private HubConnection _connection; private Thread _pushThread; private ProducerConsumerQueue _pushQueue; private object _writeSyncObject = new object(); /// /// Gets or sets the URL of the SignalR service. /// public String Url { get; set; } /// /// Gets or sets the SignalR hub name. /// public String Hub { get; set; } /// /// Gets or sets the serial number of the remote machine (Use only for ) mode. /// public String SerialNumber { get; set; } /// /// Gets or sets the remote session identifier. /// public String SessionID { get; private set; } /// /// Gets or sets the interval of write operation. /// Unlike other adapters, the SignalR adapter accumulates multiple write operations into one chunk. /// public TimeSpan WriteInterval { get; set; } /// /// Gets or sets the adapter mode. /// public SignalRTransportAdapterMode Mode { get; set; } /// /// Gets or sets the adapter connection timeout. /// public TimeSpan ConnectionTimeout { get; set; } /// /// Initializes a new instance of the class. /// public SignalRTransportAdapter() : base() { ConnectionTimeout = TimeSpan.FromSeconds(30); WriteInterval = TimeSpan.FromMilliseconds(1); ComponentName = $"SignalR Adapter {_component_counter++}"; } /// /// Initializes a new instance of the class. /// /// The service address. /// The hub name. /// The adapter mode. /// The machine serial number (when creating session). /// The session identifier (when joining session). public SignalRTransportAdapter(String url, String hub, SignalRTransportAdapterMode mode, String serialNumber = null, String sessionID = null, String ipAddress = null) : this() { Url = url; Hub = hub; Mode = mode; SerialNumber = serialNumber; SessionID = sessionID; Address = sessionID; if (!String.IsNullOrWhiteSpace(ipAddress)) { Address = ipAddress; } } /// /// Writes the specified data to the stream. /// /// The data. /// Writes the data as soon as possible while ignoring any message queuing and batching. public override void Write(byte[] data, bool immidiate = false) { TotalBytesSent += data.Length; _totalBytes += data.Length; AppendTransferRateBytes(data.Length); if (!immidiate) { _pushQueue.BlockEnqueue(data); } else { try { lock (_writeSyncObject) { _proxy.Invoke("Write", new List() { data }).GetAwaiter().GetResult(); } } catch (Exception ex) { OnFailed(LogManager.Log(ex, $"{ComponentName}: Error writing to SignalR adapter ({Address}).")); return; } } } /// /// Connects the transport component. /// /// public override Task Connect() { if (State != TransportComponentState.Connected) { LogManager.Log($"{ComponentName}: Connecting SignalR adapter..."); bool completed = false; TaskCompletionSource completionSource = new TaskCompletionSource(); _connection = new HubConnection(Url); _proxy = _connection.CreateHubProxy(Hub); Core.Threading.TimeoutTask.StartNew(() => { if (!completed) { completed = true; completionSource.SetException(new TimeoutException("Could not reach the remote machine after the given timeout.")); } }, ConnectionTimeout); if (Mode == SignalRTransportAdapterMode.CreateSession) { _proxy.On("SessionCreated", () => { try { if (!completed) { completed = true; LogManager.Log($"{ComponentName}: SignalR adapter session created ({SessionID})..."); LogManager.Log($"{ComponentName}: SingalR adapter connected."); State = TransportComponentState.Connected; StartPushThread(); completionSource.SetResult(true); } } catch (Exception ex) { if (!completed) { LogManager.Log(ex, $"{ComponentName}: Error occurred after session created."); completed = true; completionSource.SetException(ex); } } }); } _connection.StateChanged += async (x) => { try { if (x.NewState == ConnectionState.Connected) { if (Mode == SignalRTransportAdapterMode.CreateSession) { LogManager.Log($"{ComponentName}: Creating SignalR adapter Session..."); SessionID = await _proxy.Invoke("CreateSession", SerialNumber); } else { LogManager.Log($"{ComponentName}: Joining SignalR adapter session ({SessionID})..."); await _proxy.Invoke("JoinSession", SessionID); LogManager.Log($"{ComponentName}: SingalR adapter connected."); } if (Mode == SignalRTransportAdapterMode.JoinSession) { if (!completed) { completed = true; State = TransportComponentState.Connected; StartPushThread(); completionSource.SetResult(true); } } } } catch (Exception ex) { if (!completed) { completed = true; LogManager.Log(ex, $"{ComponentName}: Error occurred on connection state changed event."); completionSource.SetException(ex); } } }; _proxy.On>("DataAvailable", (dataCollection) => { OnDataAvailable(dataCollection); }); _connection.Start(); return completionSource.Task; } return Task.FromResult(true); } /// /// Disconnects the transport component. /// /// public override Task Disconnect() { return Task.Factory.StartNew(() => { if (State == TransportComponentState.Connected) { LogManager.Log($"{ComponentName}: Disconnecting SignalR adapter..."); Core.Threading.TimeoutTask.StartNew(() => { try { _connection.Stop(); _connection.Dispose(); try { if (_pushThread != null) { _pushThread.Abort(); } } catch { } } catch (Exception ex) { LogManager.Log(ex, $"{ComponentName}: Error disposing SignalR adapter connection."); } }, TimeSpan.FromSeconds(5)); LogManager.Log($"{ComponentName}: SignalR adapter disconnected."); State = TransportComponentState.Disconnected; } }); } /// /// Handles the write operation. /// private void PushThreadMethod() { try { while (State == TransportComponentState.Connected) { List dataCollection = new List(); var data = _pushQueue.BlockDequeue(); var first = true; while (_pushQueue.Count > 0 || first) { if (!first) { data = _pushQueue.BlockDequeue(); } else { first = false; } if (EnableCompression) { var compressed = Compression.GZipHelper.Compress(data); dataCollection.Add(compressed); } else { dataCollection.Add(data); } } if (dataCollection.Count > 0) { try { lock (_writeSyncObject) { _proxy.Invoke("Write", dataCollection).GetAwaiter().GetResult(); } } catch (Exception ex) { OnFailed(LogManager.Log(ex, $"{ComponentName}: Error writing to SignalR adapter ({Address}).")); return; } } Thread.Sleep(WriteInterval); } } catch (ThreadAbortException) { } } /// /// Called when new data is available. /// /// The data collection. private void OnDataAvailable(List dataCollection) { try { foreach (var data in dataCollection) { if (EnableCompression) { try { var decompressed = Compression.GZipHelper.Decompress(data); OnDataAvailable(decompressed); } catch (Exception ex) { if (ex.Message.Contains("GZip")) { //Temporarily ignore, probably switching protocol definitions... OnDataAvailable(data); } else { throw ex; } } } else { OnDataAvailable(data); } } } catch (Exception ex) { OnFailed(ex); } } private void StartPushThread() { _pushQueue = new ProducerConsumerQueue(); _pushThread = new Thread(PushThreadMethod); _pushThread.IsBackground = true; _pushThread.Name = $"{ComponentName} Push Thread"; _pushThread.Start(); } } }