diff options
| author | Mirta <mirta@twine-s.com> | 2020-12-30 16:39:52 +0200 |
|---|---|---|
| committer | Mirta <mirta@twine-s.com> | 2020-12-30 16:39:52 +0200 |
| commit | 00a491d93733d4625ad329b2ba8237f445364b3f (patch) | |
| tree | 4b24c6fa78d7648f4bb7cefafa464bb0b063fec4 /Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs | |
| parent | 124ad4150f80c6846fdee41dbbda9848c105f6e5 (diff) | |
| download | Tango-00a491d9.tar.gz Tango-00a491d9.zip | |
merge
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs')
| -rw-r--r-- | Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs | 380 |
1 files changed, 0 insertions, 380 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs deleted file mode 100644 index c9a2453f0..000000000 --- a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs +++ /dev/null @@ -1,380 +0,0 @@ -using Microsoft.AspNet.SignalR.Client; -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Tango.Core; - -namespace Tango.Transport.Adapters -{ - /// <summary> - /// Represents an adapter for communicating via SignalR protocol. - /// </summary> - /// <seealso cref="Tango.Transport.TransportAdapterBase" /> - public class SignalRTransportAdapter : TransportAdapterBase - { - private IHubProxy _proxy; - private HubConnection _connection; - private Thread _pushThread; - private ProducerConsumerQueue<byte[]> _pushQueue; - private object _writeSyncObject = new object(); - - /// <summary> - /// Gets or sets the URL of the SignalR service. - /// </summary> - public String Url { get; set; } - - /// <summary> - /// Gets or sets the SignalR hub name. - /// </summary> - public String Hub { get; set; } - - /// <summary> - /// Gets or sets the serial number of the remote machine (Use only for <see cref="SignalRTransportAdapterMode.CreateSession"/>) mode. - /// </summary> - public String SerialNumber { get; set; } - - /// <summary> - /// Gets or sets the remote session identifier. - /// </summary> - public String SessionID { get; private set; } - - /// <summary> - /// Gets or sets the interval of write operation. - /// Unlike other adapters, the SignalR adapter accumulates multiple write operations into one chunk. - /// </summary> - public TimeSpan WriteInterval { get; set; } - - /// <summary> - /// Gets or sets the adapter mode. - /// </summary> - public SignalRTransportAdapterMode Mode { get; set; } - - /// <summary> - /// Gets or sets the adapter connection timeout. - /// </summary> - public TimeSpan ConnectionTimeout { get; set; } - - /// <summary> - /// Initializes a new instance of the <see cref="SignalRTransportAdapter"/> class. - /// </summary> - public SignalRTransportAdapter() : base() - { - ConnectionTimeout = TimeSpan.FromSeconds(30); - WriteInterval = TimeSpan.FromMilliseconds(1); - ComponentName = $"SignalR Adapter {_component_counter++}"; - } - - /// <summary> - /// Initializes a new instance of the <see cref="SignalRTransportAdapter"/> class. - /// </summary> - /// <param name="url">The service address.</param> - /// <param name="hub">The hub name.</param> - /// <param name="mode">The adapter mode.</param> - /// <param name="serialNumber">The machine serial number (when creating session).</param> - /// <param name="sessionID">The session identifier (when joining session).</param> - public SignalRTransportAdapter(String url, String hub, SignalRTransportAdapterMode mode, String serialNumber = null, String sessionID = null, String ipAddress = null) : this() - { - Url = url; - Hub = hub; - Mode = mode; - SerialNumber = serialNumber; - SessionID = sessionID; - Address = sessionID; - - if (!String.IsNullOrWhiteSpace(ipAddress)) - { - Address = ipAddress; - } - } - - /// <summary> - /// Writes the specified data to the stream. - /// </summary> - /// <param name="data">The data.</param> - /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param> - public override void Write(byte[] data, bool immidiate = false) - { - TotalBytesSent += data.Length; - _totalBytes += data.Length; - - AppendTransferRateBytes(data.Length); - - if (!immidiate) - { - _pushQueue.BlockEnqueue(data); - } - else - { - try - { - lock (_writeSyncObject) - { - _proxy.Invoke("Write", new List<byte[]>() { data }).GetAwaiter().GetResult(); - } - } - catch (Exception ex) - { - OnFailed(LogManager.Log(ex, $"{ComponentName}: Error writing to SignalR adapter ({Address}).")); - return; - } - } - } - - /// <summary> - /// Connects the transport component. - /// </summary> - /// <returns></returns> - public override Task Connect() - { - if (State != TransportComponentState.Connected) - { - LogManager.Log($"{ComponentName}: Connecting SignalR adapter..."); - - bool completed = false; - - TaskCompletionSource<object> completionSource = new TaskCompletionSource<object>(); - - _connection = new HubConnection(Url); - _proxy = _connection.CreateHubProxy(Hub); - - Core.Threading.TimeoutTask.StartNew(() => - { - if (!completed) - { - completed = true; - completionSource.SetException(new TimeoutException("Could not reach the remote machine after the given timeout.")); - } - - }, ConnectionTimeout); - - if (Mode == SignalRTransportAdapterMode.CreateSession) - { - _proxy.On("SessionCreated", () => - { - try - { - if (!completed) - { - completed = true; - - LogManager.Log($"{ComponentName}: SignalR adapter session created ({SessionID})..."); - LogManager.Log($"{ComponentName}: SingalR adapter connected."); - State = TransportComponentState.Connected; - - StartPushThread(); - - completionSource.SetResult(true); - } - } - catch (Exception ex) - { - if (!completed) - { - LogManager.Log(ex, $"{ComponentName}: Error occurred after session created."); - completed = true; - completionSource.SetException(ex); - } - } - }); - } - - _connection.StateChanged += async (x) => - { - try - { - if (x.NewState == ConnectionState.Connected) - { - if (Mode == SignalRTransportAdapterMode.CreateSession) - { - LogManager.Log($"{ComponentName}: Creating SignalR adapter Session..."); - SessionID = await _proxy.Invoke<String>("CreateSession", SerialNumber); - } - else - { - LogManager.Log($"{ComponentName}: Joining SignalR adapter session ({SessionID})..."); - await _proxy.Invoke("JoinSession", SessionID); - LogManager.Log($"{ComponentName}: SingalR adapter connected."); - } - - if (Mode == SignalRTransportAdapterMode.JoinSession) - { - if (!completed) - { - completed = true; - State = TransportComponentState.Connected; - StartPushThread(); - completionSource.SetResult(true); - } - } - } - } - catch (Exception ex) - { - if (!completed) - { - completed = true; - LogManager.Log(ex, $"{ComponentName}: Error occurred on connection state changed event."); - completionSource.SetException(ex); - } - } - }; - - _proxy.On<List<byte[]>>("DataAvailable", (dataCollection) => { OnDataAvailable(dataCollection); }); - _connection.Start(); - - return completionSource.Task; - } - - return Task.FromResult(true); - } - - /// <summary> - /// Disconnects the transport component. - /// </summary> - /// <returns></returns> - public override Task Disconnect() - { - return Task.Factory.StartNew(() => - { - if (State == TransportComponentState.Connected) - { - LogManager.Log($"{ComponentName}: Disconnecting SignalR adapter..."); - Core.Threading.TimeoutTask.StartNew(() => - { - try - { - _connection.Stop(); - _connection.Dispose(); - - try - { - if (_pushThread != null) - { - _pushThread.Abort(); - } - } - catch { } - } - catch (Exception ex) - { - LogManager.Log(ex, $"{ComponentName}: Error disposing SignalR adapter connection."); - } - }, TimeSpan.FromSeconds(5)); - - LogManager.Log($"{ComponentName}: SignalR adapter disconnected."); - State = TransportComponentState.Disconnected; - } - }); - } - - /// <summary> - /// Handles the write operation. - /// </summary> - private void PushThreadMethod() - { - try - { - while (State == TransportComponentState.Connected) - { - List<byte[]> dataCollection = new List<byte[]>(); - - var data = _pushQueue.BlockDequeue(); - var first = true; - - while (_pushQueue.Count > 0 || first) - { - if (!first) - { - data = _pushQueue.BlockDequeue(); - } - else - { - first = false; - } - - if (EnableCompression) - { - var compressed = Compression.GZipHelper.Compress(data); - dataCollection.Add(compressed); - } - else - { - dataCollection.Add(data); - } - } - - if (dataCollection.Count > 0) - { - try - { - lock (_writeSyncObject) - { - _proxy.Invoke("Write", dataCollection).GetAwaiter().GetResult(); - } - } - catch (Exception ex) - { - OnFailed(LogManager.Log(ex, $"{ComponentName}: Error writing to SignalR adapter ({Address}).")); - return; - } - } - - Thread.Sleep(WriteInterval); - } - } - catch (ThreadAbortException) { } - } - - /// <summary> - /// Called when new data is available. - /// </summary> - /// <param name="dataCollection">The data collection.</param> - private void OnDataAvailable(List<byte[]> dataCollection) - { - try - { - foreach (var data in dataCollection) - { - if (EnableCompression) - { - try - { - var decompressed = Compression.GZipHelper.Decompress(data); - OnDataAvailable(decompressed); - } - catch (Exception ex) - { - if (ex.Message.Contains("GZip")) - { - //Temporarily ignore, probably switching protocol definitions... - OnDataAvailable(data); - } - else - { - throw ex; - } - } - } - else - { - OnDataAvailable(data); - } - } - } - catch (Exception ex) - { - OnFailed(ex); - } - } - - private void StartPushThread() - { - _pushQueue = new ProducerConsumerQueue<byte[]>(); - _pushThread = new Thread(PushThreadMethod); - _pushThread.IsBackground = true; - _pushThread.Name = $"{ComponentName} Push Thread"; - _pushThread.Start(); - } - } -} |
