aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs')
-rw-r--r--Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs380
1 files changed, 0 insertions, 380 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs
deleted file mode 100644
index c9a2453f0..000000000
--- a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs
+++ /dev/null
@@ -1,380 +0,0 @@
-using Microsoft.AspNet.SignalR.Client;
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-using Tango.Core;
-
-namespace Tango.Transport.Adapters
-{
- /// <summary>
- /// Represents an adapter for communicating via SignalR protocol.
- /// </summary>
- /// <seealso cref="Tango.Transport.TransportAdapterBase" />
- public class SignalRTransportAdapter : TransportAdapterBase
- {
- private IHubProxy _proxy;
- private HubConnection _connection;
- private Thread _pushThread;
- private ProducerConsumerQueue<byte[]> _pushQueue;
- private object _writeSyncObject = new object();
-
- /// <summary>
- /// Gets or sets the URL of the SignalR service.
- /// </summary>
- public String Url { get; set; }
-
- /// <summary>
- /// Gets or sets the SignalR hub name.
- /// </summary>
- public String Hub { get; set; }
-
- /// <summary>
- /// Gets or sets the serial number of the remote machine (Use only for <see cref="SignalRTransportAdapterMode.CreateSession"/>) mode.
- /// </summary>
- public String SerialNumber { get; set; }
-
- /// <summary>
- /// Gets or sets the remote session identifier.
- /// </summary>
- public String SessionID { get; private set; }
-
- /// <summary>
- /// Gets or sets the interval of write operation.
- /// Unlike other adapters, the SignalR adapter accumulates multiple write operations into one chunk.
- /// </summary>
- public TimeSpan WriteInterval { get; set; }
-
- /// <summary>
- /// Gets or sets the adapter mode.
- /// </summary>
- public SignalRTransportAdapterMode Mode { get; set; }
-
- /// <summary>
- /// Gets or sets the adapter connection timeout.
- /// </summary>
- public TimeSpan ConnectionTimeout { get; set; }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="SignalRTransportAdapter"/> class.
- /// </summary>
- public SignalRTransportAdapter() : base()
- {
- ConnectionTimeout = TimeSpan.FromSeconds(30);
- WriteInterval = TimeSpan.FromMilliseconds(1);
- ComponentName = $"SignalR Adapter {_component_counter++}";
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="SignalRTransportAdapter"/> class.
- /// </summary>
- /// <param name="url">The service address.</param>
- /// <param name="hub">The hub name.</param>
- /// <param name="mode">The adapter mode.</param>
- /// <param name="serialNumber">The machine serial number (when creating session).</param>
- /// <param name="sessionID">The session identifier (when joining session).</param>
- public SignalRTransportAdapter(String url, String hub, SignalRTransportAdapterMode mode, String serialNumber = null, String sessionID = null, String ipAddress = null) : this()
- {
- Url = url;
- Hub = hub;
- Mode = mode;
- SerialNumber = serialNumber;
- SessionID = sessionID;
- Address = sessionID;
-
- if (!String.IsNullOrWhiteSpace(ipAddress))
- {
- Address = ipAddress;
- }
- }
-
- /// <summary>
- /// Writes the specified data to the stream.
- /// </summary>
- /// <param name="data">The data.</param>
- /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param>
- public override void Write(byte[] data, bool immidiate = false)
- {
- TotalBytesSent += data.Length;
- _totalBytes += data.Length;
-
- AppendTransferRateBytes(data.Length);
-
- if (!immidiate)
- {
- _pushQueue.BlockEnqueue(data);
- }
- else
- {
- try
- {
- lock (_writeSyncObject)
- {
- _proxy.Invoke("Write", new List<byte[]>() { data }).GetAwaiter().GetResult();
- }
- }
- catch (Exception ex)
- {
- OnFailed(LogManager.Log(ex, $"{ComponentName}: Error writing to SignalR adapter ({Address})."));
- return;
- }
- }
- }
-
- /// <summary>
- /// Connects the transport component.
- /// </summary>
- /// <returns></returns>
- public override Task Connect()
- {
- if (State != TransportComponentState.Connected)
- {
- LogManager.Log($"{ComponentName}: Connecting SignalR adapter...");
-
- bool completed = false;
-
- TaskCompletionSource<object> completionSource = new TaskCompletionSource<object>();
-
- _connection = new HubConnection(Url);
- _proxy = _connection.CreateHubProxy(Hub);
-
- Core.Threading.TimeoutTask.StartNew(() =>
- {
- if (!completed)
- {
- completed = true;
- completionSource.SetException(new TimeoutException("Could not reach the remote machine after the given timeout."));
- }
-
- }, ConnectionTimeout);
-
- if (Mode == SignalRTransportAdapterMode.CreateSession)
- {
- _proxy.On("SessionCreated", () =>
- {
- try
- {
- if (!completed)
- {
- completed = true;
-
- LogManager.Log($"{ComponentName}: SignalR adapter session created ({SessionID})...");
- LogManager.Log($"{ComponentName}: SingalR adapter connected.");
- State = TransportComponentState.Connected;
-
- StartPushThread();
-
- completionSource.SetResult(true);
- }
- }
- catch (Exception ex)
- {
- if (!completed)
- {
- LogManager.Log(ex, $"{ComponentName}: Error occurred after session created.");
- completed = true;
- completionSource.SetException(ex);
- }
- }
- });
- }
-
- _connection.StateChanged += async (x) =>
- {
- try
- {
- if (x.NewState == ConnectionState.Connected)
- {
- if (Mode == SignalRTransportAdapterMode.CreateSession)
- {
- LogManager.Log($"{ComponentName}: Creating SignalR adapter Session...");
- SessionID = await _proxy.Invoke<String>("CreateSession", SerialNumber);
- }
- else
- {
- LogManager.Log($"{ComponentName}: Joining SignalR adapter session ({SessionID})...");
- await _proxy.Invoke("JoinSession", SessionID);
- LogManager.Log($"{ComponentName}: SingalR adapter connected.");
- }
-
- if (Mode == SignalRTransportAdapterMode.JoinSession)
- {
- if (!completed)
- {
- completed = true;
- State = TransportComponentState.Connected;
- StartPushThread();
- completionSource.SetResult(true);
- }
- }
- }
- }
- catch (Exception ex)
- {
- if (!completed)
- {
- completed = true;
- LogManager.Log(ex, $"{ComponentName}: Error occurred on connection state changed event.");
- completionSource.SetException(ex);
- }
- }
- };
-
- _proxy.On<List<byte[]>>("DataAvailable", (dataCollection) => { OnDataAvailable(dataCollection); });
- _connection.Start();
-
- return completionSource.Task;
- }
-
- return Task.FromResult(true);
- }
-
- /// <summary>
- /// Disconnects the transport component.
- /// </summary>
- /// <returns></returns>
- public override Task Disconnect()
- {
- return Task.Factory.StartNew(() =>
- {
- if (State == TransportComponentState.Connected)
- {
- LogManager.Log($"{ComponentName}: Disconnecting SignalR adapter...");
- Core.Threading.TimeoutTask.StartNew(() =>
- {
- try
- {
- _connection.Stop();
- _connection.Dispose();
-
- try
- {
- if (_pushThread != null)
- {
- _pushThread.Abort();
- }
- }
- catch { }
- }
- catch (Exception ex)
- {
- LogManager.Log(ex, $"{ComponentName}: Error disposing SignalR adapter connection.");
- }
- }, TimeSpan.FromSeconds(5));
-
- LogManager.Log($"{ComponentName}: SignalR adapter disconnected.");
- State = TransportComponentState.Disconnected;
- }
- });
- }
-
- /// <summary>
- /// Handles the write operation.
- /// </summary>
- private void PushThreadMethod()
- {
- try
- {
- while (State == TransportComponentState.Connected)
- {
- List<byte[]> dataCollection = new List<byte[]>();
-
- var data = _pushQueue.BlockDequeue();
- var first = true;
-
- while (_pushQueue.Count > 0 || first)
- {
- if (!first)
- {
- data = _pushQueue.BlockDequeue();
- }
- else
- {
- first = false;
- }
-
- if (EnableCompression)
- {
- var compressed = Compression.GZipHelper.Compress(data);
- dataCollection.Add(compressed);
- }
- else
- {
- dataCollection.Add(data);
- }
- }
-
- if (dataCollection.Count > 0)
- {
- try
- {
- lock (_writeSyncObject)
- {
- _proxy.Invoke("Write", dataCollection).GetAwaiter().GetResult();
- }
- }
- catch (Exception ex)
- {
- OnFailed(LogManager.Log(ex, $"{ComponentName}: Error writing to SignalR adapter ({Address})."));
- return;
- }
- }
-
- Thread.Sleep(WriteInterval);
- }
- }
- catch (ThreadAbortException) { }
- }
-
- /// <summary>
- /// Called when new data is available.
- /// </summary>
- /// <param name="dataCollection">The data collection.</param>
- private void OnDataAvailable(List<byte[]> dataCollection)
- {
- try
- {
- foreach (var data in dataCollection)
- {
- if (EnableCompression)
- {
- try
- {
- var decompressed = Compression.GZipHelper.Decompress(data);
- OnDataAvailable(decompressed);
- }
- catch (Exception ex)
- {
- if (ex.Message.Contains("GZip"))
- {
- //Temporarily ignore, probably switching protocol definitions...
- OnDataAvailable(data);
- }
- else
- {
- throw ex;
- }
- }
- }
- else
- {
- OnDataAvailable(data);
- }
- }
- }
- catch (Exception ex)
- {
- OnFailed(ex);
- }
- }
-
- private void StartPushThread()
- {
- _pushQueue = new ProducerConsumerQueue<byte[]>();
- _pushThread = new Thread(PushThreadMethod);
- _pushThread.IsBackground = true;
- _pushThread.Name = $"{ComponentName} Push Thread";
- _pushThread.Start();
- }
- }
-}