using Google.Protobuf; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; using Tango.Core; using Tango.Logging; using Tango.PMR; using Tango.PMR.Common; using System.Reactive.Linq; using System.ServiceModel; using Tango.Transport.Encoders; using Tango.PMR.Connection; using Tango.Core.Threading; using System.IO; using Tango.Core.ExtensionMethods; using Tango.PMR.Integration; using Newtonsoft.Json; using System.Diagnostics; using System.Reactive.Concurrency; namespace Tango.Transport { /// /// Represents an base class. /// /// public abstract class TransporterBase : ExtendedObject, ITransporter { protected static long _component_counter = 1; private class RequestHandler { public Type RequestType { get; set; } public Action Callback { get; set; } public object RegisteredCallback { get; set; } } private const int MESSAGE_TOKEN_LENGTH = 36; private PriorityProducerConsumerQueue _sendingQueue; private ConcurrentList _pendingRequests; private ProducerConsumerQueue _arrivedResponses; private Thread _pushThread; private Thread _pullThread; private Thread _keepAliveThread; private ITransportAdapter _adapter; private Dictionary _pendingResponses; private DateTime _lastKeepAliveTime; private List _requestHandlers; private static JsonSerializerSettings _genericMessageSettings = new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All, }; #region Events /// /// Occurs when a new request message has been received. /// public event EventHandler RequestReceived; /// /// Occurs when a new response message has been received. /// public event EventHandler PendingResponseReceived; /// /// Occurs when component state changes. /// public event EventHandler StateChanged; /// /// Occurs when a request has been sent. /// public event EventHandler RequestSent; /// /// Occurs when a request response has been received. /// public event EventHandler ResponseReceived; /// /// Occurs when a request has failed. /// public event EventHandler RequestFailed; #endregion #region Properties /// /// Gets or sets the name of the transport component. /// public String ComponentName { get; set; } /// /// Gets or sets the used to read and write raw data. /// public ITransportAdapter Adapter { get { return _adapter; } set { var previous = _adapter; _adapter = value; OnAdapterChanged(previous, value); RaisePropertyChangedAuto(); } } /// /// Gets or sets the transport encoder used to encode and decode tango messages. /// public ITransportEncoder Encoder { get; set; } private TransportComponentState _state; /// /// Gets the component state. /// public TransportComponentState State { get { return _state; } protected set { if (_state != value) { _state = value; OnStateChanged(_state); } } } /// /// Gets or sets the request timeout. /// public TimeSpan RequestTimeout { get; set; } private bool _useKeepAlive; /// /// Gets or sets a value indicating whether to use a keep alive mechanism. /// public bool UseKeepAlive { get { return _useKeepAlive; } set { _useKeepAlive = value; RaisePropertyChangedAuto(); if (_useKeepAlive) { LogManager.Log($"{GetExtendedComponentName()}: KeepAlive is activated..."); } else { LogManager.Log($"{GetExtendedComponentName()}: KeepAlive is deactivated."); } } } /// /// Gets or sets the keep alive timeout. /// public TimeSpan KeepAliveTimeout { get; set; } /// /// Gets or sets the keep alive retries. /// public int KeepAliveRetries { get; set; } /// /// Gets or sets a value indicating whether to auto respond to keep alive requests. /// public bool EnableKeepAliveAutoResponse { get; set; } /// /// Gets or sets a value indicating whether the transporter will get in to a failed state if any adapter has failed. /// public bool FailsWithAdapter { get; set; } /// /// Gets the last failed state exception/reason. /// public Exception FailedStateException { get; private set; } private GenericMessageProtocol _genericProtocol; /// /// Gets or sets the generic protocol used to serialize/deserialize generic messages. /// public GenericMessageProtocol GenericProtocol { get { return _genericProtocol; } set { _genericProtocol = value; RaisePropertyChangedAuto(); } } #endregion #region Virtual Methods /// /// Called when the has changed. /// /// The adapter. protected async virtual void OnAdapterChanged(ITransportAdapter oldAdapter, ITransportAdapter newAdapter) { if (oldAdapter != newAdapter) { _pendingRequests.Clear(); _pendingResponses.Clear(); _arrivedResponses = new ProducerConsumerQueue(); _sendingQueue = new PriorityProducerConsumerQueue(); } if (oldAdapter != null) { oldAdapter.StateChanged -= OnAdapterStateChanged; oldAdapter.DataAvailable -= OnAdapterDataAvailable; } if (newAdapter != null) { LogManager.Log($"{GetExtendedComponentName()}: Adapter Changed: Type = {newAdapter.GetType().Name}, Address = {newAdapter.Address}, State = {newAdapter.State}"); newAdapter.StateChanged -= OnAdapterStateChanged; newAdapter.DataAvailable -= OnAdapterDataAvailable; newAdapter.StateChanged += OnAdapterStateChanged; newAdapter.DataAvailable += OnAdapterDataAvailable; if (State == TransportComponentState.Connected && newAdapter.State == TransportComponentState.Disconnected) { await newAdapter.Connect(); } } else { LogManager.Log($"{GetExtendedComponentName()}: Adapter Changed: null"); } } /// /// Called when the current adapter state has changed. /// /// The sender. /// The e. protected virtual void OnAdapterStateChanged(object sender, TransportComponentState e) { if (e == TransportComponentState.Failed && FailsWithAdapter) { OnFailed(new CommunicationException($"The adapter has failed with exception '{Adapter.FailedStateException.Message}' and the transporter is configured to fail with the adapter.")); } } /// /// Called when there is data available from the adapter. /// /// The sender. /// The data. protected virtual void OnAdapterDataAvailable(object sender, byte[] data) { _lastKeepAliveTime = DateTime.Now; EnqueueMessageIn(data); } /// /// Called when the component has failed. /// /// The ex. protected virtual async void OnFailed(Exception ex) { if (State != TransportComponentState.Failed) { FailedStateException = ex; LogManager.Log(ex, $"{GetExtendedComponentName()}: Transporter failed."); State = TransportComponentState.Failed; await OnPostDisconnection(); } else { LogManager.Log(ex, LogCategory.Warning, $"{GetExtendedComponentName()}: OnFailed called while state is already failed!"); } } protected virtual async Task OnPostDisconnection() { try { if (_pullThread != null) { _pullThread.Abort(); _pushThread.Abort(); _keepAliveThread.Abort(); } } catch { } if (Adapter != null) { await Adapter.Disconnect(); } NotifyContinuousRequestMessagesDisconnection(); } /// /// Called when a new request has been received. /// /// The request. protected virtual void OnRequestReceived(RequestReceivedEventArgs e) { var container = e.Container; if (_requestHandlers.Count > 0) { if (container.Type != MessageType.GenericRequest) { var handlers = _requestHandlers.Where(x => x.RequestType.Name == container.Type.ToOriginalName()).ToList(); if (handlers.Count > 0) //Handle { e.Handled = true; var request = MessageFactory.ExtractMessageFromContainer(container); foreach (var handler in handlers) { try { handler.Callback.Invoke(this, request, container.Token); } catch { //Ignore any exception on the client side. } } } } else { var genericRequest = MessageFactory.ExtractMessageFromContainer(container); var handlers = _requestHandlers.Where(x => x.RequestType.AssemblyQualifiedName == genericRequest.Type).ToList(); if (handlers.Count > 0) { e.Handled = true; var innerRequest = GenericMessageSerializer.DeserializeFromByteString(handlers[0].RequestType, genericRequest.Data, GenericProtocol); foreach (var handler in handlers) { try { handler.Callback.Invoke(this, innerRequest, container.Token); } catch { //Ignore any exception on the client side. } } } } } RequestReceived?.Invoke(this, e); } /// /// Called when a new response has been received. /// /// The request. protected virtual void OnResponseReceived(MessageContainer container) { PendingResponseReceived?.Invoke(this, container); } /// /// Called when the component state has changed. /// /// The state. protected virtual void OnStateChanged(TransportComponentState state) { StateChanged?.Invoke(this, state); } /// /// Returns a string representing the component name. If an adapter is attached it will be part of the string. /// /// protected virtual String GetExtendedComponentName() { return Adapter != null ? $"{ComponentName} ({Adapter.Address})" : ComponentName; } /// /// Notifies all the continuous request messages about disconnection. /// protected virtual void NotifyContinuousRequestMessagesDisconnection() { LogManager.Log("Notifying all continuous request messages about disconnection..."); foreach (var request in _pendingRequests.ToList().Where(x => x.Direction == TransportMessageDirection.Request && x.IsContinuous)) { try { _pendingRequests.Remove(request); LogManager.Log($"Notifying continuous request '{(request.Message as ITangoMessage).Type}'..."); OnRequestFailed(request, new TransporterDisconnectedException("Transporter disconnected.")); request.SetException(new TransporterDisconnectedException("Transporter disconnected.")); } catch (Exception e) { System.Diagnostics.Debug.WriteLine(e.ToString()); } } } /// /// Called when the request has been sent /// /// The request. private void OnRequestSent(TransportMessageBase request) { if (request.ShouldLog) { IMessage message = request.GetActualMessage() as IMessage; if (message != null) { RequestSent?.Invoke(this, message); } } } /// /// Called when the response has been received /// /// The response. private void OnResponseReceived(IMessage response) { if (response != null) { ResponseReceived?.Invoke(this, response); } } /// /// Called when the request has been failed /// /// The request. private void OnRequestFailed(TransportMessageBase request, Exception exception) { IMessage message = request.GetActualMessage() as IMessage; if (message != null) { RequestFailed?.Invoke(this, new RequestFailedEventArgs(message, exception)); } } #endregion #region Constructors /// /// Initializes a new instance of the class. /// public TransporterBase() { ComponentName = "Not Set"; Encoder = new ProtoEncoder(); _pendingResponses = new Dictionary(); _sendingQueue = new PriorityProducerConsumerQueue(); _pendingRequests = new ConcurrentList(); _arrivedResponses = new ProducerConsumerQueue(); _requestHandlers = new List(); RequestTimeout = TimeSpan.FromSeconds(5); EnableKeepAliveAutoResponse = true; KeepAliveTimeout = TimeSpan.FromSeconds(2); KeepAliveRetries = 1; } /// /// Initializes a new instance of the class. /// /// The transport adapter. public TransporterBase(ITransportAdapter adapter) : this() { Adapter = adapter; } #endregion #region Public Methods /// /// Clears all message queues. /// public void ClearQueues() { _sendingQueue = new PriorityProducerConsumerQueue(); _pendingRequests = new ConcurrentList(); _arrivedResponses = new ProducerConsumerQueue(); } /// /// Connects the transport component. /// /// public virtual async Task Connect() { if (Adapter != null) { await Adapter.Connect(); } State = TransportComponentState.Connected; StartThreads(); LogManager.Log($"{GetExtendedComponentName()}: Transporter Connected..."); } /// /// Disconnects the transport component. /// /// public virtual async Task Disconnect() { if (State == TransportComponentState.Connected) { State = TransportComponentState.Disconnected; await OnPostDisconnection(); LogManager.Log($"{GetExtendedComponentName()}: Transporter Disconnected..."); } } #endregion #region Public Request Methods /// /// Sends a request. /// /// The request. /// Request configuration. /// /// public Task SendRequest(IMessage request, TransportRequestConfig config = null) { MessageContainer container = new MessageContainer(); container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; container.Type = MessageFactory.ParseMessageType(request.GetType().Name); return SendRequestInternal(container.Token, container, config).Task; } /// /// Sends the request. /// /// /// Request configuration. /// /// public Task SendRequest(MessageContainer container, TransportRequestConfig config = null) { return SendRequestInternal(container.Token, container, config).Task; } /// /// Sends a request. /// /// The type of the request. /// The type of the response. /// The request. /// Request configuration. /// /// public Task> SendRequest(TangoMessage request, TransportRequestConfig config = null) where Request : IMessage where Response : IMessage { return SendRequestInternal>(request.Container.Token, request, config).Task; } private TaskCompletionSource SendRequestInternal(String token, Object request, TransportRequestConfig config = null) { config = config ?? new TransportRequestConfig(); TaskCompletionSource source = new TaskCompletionSource(); String requestName = TransportMessageBase.GetActualMessageTypeName(request); Func toBytes = null; if (request is ITangoMessage tangoMessage) { toBytes = () => Encoder.Encode(tangoMessage); } else if (request is IMessage protoMessage) { toBytes = () => protoMessage.ToByteArray(); } TransportMessage message = new TransportMessage(token, request, TransportMessageDirection.Request, toBytes, source); message.ShouldLog = config.ShouldLog; message.Immidiate = config.Immediate; message.Priority = config.Priority; message.TransportComponentName = GetExtendedComponentName(); message.ThreadingMode = config.ThreadingMode; TimeSpan? timeout = config.Timeout; if (request is MessageContainer container) { timeout = GetContainerTimeoutOrDefault(container); } if (timeout == null) { timeout = RequestTimeout; } if (request is ITangoMessage tanMessage) { tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds; } String responseName = requestName.Replace("Request", "Response"); LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: {requestName} Token: {token}", LogCategory.Debug); LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug); if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); } Action timeoutAction = () => { if (!source.Task.IsCompleted) { TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds."); OnRequestFailed(message, ex); message.SetException(ex); } }; if (config.ThreadingMode == TransportThreadingMode.NewThread) { message.ActivateTimeout = () => { TimeoutTask.StartNew(() => { timeoutAction(); }, timeout.Value); }; } else { message.ActivateTimeout = () => { Task.Delay(timeout.Value).ContinueWith((x) => { timeoutAction(); }); }; } EnqueueMessageOut(message); return source; } /// /// Sends a request and expecting multiple response messages. /// /// /// Request configuration. /// /// public IObservable SendContinuousRequest(IMessage request, TransportContinuousRequestConfig config = null) { MessageContainer container = new MessageContainer(); container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); container.Type = MessageFactory.ParseMessageType(request.GetType().Name); //We need to assign this timeout because when the internal method detects a MessageContainer it will bypass the container continuous timeout assignment. container.ContinuousTimeout = config.ContinuousTimeout != null ? (UInt32)config.ContinuousTimeout.Value.TotalMilliseconds : 0; container.Continuous = true; return SendContinuousRequestInternal(container.Token, container, config); } /// /// Sends a request and expecting multiple response messages. /// /// The type of the request. /// The type of the response. /// /// Request configuration. /// /// public IObservable> SendContinuousRequest(TangoMessage request, TransportContinuousRequestConfig config = null) where Request : IMessage where Response : IMessage { request.Container.Continuous = true; request.Container.Completed = false; return SendContinuousRequestInternal>(request.Container.Token, request, config); } /// /// Sends a continuous request. /// /// The container. /// Request configuration. /// /// public IObservable SendContinuousRequest(MessageContainer container, TransportContinuousRequestConfig config = null) { return SendContinuousRequestInternal(container.Token, container, config); } private IObservable SendContinuousRequestInternal(String token, Object request, TransportContinuousRequestConfig config = null) { config = config ?? new TransportContinuousRequestConfig(); String requestName = TransportMessageBase.GetActualMessageTypeName(request); Func toBytes = null; if (request is ITangoMessage tangoMessage) { toBytes = () => Encoder.Encode(tangoMessage); } else if (request is IMessage protoMessage) { toBytes = () => protoMessage.ToByteArray(); } Subject subject = new Subject(); TransportMessage message = new TransportMessage(token, request, TransportMessageDirection.Request, toBytes, null); message.ShouldLog = config.ShouldLog; message.Immidiate = config.Immediate; message.Priority = config.Priority; message.TransportComponentName = GetExtendedComponentName(); message.IsContinuous = true; message.ContinuesResponseSubject = subject; TimeSpan? timeout = config.Timeout; TimeSpan? continuousTimeout = config.ContinuousTimeout; if (request is MessageContainer container) { timeout = GetContainerTimeoutOrDefault(container); if (timeout == null) { timeout = config.Timeout != null ? config.Timeout.Value : RequestTimeout; container.Timeout = (uint)timeout.Value.TotalMilliseconds; } continuousTimeout = GetContainerContinuousTimeoutOrDefault(container); } if (timeout == null) { timeout = RequestTimeout; } if (request is ITangoMessage tanMessage) { tanMessage.Container.Continuous = true; tanMessage.Container.Completed = false; tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds; tanMessage.Container.ContinuousTimeout = continuousTimeout != null ? (UInt32)continuousTimeout.Value.TotalMilliseconds : 0; } String responseName = requestName.Replace("Request", "Response"); LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: {requestName} Token: {token}", LogCategory.Debug); LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug); if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); } Action timeoutAction = () => { if (!message.AtLeastOneResponseReceived) { TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds."); OnRequestFailed(message, ex); message.SetException(ex); } if (continuousTimeout != null) { Task.Factory.StartNew(async () => { while (!message.Completed) { await Task.Delay(continuousTimeout.Value).ContinueWith((y) => { if (!message.Completed) { if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value) { TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Continuous request message '{requestName}' had failed to provide a response for a period of {continuousTimeout.Value.TotalSeconds} seconds and has timed out."); OnRequestFailed(message, ex); message.SetException(ex); return; } } }); } }); } }; if (config.ThreadingMode == TransportThreadingMode.NewThread) { message.ActivateTimeout = () => { TimeoutTask.StartNew(() => { timeoutAction(); }, timeout.Value); }; } else { message.ActivateTimeout = () => { Task.Delay(timeout.Value).ContinueWith((x) => { timeoutAction(); }); }; } EnqueueMessageOut(message); return subject.AsObservable(); } /// /// Sends a generic request of any type. /// /// The type of the request. /// The type of the response. /// The request. /// The configuration. /// public async Task SendGenericRequest(Request request, TransportRequestConfig config = null) where Request : class where Response : class { GenericRequest genericRequest = new GenericRequest(); genericRequest.Type = request.GetType().AssemblyQualifiedName; genericRequest.Data = GenericMessageSerializer.SerializeToByteString(request, GenericProtocol); var response = await SendRequest(genericRequest, config); var responseObject = GenericMessageSerializer.DeserializeFromByteString(response.Message.Data, GenericProtocol); return responseObject; } /// /// Sends a generic request and expecting multiple generic response messages. /// /// The type of the request. /// The type of the response. /// Request configuration. /// public IObservable SendGenericContinuousRequest(Request request, TransportContinuousRequestConfig config = null) where Request : class where Response : class { GenericRequest genericRequest = new GenericRequest(); genericRequest.Type = request.GetType().AssemblyQualifiedName; genericRequest.Data = GenericMessageSerializer.SerializeToByteString(request, GenericProtocol); Subject subject = new Subject(); SendContinuousRequest(genericRequest, config).Subscribe((response) => { try { var responseObject = GenericMessageSerializer.DeserializeFromByteString(response.Message.Data, GenericProtocol); subject.OnNext(responseObject); } catch (Exception ex) { Debugger.Break(); System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request continuous handler.\n{ex.ToString()}"); //Ignore exception at the client side. } }, (ex) => { try { subject.OnError(ex); } catch (Exception xx) { Debugger.Break(); System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request error handler.\n{xx.ToString()}"); //Ignore exception at the client side. } }, () => { try { subject.OnCompleted(); } catch (Exception ex) { Debugger.Break(); System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request completed handler.\n{ex.ToString()}"); //Ignore exception at the client side. } }); return subject.AsObservable(); } #endregion #region Public Response Methods /// /// Sends a response. /// /// The type of the response. /// The response. /// public Task SendResponse(TangoMessage response) where Response : IMessage { return SendResponse(response, response.Container.Token); } /// /// Sends the response. /// /// /// Request token. /// Response configuration. /// public Task SendResponse(IMessage response, String token, TransportResponseConfig config = null) { config = config ?? new TransportResponseConfig(); String responseName = response.GetType().Name; MessageContainer container = new MessageContainer(); container.Token = token; container.Data = response.ToByteString(); container.Type = MessageFactory.ParseMessageType(responseName); if (config.ErrorCode.HasValue) { container.Error = config.ErrorCode.Value; } if (config.ErrorMessage != null) { container.ErrorMessage = config.ErrorMessage; } container.EventCode = (int)config.EventType; container.Completed = config.Completed; return SendResponse(container); } /// /// Sends the response. /// /// The container. /// /// /// public Task SendResponse(MessageContainer container, TransportResponseConfig config = null) { config = config ?? new TransportResponseConfig(); if (_pushThread == null || _pushThread.ThreadState == System.Threading.ThreadState.Aborted) { throw new InvalidOperationException("Transporter push thread is not in a running state."); } String token = container.Token; LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + container.Type, LogCategory.Debug); PendingResponse pendingResponse = null; if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}.")); } LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug); if (_pendingResponses.TryGetValue(token, out pendingResponse)) { LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug); if (!pendingResponse.IsContinuous) { LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug); _pendingResponses.Remove(token); } else if (container.Completed) { LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug); _pendingResponses.Remove(token); } } else { //This should never happen. throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical); } TaskCompletionSource source = new TaskCompletionSource(); TransportMessage message = new TransportMessage(token, container, TransportMessageDirection.Response, () => container.ToByteArray(), source); message.ShouldLog = config.ShouldLog; message.Immidiate = config.Immediate; message.Priority = config.Priority; EnqueueMessageOut(message); return source.Task; } /// /// Sends a response. /// /// The type of the response. /// The response. /// Request token. /// Response configuration. /// /// /// Transporter push thread is not in a running state. /// public Task SendResponse(TangoMessage response, String token, TransportResponseConfig config = null) where Response : IMessage { config = config ?? new TransportResponseConfig(); if (_pushThread == null || _pushThread.ThreadState == System.Threading.ThreadState.Aborted) { throw new InvalidOperationException("Transporter push thread is not in a running state."); } response.Container.Token = token; response.Container.Completed = config.Completed; if (config.ErrorCode.HasValue) { response.Container.Error = config.ErrorCode.Value; } if (!String.IsNullOrEmpty(config.ErrorMessage)) { response.Container.ErrorMessage = config.ErrorMessage; } response.Container.EventCode = (int)config.EventType; LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + typeof(Response).Name, LogCategory.Debug); if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}.")); } PendingResponse pendingResponse = null; LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug); if (_pendingResponses.TryGetValue(token, out pendingResponse)) { LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug); if (!pendingResponse.IsContinuous) { LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug); _pendingResponses.Remove(token); } else if (response.Container.Completed) { LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug); _pendingResponses.Remove(token); } } else { //This should never happen. throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical); } TaskCompletionSource source = new TaskCompletionSource(); TransportMessage message = new TransportMessage(token, response, TransportMessageDirection.Response, () => Encoder.Encode(response), source); message.ShouldLog = config.ShouldLog; message.Immidiate = config.Immediate; message.Priority = config.Priority; EnqueueMessageOut(message); return source.Task; } /// /// Sends a generic response. /// /// The type of the response. /// The response. /// The request token. /// The response configuration. /// public async Task SendGenericResponse(Response response, String token, TransportResponseConfig config = null) where Response : class { GenericResponse genericResponse = new GenericResponse(); genericResponse.Type = response.GetType().AssemblyQualifiedName; genericResponse.Data = GenericMessageSerializer.SerializeToByteString(response, GenericProtocol); await SendResponse(genericResponse, token, config); } /// /// Sends a general error response agnostic to the type of request. /// /// The exception. /// Response configuration. /// public Task SendErrorResponse(Exception exception, String token) { return SendResponse(new ErrorResponse() { }, token, new TransportResponseConfig() { ErrorCode = ErrorCode.GeneralError, Completed = true, ErrorMessage = exception.FlattenMessage() }); } #endregion #region Request Handlers /// /// Registers a custom request handler. /// /// The type of the request. /// The callback. public void RegisterRequestHandler(RequestHandlerCallbackDelegate callback) where Request : class { RequestHandler handler = new RequestHandler(); handler.RequestType = typeof(Request); handler.RegisteredCallback = callback; handler.Callback = (transporter, obj, token) => { callback?.Invoke(transporter, obj as Request, token); }; _requestHandlers.Add(handler); } /// /// Unregisters a custom request handler. /// /// The type of the request. /// The callback. public void UnregisterRequestHandler(RequestHandlerCallbackDelegate callback) where Request : class { var handler = _requestHandlers.FirstOrDefault(x => (x.RegisteredCallback as RequestHandlerCallbackDelegate) == callback); if (handler != null) { _requestHandlers.Remove(handler); } } /// /// Copies this instance request handlers to the specified instance. /// /// The transporter to copy the handlers to. public void CopyRequestHandlers(ITransporter transporter) { foreach (var handler in _requestHandlers.ToList()) { (transporter as TransporterBase)._requestHandlers.Add(handler); _requestHandlers.Remove(handler); } } #endregion #region Private Methods /// /// Starts the pull and push threads. /// protected void StartThreads() { try { _pullThread = new Thread(PullThreadMethod); _pullThread.Name = $"{GetExtendedComponentName()} Pull Thread"; _pullThread.IsBackground = true; _pullThread.Start(); _pushThread = new Thread(PushThreadMethod); _pushThread.Name = $"{GetExtendedComponentName()} Push Thread"; _pushThread.IsBackground = true; _pushThread.Start(); _keepAliveThread = new Thread(KeepAliveThreadMethod); _keepAliveThread.Name = $"{GetExtendedComponentName()} KeepAlive Thread"; _keepAliveThread.IsBackground = true; _keepAliveThread.Start(); } catch (Exception ex) { LogManager.Log(ex, "Error starting transporter threads."); } } /// /// Gets the container timeout or default. /// /// The container. /// private TimeSpan? GetContainerTimeoutOrDefault(MessageContainer container) { return container.Timeout > 0 ? TimeSpan.FromMilliseconds(container.Timeout) : default(TimeSpan?); } /// /// Gets the container timeout or default. /// /// The container. /// private TimeSpan? GetContainerContinuousTimeoutOrDefault(MessageContainer container) { return container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?); } /// /// Enqueues the message and releases the push wait handle. /// /// The message. private void EnqueueMessageOut(TransportMessageBase message) { _sendingQueue.BlockEnqueue(message, message.Priority); } /// /// Enqueues the message and releases the pull wait handle. /// /// The data. private void EnqueueMessageIn(byte[] data) { _arrivedResponses.BlockEnqueue(data); } #endregion #region Push Thread /// /// Push thread loop. /// private void PushThreadMethod() { try { while (State == TransportComponentState.Connected) { TransportMessageBase message = _sendingQueue.BlockDequeue(); try { if (Adapter.State == TransportComponentState.Connected) { if (message.Token.Length != MESSAGE_TOKEN_LENGTH) { var ex = LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Invalid message token length: " + message.Token)); OnRequestFailed(message, ex); message.SetException(ex); continue; } if (message.Direction == TransportMessageDirection.Request) { if (message.ShouldLog) { LogManager.Log($"{GetExtendedComponentName()}: Sending request '{message.GetActualMessageTypeName()}'...\n{message.GetActualMessage().ToJsonString()}", LogCategory.Info); OnRequestSent(message); } lock (_pendingRequests) { _pendingRequests.Add(message); } } else { if (message.ShouldLog) { LogManager.Log($"{GetExtendedComponentName()}: Sending response '{message.GetActualMessageTypeName()}'...\n{message.GetActualMessage().ToJsonString()}", LogCategory.Info); } } Adapter.Write(message.Serialize(), message.Immidiate); message.ActivateTimeout?.Invoke(); LogManager.Log($"{GetExtendedComponentName()}: Message sent...", LogCategory.Debug, message.Message); } else { if (message.Direction == TransportMessageDirection.Request) { var ex = new InvalidOperationException($"{GetExtendedComponentName()}: Could not send message " + message.GetActualMessageTypeName() + ". Adapter is disconnected."); OnRequestFailed(message, ex); message.SetException(ex); } } if (message.Direction == TransportMessageDirection.Response) { message.SetResult(true, true); } } catch (ThreadAbortException) { Exception requestException = FailedStateException != null ? FailedStateException : new TransporterDisconnectedException("The transporter push thread has been aborted."); OnRequestFailed(message, requestException); message.SetException(requestException); } catch (Exception ex) { OnRequestFailed(message, ex); message.SetException(ex); } } } catch (ThreadAbortException) { LogManager.Log($"{GetExtendedComponentName()}: Push thread has been aborted."); } catch (Exception ex) { OnFailed(ex); } } #endregion #region Pull Thread /// /// Pull thread loop. /// private void PullThreadMethod() { try { while (State == TransportComponentState.Connected) { byte[] data = _arrivedResponses.BlockDequeue(); LogManager.Log($"{GetExtendedComponentName()}: Message received...", LogCategory.Debug); LogManager.Log($"{GetExtendedComponentName()}: Parsing message container...", LogCategory.Debug); MessageContainer container = null; try { container = Encoder.DecodeContainer(data); } catch (Exception ex) { LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing message container. Skipping incoming message..."); continue; } LogManager.Log($"{GetExtendedComponentName()}: Message was identified as " + container.Type + ".", LogCategory.Debug); if (container.Token.Length != MESSAGE_TOKEN_LENGTH) { LogManager.Log($"{GetExtendedComponentName()}: Invalid message token length received: " + container.Token, LogCategory.Error); continue; } LogManager.Log($"{GetExtendedComponentName()}: Searching for pending request token: " + container.Token, LogCategory.Debug); TransportMessageBase request = null; lock (_pendingRequests) { try { var requests = _pendingRequests.ToList().Where(x => x.Token == container.Token).ToList(); try { if (requests.Count > 1) { LogManager.Log($"{GetExtendedComponentName()}: {requests.Count} requests with the same token were detected - {requests.First().GetActualMessageTypeName()}.", LogCategory.Warning); } } catch (Exception ex) { LogManager.Log(ex, LogCategory.Warning, "Something bad happened please investigate."); } request = requests.LastOrDefault(); } catch (Exception ex) { LogManager.Log(ex, LogCategory.Warning, "Something bad happened please investigate."); } } if (request != null) { LogManager.Log($"{GetExtendedComponentName()}: Found pending request: " + request.GetActualMessageTypeName(), LogCategory.Debug); if (!request.IsContinuous) { LogManager.Log($"{GetExtendedComponentName()}: Pending request was identified as 'single response'. Removing pending request.", LogCategory.Debug); _pendingRequests.Remove(request); try { if (container.Error == ErrorCode.None) { var message = Encoder.Decode(data); if (request.ShouldLog) { try { String responseType = message.Type.ToString(); var messageContent = message.GetType().GetProperty("Message").GetValue(message); try { if (messageContent.GetType() == typeof(GenericResponse)) { Type genericType = Type.GetType((messageContent as GenericResponse).Type); responseType = genericType.Name; messageContent = GenericMessageSerializer.DeserializeFromByteString(genericType, (messageContent as GenericResponse).Data, GenericProtocol); } } catch { } LogManager.Log($"{GetExtendedComponentName()}: Response received '{responseType}'...\n{messageContent.ToJsonString()}", LogCategory.Info); OnResponseReceived(messageContent as IMessage); } catch { LogManager.Log("Error logging response received.", LogCategory.Warning); } } LogManager.Log($"{GetExtendedComponentName()}: Parsing inner response message and setting pending request task result...", LogCategory.Debug, message); request.SetResult(message, true); LogManager.Log($"{GetExtendedComponentName()}: Message enquirer released...", LogCategory.Debug); } else { var ex = new ResponseErrorException(container, request.GetActualMessageTypeName()); OnRequestFailed(request, ex); request.SetException(ex); } } catch (Exception ex) { OnRequestFailed(request, ex); request.SetException(LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing response message.")); } } else { LogManager.Log($"{GetExtendedComponentName()}: Pending request was identified as 'continuous response'. keeping pending request.", LogCategory.Debug); try { if (container.Error == ErrorCode.None) { var message = Encoder.Decode(data); if (request.ShouldLog && !request.AtLeastOneResponseReceived) { try { var messageContent = message.GetType().GetProperty("Message").GetValue(message); LogManager.Log($"{GetExtendedComponentName()}: Response received '{message.Type}'...\n{messageContent.ToJsonString()}", LogCategory.Info); OnResponseReceived(messageContent as IMessage); } catch { LogManager.Log("Error logging response received.", LogCategory.Warning); } } LogManager.Log($"{GetExtendedComponentName()}: Parsing inner response message and invoking continuous response callback...", LogCategory.Debug, message); if (container.Completed) { LogManager.Log($"{GetExtendedComponentName()}: Continuous sequence completed.", LogCategory.Debug); _pendingRequests.Remove(request); } request.SetResult(message, container.Completed); } else if (container.Error == ErrorCode.ContinuousResponseAborted) { String m = $"{GetExtendedComponentName()}: Continuous response " + container.Type + " has been aborted: " + container.Error.ToString(); LogManager.Log(m, LogCategory.Info); _pendingRequests.Remove(request); OnRequestFailed(request, new ContinuousResponseAbortedException(container, m)); request.SetException(new ContinuousResponseAbortedException(container, m)); } else { LogManager.Log($"{GetExtendedComponentName()}: Continuous response has returned with error: " + container.Error.ToString(), LogCategory.Warning); _pendingRequests.Remove(request); var exx = new ResponseErrorException(container, request.GetActualMessageTypeName()); OnRequestFailed(request, exx); request.SetException(exx); } } catch (Exception ex) { LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing response message."); } } try { Task.Factory.StartNew(() => OnResponseReceived(container)); } catch { //Ignore any exceptions that may raise on the client side.. } } else { if (container.Type.ToString().EndsWith("Response")) { LogManager.Log($"{GetExtendedComponentName()}: A response message with no awaiting request was identified. {container.Type}, Token: {container.Token}. Message ignored.", LogCategory.Warning); continue; } LogManager.Log($"{GetExtendedComponentName()}: Message was identified as a new request message: " + container.Type.ToString(), LogCategory.Debug); try { LogManager.Log($"{GetExtendedComponentName()}: Saving request token: " + container.Token, LogCategory.Debug); _pendingResponses.Add(container.Token, new PendingResponse(container.Continuous)); if (container.Type == MessageType.KeepAliveRequest && EnableKeepAliveAutoResponse) { LogManager.Log($"{GetExtendedComponentName()}: Submitting keep alive response...", LogCategory.Debug); try { SendResponse(new KeepAliveResponse(), container.Token, new TransportResponseConfig() { Priority = QueuePriority.High }); } catch { } } else { LogManager.Log($"{GetExtendedComponentName()}: Invoking RequestReceived event...", LogCategory.Debug, container); try { Task.Factory.StartNew(() => OnRequestReceived(new RequestReceivedEventArgs(container))); } catch { //Ignore any exceptions that may raise on the client side.. } } } catch (Exception ex) { LogManager.Log(ex); } } } } catch (ThreadAbortException) { LogManager.Log($"{GetExtendedComponentName()}: Pull thread has been aborted."); } catch (Exception ex) { OnFailed(ex); } } #endregion #region Keep Alive Thread /// /// Responsible for sending keep alive messages. /// private void KeepAliveThreadMethod() { bool aborted = false; int retryCounter = KeepAliveRetries; try { Thread.Sleep(2000); while (State == TransportComponentState.Connected) { try { Thread.Sleep(2000); if (UseKeepAlive) { if (_arrivedResponses.Count == 0) { retryCounter--; if (State == TransportComponentState.Connected) { var response = SendRequest(new KeepAliveRequest(), new TransportRequestConfig() { Timeout = KeepAliveTimeout, Priority = QueuePriority.High }).Result; retryCounter = KeepAliveRetries; } else { continue; } } else { LogManager.Log($"{GetExtendedComponentName()}: Keep alive request was skipped due to busy response queue.", LogCategory.Debug); } } } catch (Exception ex) when (ex is TimeoutException || ex is AggregateException) { if (State != TransportComponentState.Connected || aborted || (Adapter.State == TransportComponentState.Failed && FailsWithAdapter)) return; if (UseKeepAlive) { if (DateTime.Now - _lastKeepAliveTime > KeepAliveTimeout) { if (retryCounter <= 0) { OnFailed(new KeepAliveException("The transporter has not received a KeepAlive response within the given time.")); return; } else { LogManager.Log($"The transporter has not received a KeepAlive response within the given time. Retrying ({retryCounter}/{KeepAliveRetries})...", LogCategory.Warning); } } else { retryCounter = KeepAliveRetries; LogManager.Log($"{GetExtendedComponentName()}: The transporter has not received a KeepAlive response within the given time, but was rescued due to other message received within the given time.", LogCategory.Warning); } } } catch (ThreadAbortException) { aborted = true; LogManager.Log($"{GetExtendedComponentName()}: KeepAlive thread has been aborted."); return; } catch (Exception ex) { if (State != TransportComponentState.Connected || aborted) return; if (UseKeepAlive) { OnFailed(ex); return; } } } } catch (ThreadAbortException) { LogManager.Log($"{GetExtendedComponentName()}: KeepAlive thread has been aborted."); } } #endregion #region Override Methods /// /// Returns a that represents this instance. /// /// /// A that represents this instance. /// public override string ToString() { return this.GetType().Name + ", Adapter: " + (Adapter != null ? Adapter.ToString() : "Null") + ", Encoder: " + (Encoder != null ? Encoder.ToString() : "Null"); } #endregion #region Dispose /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Disconnect().Wait(); State = TransportComponentState.Disposed; } #endregion } }