diff options
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport/TransporterBase.cs')
| -rw-r--r-- | Software/Visual_Studio/Tango.Transport/TransporterBase.cs | 495 |
1 files changed, 323 insertions, 172 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs index 263e78933..ba220797e 100644 --- a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs @@ -18,6 +18,7 @@ using Tango.Transport.Encoders; using Tango.PMR.Connection; using Tango.Core.Threading; using System.IO; +using Tango.Core.ExtensionMethods; namespace Tango.Transport { @@ -57,6 +58,21 @@ namespace Tango.Transport /// </summary> public event EventHandler<TransportComponentState> StateChanged; + /// <summary> + /// Occurs when a request has been sent. + /// </summary> + public event EventHandler<IMessage> RequestSent; + + /// <summary> + /// Occurs when a request response has been received. + /// </summary> + public event EventHandler<IMessage> ResponseReceived; + + /// <summary> + /// Occurs when a request has failed. + /// </summary> + public event EventHandler<RequestFailedEventArgs> RequestFailed; + #endregion #region Properties @@ -283,6 +299,7 @@ namespace Tango.Transport try { LogManager.Log($"Notifying continuous request '{(request.Message as ITangoMessage).Type}'..."); + OnRequestFailed(request, new TransporterDisconnectedException("Transporter disconnected.")); request.SetException(new TransporterDisconnectedException("Transporter disconnected.")); } catch (Exception e) @@ -292,6 +309,49 @@ namespace Tango.Transport } } + /// <summary> + /// Called when the request has been sent + /// </summary> + /// <param name="response">The request.</param> + private void OnRequestSent(TransportMessageBase request) + { + if (request.ShouldLog) + { + IMessage message = request.GetActualMessage() as IMessage; + + if (message != null) + { + RequestSent?.Invoke(this, message); + } + } + } + + /// <summary> + /// Called when the response has been received + /// </summary> + /// <param name="response">The response.</param> + private void OnResponseReceived(IMessage response) + { + if (response != null) + { + ResponseReceived?.Invoke(this, response); + } + } + + /// <summary> + /// Called when the request has been failed + /// </summary> + /// <param name="request">The request.</param> + private void OnRequestFailed(TransportMessageBase request, Exception exception) + { + IMessage message = request.GetActualMessage() as IMessage; + + if (message != null) + { + RequestFailed?.Invoke(this, new RequestFailedEventArgs(message, exception)); + } + } + #endregion #region Constructors @@ -381,21 +441,28 @@ namespace Tango.Transport NotifyContinuousRequestMessagesDisconnection(); } + #endregion + + #region Public Request Methods + /// <summary> /// Sends a request. /// </summary> /// <param name="request">The request.</param> - /// <param name="timeout">Optional timeout. If not specified will use the <see cref="RequestTimeout" />.</param> + /// <param name="config">Request configuration.</param> /// <returns></returns> - public Task<IMessage> SendRequest(IMessage request, TimeSpan? timeout = null) + /// <exception cref="InvalidOperationException"></exception> + public Task<IMessage> SendRequest(IMessage request, TransportRequestConfig config = null) { + config = config ?? new TransportRequestConfig(); + String requestName = request.GetType().Name; String responseName = requestName.Replace("Request", "Response"); MessageContainer container = new MessageContainer(); container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); - container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; + container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; container.Type = MessageFactory.ParseMessageType(requestName); LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); @@ -408,6 +475,8 @@ namespace Tango.Transport TaskCompletionSource<IMessage> source = new TaskCompletionSource<IMessage>(); TransportMessage<IMessage> message = new TransportMessage<IMessage>(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), source); + message.ShouldLog = config.ShouldLog; + message.TransportComponentName = GetExtendedComponentName(); message.ActivateTimeout = () => { @@ -416,13 +485,13 @@ namespace Tango.Transport if (!source.Task.IsCompleted) { - TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log(ex); + TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug); - source.SetException(ex); + OnRequestFailed(message, ex); + message.SetException(ex); } - }, timeout != null ? timeout.Value : RequestTimeout); + }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); }; EnqueueMessageOut(message); @@ -431,49 +500,16 @@ namespace Tango.Transport } /// <summary> - /// Sends the response. - /// </summary> - /// <param name="response">The response.</param> - /// <param name="token">The token.</param> - /// <param name="completed">The completed.</param> - /// <param name="errorCode">The error code.</param> - /// <param name="errorMessage">The error message.</param> - /// <returns></returns> - /// <exception cref="System.InvalidOperationException">Matching request token was not found!</exception> - public Task SendResponse(IMessage response, string token, bool? completed = default(bool?), ErrorCode? errorCode = default(ErrorCode?), string errorMessage = null) - { - String responseName = response.GetType().Name; - - MessageContainer container = new MessageContainer(); - container.Token = token; - container.Data = response.ToByteString(); - container.Type = MessageFactory.ParseMessageType(responseName); - - if (errorCode.HasValue) - { - container.Error = errorCode.Value; - } - - if (errorMessage != null) - { - container.ErrorMessage = errorMessage; - } - - if (completed.HasValue) - { - container.Completed = completed.Value; - } - - return SendResponse(container); - } - - /// <summary> /// Sends the request. /// </summary> - /// <param name="container">The container.</param> + /// <param name="container"></param> + /// <param name="config">Request configuration.</param> /// <returns></returns> - public Task<MessageContainer> SendRequest(MessageContainer container) + /// <exception cref="InvalidOperationException"></exception> + public Task<MessageContainer> SendRequest(MessageContainer container, TransportRequestConfig config = null) { + config = config ?? new TransportRequestConfig(); + String responseName = container.Type.ToString().Replace("Request", "Response"); TimeSpan? timeout = GetContainerTimeoutOrDefault(container); @@ -487,6 +523,8 @@ namespace Tango.Transport TaskCompletionSource<MessageContainer> source = new TaskCompletionSource<MessageContainer>(); TransportMessage<MessageContainer> message = new TransportMessage<MessageContainer>(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), source); + message.ShouldLog = config.ShouldLog; + message.TransportComponentName = GetExtendedComponentName(); message.ActivateTimeout = () => { @@ -496,9 +534,9 @@ namespace Tango.Transport if (!source.Task.IsCompleted) { TimeoutException ex = new TimeoutException("Request message: " + container.Type + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log(ex); LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug); - source.SetException(ex); + OnRequestFailed(message, ex); + message.SetException(ex); } }, timeout != null ? timeout.Value : RequestTimeout); @@ -511,61 +549,65 @@ namespace Tango.Transport } /// <summary> - /// Sends the response. + /// Sends a request. /// </summary> - /// <param name="container">The container.</param> + /// <typeparam name="Request">The type of the request.</typeparam> + /// <typeparam name="Response">The type of the response.</typeparam> + /// <param name="request">The request.</param> + /// <param name="config">Request configuration.</param> /// <returns></returns> - /// <exception cref="System.InvalidOperationException">Matching request token was not found!</exception> - public Task SendResponse(MessageContainer container) + /// <exception cref="InvalidOperationException"></exception> + public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TransportRequestConfig config = null) where Request : IMessage<Request> where Response : IMessage<Response> { - String token = container.Token; - - LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + container.Type, LogCategory.Debug); + config = config ?? new TransportRequestConfig(); - PendingResponse pendingResponse = null; + LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); + LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + typeof(Response).Name, LogCategory.Debug); if (State != TransportComponentState.Connected) { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}.")); + throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); } - LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug); + request.Container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - if (_pendingResponses.TryGetValue(token, out pendingResponse)) - { - LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug); + TaskCompletionSource<TangoMessage<Response>> source = new TaskCompletionSource<TangoMessage<Response>>(); + TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source); + message.ShouldLog = config.ShouldLog; + message.TransportComponentName = GetExtendedComponentName(); - if (!pendingResponse.IsContinuous) - { - LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug); - _pendingResponses.Remove(token); - } - else if (container.Completed) - { - LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug); - _pendingResponses.Remove(token); - } - } - else + message.ActivateTimeout = () => { - //This should never happen. - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical); - } + TimeoutTask.StartNew(() => + { + + if (!source.Task.IsCompleted) + { + TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); + LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug); + OnRequestFailed(message, ex); + message.SetException(ex); + } + + }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); + }; - TaskCompletionSource<object> source = new TaskCompletionSource<object>(); - TransportMessage<object> message = new TransportMessage<object>(token, container, TransportMessageDirection.Response, () => container.ToByteArray(), source); EnqueueMessageOut(message); + return source.Task; } /// <summary> /// Sends a request and expecting multiple response messages. /// </summary> - /// <param name="request">The request.</param> - /// <param name="timeout"></param> + /// <param name="request"></param> + /// <param name="config">Request configuration.</param> /// <returns></returns> - public IObservable<IMessage> SendContinuousRequest(IMessage request, TimeSpan? timeout = default(TimeSpan?)) + /// <exception cref="InvalidOperationException"></exception> + public IObservable<IMessage> SendContinuousRequest(IMessage request, TransportContinuousRequestConfig config = null) { + config = config ?? new TransportContinuousRequestConfig(); + String requestName = request.GetType().Name; String responseName = requestName.Replace("Request", "Response"); @@ -573,7 +615,7 @@ namespace Tango.Transport container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); container.Type = MessageFactory.ParseMessageType(requestName); - container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; + container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; container.Continuous = true; LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); @@ -591,6 +633,8 @@ namespace Tango.Transport { IsContinuous = true, ContinuesResponseSubject = subject, + ShouldLog = config.ShouldLog, + TransportComponentName = GetExtendedComponentName(), }; message.ActivateTimeout = () => @@ -601,13 +645,13 @@ namespace Tango.Transport if (!message.AtLeastOneResponseReceived) { - TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log(ex); + TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); + OnRequestFailed(message, ex); message.SetException(ex); } - }, timeout != null ? timeout.Value : RequestTimeout); + }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); }; @@ -617,59 +661,18 @@ namespace Tango.Transport } /// <summary> - /// Sends a request. - /// </summary> - /// <typeparam name="Request">The type of the request.</typeparam> - /// <typeparam name="Response">The type of the response.</typeparam> - /// <param name="request">The request.</param> - /// <param name="timeout">Optional timeout. If not specified will use the <see cref="RequestTimeout" />.</param> - /// <returns></returns> - public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TimeSpan? timeout = null) where Request : IMessage<Request> where Response : IMessage<Response> - { - LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + typeof(Response).Name, LogCategory.Debug); - - if (State != TransportComponentState.Connected) - { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); - } - - request.Container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - - TaskCompletionSource<TangoMessage<Response>> source = new TaskCompletionSource<TangoMessage<Response>>(); - TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source); - - message.ActivateTimeout = () => - { - TimeoutTask.StartNew(() => - { - - if (!source.Task.IsCompleted) - { - TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log(ex); - LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug); - source.SetException(ex); - } - - }, timeout != null ? timeout.Value : RequestTimeout); - }; - - EnqueueMessageOut(message); - - return source.Task; - } - - /// <summary> /// Sends a request and expecting multiple response messages. /// </summary> /// <typeparam name="Request">The type of the request.</typeparam> /// <typeparam name="Response">The type of the response.</typeparam> - /// <param name="request">The request.</param> - /// <param name="timeout"></param> + /// <param name="request"></param> + /// <param name="config">Request configuration.</param> /// <returns></returns> - public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, TimeSpan? firstTimeout = null, TimeSpan? continousTimeout = null) where Request : IMessage<Request> where Response : IMessage<Response> + /// <exception cref="InvalidOperationException"></exception> + public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, TransportContinuousRequestConfig config = null) where Request : IMessage<Request> where Response : IMessage<Response> { + config = config ?? new TransportContinuousRequestConfig(); + LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); Subject<TangoMessage<Response>> subject = new Subject<TangoMessage<Response>>(); @@ -684,13 +687,15 @@ namespace Tango.Transport request.Container.Continuous = true; request.Container.Completed = false; - request.Container.Timeout = firstTimeout.HasValue ? (UInt32)firstTimeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - request.Container.ContinuousTimeout = continousTimeout.HasValue ? (UInt32)continousTimeout.Value.TotalMilliseconds : 0; + request.Container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; + request.Container.ContinuousTimeout = config.ContinuousTimeout.HasValue ? (UInt32)config.ContinuousTimeout.Value.TotalMilliseconds : 0; TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), null) { IsContinuous = true, ContinuesResponseSubject = subject, + ShouldLog = config.ShouldLog, + TransportComponentName = GetExtendedComponentName(), }; message.ActivateTimeout = () => @@ -700,27 +705,27 @@ namespace Tango.Transport if (!message.AtLeastOneResponseReceived) { - TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (firstTimeout != null ? firstTimeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log(ex); + TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); + OnRequestFailed(message, ex); message.SetException(ex); } - if (continousTimeout != null) + if (config.ContinuousTimeout != null) { Task.Factory.StartNew(async () => { while (!message.Completed) { - await Task.Delay(continousTimeout.Value).ContinueWith((y) => + await Task.Delay(config.ContinuousTimeout.Value).ContinueWith((y) => { if (!message.Completed) { - if (DateTime.Now - message.LastResponseTime > continousTimeout.Value) + if (DateTime.Now - message.LastResponseTime > config.ContinuousTimeout.Value) { - TimeoutException ex = new TimeoutException("Continuous request message: " + typeof(Request).Name + " had failed to provide a response for a period of " + (continousTimeout.Value.TotalSeconds) + " seconds and has timed out."); - LogManager.Log(ex); + TimeoutException ex = new TimeoutException("Continuous request message: " + typeof(Request).Name + " had failed to provide a response for a period of " + (config.ContinuousTimeout.Value.TotalSeconds) + " seconds and has timed out."); LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); + OnRequestFailed(message, ex); message.SetException(ex); return; } @@ -730,7 +735,7 @@ namespace Tango.Transport }); } - }, firstTimeout != null ? firstTimeout.Value : RequestTimeout); + }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); }; EnqueueMessageOut(message); @@ -742,9 +747,13 @@ namespace Tango.Transport /// Sends a continuous request. /// </summary> /// <param name="container">The container.</param> + /// <param name="config">Request configuration.</param> /// <returns></returns> - public IObservable<MessageContainer> SendContinuousRequest(MessageContainer container) + /// <exception cref="InvalidOperationException"></exception> + public IObservable<MessageContainer> SendContinuousRequest(MessageContainer container, TransportContinuousRequestConfig config = null) { + config = config ?? new TransportContinuousRequestConfig(); + TimeSpan? timeout = GetContainerTimeoutOrDefault(container); TimeSpan? continuousTimeout = container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?); @@ -766,6 +775,8 @@ namespace Tango.Transport { IsContinuous = true, ContinuesResponseSubject = subject, + ShouldLog = config.ShouldLog, + TransportComponentName = GetExtendedComponentName(), }; message.ActivateTimeout = () => @@ -776,8 +787,8 @@ namespace Tango.Transport if (!message.AtLeastOneResponseReceived) { TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log(ex); LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); + OnRequestFailed(message, ex); message.SetException(ex); } @@ -794,8 +805,8 @@ namespace Tango.Transport if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value) { TimeoutException ex = new TimeoutException("Continuous request message: " + requestName + " had failed to provide a response for a period of " + (continuousTimeout.Value.TotalSeconds) + " seconds and has timed out."); - LogManager.Log(ex); LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); + OnRequestFailed(message, ex); message.SetException(ex); return; } @@ -813,6 +824,10 @@ namespace Tango.Transport return subject.AsObservable(); } + #endregion + + #region Public Response Methods + /// <summary> /// Sends a response. /// </summary> @@ -825,18 +840,102 @@ namespace Tango.Transport } /// <summary> - /// Sends a response for the specified token. + /// Sends the response. + /// </summary> + /// <param name="response"></param> + /// <param name="token">Request token.</param> + /// <param name="config">Response configuration.</param> + /// <returns></returns> + public Task SendResponse(IMessage response, String token, TransportResponseConfig config = null) + { + config = config ?? new TransportResponseConfig(); + + String responseName = response.GetType().Name; + + MessageContainer container = new MessageContainer(); + container.Token = token; + container.Data = response.ToByteString(); + container.Type = MessageFactory.ParseMessageType(responseName); + + if (config.ErrorCode.HasValue) + { + container.Error = config.ErrorCode.Value; + } + + if (config.ErrorMessage != null) + { + container.ErrorMessage = config.ErrorMessage; + } + + container.Completed = config.Completed; + + return SendResponse(container); + } + + /// <summary> + /// Sends the response. + /// </summary> + /// <param name="container">The container.</param> + /// <returns></returns> + /// <exception cref="InvalidOperationException"> + /// </exception> + public Task SendResponse(MessageContainer container) + { + String token = container.Token; + + LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + container.Type, LogCategory.Debug); + + PendingResponse pendingResponse = null; + + if (State != TransportComponentState.Connected) + { + throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}.")); + } + + LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug); + + if (_pendingResponses.TryGetValue(token, out pendingResponse)) + { + LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug); + + if (!pendingResponse.IsContinuous) + { + LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug); + _pendingResponses.Remove(token); + } + else if (container.Completed) + { + LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug); + _pendingResponses.Remove(token); + } + } + else + { + //This should never happen. + throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical); + } + + TaskCompletionSource<object> source = new TaskCompletionSource<object>(); + TransportMessage<object> message = new TransportMessage<object>(token, container, TransportMessageDirection.Response, () => container.ToByteArray(), source); + EnqueueMessageOut(message); + return source.Task; + } + + /// <summary> + /// Sends a response. /// </summary> /// <typeparam name="Response">The type of the response.</typeparam> /// <param name="response">The response.</param> - /// <param name="token">The token.</param> - /// <param name="completed">The completed.</param> - /// <param name="errorCode">The error code.</param> - /// <param name="errorMessage">The error message.</param> + /// <param name="token">Request token.</param> + /// <param name="config">Response configuration.</param> /// <returns></returns> - /// <exception cref="InvalidOperationException">Matching request token was not found!</exception> - public Task SendResponse<Response>(TangoMessage<Response> response, String token, bool? completed = null, ErrorCode? errorCode = null, String errorMessage = null) where Response : IMessage<Response> + /// <exception cref="InvalidOperationException"> + /// Transporter push thread is not in a running state. + /// </exception> + public Task SendResponse<Response>(TangoMessage<Response> response, String token, TransportResponseConfig config = null) where Response : IMessage<Response> { + config = config ?? new TransportResponseConfig(); + if (_pushThread == null || _pushThread.ThreadState == ThreadState.Aborted) { throw new InvalidOperationException("Transporter push thread is not in a running state."); @@ -844,19 +943,16 @@ namespace Tango.Transport response.Container.Token = token; - if (completed.HasValue) - { - response.Container.Completed = completed.Value; - } + response.Container.Completed = config.Completed; - if (errorCode.HasValue) + if (config.ErrorCode.HasValue) { - response.Container.Error = errorCode.Value; + response.Container.Error = config.ErrorCode.Value; } - if (!String.IsNullOrEmpty(errorMessage)) + if (!String.IsNullOrEmpty(config.ErrorMessage)) { - response.Container.ErrorMessage = errorMessage; + response.Container.ErrorMessage = config.ErrorMessage; } LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + typeof(Response).Name, LogCategory.Debug); @@ -901,11 +997,16 @@ namespace Tango.Transport /// Sends a general error response agnostic to the type of request. /// </summary> /// <param name="exception">The exception.</param> - /// <param name="token">The token.</param> + /// <param name="config">Response configuration.</param> /// <returns></returns> - public Task SendErrorResponse(Exception exception, string token) + public Task SendErrorResponse(Exception exception, String token) { - return SendResponse<ErrorResponse>(new ErrorResponse() { }, token, true, ErrorCode.GeneralError, exception.Message); + return SendResponse<ErrorResponse>(new ErrorResponse() { }, token, new TransportResponseConfig() + { + ErrorCode = ErrorCode.GeneralError, + Completed = true, + ErrorMessage = exception.Message + }); } #endregion @@ -989,31 +1090,46 @@ namespace Tango.Transport { if (message.Token.Length != MESSAGE_TOKEN_LENGTH) { - message.SetException(LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Invalid message token length: " + message.Token))); + var ex = LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Invalid message token length: " + message.Token)); + OnRequestFailed(message, ex); + message.SetException(ex); continue; } - LogManager.Log($"{GetExtendedComponentName()}: Sending message on adapter: " + Adapter.Address + "...", LogCategory.Debug, message.Message); - if (message.Direction == TransportMessageDirection.Request) { + if (message.ShouldLog) + { + LogManager.Log($"{GetExtendedComponentName()}: Sending request '{message.GetActualMessageTypeName()}'...\n{message.GetActualMessage().ToJsonString()}", LogCategory.Info); + OnRequestSent(message); + } + lock (_pendingRequests) { _pendingRequests.Add(message); } } + else + { + if (message.ShouldLog) + { + LogManager.Log($"{GetExtendedComponentName()}: Sending response '{message.GetActualMessageTypeName()}'...\n{message.GetActualMessage().ToJsonString()}", LogCategory.Info); + } + } Adapter.Write(message.Serialize()); message.ActivateTimeout?.Invoke(); - LogManager.Log($"{GetExtendedComponentName()}: Message sent on adapter: " + Adapter.Address + "...", LogCategory.Debug, message.Message); + LogManager.Log($"{GetExtendedComponentName()}: Message sent...", LogCategory.Debug, message.Message); } else { if (message.Direction == TransportMessageDirection.Request) { - message.SetException(LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send message " + message.Message.GetType().Name + ". Adapter is disconnected."))); + var ex = new InvalidOperationException($"{GetExtendedComponentName()}: Could not send message " + message.Message.GetType().Name + ". Adapter is disconnected."); + OnRequestFailed(message, ex); + message.SetException(ex); } } @@ -1024,6 +1140,7 @@ namespace Tango.Transport } catch (Exception ex) { + OnRequestFailed(message, ex); message.SetException(ex); } } @@ -1053,7 +1170,7 @@ namespace Tango.Transport { byte[] data = _arrivedResponses.BlockDequeue(); - LogManager.Log($"{GetExtendedComponentName()}: Message received on adapter: " + Adapter.Address, LogCategory.Debug); + LogManager.Log($"{GetExtendedComponentName()}: Message received...", LogCategory.Debug); LogManager.Log($"{GetExtendedComponentName()}: Parsing message container...", LogCategory.Debug); MessageContainer container = Encoder.DecodeContainer(data); @@ -1110,17 +1227,35 @@ namespace Tango.Transport if (container.Error == ErrorCode.None) { var message = Encoder.Decode(data); + + if (request.ShouldLog) + { + try + { + var messageContent = message.GetType().GetProperty("Message").GetValue(message); + LogManager.Log($"{GetExtendedComponentName()}: Response received '{message.Type}'...\n{messageContent.ToJsonString()}", LogCategory.Info); + OnResponseReceived(messageContent as IMessage); + } + catch + { + LogManager.Log("Error logging response received.", LogCategory.Warning); + } + } + LogManager.Log($"{GetExtendedComponentName()}: Parsing inner response message and setting pending request task result...", LogCategory.Debug, message); request.SetResult(message, true); LogManager.Log($"{GetExtendedComponentName()}: Message enquirer released...", LogCategory.Debug); } else { - request.SetException(LogManager.Log(new ResponseErrorException(container), LogCategory.Warning)); + var ex = LogManager.Log(new ResponseErrorException(container)); + OnRequestFailed(request, ex); + request.SetException(ex); } } catch (Exception ex) { + OnRequestFailed(request, ex); request.SetException(LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing response message.")); } } @@ -1134,6 +1269,20 @@ namespace Tango.Transport { var message = Encoder.Decode(data); + if (request.ShouldLog && !request.AtLeastOneResponseReceived) + { + try + { + var messageContent = message.GetType().GetProperty("Message").GetValue(message); + LogManager.Log($"{GetExtendedComponentName()}: Response received '{message.Type}'...\n{messageContent.ToJsonString()}", LogCategory.Info); + OnResponseReceived(messageContent as IMessage); + } + catch + { + LogManager.Log("Error logging response received.", LogCategory.Warning); + } + } + LogManager.Log($"{GetExtendedComponentName()}: Parsing inner response message and invoking continuous response callback...", LogCategory.Debug, message); if (container.Completed) @@ -1148,12 +1297,14 @@ namespace Tango.Transport String m = $"{GetExtendedComponentName()}: Continuous response " + container.Type + " has been aborted: " + container.Error.ToString(); LogManager.Log(m, LogCategory.Info); _pendingRequests.Remove(request); - request.SetException(new ContinuousResponseAbortedException(m)); + OnRequestFailed(request, new ContinuousResponseAbortedException(container, m)); + request.SetException(new ContinuousResponseAbortedException(container, m)); } else { LogManager.Log($"{GetExtendedComponentName()}: Continuous response has returned with error: " + container.Error.ToString(), LogCategory.Warning); _pendingRequests.Remove(request); + OnRequestFailed(request, new ResponseErrorException(container)); request.SetException(new ResponseErrorException(container)); } } @@ -1251,7 +1402,7 @@ namespace Tango.Transport if (_arrivedResponses.Count == 0) { retryCounter--; - var response = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), KeepAliveTimeout).Result; + var response = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), new TransportRequestConfig() { Timeout = KeepAliveTimeout }).Result; retryCounter = KeepAliveRetries; } else |
