using Google.Protobuf;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Tango.Core;
using Tango.Logging;
using Tango.PMR;
using Tango.PMR.Common;
using System.Reactive.Linq;
using System.ServiceModel;
using Tango.Transport.Encoders;
using Tango.PMR.Connection;
using Tango.Core.Threading;
namespace Tango.Transport
{
///
/// Represents an base class.
///
///
public abstract class TransporterBase : ExtendedObject, ITransporter
{
private const int MESSAGE_TOKEN_LENGTH = 36;
private ProducerConsumerQueue _sendingQueue;
private ConcurrentList _pendingRequests;
private ProducerConsumerQueue _arrivedResponses;
private Thread _pushThread;
private Thread _pullThread;
private Thread _keepAliveThread;
private ITransportAdapter _adapter;
private Dictionary _pendingResponses;
private DateTime _lastKeepAliveTime;
#region Events
///
/// Occurs when a new request message has been received.
///
public event EventHandler RequestReceived;
///
/// Occurs when a new response message has been received.
///
public event EventHandler PendingResponseReceived;
///
/// Occurs when component state changes.
///
public event EventHandler StateChanged;
#endregion
#region Properties
///
/// Gets or sets the used to read and write raw data.
///
public ITransportAdapter Adapter
{
get { return _adapter; }
set
{
var previous = _adapter;
_adapter = value;
OnAdapterChanged(previous, value);
RaisePropertyChangedAuto();
}
}
///
/// Gets or sets the transport encoder used to encode and decode tango messages.
///
public ITransportEncoder Encoder { get; set; }
private TransportComponentState _state;
///
/// Gets the component state.
///
public TransportComponentState State
{
get { return _state; }
protected set
{
if (_state != value)
{
_state = value;
OnStateChanged(_state);
}
}
}
///
/// Gets or sets the request timeout.
///
public TimeSpan RequestTimeout { get; set; }
private bool _useKeepAlive;
///
/// Gets or sets a value indicating whether to use a keep alive mechanism.
///
public bool UseKeepAlive
{
get { return _useKeepAlive; }
set
{
_useKeepAlive = value;
RaisePropertyChangedAuto();
if (_useKeepAlive)
{
LogManager.Log("KeepAlive is activated...");
}
else
{
LogManager.Log("KeepAlive is deactivated.");
}
}
}
///
/// Gets or sets the keep alive timeout.
///
public TimeSpan KeepAliveTimeout { get; set; }
///
/// Gets or sets the keep alive retries.
///
public int KeepAliveRetries { get; set; }
///
/// Gets or sets a value indicating whether to auto respond to keep alive requests.
///
public bool EnableKeepAliveAutoResponse { get; set; }
///
/// Gets or sets a value indicating whether the transporter will get in to a failed state if any adapter has failed.
///
public bool FailsWithAdapter { get; set; }
///
/// Gets the last failed state exception/reason.
///
public Exception FailedStateException { get; private set; }
#endregion
#region Virtual Methods
///
/// Called when the has changed.
///
/// The adapter.
protected async virtual void OnAdapterChanged(ITransportAdapter oldAdapter, ITransportAdapter newAdapter)
{
if (oldAdapter != newAdapter)
{
_pendingRequests.Clear();
_pendingResponses.Clear();
_arrivedResponses = new ProducerConsumerQueue();
_sendingQueue = new ProducerConsumerQueue();
}
if (oldAdapter != null)
{
oldAdapter.StateChanged -= OnAdapterStateChanged;
oldAdapter.DataAvailable -= OnAdapterDataAvailable;
}
if (newAdapter != null)
{
LogManager.Log(String.Format("Adapter Changed: Type = {0}, Address = {1}, State = {2}", newAdapter.GetType().Name, newAdapter.Address, newAdapter.State));
newAdapter.StateChanged -= OnAdapterStateChanged;
newAdapter.DataAvailable -= OnAdapterDataAvailable;
newAdapter.StateChanged += OnAdapterStateChanged;
newAdapter.DataAvailable += OnAdapterDataAvailable;
if (State == TransportComponentState.Connected && newAdapter.State == TransportComponentState.Disconnected)
{
await newAdapter.Connect();
}
}
else
{
LogManager.Log("Adapter Changed: null");
}
}
///
/// Called when the current adapter state has changed.
///
/// The sender.
/// The e.
protected virtual void OnAdapterStateChanged(object sender, TransportComponentState e)
{
if (e == TransportComponentState.Failed && FailsWithAdapter)
{
OnFailed(new CommunicationException("The adapter has failed. Going into a failed state..."));
}
}
///
/// Called when there is data available from the adapter.
///
/// The sender.
/// The data.
protected virtual void OnAdapterDataAvailable(object sender, byte[] data)
{
_lastKeepAliveTime = DateTime.Now;
EnqueueMessageIn(data);
}
///
/// Called when the component has failed.
///
/// The ex.
protected virtual void OnFailed(Exception ex)
{
FailedStateException = ex;
State = TransportComponentState.Failed;
LogManager.Log(ex, "Transporter failed.");
Disconnect().Wait();
}
///
/// Called when a new request has been received.
///
/// The request.
protected virtual void OnRequestReceived(MessageContainer container)
{
RequestReceived?.Invoke(this, container);
}
///
/// Called when a new response has been received.
///
/// The request.
protected virtual void OnResponseReceived(MessageContainer container)
{
PendingResponseReceived?.Invoke(this, container);
}
///
/// Called when the component state has changed.
///
/// The state.
protected virtual void OnStateChanged(TransportComponentState state)
{
StateChanged?.Invoke(this, state);
}
#endregion
#region Constructors
///
/// Initializes a new instance of the class.
///
public TransporterBase()
{
Encoder = new ProtoEncoder();
_pendingResponses = new Dictionary();
_sendingQueue = new ProducerConsumerQueue();
_pendingRequests = new ConcurrentList();
_arrivedResponses = new ProducerConsumerQueue();
RequestTimeout = TimeSpan.FromSeconds(5);
EnableKeepAliveAutoResponse = true;
KeepAliveTimeout = TimeSpan.FromSeconds(2);
KeepAliveRetries = 1;
}
///
/// Initializes a new instance of the class.
///
/// The transport adapter.
public TransporterBase(ITransportAdapter adapter) : this()
{
Adapter = adapter;
}
#endregion
#region Public Methods
///
/// Clears all message queues.
///
public void ClearQueues()
{
_sendingQueue = new ProducerConsumerQueue();
_pendingRequests = new ConcurrentList();
_arrivedResponses = new ProducerConsumerQueue();
}
///
/// Connects the transport component.
///
///
public virtual async Task Connect()
{
if (Adapter != null)
{
await Adapter.Connect();
}
State = TransportComponentState.Connected;
StartThreads();
LogManager.Log("Transporter Connected...");
}
///
/// Disconnects the transport component.
///
///
public virtual async Task Disconnect()
{
State = TransportComponentState.Disconnected;
try
{
if (_pullThread != null)
{
_pullThread.Abort();
_pushThread.Abort();
_keepAliveThread.Abort();
}
}
catch { }
if (Adapter != null)
{
await Adapter.Disconnect();
}
LogManager.Log("Transporter Disconnected...");
}
///
/// Sends a request.
///
/// The request.
/// Optional timeout. If not specified will use the .
///
public Task SendRequest(IMessage request, TimeSpan? timeout = null)
{
String requestName = request.GetType().Name;
String responseName = requestName.Replace("Request", "Response");
MessageContainer container = new MessageContainer();
container.Token = Guid.NewGuid().ToString();
container.Data = request.ToByteString();
container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
container.Type = MessageFactory.ParseMessageType(requestName);
LogManager.Log("Queuing request message: " + requestName + " Token: " + container.Token, LogCategory.Debug);
LogManager.Log("Expected response: " + responseName, LogCategory.Debug);
if (State != TransportComponentState.Connected)
{
throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}."));
}
TaskCompletionSource source = new TaskCompletionSource();
TransportMessage message = new TransportMessage(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), source);
message.ActivateTimeout = () =>
{
TimeoutTask.StartNew(() =>
{
if (!source.Task.IsCompleted)
{
TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
LogManager.Log(ex);
LogManager.Log("Setting request task exception...", LogCategory.Debug);
source.SetException(ex);
}
}, timeout != null ? timeout.Value : RequestTimeout);
};
EnqueueMessageOut(message);
return source.Task;
}
///
/// Sends the response.
///
/// The response.
/// The token.
/// The completed.
/// The error code.
/// The error message.
///
/// Matching request token was not found!
public Task SendResponse(IMessage response, string token, bool? completed = default(bool?), ErrorCode? errorCode = default(ErrorCode?), string errorMessage = null)
{
String responseName = response.GetType().Name;
MessageContainer container = new MessageContainer();
container.Token = token;
container.Data = response.ToByteString();
container.Type = MessageFactory.ParseMessageType(responseName);
if (errorCode.HasValue)
{
container.Error = errorCode.Value;
}
if (errorMessage != null)
{
container.ErrorMessage = errorMessage;
}
if (completed.HasValue)
{
container.Completed = completed.Value;
}
return SendResponse(container);
}
///
/// Sends the request.
///
/// The container.
///
public Task SendRequest(MessageContainer container)
{
String responseName = container.Type.ToString().Replace("Request", "Response");
TimeSpan? timeout = GetContainerTimeoutOrDefault(container);
LogManager.Log("Queuing request message: " + container.Type + " Token: " + container.Token, LogCategory.Debug);
LogManager.Log("Expected response: " + responseName, LogCategory.Debug);
if (State != TransportComponentState.Connected)
{
throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}."));
}
TaskCompletionSource source = new TaskCompletionSource();
TransportMessage message = new TransportMessage(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), source);
message.ActivateTimeout = () =>
{
TimeoutTask.StartNew(() =>
{
if (!source.Task.IsCompleted)
{
TimeoutException ex = new TimeoutException("Request message: " + container.Type + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
LogManager.Log(ex);
LogManager.Log("Setting request task exception...", LogCategory.Debug);
source.SetException(ex);
}
}, timeout != null ? timeout.Value : RequestTimeout);
};
EnqueueMessageOut(message);
return source.Task;
}
///
/// Sends the response.
///
/// The container.
///
/// Matching request token was not found!
public Task SendResponse(MessageContainer container)
{
String token = container.Token;
LogManager.Log("Queuing response message: " + container.Type, LogCategory.Debug);
PendingResponse pendingResponse = null;
if (State != TransportComponentState.Connected)
{
throw LogManager.Log(new InvalidOperationException($"Could not send the response while transporter state is {State}."));
}
LogManager.Log("Searching for matching request token: " + token, LogCategory.Debug);
if (_pendingResponses.TryGetValue(token, out pendingResponse))
{
LogManager.Log("Found matching request token: " + token, LogCategory.Debug);
if (!pendingResponse.IsContinuous)
{
LogManager.Log("Removing matching request token.", LogCategory.Debug);
_pendingResponses.Remove(token);
}
else if (container.Completed)
{
LogManager.Log("Response completed. Removing matching request token.", LogCategory.Debug);
_pendingResponses.Remove(token);
}
}
else
{
//This should never happen.
throw LogManager.Log(new InvalidOperationException("Matching request token was not found!"), LogCategory.Critical);
}
TaskCompletionSource