diff options
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport/TransporterBase.cs')
| -rw-r--r-- | Software/Visual_Studio/Tango.Transport/TransporterBase.cs | 1294 |
1 files changed, 411 insertions, 883 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs index 4c88a0158..2824ed26a 100644 --- a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs @@ -17,12 +17,6 @@ using System.ServiceModel; using Tango.Transport.Encoders; using Tango.PMR.Connection; using Tango.Core.Threading; -using System.IO; -using Tango.Core.ExtensionMethods; -using Tango.PMR.Integration; -using Newtonsoft.Json; -using System.Diagnostics; -using System.Reactive.Concurrency; namespace Tango.Transport { @@ -32,17 +26,8 @@ namespace Tango.Transport /// <seealso cref="Tango.Transport.ITransporter" /> public abstract class TransporterBase : ExtendedObject, ITransporter { - protected static long _component_counter = 1; - - private class RequestHandler - { - public Type RequestType { get; set; } - public Action<ITransporter, Object, String> Callback { get; set; } - public object RegisteredCallback { get; set; } - } - private const int MESSAGE_TOKEN_LENGTH = 36; - private PriorityProducerConsumerQueue<TransportMessageBase> _sendingQueue; + private ProducerConsumerQueue<TransportMessageBase> _sendingQueue; private ConcurrentList<TransportMessageBase> _pendingRequests; private ProducerConsumerQueue<byte[]> _arrivedResponses; private Thread _pushThread; @@ -51,18 +36,13 @@ namespace Tango.Transport private ITransportAdapter _adapter; private Dictionary<String, PendingResponse> _pendingResponses; private DateTime _lastKeepAliveTime; - private List<RequestHandler> _requestHandlers; - private static JsonSerializerSettings _genericMessageSettings = new JsonSerializerSettings() - { - TypeNameHandling = TypeNameHandling.All, - }; #region Events /// <summary> /// Occurs when a new request message has been received. /// </summary> - public event EventHandler<RequestReceivedEventArgs> RequestReceived; + public event EventHandler<MessageContainer> RequestReceived; /// <summary> /// Occurs when a new response message has been received. @@ -74,31 +54,11 @@ namespace Tango.Transport /// </summary> public event EventHandler<TransportComponentState> StateChanged; - /// <summary> - /// Occurs when a request has been sent. - /// </summary> - public event EventHandler<IMessage> RequestSent; - - /// <summary> - /// Occurs when a request response has been received. - /// </summary> - public event EventHandler<IMessage> ResponseReceived; - - /// <summary> - /// Occurs when a request has failed. - /// </summary> - public event EventHandler<RequestFailedEventArgs> RequestFailed; - #endregion #region Properties /// <summary> - /// Gets or sets the name of the transport component. - /// </summary> - public String ComponentName { get; set; } - - /// <summary> /// Gets or sets the <see cref="ITransportAdapter" /> used to read and write raw data. /// </summary> public ITransportAdapter Adapter @@ -154,11 +114,11 @@ namespace Tango.Transport if (_useKeepAlive) { - LogManager.Log($"{GetExtendedComponentName()}: KeepAlive is activated..."); + LogManager.Log("KeepAlive is activated..."); } else { - LogManager.Log($"{GetExtendedComponentName()}: KeepAlive is deactivated."); + LogManager.Log("KeepAlive is deactivated."); } } } @@ -188,16 +148,6 @@ namespace Tango.Transport /// </summary> public Exception FailedStateException { get; private set; } - private GenericMessageProtocol _genericProtocol; - /// <summary> - /// Gets or sets the generic protocol used to serialize/deserialize generic messages. - /// </summary> - public GenericMessageProtocol GenericProtocol - { - get { return _genericProtocol; } - set { _genericProtocol = value; RaisePropertyChangedAuto(); } - } - #endregion #region Virtual Methods @@ -213,7 +163,7 @@ namespace Tango.Transport _pendingRequests.Clear(); _pendingResponses.Clear(); _arrivedResponses = new ProducerConsumerQueue<byte[]>(); - _sendingQueue = new PriorityProducerConsumerQueue<TransportMessageBase>(); + _sendingQueue = new ProducerConsumerQueue<TransportMessageBase>(); } if (oldAdapter != null) @@ -224,7 +174,7 @@ namespace Tango.Transport if (newAdapter != null) { - LogManager.Log($"{GetExtendedComponentName()}: Adapter Changed: Type = {newAdapter.GetType().Name}, Address = {newAdapter.Address}, State = {newAdapter.State}"); + LogManager.Log(String.Format("Adapter Changed: Type = {0}, Address = {1}, State = {2}", newAdapter.GetType().Name, newAdapter.Address, newAdapter.State)); newAdapter.StateChanged -= OnAdapterStateChanged; newAdapter.DataAvailable -= OnAdapterDataAvailable; @@ -238,7 +188,7 @@ namespace Tango.Transport } else { - LogManager.Log($"{GetExtendedComponentName()}: Adapter Changed: null"); + LogManager.Log("Adapter Changed: null"); } } @@ -251,7 +201,7 @@ namespace Tango.Transport { if (e == TransportComponentState.Failed && FailsWithAdapter) { - OnFailed(new CommunicationException($"The adapter has failed with exception '{Adapter.FailedStateException.Message}' and the transporter is configured to fail with the adapter.")); + OnFailed(new CommunicationException("The adapter has failed. Going into a failed state...")); } } @@ -270,104 +220,21 @@ namespace Tango.Transport /// Called when the component has failed. /// </summary> /// <param name="ex">The ex.</param> - protected virtual async void OnFailed(Exception ex) - { - if (State != TransportComponentState.Failed) - { - FailedStateException = ex; - LogManager.Log(ex, $"{GetExtendedComponentName()}: Transporter failed."); - State = TransportComponentState.Failed; - - await OnPostDisconnection(); - } - else - { - LogManager.Log(ex, LogCategory.Warning, $"{GetExtendedComponentName()}: OnFailed called while state is already failed!"); - } - } - - protected virtual async Task OnPostDisconnection() + protected virtual void OnFailed(Exception ex) { - try - { - if (_pullThread != null) - { - _pullThread.Abort(); - _pushThread.Abort(); - _keepAliveThread.Abort(); - } - } - catch { } - - if (Adapter != null) - { - await Adapter.Disconnect(); - } - - NotifyContinuousRequestMessagesDisconnection(); + FailedStateException = ex; + State = TransportComponentState.Failed; + LogManager.Log(ex, "Transporter failed."); + Disconnect().Wait(); } /// <summary> /// Called when a new request has been received. /// </summary> /// <param name="container">The request.</param> - protected virtual void OnRequestReceived(RequestReceivedEventArgs e) + protected virtual void OnRequestReceived(MessageContainer container) { - var container = e.Container; - - if (_requestHandlers.Count > 0) - { - if (container.Type != MessageType.GenericRequest) - { - var handlers = _requestHandlers.Where(x => x.RequestType.Name == container.Type.ToOriginalName()).ToList(); - - if (handlers.Count > 0) //Handle - { - e.Handled = true; - - var request = MessageFactory.ExtractMessageFromContainer(container); - - foreach (var handler in handlers) - { - try - { - handler.Callback.Invoke(this, request, container.Token); - } - catch - { - //Ignore any exception on the client side. - } - } - } - } - else - { - var genericRequest = MessageFactory.ExtractMessageFromContainer<GenericRequest>(container); - - var handlers = _requestHandlers.Where(x => x.RequestType.AssemblyQualifiedName == genericRequest.Type).ToList(); - - if (handlers.Count > 0) - { - e.Handled = true; - - var innerRequest = GenericMessageSerializer.DeserializeFromByteString(handlers[0].RequestType, genericRequest.Data, GenericProtocol); - - foreach (var handler in handlers) - { - try - { - handler.Callback.Invoke(this, innerRequest, container.Token); - } - catch - { - //Ignore any exception on the client side. - } - } - } - } - } - - RequestReceived?.Invoke(this, e); + RequestReceived?.Invoke(this, container); } /// <summary> @@ -388,80 +255,6 @@ namespace Tango.Transport StateChanged?.Invoke(this, state); } - /// <summary> - /// Returns a string representing the component name. If an adapter is attached it will be part of the string. - /// </summary> - /// <returns></returns> - protected virtual String GetExtendedComponentName() - { - return Adapter != null ? $"{ComponentName} ({Adapter.Address})" : ComponentName; - } - - /// <summary> - /// Notifies all the continuous request messages about disconnection. - /// </summary> - protected virtual void NotifyContinuousRequestMessagesDisconnection() - { - LogManager.Log("Notifying all continuous request messages about disconnection..."); - foreach (var request in _pendingRequests.ToList().Where(x => x.Direction == TransportMessageDirection.Request && x.IsContinuous)) - { - try - { - _pendingRequests.Remove(request); - LogManager.Log($"Notifying continuous request '{(request.Message as ITangoMessage).Type}'..."); - OnRequestFailed(request, new TransporterDisconnectedException("Transporter disconnected.")); - request.SetException(new TransporterDisconnectedException("Transporter disconnected.")); - } - catch (Exception e) - { - System.Diagnostics.Debug.WriteLine(e.ToString()); - } - } - } - - /// <summary> - /// Called when the request has been sent - /// </summary> - /// <param name="response">The request.</param> - private void OnRequestSent(TransportMessageBase request) - { - if (request.ShouldLog) - { - IMessage message = request.GetActualMessage() as IMessage; - - if (message != null) - { - RequestSent?.Invoke(this, message); - } - } - } - - /// <summary> - /// Called when the response has been received - /// </summary> - /// <param name="response">The response.</param> - private void OnResponseReceived(IMessage response) - { - if (response != null) - { - ResponseReceived?.Invoke(this, response); - } - } - - /// <summary> - /// Called when the request has been failed - /// </summary> - /// <param name="request">The request.</param> - private void OnRequestFailed(TransportMessageBase request, Exception exception) - { - IMessage message = request.GetActualMessage() as IMessage; - - if (message != null) - { - RequestFailed?.Invoke(this, new RequestFailedEventArgs(message, exception)); - } - } - #endregion #region Constructors @@ -471,13 +264,11 @@ namespace Tango.Transport /// </summary> public TransporterBase() { - ComponentName = "Not Set"; Encoder = new ProtoEncoder(); _pendingResponses = new Dictionary<string, PendingResponse>(); - _sendingQueue = new PriorityProducerConsumerQueue<TransportMessageBase>(); + _sendingQueue = new ProducerConsumerQueue<TransportMessageBase>(); _pendingRequests = new ConcurrentList<TransportMessageBase>(); _arrivedResponses = new ProducerConsumerQueue<byte[]>(); - _requestHandlers = new List<RequestHandler>(); RequestTimeout = TimeSpan.FromSeconds(5); EnableKeepAliveAutoResponse = true; KeepAliveTimeout = TimeSpan.FromSeconds(2); @@ -502,7 +293,7 @@ namespace Tango.Transport /// </summary> public void ClearQueues() { - _sendingQueue = new PriorityProducerConsumerQueue<TransportMessageBase>(); + _sendingQueue = new ProducerConsumerQueue<TransportMessageBase>(); _pendingRequests = new ConcurrentList<TransportMessageBase>(); _arrivedResponses = new ProducerConsumerQueue<byte[]>(); } @@ -521,7 +312,7 @@ namespace Tango.Transport State = TransportComponentState.Connected; StartThreads(); - LogManager.Log($"{GetExtendedComponentName()}: Transporter Connected..."); + LogManager.Log("Transporter Connected..."); } /// <summary> @@ -530,323 +321,255 @@ namespace Tango.Transport /// <returns></returns> public virtual async Task Disconnect() { - if (State == TransportComponentState.Connected) - { - State = TransportComponentState.Disconnected; + State = TransportComponentState.Disconnected; - await OnPostDisconnection(); + try + { + if (_pullThread != null) + { + _pullThread.Abort(); + _pushThread.Abort(); + _keepAliveThread.Abort(); + } + } + catch { } - LogManager.Log($"{GetExtendedComponentName()}: Transporter Disconnected..."); + if (Adapter != null) + { + await Adapter.Disconnect(); } + LogManager.Log("Transporter Disconnected..."); } - #endregion - - #region Public Request Methods - /// <summary> /// Sends a request. /// </summary> /// <param name="request">The request.</param> - /// <param name="config">Request configuration.</param> + /// <param name="timeout">Optional timeout. If not specified will use the <see cref="RequestTimeout" />.</param> /// <returns></returns> - /// <exception cref="InvalidOperationException"></exception> - public Task<IMessage> SendRequest(IMessage request, TransportRequestConfig config = null) + public Task<IMessage> SendRequest(IMessage request, TimeSpan? timeout = null) { + String requestName = request.GetType().Name; + String responseName = requestName.Replace("Request", "Response"); + MessageContainer container = new MessageContainer(); container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); - container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - container.Type = MessageFactory.ParseMessageType(request.GetType().Name); + container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; + container.Type = MessageFactory.ParseMessageType(requestName); - return SendRequestInternal<IMessage>(container.Token, container, config).Task; - } + LogManager.Log("Queuing request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); + LogManager.Log("Expected response: " + responseName, LogCategory.Debug); - /// <summary> - /// Sends the request. - /// </summary> - /// <param name="container"></param> - /// <param name="config">Request configuration.</param> - /// <returns></returns> - /// <exception cref="InvalidOperationException"></exception> - public Task<MessageContainer> SendRequest(MessageContainer container, TransportRequestConfig config = null) - { - return SendRequestInternal<MessageContainer>(container.Token, container, config).Task; - } + if (State != TransportComponentState.Connected) + { + throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); + } - /// <summary> - /// Sends a request. - /// </summary> - /// <typeparam name="Request">The type of the request.</typeparam> - /// <typeparam name="Response">The type of the response.</typeparam> - /// <param name="request">The request.</param> - /// <param name="config">Request configuration.</param> - /// <returns></returns> - /// <exception cref="InvalidOperationException"></exception> - public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TransportRequestConfig config = null) where Request : IMessage<Request> where Response : IMessage<Response> - { - return SendRequestInternal<TangoMessage<Response>>(request.Container.Token, request, config).Task; - } + TaskCompletionSource<IMessage> source = new TaskCompletionSource<IMessage>(); + TransportMessage<IMessage> message = new TransportMessage<IMessage>(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), source); - private TaskCompletionSource<T> SendRequestInternal<T>(String token, Object request, TransportRequestConfig config = null) - { - config = config ?? new TransportRequestConfig(); + message.ActivateTimeout = () => + { + TimeoutTask.StartNew(() => + { - TaskCompletionSource<T> source = new TaskCompletionSource<T>(); + if (!source.Task.IsCompleted) + { + TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); + LogManager.Log(ex); + LogManager.Log("Setting request task exception...", LogCategory.Debug); + source.SetException(ex); + } - String requestName = TransportMessageBase.GetActualMessageTypeName(request); + }, timeout != null ? timeout.Value : RequestTimeout); + }; - Func<byte[]> toBytes = null; + EnqueueMessageOut(message); - if (request is ITangoMessage tangoMessage) - { - toBytes = () => Encoder.Encode(tangoMessage); - } - else if (request is IMessage protoMessage) - { - toBytes = () => protoMessage.ToByteArray(); - } + return source.Task; + } - TransportMessage<T> message = new TransportMessage<T>(token, request, TransportMessageDirection.Request, toBytes, source); - message.ShouldLog = config.ShouldLog; - message.Immidiate = config.Immediate; - message.Priority = config.Priority; - message.TransportComponentName = GetExtendedComponentName(); - message.ThreadingMode = config.ThreadingMode; + /// <summary> + /// Sends the response. + /// </summary> + /// <param name="response">The response.</param> + /// <param name="token">The token.</param> + /// <param name="completed">The completed.</param> + /// <param name="errorCode">The error code.</param> + /// <param name="errorMessage">The error message.</param> + /// <returns></returns> + /// <exception cref="System.InvalidOperationException">Matching request token was not found!</exception> + public Task SendResponse(IMessage response, string token, bool? completed = default(bool?), ErrorCode? errorCode = default(ErrorCode?), string errorMessage = null) + { + String responseName = response.GetType().Name; - TimeSpan? timeout = config.Timeout; + MessageContainer container = new MessageContainer(); + container.Token = token; + container.Data = response.ToByteString(); + container.Type = MessageFactory.ParseMessageType(responseName); - if (request is MessageContainer container) + if (errorCode.HasValue) { - timeout = GetContainerTimeoutOrDefault(container); + container.Error = errorCode.Value; } - if (timeout == null) + if (errorMessage != null) { - timeout = RequestTimeout; + container.ErrorMessage = errorMessage; } - if (request is ITangoMessage tanMessage) + if (completed.HasValue) { - tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds; + container.Completed = completed.Value; } - String responseName = requestName.Replace("Request", "Response"); + return SendResponse(container); + } - LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: {requestName} Token: {token}", LogCategory.Debug); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug); + /// <summary> + /// Sends the request. + /// </summary> + /// <param name="container">The container.</param> + /// <returns></returns> + public Task<MessageContainer> SendRequest(MessageContainer container) + { + String responseName = container.Type.ToString().Replace("Request", "Response"); + TimeSpan? timeout = GetContainerTimeoutOrDefault(container); + + LogManager.Log("Queuing request message: " + container.Type + " Token: " + container.Token, LogCategory.Debug); + LogManager.Log("Expected response: " + responseName, LogCategory.Debug); if (State != TransportComponentState.Connected) { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); + throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); } + TaskCompletionSource<MessageContainer> source = new TaskCompletionSource<MessageContainer>(); + TransportMessage<MessageContainer> message = new TransportMessage<MessageContainer>(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), source); - Action timeoutAction = () => + message.ActivateTimeout = () => { - if (!source.Task.IsCompleted) + TimeoutTask.StartNew(() => { - TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds."); - OnRequestFailed(message, ex); - message.SetException(ex); - } - }; - if (config.ThreadingMode == TransportThreadingMode.NewThread) - { - message.ActivateTimeout = () => - { - TimeoutTask.StartNew(() => + if (!source.Task.IsCompleted) { - timeoutAction(); - }, timeout.Value); - }; - } - else - { - message.ActivateTimeout = () => - { - Task.Delay(timeout.Value).ContinueWith((x) => - { - timeoutAction(); - }); - }; - } - - EnqueueMessageOut(message); - - return source; - } - - /// <summary> - /// Sends a request and expecting multiple response messages. - /// </summary> - /// <param name="request"></param> - /// <param name="config">Request configuration.</param> - /// <returns></returns> - /// <exception cref="InvalidOperationException"></exception> - public IObservable<IMessage> SendContinuousRequest(IMessage request, TransportContinuousRequestConfig config = null) - { - MessageContainer container = new MessageContainer(); - container.Token = Guid.NewGuid().ToString(); - container.Data = request.ToByteString(); - container.Type = MessageFactory.ParseMessageType(request.GetType().Name); + TimeoutException ex = new TimeoutException("Request message: " + container.Type + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); + LogManager.Log(ex); + LogManager.Log("Setting request task exception...", LogCategory.Debug); + source.SetException(ex); + } - //We need to assign this timeout because when the internal method detects a MessageContainer it will bypass the container continuous timeout assignment. - container.ContinuousTimeout = config.ContinuousTimeout != null ? (UInt32)config.ContinuousTimeout.Value.TotalMilliseconds : 0; + }, timeout != null ? timeout.Value : RequestTimeout); - container.Continuous = true; + }; - return SendContinuousRequestInternal<IMessage>(container.Token, container, config); - } + EnqueueMessageOut(message); - /// <summary> - /// Sends a request and expecting multiple response messages. - /// </summary> - /// <typeparam name="Request">The type of the request.</typeparam> - /// <typeparam name="Response">The type of the response.</typeparam> - /// <param name="request"></param> - /// <param name="config">Request configuration.</param> - /// <returns></returns> - /// <exception cref="InvalidOperationException"></exception> - public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, TransportContinuousRequestConfig config = null) where Request : IMessage<Request> where Response : IMessage<Response> - { - request.Container.Continuous = true; - request.Container.Completed = false; - return SendContinuousRequestInternal<TangoMessage<Response>>(request.Container.Token, request, config); + return source.Task; } /// <summary> - /// Sends a continuous request. + /// Sends the response. /// </summary> /// <param name="container">The container.</param> - /// <param name="config">Request configuration.</param> /// <returns></returns> - /// <exception cref="InvalidOperationException"></exception> - public IObservable<MessageContainer> SendContinuousRequest(MessageContainer container, TransportContinuousRequestConfig config = null) + /// <exception cref="System.InvalidOperationException">Matching request token was not found!</exception> + public Task SendResponse(MessageContainer container) { - return SendContinuousRequestInternal<MessageContainer>(container.Token, container, config); - } - - private IObservable<T> SendContinuousRequestInternal<T>(String token, Object request, TransportContinuousRequestConfig config = null) - { - config = config ?? new TransportContinuousRequestConfig(); + String token = container.Token; - String requestName = TransportMessageBase.GetActualMessageTypeName(request); + LogManager.Log("Queuing response message: " + container.Type, LogCategory.Debug); - Func<byte[]> toBytes = null; + PendingResponse pendingResponse = null; - if (request is ITangoMessage tangoMessage) - { - toBytes = () => Encoder.Encode(tangoMessage); - } - else if (request is IMessage protoMessage) + if (State != TransportComponentState.Connected) { - toBytes = () => protoMessage.ToByteArray(); + throw LogManager.Log(new InvalidOperationException($"Could not send the response while transporter state is {State}.")); } - Subject<T> subject = new Subject<T>(); - - TransportMessage<T> message = new TransportMessage<T>(token, request, TransportMessageDirection.Request, toBytes, null); - message.ShouldLog = config.ShouldLog; - message.Immidiate = config.Immediate; - message.Priority = config.Priority; - message.TransportComponentName = GetExtendedComponentName(); - message.IsContinuous = true; - message.ContinuesResponseSubject = subject; - - TimeSpan? timeout = config.Timeout; - TimeSpan? continuousTimeout = config.ContinuousTimeout; + LogManager.Log("Searching for matching request token: " + token, LogCategory.Debug); - if (request is MessageContainer container) + if (_pendingResponses.TryGetValue(token, out pendingResponse)) { - timeout = GetContainerTimeoutOrDefault(container); + LogManager.Log("Found matching request token: " + token, LogCategory.Debug); - if (timeout == null) + if (!pendingResponse.IsContinuous) { - timeout = config.Timeout != null ? config.Timeout.Value : RequestTimeout; - container.Timeout = (uint)timeout.Value.TotalMilliseconds; + LogManager.Log("Removing matching request token.", LogCategory.Debug); + _pendingResponses.Remove(token); + } + else if (container.Completed) + { + LogManager.Log("Response completed. Removing matching request token.", LogCategory.Debug); + _pendingResponses.Remove(token); } - - continuousTimeout = GetContainerContinuousTimeoutOrDefault(container); } - - if (timeout == null) + else { - timeout = RequestTimeout; + //This should never happen. + throw LogManager.Log(new InvalidOperationException("Matching request token was not found!"), LogCategory.Critical); } - if (request is ITangoMessage tanMessage) - { - tanMessage.Container.Continuous = true; - tanMessage.Container.Completed = false; - tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds; - tanMessage.Container.ContinuousTimeout = continuousTimeout != null ? (UInt32)continuousTimeout.Value.TotalMilliseconds : 0; - } + TaskCompletionSource<object> source = new TaskCompletionSource<object>(); + TransportMessage<object> message = new TransportMessage<object>(token, container, TransportMessageDirection.Response, () => container.ToByteArray(), source); + EnqueueMessageOut(message); + return source.Task; + } + /// <summary> + /// Sends a request and expecting multiple response messages. + /// </summary> + /// <param name="request">The request.</param> + /// <param name="timeout"></param> + /// <returns></returns> + public IObservable<IMessage> SendContinuousRequest(IMessage request, TimeSpan? timeout = default(TimeSpan?)) + { + String requestName = request.GetType().Name; String responseName = requestName.Replace("Request", "Response"); - LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: {requestName} Token: {token}", LogCategory.Debug); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug); + MessageContainer container = new MessageContainer(); + container.Token = Guid.NewGuid().ToString(); + container.Data = request.ToByteString(); + container.Type = MessageFactory.ParseMessageType(requestName); + container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; + container.Continuous = true; + + LogManager.Log("Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); if (State != TransportComponentState.Connected) { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); + throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); } - Action timeoutAction = () => - { - if (!message.AtLeastOneResponseReceived) - { - TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds."); - OnRequestFailed(message, ex); - message.SetException(ex); - } + Subject<IMessage> subject = new Subject<IMessage>(); - if (continuousTimeout != null) - { - Task.Factory.StartNew(async () => - { - while (!message.Completed) - { - await Task.Delay(continuousTimeout.Value).ContinueWith((y) => - { - if (!message.Completed) - { - if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value) - { - TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Continuous request message '{requestName}' had failed to provide a response for a period of {continuousTimeout.Value.TotalSeconds} seconds and has timed out."); - OnRequestFailed(message, ex); - message.SetException(ex); - return; - } - } - }); - } - }); - } - }; + LogManager.Log("Expected response: " + responseName, LogCategory.Debug); - if (config.ThreadingMode == TransportThreadingMode.NewThread) + TransportMessage<IMessage> message = new TransportMessage<IMessage>(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), null) { - message.ActivateTimeout = () => - { - TimeoutTask.StartNew(() => - { - timeoutAction(); - }, timeout.Value); - }; - } - else + IsContinuous = true, + ContinuesResponseSubject = subject, + }; + + message.ActivateTimeout = () => { - message.ActivateTimeout = () => + + TimeoutTask.StartNew(() => { - Task.Delay(timeout.Value).ContinueWith((x) => + + if (!message.AtLeastOneResponseReceived) { - timeoutAction(); - }); - }; - } + TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); + LogManager.Log(ex); + LogManager.Log("Setting request exception...", LogCategory.Debug); + message.SetException(ex); + } + + }, timeout != null ? timeout.Value : RequestTimeout); + + }; EnqueueMessageOut(message); @@ -854,191 +577,200 @@ namespace Tango.Transport } /// <summary> - /// Sends a generic request of any type. + /// Sends a request. /// </summary> /// <typeparam name="Request">The type of the request.</typeparam> /// <typeparam name="Response">The type of the response.</typeparam> /// <param name="request">The request.</param> - /// <param name="config">The configuration.</param> - /// <returns></returns> - public async Task<Response> SendGenericRequest<Request, Response>(Request request, TransportRequestConfig config = null) where Request : class where Response : class - { - GenericRequest genericRequest = new GenericRequest(); - genericRequest.Type = request.GetType().AssemblyQualifiedName; - genericRequest.Data = GenericMessageSerializer.SerializeToByteString<Request>(request, GenericProtocol); - - var response = await SendRequest<GenericRequest, GenericResponse>(genericRequest, config); - var responseObject = GenericMessageSerializer.DeserializeFromByteString<Response>(response.Message.Data, GenericProtocol); - return responseObject; - } - - /// <summary> - /// Sends a generic request and expecting multiple generic response messages. - /// </summary> - /// <typeparam name="Request">The type of the request.</typeparam> - /// <typeparam name="Response">The type of the response.</typeparam> - /// <param name="config">Request configuration.</param> + /// <param name="timeout">Optional timeout. If not specified will use the <see cref="RequestTimeout" />.</param> /// <returns></returns> - public IObservable<Response> SendGenericContinuousRequest<Request, Response>(Request request, TransportContinuousRequestConfig config = null) where Request : class where Response : class + public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TimeSpan? timeout = null) where Request : IMessage<Request> where Response : IMessage<Response> { - GenericRequest genericRequest = new GenericRequest(); - genericRequest.Type = request.GetType().AssemblyQualifiedName; + LogManager.Log("Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); + LogManager.Log("Expected response: " + typeof(Response).Name, LogCategory.Debug); - genericRequest.Data = GenericMessageSerializer.SerializeToByteString<Request>(request, GenericProtocol); - - Subject<Response> subject = new Subject<Response>(); - - SendContinuousRequest<GenericRequest, GenericResponse>(genericRequest, config).Subscribe((response) => + if (State != TransportComponentState.Connected) { + throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); + } - try - { - var responseObject = GenericMessageSerializer.DeserializeFromByteString<Response>(response.Message.Data, GenericProtocol); - subject.OnNext(responseObject); - } - catch (Exception ex) - { - Debugger.Break(); - System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request continuous handler.\n{ex.ToString()}"); - //Ignore exception at the client side. - } - - }, (ex) => - { + request.Container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - try - { - subject.OnError(ex); - } - catch (Exception xx) - { - Debugger.Break(); - System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request error handler.\n{xx.ToString()}"); - //Ignore exception at the client side. - } + TaskCompletionSource<TangoMessage<Response>> source = new TaskCompletionSource<TangoMessage<Response>>(); + TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source); - }, () => + message.ActivateTimeout = () => { - try - { - subject.OnCompleted(); - } - catch (Exception ex) + TimeoutTask.StartNew(() => { - Debugger.Break(); - System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request completed handler.\n{ex.ToString()}"); - //Ignore exception at the client side. - } - }); - return subject.AsObservable(); - } + if (!source.Task.IsCompleted) + { + TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); + LogManager.Log(ex); + LogManager.Log("Setting request task exception...", LogCategory.Debug); + source.SetException(ex); + } - #endregion + }, timeout != null ? timeout.Value : RequestTimeout); + }; - #region Public Response Methods + EnqueueMessageOut(message); - /// <summary> - /// Sends a response. - /// </summary> - /// <typeparam name="Response">The type of the response.</typeparam> - /// <param name="response">The response.</param> - /// <returns></returns> - public Task SendResponse<Response>(TangoMessage<Response> response) where Response : IMessage<Response> - { - return SendResponse<Response>(response, response.Container.Token); + return source.Task; } /// <summary> - /// Sends the response. + /// Sends a request and expecting multiple response messages. /// </summary> - /// <param name="response"></param> - /// <param name="token">Request token.</param> - /// <param name="config">Response configuration.</param> + /// <typeparam name="Request">The type of the request.</typeparam> + /// <typeparam name="Response">The type of the response.</typeparam> + /// <param name="request">The request.</param> + /// <param name="timeout"></param> /// <returns></returns> - public Task SendResponse(IMessage response, String token, TransportResponseConfig config = null) + public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, TimeSpan? firstTimeout = null, TimeSpan? continousTimeout = null) where Request : IMessage<Request> where Response : IMessage<Response> { - config = config ?? new TransportResponseConfig(); + LogManager.Log("Queuing continuous request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); - String responseName = response.GetType().Name; + Subject<TangoMessage<Response>> subject = new Subject<TangoMessage<Response>>(); - MessageContainer container = new MessageContainer(); - container.Token = token; - container.Data = response.ToByteString(); - container.Type = MessageFactory.ParseMessageType(responseName); + LogManager.Log("Expected response: " + typeof(Response).Name, LogCategory.Debug); - if (config.ErrorCode.HasValue) + if (State != TransportComponentState.Connected) { - container.Error = config.ErrorCode.Value; + throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); } - if (config.ErrorMessage != null) + request.Container.Continuous = true; + request.Container.Completed = false; + + request.Container.Timeout = firstTimeout.HasValue ? (UInt32)firstTimeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; + request.Container.ContinuousTimeout = continousTimeout.HasValue ? (UInt32)continousTimeout.Value.TotalMilliseconds : 0; + + TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), null) { - container.ErrorMessage = config.ErrorMessage; - } + IsContinuous = true, + ContinuesResponseSubject = subject, + }; - container.Completed = config.Completed; + message.ActivateTimeout = () => + { + TimeoutTask.StartNew(() => + { - return SendResponse(container); + if (!message.AtLeastOneResponseReceived) + { + TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (firstTimeout != null ? firstTimeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); + LogManager.Log(ex); + LogManager.Log("Setting request exception...", LogCategory.Debug); + message.SetException(ex); + } + + if (continousTimeout != null) + { + Task.Factory.StartNew(async () => + { + while (!message.Completed) + { + await Task.Delay(continousTimeout.Value).ContinueWith((y) => + { + if (!message.Completed) + { + if (DateTime.Now - message.LastResponseTime > continousTimeout.Value) + { + TimeoutException ex = new TimeoutException("Continuous request message: " + typeof(Request).Name + " had failed to provide a response for a period of " + (continousTimeout.Value.TotalSeconds) + " seconds and has timed out."); + LogManager.Log(ex); + LogManager.Log("Setting request exception...", LogCategory.Debug); + message.SetException(ex); + return; + } + } + }); + } + }); + } + + }, firstTimeout != null ? firstTimeout.Value : RequestTimeout); + }; + + EnqueueMessageOut(message); + + return subject.AsObservable(); } /// <summary> - /// Sends the response. + /// Sends a continuous request. /// </summary> /// <param name="container">The container.</param> /// <returns></returns> - /// <exception cref="InvalidOperationException"> - /// </exception> - public Task SendResponse(MessageContainer container, TransportResponseConfig config = null) + public IObservable<MessageContainer> SendContinuousRequest(MessageContainer container) { - config = config ?? new TransportResponseConfig(); - - if (_pushThread == null || _pushThread.ThreadState == System.Threading.ThreadState.Aborted) - { - throw new InvalidOperationException("Transporter push thread is not in a running state."); - } + TimeSpan? timeout = GetContainerTimeoutOrDefault(container); + TimeSpan? continuousTimeout = container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?); - String token = container.Token; - - LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + container.Type, LogCategory.Debug); + String requestName = container.Type.ToString(); + String responseName = requestName.Replace("Request", "Response"); - PendingResponse pendingResponse = null; + LogManager.Log("Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); if (State != TransportComponentState.Connected) { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}.")); + throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}.")); } - LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug); + Subject<MessageContainer> subject = new Subject<MessageContainer>(); - if (_pendingResponses.TryGetValue(token, out pendingResponse)) + LogManager.Log("Expected response: " + responseName, LogCategory.Debug); + + TransportMessage<MessageContainer> message = new TransportMessage<MessageContainer>(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), null) { - LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug); + IsContinuous = true, + ContinuesResponseSubject = subject, + }; - if (!pendingResponse.IsContinuous) - { - LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug); - _pendingResponses.Remove(token); - } - else if (container.Completed) - { - LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug); - _pendingResponses.Remove(token); - } - } - else + message.ActivateTimeout = () => { - //This should never happen. - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical); - } + TimeoutTask.StartNew(() => + { + + if (!message.AtLeastOneResponseReceived) + { + TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); + LogManager.Log(ex); + LogManager.Log("Setting request exception...", LogCategory.Debug); + message.SetException(ex); + } + + if (continuousTimeout != null) + { + Task.Factory.StartNew(async () => + { + while (!message.Completed) + { + await Task.Delay(continuousTimeout.Value).ContinueWith((y) => + { + if (!message.Completed) + { + if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value) + { + TimeoutException ex = new TimeoutException("Continuous request message: " + requestName + " had failed to provide a response for a period of " + (continuousTimeout.Value.TotalSeconds) + " seconds and has timed out."); + LogManager.Log(ex); + LogManager.Log("Setting request exception...", LogCategory.Debug); + message.SetException(ex); + return; + } + } + }); + } + }); + } + + }, timeout != null ? timeout.Value : RequestTimeout); + }; - TaskCompletionSource<object> source = new TaskCompletionSource<object>(); - TransportMessage<object> message = new TransportMessage<object>(token, container, TransportMessageDirection.Response, () => container.ToByteArray(), source); - message.ShouldLog = config.ShouldLog; - message.Immidiate = config.Immediate; - message.Priority = config.Priority; EnqueueMessageOut(message); - return source.Task; + + return subject.AsObservable(); } /// <summary> @@ -1046,156 +778,94 @@ namespace Tango.Transport /// </summary> /// <typeparam name="Response">The type of the response.</typeparam> /// <param name="response">The response.</param> - /// <param name="token">Request token.</param> - /// <param name="config">Response configuration.</param> /// <returns></returns> - /// <exception cref="InvalidOperationException"> - /// Transporter push thread is not in a running state. - /// </exception> - public Task SendResponse<Response>(TangoMessage<Response> response, String token, TransportResponseConfig config = null) where Response : IMessage<Response> + public Task SendResponse<Response>(TangoMessage<Response> response) where Response : IMessage<Response> { - config = config ?? new TransportResponseConfig(); + return SendResponse<Response>(response, response.Container.Token); + } - if (_pushThread == null || _pushThread.ThreadState == System.Threading.ThreadState.Aborted) + /// <summary> + /// Sends a response for the specified token. + /// </summary> + /// <typeparam name="Response">The type of the response.</typeparam> + /// <param name="response">The response.</param> + /// <param name="token">The token.</param> + /// <param name="completed">The completed.</param> + /// <param name="errorCode">The error code.</param> + /// <param name="errorMessage">The error message.</param> + /// <returns></returns> + /// <exception cref="InvalidOperationException">Matching request token was not found!</exception> + public Task SendResponse<Response>(TangoMessage<Response> response, String token, bool? completed = null, ErrorCode? errorCode = null, String errorMessage = null) where Response : IMessage<Response> + { + if (_pushThread == null || _pushThread.ThreadState == ThreadState.Aborted) { throw new InvalidOperationException("Transporter push thread is not in a running state."); } response.Container.Token = token; - response.Container.Completed = config.Completed; + if (completed.HasValue) + { + response.Container.Completed = completed.Value; + } - if (config.ErrorCode.HasValue) + if (errorCode.HasValue) { - response.Container.Error = config.ErrorCode.Value; + response.Container.Error = errorCode.Value; } - if (!String.IsNullOrEmpty(config.ErrorMessage)) + if (!String.IsNullOrEmpty(errorMessage)) { - response.Container.ErrorMessage = config.ErrorMessage; + response.Container.ErrorMessage = errorMessage; } - LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + typeof(Response).Name, LogCategory.Debug); + LogManager.Log("Queuing response message: " + typeof(Response).Name, LogCategory.Debug); if (State != TransportComponentState.Connected) { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}.")); + throw LogManager.Log(new InvalidOperationException($"Could not send the response while transporter state is {State}.")); } PendingResponse pendingResponse = null; - LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug); + LogManager.Log("Searching for matching request token: " + token, LogCategory.Debug); if (_pendingResponses.TryGetValue(token, out pendingResponse)) { - LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug); + LogManager.Log("Found matching request token: " + token, LogCategory.Debug); if (!pendingResponse.IsContinuous) { - LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug); + LogManager.Log("Removing matching request token.", LogCategory.Debug); _pendingResponses.Remove(token); } else if (response.Container.Completed) { - LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug); + LogManager.Log("Response completed. Removing matching request token.", LogCategory.Debug); _pendingResponses.Remove(token); } } else { //This should never happen. - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical); + throw LogManager.Log(new InvalidOperationException("Matching request token was not found!"), LogCategory.Critical); } TaskCompletionSource<object> source = new TaskCompletionSource<object>(); TransportMessage<object> message = new TransportMessage<object>(token, response, TransportMessageDirection.Response, () => Encoder.Encode(response), source); - message.ShouldLog = config.ShouldLog; - message.Immidiate = config.Immediate; - message.Priority = config.Priority; EnqueueMessageOut(message); return source.Task; } /// <summary> - /// Sends a generic response. - /// </summary> - /// <typeparam name="Response">The type of the response.</typeparam> - /// <param name="response">The response.</param> - /// <param name="token">The request token.</param> - /// <param name="config">The response configuration.</param> - /// <returns></returns> - public async Task SendGenericResponse<Response>(Response response, String token, TransportResponseConfig config = null) where Response : class - { - GenericResponse genericResponse = new GenericResponse(); - - genericResponse.Type = response.GetType().AssemblyQualifiedName; - genericResponse.Data = GenericMessageSerializer.SerializeToByteString<Response>(response, GenericProtocol); - await SendResponse<GenericResponse>(genericResponse, token, config); - } - - /// <summary> /// Sends a general error response agnostic to the type of request. /// </summary> /// <param name="exception">The exception.</param> - /// <param name="config">Response configuration.</param> + /// <param name="token">The token.</param> /// <returns></returns> - public Task SendErrorResponse(Exception exception, String token) + public Task SendErrorResponse(Exception exception, string token) { - return SendResponse<ErrorResponse>(new ErrorResponse() { }, token, new TransportResponseConfig() - { - ErrorCode = ErrorCode.GeneralError, - Completed = true, - ErrorMessage = exception.FlattenMessage() - }); - } - - #endregion - - #region Request Handlers - - /// <summary> - /// Registers a custom request handler. - /// </summary> - /// <typeparam name="Request">The type of the request.</typeparam> - /// <param name="callback">The callback.</param> - public void RegisterRequestHandler<Request>(RequestHandlerCallbackDelegate<Request> callback) where Request : class - { - RequestHandler handler = new RequestHandler(); - handler.RequestType = typeof(Request); - handler.RegisteredCallback = callback; - handler.Callback = (transporter, obj, token) => - { - callback?.Invoke(transporter, obj as Request, token); - }; - - _requestHandlers.Add(handler); - } - - /// <summary> - /// Unregisters a custom request handler. - /// </summary> - /// <typeparam name="Request">The type of the request.</typeparam> - /// <param name="callback">The callback.</param> - public void UnregisterRequestHandler<Request>(RequestHandlerCallbackDelegate<Request> callback) where Request : class - { - var handler = _requestHandlers.FirstOrDefault(x => (x.RegisteredCallback as RequestHandlerCallbackDelegate<Request>) == callback); - if (handler != null) - { - _requestHandlers.Remove(handler); - } - } - - /// <summary> - /// Copies this instance request handlers to the specified instance. - /// </summary> - /// <param name="transporter">The transporter to copy the handlers to.</param> - public void CopyRequestHandlers(ITransporter transporter) - { - foreach (var handler in _requestHandlers.ToList()) - { - (transporter as TransporterBase)._requestHandlers.Add(handler); - _requestHandlers.Remove(handler); - } + return SendResponse<ErrorResponse>(new ErrorResponse() { }, token, true, ErrorCode.GeneralError, exception.Message); } #endregion @@ -1207,27 +877,17 @@ namespace Tango.Transport /// </summary> protected void StartThreads() { - try - { - _pullThread = new Thread(PullThreadMethod); - _pullThread.Name = $"{GetExtendedComponentName()} Pull Thread"; - _pullThread.IsBackground = true; - _pullThread.Start(); + _pullThread = new Thread(PullThreadMethod); + _pullThread.IsBackground = true; + _pullThread.Start(); - _pushThread = new Thread(PushThreadMethod); - _pushThread.Name = $"{GetExtendedComponentName()} Push Thread"; - _pushThread.IsBackground = true; - _pushThread.Start(); + _pushThread = new Thread(PushThreadMethod); + _pushThread.IsBackground = true; + _pushThread.Start(); - _keepAliveThread = new Thread(KeepAliveThreadMethod); - _keepAliveThread.Name = $"{GetExtendedComponentName()} KeepAlive Thread"; - _keepAliveThread.IsBackground = true; - _keepAliveThread.Start(); - } - catch (Exception ex) - { - LogManager.Log(ex, "Error starting transporter threads."); - } + _keepAliveThread = new Thread(KeepAliveThreadMethod); + _keepAliveThread.IsBackground = true; + _keepAliveThread.Start(); } /// <summary> @@ -1241,22 +901,12 @@ namespace Tango.Transport } /// <summary> - /// Gets the container timeout or default. - /// </summary> - /// <param name="container">The container.</param> - /// <returns></returns> - private TimeSpan? GetContainerContinuousTimeoutOrDefault(MessageContainer container) - { - return container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?); - } - - /// <summary> /// Enqueues the message and releases the push wait handle. /// </summary> /// <param name="message">The message.</param> private void EnqueueMessageOut(TransportMessageBase message) { - _sendingQueue.BlockEnqueue(message, message.Priority); + _sendingQueue.BlockEnqueue(message); } /// <summary> @@ -1289,46 +939,31 @@ namespace Tango.Transport { if (message.Token.Length != MESSAGE_TOKEN_LENGTH) { - var ex = LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Invalid message token length: " + message.Token)); - OnRequestFailed(message, ex); - message.SetException(ex); + message.SetException(LogManager.Log(new InvalidOperationException("Invalid message token length: " + message.Token))); continue; } + LogManager.Log("Sending message on adapter: " + Adapter.Address + "...", LogCategory.Debug, message.Message); + if (message.Direction == TransportMessageDirection.Request) { - if (message.ShouldLog) - { - LogManager.Log($"{GetExtendedComponentName()}: Sending request '{message.GetActualMessageTypeName()}'...\n{message.GetActualMessage().ToJsonString()}", LogCategory.Info); - OnRequestSent(message); - } - lock (_pendingRequests) { _pendingRequests.Add(message); } } - else - { - if (message.ShouldLog) - { - LogManager.Log($"{GetExtendedComponentName()}: Sending response '{message.GetActualMessageTypeName()}'...\n{message.GetActualMessage().ToJsonString()}", LogCategory.Info); - } - } - Adapter.Write(message.Serialize(), message.Immidiate); + Adapter.Write(message.Serialize()); message.ActivateTimeout?.Invoke(); - LogManager.Log($"{GetExtendedComponentName()}: Message sent...", LogCategory.Debug, message.Message); + LogManager.Log("Message sent on adapter: " + Adapter.Address + "...", LogCategory.Debug, message.Message); } else { if (message.Direction == TransportMessageDirection.Request) { - var ex = new InvalidOperationException($"{GetExtendedComponentName()}: Could not send message " + message.GetActualMessageTypeName() + ". Adapter is disconnected."); - OnRequestFailed(message, ex); - message.SetException(ex); + message.SetException(LogManager.Log(new InvalidOperationException("Could not send message " + message.Message.GetType().Name + ". Adapter is disconnected."))); } } @@ -1337,22 +972,15 @@ namespace Tango.Transport message.SetResult(true, true); } } - catch (ThreadAbortException) - { - Exception requestException = FailedStateException != null ? FailedStateException : new TransporterDisconnectedException("The transporter push thread has been aborted."); - OnRequestFailed(message, requestException); - message.SetException(requestException); - } catch (Exception ex) { - OnRequestFailed(message, ex); message.SetException(ex); } } } catch (ThreadAbortException) { - LogManager.Log($"{GetExtendedComponentName()}: Push thread has been aborted."); + LogManager.Log("Push thread has been aborted."); } catch (Exception ex) { @@ -1375,66 +1003,34 @@ namespace Tango.Transport { byte[] data = _arrivedResponses.BlockDequeue(); - LogManager.Log($"{GetExtendedComponentName()}: Message received...", LogCategory.Debug); + LogManager.Log("Message received on adapter: " + Adapter.Address, LogCategory.Debug); - LogManager.Log($"{GetExtendedComponentName()}: Parsing message container...", LogCategory.Debug); + LogManager.Log("Parsing message container...", LogCategory.Debug); + MessageContainer container = Encoder.DecodeContainer(data); - MessageContainer container = null; - - try - { - container = Encoder.DecodeContainer(data); - } - catch (Exception ex) - { - LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing message container. Skipping incoming message..."); - continue; - } - - LogManager.Log($"{GetExtendedComponentName()}: Message was identified as " + container.Type + ".", LogCategory.Debug); + LogManager.Log("Message was identified as " + container.Type + ".", LogCategory.Debug); if (container.Token.Length != MESSAGE_TOKEN_LENGTH) { - LogManager.Log($"{GetExtendedComponentName()}: Invalid message token length received: " + container.Token, LogCategory.Error); + LogManager.Log("Invalid message token length received: " + container.Token, LogCategory.Error); continue; } - LogManager.Log($"{GetExtendedComponentName()}: Searching for pending request token: " + container.Token, LogCategory.Debug); + LogManager.Log("Searching for pending request token: " + container.Token, LogCategory.Debug); TransportMessageBase request = null; lock (_pendingRequests) { - try - { - var requests = _pendingRequests.ToList().Where(x => x.Token == container.Token).ToList(); - - try - { - if (requests.Count > 1) - { - LogManager.Log($"{GetExtendedComponentName()}: {requests.Count} requests with the same token were detected - {requests.First().GetActualMessageTypeName()}.", LogCategory.Warning); - } - } - catch (Exception ex) - { - LogManager.Log(ex, LogCategory.Warning, "Something bad happened please investigate."); - } - - request = requests.LastOrDefault(); - } - catch (Exception ex) - { - LogManager.Log(ex, LogCategory.Warning, "Something bad happened please investigate."); - } + request = _pendingRequests.ToList().SingleOrDefault(x => x.Token == container.Token); } if (request != null) { - LogManager.Log($"{GetExtendedComponentName()}: Found pending request: " + request.GetActualMessageTypeName(), LogCategory.Debug); + LogManager.Log("Found pending request: " + (request.Message.GetType().IsGenericType ? request.Message.GetType().GetGenericArguments()[0].Name : request.Message.GetType().Name), LogCategory.Debug); if (!request.IsContinuous) { - LogManager.Log($"{GetExtendedComponentName()}: Pending request was identified as 'single response'. Removing pending request.", LogCategory.Debug); + LogManager.Log("Pending request was identified as 'single response'. Removing pending request.", LogCategory.Debug); _pendingRequests.Remove(request); @@ -1443,54 +1039,23 @@ namespace Tango.Transport if (container.Error == ErrorCode.None) { var message = Encoder.Decode(data); - - if (request.ShouldLog) - { - try - { - String responseType = message.Type.ToString(); - var messageContent = message.GetType().GetProperty("Message").GetValue(message); - - try - { - if (messageContent.GetType() == typeof(GenericResponse)) - { - Type genericType = Type.GetType((messageContent as GenericResponse).Type); - responseType = genericType.Name; - messageContent = GenericMessageSerializer.DeserializeFromByteString(genericType, (messageContent as GenericResponse).Data, GenericProtocol); - } - } - catch { } - - LogManager.Log($"{GetExtendedComponentName()}: Response received '{responseType}'...\n{messageContent.ToJsonString()}", LogCategory.Info); - OnResponseReceived(messageContent as IMessage); - } - catch - { - LogManager.Log("Error logging response received.", LogCategory.Warning); - } - } - - LogManager.Log($"{GetExtendedComponentName()}: Parsing inner response message and setting pending request task result...", LogCategory.Debug, message); + LogManager.Log("Parsing inner response message and setting pending request task result...", LogCategory.Debug, message); request.SetResult(message, true); - LogManager.Log($"{GetExtendedComponentName()}: Message enquirer released...", LogCategory.Debug); + LogManager.Log("Message enquirer released...", LogCategory.Debug); } else { - var ex = new ResponseErrorException(container, request.GetActualMessageTypeName()); - OnRequestFailed(request, ex); - request.SetException(ex); + request.SetException(LogManager.Log(new ResponseErrorException(container), LogCategory.Warning)); } } catch (Exception ex) { - OnRequestFailed(request, ex); - request.SetException(LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing response message.")); + request.SetException(LogManager.Log(ex, "Error parsing response message.")); } } else { - LogManager.Log($"{GetExtendedComponentName()}: Pending request was identified as 'continuous response'. keeping pending request.", LogCategory.Debug); + LogManager.Log("Pending request was identified as 'continuous response'. keeping pending request.", LogCategory.Debug); try { @@ -1498,50 +1063,32 @@ namespace Tango.Transport { var message = Encoder.Decode(data); - if (request.ShouldLog && !request.AtLeastOneResponseReceived) - { - try - { - var messageContent = message.GetType().GetProperty("Message").GetValue(message); - LogManager.Log($"{GetExtendedComponentName()}: Response received '{message.Type}'...\n{messageContent.ToJsonString()}", LogCategory.Info); - OnResponseReceived(messageContent as IMessage); - } - catch - { - LogManager.Log("Error logging response received.", LogCategory.Warning); - } - } - - LogManager.Log($"{GetExtendedComponentName()}: Parsing inner response message and invoking continuous response callback...", LogCategory.Debug, message); + LogManager.Log("Parsing inner response message and invoking continuous response callback...", LogCategory.Debug, message); if (container.Completed) { - LogManager.Log($"{GetExtendedComponentName()}: Continuous sequence completed.", LogCategory.Debug); + LogManager.Log("Continuous sequence completed.", LogCategory.Debug); _pendingRequests.Remove(request); } request.SetResult(message, container.Completed); } else if (container.Error == ErrorCode.ContinuousResponseAborted) { - String m = $"{GetExtendedComponentName()}: Continuous response " + container.Type + " has been aborted: " + container.Error.ToString(); + String m = "Continuous response " + container.Type + " has been aborted: " + container.Error.ToString(); LogManager.Log(m, LogCategory.Info); _pendingRequests.Remove(request); - OnRequestFailed(request, new ContinuousResponseAbortedException(container, m)); - request.SetException(new ContinuousResponseAbortedException(container, m)); + request.SetException(new ContinuousResponseAbortedException(m)); } else { - LogManager.Log($"{GetExtendedComponentName()}: Continuous response has returned with error: " + container.Error.ToString(), LogCategory.Warning); + LogManager.Log("Continuous response has returned with error: " + container.Error.ToString(), LogCategory.Warning); _pendingRequests.Remove(request); - - var exx = new ResponseErrorException(container, request.GetActualMessageTypeName()); - OnRequestFailed(request, exx); - request.SetException(exx); + request.SetException(new ResponseErrorException(container)); } } catch (Exception ex) { - LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing response message."); + LogManager.Log(ex, "Error parsing response message."); } } @@ -1558,36 +1105,29 @@ namespace Tango.Transport { if (container.Type.ToString().EndsWith("Response")) { - LogManager.Log($"{GetExtendedComponentName()}: A response message with no awaiting request was identified. {container.Type}, Token: {container.Token}. Message ignored.", LogCategory.Warning); + LogManager.Log(String.Format("A response message with no awaiting request was identified. {0}, Token: {1}. Message ignored.", container.Type, container.Token), LogCategory.Warning); continue; } - LogManager.Log($"{GetExtendedComponentName()}: Message was identified as a new request message: " + container.Type.ToString(), LogCategory.Debug); + LogManager.Log("Message was identified as a new request message: " + container.Type.ToString(), LogCategory.Debug); try { - LogManager.Log($"{GetExtendedComponentName()}: Saving request token: " + container.Token, LogCategory.Debug); + LogManager.Log("Saving request token: " + container.Token, LogCategory.Debug); _pendingResponses.Add(container.Token, new PendingResponse(container.Continuous)); if (container.Type == MessageType.KeepAliveRequest && EnableKeepAliveAutoResponse) { - LogManager.Log($"{GetExtendedComponentName()}: Submitting keep alive response...", LogCategory.Debug); - try - { - SendResponse<KeepAliveResponse>(new KeepAliveResponse(), container.Token, new TransportResponseConfig() - { - Priority = QueuePriority.High - }); - } - catch { } + LogManager.Log("Submitting keep alive response...", LogCategory.Debug); + SendResponse<KeepAliveResponse>(new KeepAliveResponse(), container.Token); } else { - LogManager.Log($"{GetExtendedComponentName()}: Invoking RequestReceived event...", LogCategory.Debug, container); + LogManager.Log("Invoking RequestReceived event...", LogCategory.Debug, container); try { - Task.Factory.StartNew(() => OnRequestReceived(new RequestReceivedEventArgs(container))); + Task.Factory.StartNew(() => OnRequestReceived(container)); } catch { @@ -1604,7 +1144,7 @@ namespace Tango.Transport } catch (ThreadAbortException) { - LogManager.Log($"{GetExtendedComponentName()}: Pull thread has been aborted."); + LogManager.Log("Pull thread has been aborted."); } catch (Exception ex) { @@ -1640,24 +1180,12 @@ namespace Tango.Transport if (_arrivedResponses.Count == 0) { retryCounter--; - - if (State == TransportComponentState.Connected) - { - var response = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), new TransportRequestConfig() - { - Timeout = KeepAliveTimeout, - Priority = QueuePriority.High - }).Result; - retryCounter = KeepAliveRetries; - } - else - { - continue; - } + var response = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), KeepAliveTimeout).Result; + retryCounter = KeepAliveRetries; } else { - LogManager.Log($"{GetExtendedComponentName()}: Keep alive request was skipped due to busy response queue.", LogCategory.Debug); + LogManager.Log("Keep alive request was skipped due to busy response queue.", LogCategory.Debug); } } } @@ -1682,14 +1210,14 @@ namespace Tango.Transport else { retryCounter = KeepAliveRetries; - LogManager.Log($"{GetExtendedComponentName()}: The transporter has not received a KeepAlive response within the given time, but was rescued due to other message received within the given time.", LogCategory.Warning); + LogManager.Log("The transporter has not received a KeepAlive response within the given time, but was rescued due to other message received within the given time.", LogCategory.Warning); } } } catch (ThreadAbortException) { aborted = true; - LogManager.Log($"{GetExtendedComponentName()}: KeepAlive thread has been aborted."); + LogManager.Log("KeepAlive thread has been aborted."); return; } catch (Exception ex) @@ -1706,7 +1234,7 @@ namespace Tango.Transport } catch (ThreadAbortException) { - LogManager.Log($"{GetExtendedComponentName()}: KeepAlive thread has been aborted."); + LogManager.Log("KeepAlive thread has been aborted."); } } |
