using Microsoft.AspNet.SignalR.Client;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Tango.Core;
namespace Tango.Transport.Adapters
{
///
/// Represents an adapter for communicating via SignalR protocol.
///
///
public class SignalRTransportAdapter : TransportAdapterBase
{
private IHubProxy _proxy;
private HubConnection _connection;
private Thread _pushThread;
private ProducerConsumerQueue _pushQueue;
private object _writeSyncObject = new object();
///
/// Gets or sets the URL of the SignalR service.
///
public String Url { get; set; }
///
/// Gets or sets the SignalR hub name.
///
public String Hub { get; set; }
///
/// Gets or sets the serial number of the remote machine (Use only for ) mode.
///
public String SerialNumber { get; set; }
///
/// Gets or sets the remote session identifier.
///
public String SessionID { get; private set; }
///
/// Gets or sets the interval of write operation.
/// Unlike other adapters, the SignalR adapter accumulates multiple write operations into one chunk.
///
public TimeSpan WriteInterval { get; set; }
///
/// Gets or sets the adapter mode.
///
public SignalRTransportAdapterMode Mode { get; set; }
///
/// Gets or sets the adapter connection timeout.
///
public TimeSpan ConnectionTimeout { get; set; }
///
/// Initializes a new instance of the class.
///
public SignalRTransportAdapter() : base()
{
ConnectionTimeout = TimeSpan.FromSeconds(30);
WriteInterval = TimeSpan.FromMilliseconds(1);
ComponentName = $"SignalR Adapter {_component_counter++}";
}
///
/// Initializes a new instance of the class.
///
/// The service address.
/// The hub name.
/// The adapter mode.
/// The machine serial number (when creating session).
/// The session identifier (when joining session).
public SignalRTransportAdapter(String url, String hub, SignalRTransportAdapterMode mode, String serialNumber = null, String sessionID = null, String ipAddress = null) : this()
{
Url = url;
Hub = hub;
Mode = mode;
SerialNumber = serialNumber;
SessionID = sessionID;
Address = sessionID;
if (!String.IsNullOrWhiteSpace(ipAddress))
{
Address = ipAddress;
}
}
///
/// Writes the specified data to the stream.
///
/// The data.
/// Writes the data as soon as possible while ignoring any message queuing and batching.
public override void Write(byte[] data, bool immidiate = false)
{
TotalBytesSent += data.Length;
_totalBytes += data.Length;
AppendTransferRateBytes(data.Length);
if (!immidiate)
{
_pushQueue.BlockEnqueue(data);
}
else
{
try
{
lock (_writeSyncObject)
{
_proxy.Invoke("Write", new List() { data }).GetAwaiter().GetResult();
}
}
catch (Exception ex)
{
OnFailed(LogManager.Log(ex, $"{ComponentName}: Error writing to SignalR adapter ({Address})."));
return;
}
}
}
///
/// Connects the transport component.
///
///
public override Task Connect()
{
if (State != TransportComponentState.Connected)
{
LogManager.Log($"{ComponentName}: Connecting SignalR adapter...");
bool completed = false;
TaskCompletionSource