using Google.Protobuf; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; using Tango.Core; using Tango.Logging; using Tango.PMR; using Tango.PMR.Common; using System.Reactive.Linq; using System.ServiceModel; using Tango.Transport.Encoders; using Tango.PMR.Connection; using Tango.Core.Threading; namespace Tango.Transport { /// /// Represents an base class. /// /// public abstract class TransporterBase : ExtendedObject, ITransporter { private const int MESSAGE_TOKEN_LENGTH = 36; private ProducerConsumerQueue _sendingQueue; private ConcurrentList _pendingRequests; private ProducerConsumerQueue _arrivedResponses; private Thread _pushThread; private Thread _pullThread; private Thread _keepAliveThread; private ITransportAdapter _adapter; private Dictionary _pendingResponses; private DateTime _lastKeepAliveTime; #region Events /// /// Occurs when a new request message has been received. /// public event EventHandler RequestReceived; /// /// Occurs when a new response message has been received. /// public event EventHandler PendingResponseReceived; /// /// Occurs when component state changes. /// public event EventHandler StateChanged; #endregion #region Properties /// /// Gets or sets the used to read and write raw data. /// public ITransportAdapter Adapter { get { return _adapter; } set { var previous = _adapter; _adapter = value; OnAdapterChanged(previous, value); RaisePropertyChangedAuto(); } } /// /// Gets or sets the transport encoder used to encode and decode tango messages. /// public ITransportEncoder Encoder { get; set; } private TransportComponentState _state; /// /// Gets the component state. /// public TransportComponentState State { get { return _state; } protected set { if (_state != value) { _state = value; OnStateChanged(_state); } } } /// /// Gets or sets the request timeout. /// public TimeSpan RequestTimeout { get; set; } private bool _useKeepAlive; /// /// Gets or sets a value indicating whether to use a keep alive mechanism. /// public bool UseKeepAlive { get { return _useKeepAlive; } set { _useKeepAlive = value; RaisePropertyChangedAuto(); if (_useKeepAlive) { LogManager.Log("KeepAlive is activated..."); } else { LogManager.Log("KeepAlive is deactivated."); } } } /// /// Gets or sets the keep alive timeout. /// public TimeSpan KeepAliveTimeout { get; set; } /// /// Gets or sets the keep alive retries. /// public int KeepAliveRetries { get; set; } /// /// Gets or sets a value indicating whether to auto respond to keep alive requests. /// public bool EnableKeepAliveAutoResponse { get; set; } /// /// Gets or sets a value indicating whether the transporter will get in to a failed state if any adapter has failed. /// public bool FailsWithAdapter { get; set; } /// /// Gets the last failed state exception/reason. /// public Exception FailedStateException { get; private set; } #endregion #region Virtual Methods /// /// Called when the has changed. /// /// The adapter. protected async virtual void OnAdapterChanged(ITransportAdapter oldAdapter, ITransportAdapter newAdapter) { if (oldAdapter != newAdapter) { _pendingRequests.Clear(); _pendingResponses.Clear(); _arrivedResponses = new ProducerConsumerQueue(); _sendingQueue = new ProducerConsumerQueue(); } if (oldAdapter != null) { oldAdapter.StateChanged -= OnAdapterStateChanged; oldAdapter.DataAvailable -= OnAdapterDataAvailable; } if (newAdapter != null) { LogManager.Log(String.Format("Adapter Changed: Type = {0}, Address = {1}, State = {2}", newAdapter.GetType().Name, newAdapter.Address, newAdapter.State)); newAdapter.StateChanged -= OnAdapterStateChanged; newAdapter.DataAvailable -= OnAdapterDataAvailable; newAdapter.StateChanged += OnAdapterStateChanged; newAdapter.DataAvailable += OnAdapterDataAvailable; if (State == TransportComponentState.Connected && newAdapter.State == TransportComponentState.Disconnected) { await newAdapter.Connect(); } } else { LogManager.Log("Adapter Changed: null"); } } /// /// Called when the current adapter state has changed. /// /// The sender. /// The e. protected virtual void OnAdapterStateChanged(object sender, TransportComponentState e) { if (e == TransportComponentState.Failed && FailsWithAdapter) { OnFailed(new CommunicationException("The adapter has failed. Going into a failed state...")); } } /// /// Called when there is data available from the adapter. /// /// The sender. /// The data. protected virtual void OnAdapterDataAvailable(object sender, byte[] data) { _lastKeepAliveTime = DateTime.Now; EnqueueMessageIn(data); } /// /// Called when the component has failed. /// /// The ex. protected virtual void OnFailed(Exception ex) { FailedStateException = ex; State = TransportComponentState.Failed; LogManager.Log(ex, "Transporter failed."); Disconnect().Wait(); } /// /// Called when a new request has been received. /// /// The request. protected virtual void OnRequestReceived(MessageContainer container) { RequestReceived?.Invoke(this, container); } /// /// Called when a new response has been received. /// /// The request. protected virtual void OnResponseReceived(MessageContainer container) { PendingResponseReceived?.Invoke(this, container); } /// /// Called when the component state has changed. /// /// The state. protected virtual void OnStateChanged(TransportComponentState state) { StateChanged?.Invoke(this, state); } #endregion #region Constructors /// /// Initializes a new instance of the class. /// public TransporterBase() { Encoder = new ProtoEncoder(); _pendingResponses = new Dictionary(); _sendingQueue = new ProducerConsumerQueue(); _pendingRequests = new ConcurrentList(); _arrivedResponses = new ProducerConsumerQueue(); RequestTimeout = TimeSpan.FromSeconds(5); EnableKeepAliveAutoResponse = true; KeepAliveTimeout = TimeSpan.FromSeconds(2); KeepAliveRetries = 1; } /// /// Initializes a new instance of the class. /// /// The transport adapter. public TransporterBase(ITransportAdapter adapter) : this() { Adapter = adapter; } #endregion #region Public Methods /// /// Clears all message queues. /// public void ClearQueues() { _sendingQueue = new ProducerConsumerQueue(); _pendingRequests = new ConcurrentList(); _arrivedResponses = new ProducerConsumerQueue(); } /// /// Connects the transport component. /// /// public virtual async Task Connect() { if (Adapter != null) { await Adapter.Connect(); } State = TransportComponentState.Connected; StartThreads(); LogManager.Log("Transporter Connected..."); } /// /// Disconnects the transport component. /// /// public virtual async Task Disconnect() { State = TransportComponentState.Disconnected; try { if (_pullThread != null) { _pullThread.Abort(); _pushThread.Abort(); _keepAliveThread.Abort(); } } catch { } if (Adapter != null) { await Adapter.Disconnect(); } LogManager.Log("Transporter Disconnected..."); } /// /// Sends a request. /// /// The request. /// Optional timeout. If not specified will use the . /// public Task SendRequest(IMessage request, TimeSpan? timeout = null) { String requestName = request.GetType().Name; String responseName = requestName.Replace("Request", "Response"); MessageContainer container = new MessageContainer(); container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; container.Type = MessageFactory.ParseMessageType(requestName); LogManager.Log("Queuing request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); LogManager.Log("Expected response: " + responseName, LogCategory.Debug); if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); } TaskCompletionSource source = new TaskCompletionSource(); TransportMessage message = new TransportMessage(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), source); message.ActivateTimeout = () => { TimeoutTask.StartNew(() => { if (!source.Task.IsCompleted) { TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); LogManager.Log(ex); LogManager.Log("Setting request task exception...", LogCategory.Debug); source.SetException(ex); } }, timeout != null ? timeout.Value : RequestTimeout); }; EnqueueMessageOut(message); return source.Task; } /// /// Sends the response. /// /// The response. /// The token. /// The completed. /// The error code. /// The error message. /// /// Matching request token was not found! public Task SendResponse(IMessage response, string token, bool? completed = default(bool?), ErrorCode? errorCode = default(ErrorCode?), string errorMessage = null) { String responseName = response.GetType().Name; MessageContainer container = new MessageContainer(); container.Token = token; container.Data = response.ToByteString(); container.Type = MessageFactory.ParseMessageType(responseName); if (errorCode.HasValue) { container.Error = errorCode.Value; } if (errorMessage != null) { container.ErrorMessage = errorMessage; } if (completed.HasValue) { container.Completed = completed.Value; } return SendResponse(container); } /// /// Sends the request. /// /// The container. /// public Task SendRequest(MessageContainer container) { String responseName = container.Type.ToString().Replace("Request", "Response"); TimeSpan? timeout = GetContainerTimeoutOrDefault(container); LogManager.Log("Queuing request message: " + container.Type + " Token: " + container.Token, LogCategory.Debug); LogManager.Log("Expected response: " + responseName, LogCategory.Debug); if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); } TaskCompletionSource source = new TaskCompletionSource(); TransportMessage message = new TransportMessage(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), source); message.ActivateTimeout = () => { TimeoutTask.StartNew(() => { if (!source.Task.IsCompleted) { TimeoutException ex = new TimeoutException("Request message: " + container.Type + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); LogManager.Log(ex); LogManager.Log("Setting request task exception...", LogCategory.Debug); source.SetException(ex); } }, timeout != null ? timeout.Value : RequestTimeout); }; EnqueueMessageOut(message); return source.Task; } /// /// Sends the response. /// /// The container. /// /// Matching request token was not found! public Task SendResponse(MessageContainer container) { String token = container.Token; LogManager.Log("Queuing response message: " + container.Type, LogCategory.Debug); PendingResponse pendingResponse = null; if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"Could not send the response while transporter state is {State}.")); } LogManager.Log("Searching for matching request token: " + token, LogCategory.Debug); if (_pendingResponses.TryGetValue(token, out pendingResponse)) { LogManager.Log("Found matching request token: " + token, LogCategory.Debug); if (!pendingResponse.IsContinuous) { LogManager.Log("Removing matching request token.", LogCategory.Debug); _pendingResponses.Remove(token); } else if (container.Completed) { LogManager.Log("Response completed. Removing matching request token.", LogCategory.Debug); _pendingResponses.Remove(token); } } else { //This should never happen. throw LogManager.Log(new InvalidOperationException("Matching request token was not found!"), LogCategory.Critical); } TaskCompletionSource source = new TaskCompletionSource(); TransportMessage message = new TransportMessage(token, container, TransportMessageDirection.Response, () => container.ToByteArray(), source); EnqueueMessageOut(message); return source.Task; } /// /// Sends a request and expecting multiple response messages. /// /// The request. /// /// public IObservable SendContinuousRequest(IMessage request, TimeSpan? timeout = default(TimeSpan?)) { String requestName = request.GetType().Name; String responseName = requestName.Replace("Request", "Response"); MessageContainer container = new MessageContainer(); container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); container.Type = MessageFactory.ParseMessageType(requestName); container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; container.Continuous = true; LogManager.Log("Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); } Subject subject = new Subject(); LogManager.Log("Expected response: " + responseName, LogCategory.Debug); TransportMessage message = new TransportMessage(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), null) { IsContinuous = true, ContinuesResponseSubject = subject, }; message.ActivateTimeout = () => { TimeoutTask.StartNew(() => { if (!message.AtLeastOneResponseReceived) { TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); LogManager.Log(ex); LogManager.Log("Setting request exception...", LogCategory.Debug); message.SetException(ex); } }, timeout != null ? timeout.Value : RequestTimeout); }; EnqueueMessageOut(message); return subject.AsObservable(); } /// /// Sends a request. /// /// The type of the request. /// The type of the response. /// The request. /// Optional timeout. If not specified will use the . /// public Task> SendRequest(TangoMessage request, TimeSpan? timeout = null) where Request : IMessage where Response : IMessage { LogManager.Log("Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); LogManager.Log("Expected response: " + typeof(Response).Name, LogCategory.Debug); if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); } request.Container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; TaskCompletionSource> source = new TaskCompletionSource>(); TransportMessage> message = new TransportMessage>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source); message.ActivateTimeout = () => { TimeoutTask.StartNew(() => { if (!source.Task.IsCompleted) { TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); LogManager.Log(ex); LogManager.Log("Setting request task exception...", LogCategory.Debug); source.SetException(ex); } }, timeout != null ? timeout.Value : RequestTimeout); }; EnqueueMessageOut(message); return source.Task; } /// /// Sends a request and expecting multiple response messages. /// /// The type of the request. /// The type of the response. /// The request. /// /// public IObservable> SendContinuousRequest(TangoMessage request, TimeSpan? firstTimeout = null, TimeSpan? continousTimeout = null) where Request : IMessage where Response : IMessage { LogManager.Log("Queuing continuous request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); Subject> subject = new Subject>(); LogManager.Log("Expected response: " + typeof(Response).Name, LogCategory.Debug); if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); } request.Container.Continuous = true; request.Container.Completed = false; request.Container.Timeout = firstTimeout.HasValue ? (UInt32)firstTimeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; request.Container.ContinuousTimeout = continousTimeout.HasValue ? (UInt32)continousTimeout.Value.TotalMilliseconds : 0; TransportMessage> message = new TransportMessage>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), null) { IsContinuous = true, ContinuesResponseSubject = subject, }; message.ActivateTimeout = () => { TimeoutTask.StartNew(() => { if (!message.AtLeastOneResponseReceived) { TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (firstTimeout != null ? firstTimeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); LogManager.Log(ex); LogManager.Log("Setting request exception...", LogCategory.Debug); message.SetException(ex); } if (continousTimeout != null) { Task.Factory.StartNew(async () => { while (!message.Completed) { await Task.Delay(continousTimeout.Value).ContinueWith((y) => { if (!message.Completed) { if (DateTime.Now - message.LastResponseTime > continousTimeout.Value) { TimeoutException ex = new TimeoutException("Continuous request message: " + typeof(Request).Name + " had failed to provide a response for a period of " + (continousTimeout.Value.TotalSeconds) + " seconds and has timed out."); LogManager.Log(ex); LogManager.Log("Setting request exception...", LogCategory.Debug); message.SetException(ex); return; } } }); } }); } }, firstTimeout != null ? firstTimeout.Value : RequestTimeout); }; EnqueueMessageOut(message); return subject.AsObservable(); } /// /// Sends a continuous request. /// /// The container. /// public IObservable SendContinuousRequest(MessageContainer container) { TimeSpan? timeout = GetContainerTimeoutOrDefault(container); TimeSpan? continuousTimeout = container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?); String requestName = container.Type.ToString(); String responseName = requestName.Replace("Request", "Response"); LogManager.Log("Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); } Subject subject = new Subject(); LogManager.Log("Expected response: " + responseName, LogCategory.Debug); TransportMessage message = new TransportMessage(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), null) { IsContinuous = true, ContinuesResponseSubject = subject, }; message.ActivateTimeout = () => { TimeoutTask.StartNew(() => { if (!message.AtLeastOneResponseReceived) { TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); LogManager.Log(ex); LogManager.Log("Setting request exception...", LogCategory.Debug); message.SetException(ex); } if (continuousTimeout != null) { Task.Factory.StartNew(async () => { while (!message.Completed) { await Task.Delay(continuousTimeout.Value).ContinueWith((y) => { if (!message.Completed) { if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value) { TimeoutException ex = new TimeoutException("Continuous request message: " + requestName + " had failed to provide a response for a period of " + (continuousTimeout.Value.TotalSeconds) + " seconds and has timed out."); LogManager.Log(ex); LogManager.Log("Setting request exception...", LogCategory.Debug); message.SetException(ex); return; } } }); } }); } }, timeout != null ? timeout.Value : RequestTimeout); }; EnqueueMessageOut(message); return subject.AsObservable(); } /// /// Sends a response. /// /// The type of the response. /// The response. /// public Task SendResponse(TangoMessage response) where Response : IMessage { return SendResponse(response, response.Container.Token); } /// /// Sends a response for the specified token. /// /// The type of the response. /// The response. /// The token. /// The completed. /// The error code. /// The error message. /// /// Matching request token was not found! public Task SendResponse(TangoMessage response, String token, bool? completed = null, ErrorCode? errorCode = null, String errorMessage = null) where Response : IMessage { if (_pushThread == null || _pushThread.ThreadState == ThreadState.Aborted) { throw new InvalidOperationException("Transporter push thread is not in a running state."); } response.Container.Token = token; if (completed.HasValue) { response.Container.Completed = completed.Value; } if (errorCode.HasValue) { response.Container.Error = errorCode.Value; } if (!String.IsNullOrEmpty(errorMessage)) { response.Container.ErrorMessage = errorMessage; } LogManager.Log("Queuing response message: " + typeof(Response).Name, LogCategory.Debug); if (State != TransportComponentState.Connected) { throw LogManager.Log(new InvalidOperationException($"Could not send the response while transporter state is {State}.")); } PendingResponse pendingResponse = null; LogManager.Log("Searching for matching request token: " + token, LogCategory.Debug); if (_pendingResponses.TryGetValue(token, out pendingResponse)) { LogManager.Log("Found matching request token: " + token, LogCategory.Debug); if (!pendingResponse.IsContinuous) { LogManager.Log("Removing matching request token.", LogCategory.Debug); _pendingResponses.Remove(token); } else if (response.Container.Completed) { LogManager.Log("Response completed. Removing matching request token.", LogCategory.Debug); _pendingResponses.Remove(token); } } else { //This should never happen. throw LogManager.Log(new InvalidOperationException("Matching request token was not found!"), LogCategory.Critical); } TaskCompletionSource source = new TaskCompletionSource(); TransportMessage message = new TransportMessage(token, response, TransportMessageDirection.Response, () => Encoder.Encode(response), source); EnqueueMessageOut(message); return source.Task; } /// /// Sends a general error response agnostic to the type of request. /// /// The exception. /// The token. /// public Task SendErrorResponse(Exception exception, string token) { return SendResponse(new ErrorResponse() { }, token, true, ErrorCode.GeneralError, exception.Message); } #endregion #region Private Methods /// /// Starts the pull and push threads. /// protected void StartThreads() { _pullThread = new Thread(PullThreadMethod); _pullThread.IsBackground = true; _pullThread.Start(); _pushThread = new Thread(PushThreadMethod); _pushThread.IsBackground = true; _pushThread.Start(); _keepAliveThread = new Thread(KeepAliveThreadMethod); _keepAliveThread.IsBackground = true; _keepAliveThread.Start(); } /// /// Gets the container timeout or default. /// /// The container. /// private TimeSpan? GetContainerTimeoutOrDefault(MessageContainer container) { return container.Timeout > 0 ? TimeSpan.FromMilliseconds(container.Timeout) : default(TimeSpan?); } /// /// Enqueues the message and releases the push wait handle. /// /// The message. private void EnqueueMessageOut(TransportMessageBase message) { _sendingQueue.BlockEnqueue(message); } /// /// Enqueues the message and releases the pull wait handle. /// /// The data. private void EnqueueMessageIn(byte[] data) { _arrivedResponses.BlockEnqueue(data); } #endregion #region Push Thread /// /// Push thread loop. /// private void PushThreadMethod() { try { while (State == TransportComponentState.Connected) { TransportMessageBase message = _sendingQueue.BlockDequeue(); try { if (Adapter.State == TransportComponentState.Connected) { if (message.Token.Length != MESSAGE_TOKEN_LENGTH) { message.SetException(LogManager.Log(new InvalidOperationException("Invalid message token length: " + message.Token))); continue; } LogManager.Log("Sending message on adapter: " + Adapter.Address + "...", LogCategory.Debug, message.Message); if (message.Direction == TransportMessageDirection.Request) { lock (_pendingRequests) { _pendingRequests.Add(message); } } Adapter.Write(message.Serialize()); message.ActivateTimeout?.Invoke(); LogManager.Log("Message sent on adapter: " + Adapter.Address + "...", LogCategory.Debug, message.Message); } else { if (message.Direction == TransportMessageDirection.Request) { message.SetException(LogManager.Log(new InvalidOperationException("Could not send message " + message.Message.GetType().Name + ". Adapter is disconnected."))); } } if (message.Direction == TransportMessageDirection.Response) { message.SetResult(true, true); } } catch (Exception ex) { message.SetException(ex); } } } catch (ThreadAbortException) { LogManager.Log("Push thread has been aborted."); } catch (Exception ex) { OnFailed(ex); } } #endregion #region Pull Thread /// /// Pull thread loop. /// private void PullThreadMethod() { try { while (State == TransportComponentState.Connected) { byte[] data = _arrivedResponses.BlockDequeue(); LogManager.Log("Message received on adapter: " + Adapter.Address, LogCategory.Debug); LogManager.Log("Parsing message container...", LogCategory.Debug); MessageContainer container = Encoder.DecodeContainer(data); LogManager.Log("Message was identified as " + container.Type + ".", LogCategory.Debug); if (container.Token.Length != MESSAGE_TOKEN_LENGTH) { LogManager.Log("Invalid message token length received: " + container.Token, LogCategory.Error); continue; } LogManager.Log("Searching for pending request token: " + container.Token, LogCategory.Debug); TransportMessageBase request = null; lock (_pendingRequests) { request = _pendingRequests.ToList().SingleOrDefault(x => x.Token == container.Token); } if (request != null) { LogManager.Log("Found pending request: " + (request.Message.GetType().IsGenericType ? request.Message.GetType().GetGenericArguments()[0].Name : request.Message.GetType().Name), LogCategory.Debug); if (!request.IsContinuous) { LogManager.Log("Pending request was identified as 'single response'. Removing pending request.", LogCategory.Debug); _pendingRequests.Remove(request); try { if (container.Error == ErrorCode.None) { var message = Encoder.Decode(data); LogManager.Log("Parsing inner response message and setting pending request task result...", LogCategory.Debug, message); request.SetResult(message, true); LogManager.Log("Message enquirer released...", LogCategory.Debug); } else { request.SetException(LogManager.Log(new ResponseErrorException(container), LogCategory.Warning)); } } catch (Exception ex) { request.SetException(LogManager.Log(ex, "Error parsing response message.")); } } else { LogManager.Log("Pending request was identified as 'continuous response'. keeping pending request.", LogCategory.Debug); try { if (container.Error == ErrorCode.None) { var message = Encoder.Decode(data); LogManager.Log("Parsing inner response message and invoking continuous response callback...", LogCategory.Debug, message); if (container.Completed) { LogManager.Log("Continuous sequence completed.", LogCategory.Debug); _pendingRequests.Remove(request); } request.SetResult(message, container.Completed); } else if (container.Error == ErrorCode.ContinuousResponseAborted) { String m = "Continuous response " + container.Type + " has been aborted: " + container.Error.ToString(); LogManager.Log(m, LogCategory.Info); _pendingRequests.Remove(request); request.SetException(new ContinuousResponseAbortedException(m)); } else { LogManager.Log("Continuous response has returned with error: " + container.Error.ToString(), LogCategory.Warning); _pendingRequests.Remove(request); request.SetException(new ResponseErrorException(container)); } } catch (Exception ex) { LogManager.Log(ex, "Error parsing response message."); } } try { Task.Factory.StartNew(() => OnResponseReceived(container)); } catch { //Ignore any exceptions that may raise on the client side.. } } else { if (container.Type.ToString().EndsWith("Response")) { LogManager.Log(String.Format("A response message with no awaiting request was identified. {0}, Token: {1}. Message ignored.", container.Type, container.Token), LogCategory.Warning); continue; } LogManager.Log("Message was identified as a new request message: " + container.Type.ToString(), LogCategory.Debug); try { LogManager.Log("Saving request token: " + container.Token, LogCategory.Debug); _pendingResponses.Add(container.Token, new PendingResponse(container.Continuous)); if (container.Type == MessageType.KeepAliveRequest && EnableKeepAliveAutoResponse) { LogManager.Log("Submitting keep alive response...", LogCategory.Debug); SendResponse(new KeepAliveResponse(), container.Token); } else { LogManager.Log("Invoking RequestReceived event...", LogCategory.Debug, container); try { Task.Factory.StartNew(() => OnRequestReceived(container)); } catch { //Ignore any exceptions that may raise on the client side.. } } } catch (Exception ex) { LogManager.Log(ex); } } } } catch (ThreadAbortException) { LogManager.Log("Pull thread has been aborted."); } catch (Exception ex) { OnFailed(ex); } } #endregion #region Keep Alive Thread /// /// Responsible for sending keep alive messages. /// private void KeepAliveThreadMethod() { bool aborted = false; int retryCounter = KeepAliveRetries; try { Thread.Sleep(2000); while (State == TransportComponentState.Connected) { try { Thread.Sleep(2000); if (UseKeepAlive) { if (_arrivedResponses.Count == 0) { retryCounter--; var response = SendRequest(new KeepAliveRequest(), KeepAliveTimeout).Result; retryCounter = KeepAliveRetries; } else { LogManager.Log("Keep alive request was skipped due to busy response queue.", LogCategory.Debug); } } } catch (Exception ex) when (ex is TimeoutException || ex is AggregateException) { if (State != TransportComponentState.Connected || aborted || (Adapter.State == TransportComponentState.Failed && FailsWithAdapter)) return; if (UseKeepAlive) { if (DateTime.Now - _lastKeepAliveTime > KeepAliveTimeout) { if (retryCounter == 0) { OnFailed(new KeepAliveException("The transporter has not received a KeepAlive response within the given time.")); return; } else { LogManager.Log($"The transporter has not received a KeepAlive response within the given time. Retrying ({retryCounter}/{KeepAliveRetries})...", LogCategory.Warning); } } else { retryCounter = KeepAliveRetries; LogManager.Log("The transporter has not received a KeepAlive response within the given time, but was rescued due to other message received within the given time.", LogCategory.Warning); } } } catch (ThreadAbortException) { aborted = true; LogManager.Log("KeepAlive thread has been aborted."); return; } catch (Exception ex) { if (State != TransportComponentState.Connected || aborted) return; if (UseKeepAlive) { OnFailed(ex); return; } } } } catch (ThreadAbortException) { LogManager.Log("KeepAlive thread has been aborted."); } } #endregion #region Override Methods /// /// Returns a that represents this instance. /// /// /// A that represents this instance. /// public override string ToString() { return this.GetType().Name + ", Adapter: " + (Adapter != null ? Adapter.ToString() : "Null") + ", Encoder: " + (Encoder != null ? Encoder.ToString() : "Null"); } #endregion #region Dispose /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Disconnect().Wait(); State = TransportComponentState.Disposed; } #endregion } }