From 916d7f42b72eef37b590663b5964dcdec7065e5d Mon Sep 17 00:00:00 2001 From: Roy Ben Shabat Date: Fri, 4 Sep 2020 20:07:23 +0300 Subject: Refactored TransporterBase to unify all request messages ! --- .../Tango.Emulations/Emulators/MachineEmulator.cs | 2 +- .../Tango.Transport/TransportMessage.cs | 45 +-- .../Tango.Transport/TransportMessageBase.cs | 50 +++ .../Tango.Transport/TransporterBase.cs | 358 ++++++++------------- 4 files changed, 179 insertions(+), 276 deletions(-) (limited to 'Software/Visual_Studio') diff --git a/Software/Visual_Studio/Tango.Emulations/Emulators/MachineEmulator.cs b/Software/Visual_Studio/Tango.Emulations/Emulators/MachineEmulator.cs index 5e41146f0..dd82166d0 100644 --- a/Software/Visual_Studio/Tango.Emulations/Emulators/MachineEmulator.cs +++ b/Software/Visual_Studio/Tango.Emulations/Emulators/MachineEmulator.cs @@ -453,7 +453,7 @@ namespace Tango.Emulations.Emulators { for (int i = 0; i < request.Message.Amount; i++) { - Thread.Sleep(request.Message.Delay * 10); + Thread.Sleep(request.Message.Delay); var res = MessageFactory.CreateTangoMessage(request.Container.Token); res.Message.Progress = i; diff --git a/Software/Visual_Studio/Tango.Transport/TransportMessage.cs b/Software/Visual_Studio/Tango.Transport/TransportMessage.cs index f7a0cf8d8..9710e19b8 100644 --- a/Software/Visual_Studio/Tango.Transport/TransportMessage.cs +++ b/Software/Visual_Studio/Tango.Transport/TransportMessage.cs @@ -160,49 +160,7 @@ namespace Tango.Transport public override string GetActualMessageTypeName() { - String name = String.Empty; - - if (Message is ITangoMessage) - { - var message = Message.GetType().GetProperty("Message").GetValue(Message); - - if (message.GetType() == typeof(GenericRequest)) - { - try - { - name = (message as GenericRequest).GetTypeName(); - } - catch - { - name = message.GetType().Name; - } - } - else if (message.GetType() == typeof(GenericResponse)) - { - try - { - name = (message as GenericResponse).GetTypeName(); - } - catch - { - name = message.GetType().Name; - } - } - else - { - name = message.GetType().Name; - } - } - else if (Message is MessageContainer) - { - name = (Message as MessageContainer).Type.ToString(); - } - else - { - name = Message.GetType().Name; - } - - return name; + return GetActualMessageTypeName(Message); } public override object GetActualMessage() @@ -224,5 +182,6 @@ namespace Tango.Transport return obj; } + } } diff --git a/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs b/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs index d35ad108f..08ce403e1 100644 --- a/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs @@ -5,6 +5,9 @@ using System.Reactive.Subjects; using System.Text; using System.Threading.Tasks; using Tango.Core; +using Tango.PMR; +using Tango.PMR.Common; +using Tango.PMR.Integration; namespace Tango.Transport { @@ -86,5 +89,52 @@ namespace Tango.Transport Direction = direction; Serialize = toBytes; } + + public static String GetActualMessageTypeName(Object obj) + { + String name = String.Empty; + + if (obj is ITangoMessage) + { + var message = obj.GetType().GetProperty("Message").GetValue(obj); + + if (message.GetType() == typeof(GenericRequest)) + { + try + { + name = (message as GenericRequest).GetTypeName(); + } + catch + { + name = message.GetType().Name; + } + } + else if (message.GetType() == typeof(GenericResponse)) + { + try + { + name = (message as GenericResponse).GetTypeName(); + } + catch + { + name = message.GetType().Name; + } + } + else + { + name = message.GetType().Name; + } + } + else if (obj is MessageContainer) + { + name = (obj as MessageContainer).Type.ToString(); + } + else + { + name = obj.GetType().Name; + } + + return name; + } } } diff --git a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs index 435e66196..4f2e621b9 100644 --- a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs @@ -553,51 +553,13 @@ namespace Tango.Transport /// public Task SendRequest(IMessage request, TransportRequestConfig config = null) { - config = config ?? new TransportRequestConfig(); - - String requestName = request.GetType().Name; - String responseName = requestName.Replace("Request", "Response"); - MessageContainer container = new MessageContainer(); container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - container.Type = MessageFactory.ParseMessageType(requestName); - - LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + responseName, LogCategory.Debug); - - if (State != TransportComponentState.Connected) - { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); - } - - TaskCompletionSource source = new TaskCompletionSource(); - TransportMessage message = new TransportMessage(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), source); - message.ShouldLog = config.ShouldLog; - message.Immidiate = config.Immediate; - message.Priority = config.Priority; - message.TransportComponentName = GetExtendedComponentName(); - - message.ActivateTimeout = () => - { - TimeoutTask.StartNew(() => - { - - if (!source.Task.IsCompleted) - { - TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug); - OnRequestFailed(message, ex); - message.SetException(ex); - } - - }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); - }; - - EnqueueMessageOut(message); + container.Type = MessageFactory.ParseMessageType(request.GetType().Name); - return source.Task; + return SendRequestInternal(container.Token, container, config).Task; } /// @@ -609,46 +571,7 @@ namespace Tango.Transport /// public Task SendRequest(MessageContainer container, TransportRequestConfig config = null) { - config = config ?? new TransportRequestConfig(); - - String responseName = container.Type.ToString().Replace("Request", "Response"); - TimeSpan? timeout = GetContainerTimeoutOrDefault(container); - - LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + container.Type + " Token: " + container.Token, LogCategory.Debug); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + responseName, LogCategory.Debug); - - if (State != TransportComponentState.Connected) - { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); - } - - TaskCompletionSource source = new TaskCompletionSource(); - TransportMessage message = new TransportMessage(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), source); - message.ShouldLog = config.ShouldLog; - message.Immidiate = config.Immediate; - message.Priority = config.Priority; - message.TransportComponentName = GetExtendedComponentName(); - - message.ActivateTimeout = () => - { - TimeoutTask.StartNew(() => - { - - if (!source.Task.IsCompleted) - { - TimeoutException ex = new TimeoutException("Request message: " + container.Type + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug); - OnRequestFailed(message, ex); - message.SetException(ex); - } - - }, timeout != null ? timeout.Value : RequestTimeout); - - }; - - EnqueueMessageOut(message); - - return source.Task; + return SendRequestInternal(container.Token, container, config).Task; } /// @@ -661,44 +584,79 @@ namespace Tango.Transport /// /// public Task> SendRequest(TangoMessage request, TransportRequestConfig config = null) where Request : IMessage where Response : IMessage + { + return SendRequestInternal>(request.Container.Token, request, config).Task; + } + + private TaskCompletionSource SendRequestInternal(String token, Object request, TransportRequestConfig config = null) { config = config ?? new TransportRequestConfig(); - LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + typeof(Response).Name, LogCategory.Debug); + TaskCompletionSource source = new TaskCompletionSource(); - if (State != TransportComponentState.Connected) + String requestName = TransportMessageBase.GetActualMessageTypeName(request); + + Func toBytes = null; + + if (request is ITangoMessage tangoMessage) { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); + toBytes = () => Encoder.Encode(tangoMessage); + } + else if (request is IMessage protoMessage) + { + toBytes = () => protoMessage.ToByteArray(); } - request.Container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - - TaskCompletionSource> source = new TaskCompletionSource>(); - TransportMessage> message = new TransportMessage>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source); + TransportMessage message = new TransportMessage(token, request, TransportMessageDirection.Request, toBytes, source); message.ShouldLog = config.ShouldLog; message.Immidiate = config.Immediate; message.Priority = config.Priority; message.TransportComponentName = GetExtendedComponentName(); + TimeSpan? timeout = config.Timeout; + + if (request is MessageContainer container) + { + timeout = GetContainerTimeoutOrDefault(container); + } + + if (timeout == null) + { + timeout = RequestTimeout; + } + + if (request is ITangoMessage tanMessage) + { + tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds; + } + + String responseName = requestName.Replace("Request", "Response"); + + LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: {requestName} Token: {token}", LogCategory.Debug); + LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug); + + if (State != TransportComponentState.Connected) + { + throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); + } + message.ActivateTimeout = () => { TimeoutTask.StartNew(() => { if (!source.Task.IsCompleted) { - TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug); + TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds."); OnRequestFailed(message, ex); message.SetException(ex); } - }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); + }, timeout.Value); }; EnqueueMessageOut(message); - return source.Task; + return source; } /// @@ -710,60 +668,17 @@ namespace Tango.Transport /// public IObservable SendContinuousRequest(IMessage request, TransportContinuousRequestConfig config = null) { - config = config ?? new TransportContinuousRequestConfig(); - - String requestName = request.GetType().Name; - String responseName = requestName.Replace("Request", "Response"); - MessageContainer container = new MessageContainer(); container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); - container.Type = MessageFactory.ParseMessageType(requestName); - container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - container.Continuous = true; + container.Type = MessageFactory.ParseMessageType(request.GetType().Name); - LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); - - if (State != TransportComponentState.Connected) - { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); - } + //We need to assign this timeout because when the internal method detects a MessageContainer it will bypass the container continuous timeout assignment. + container.ContinuousTimeout = config.ContinuousTimeout != null ? (UInt32)config.ContinuousTimeout.Value.TotalMilliseconds : 0; - Subject subject = new Subject(); - - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + responseName, LogCategory.Debug); - - TransportMessage message = new TransportMessage(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), null) - { - IsContinuous = true, - ContinuesResponseSubject = subject, - ShouldLog = config.ShouldLog, - Immidiate = config.Immediate, - Priority = config.Priority, - TransportComponentName = GetExtendedComponentName(), - }; - - message.ActivateTimeout = () => - { - - TimeoutTask.StartNew(() => - { - - if (!message.AtLeastOneResponseReceived) - { - TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); - OnRequestFailed(message, ex); - message.SetException(ex); - } - - }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); - - }; - - EnqueueMessageOut(message); + container.Continuous = true; - return subject.AsObservable(); + return SendContinuousRequestInternal(container.Token, container, config); } /// @@ -777,78 +692,9 @@ namespace Tango.Transport /// public IObservable> SendContinuousRequest(TangoMessage request, TransportContinuousRequestConfig config = null) where Request : IMessage where Response : IMessage { - config = config ?? new TransportContinuousRequestConfig(); - - LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); - - Subject> subject = new Subject>(); - - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + typeof(Response).Name, LogCategory.Debug); - - if (State != TransportComponentState.Connected) - { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); - } - request.Container.Continuous = true; request.Container.Completed = false; - - request.Container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - request.Container.ContinuousTimeout = config.ContinuousTimeout.HasValue ? (UInt32)config.ContinuousTimeout.Value.TotalMilliseconds : 0; - - TransportMessage> message = new TransportMessage>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), null) - { - IsContinuous = true, - ContinuesResponseSubject = subject, - ShouldLog = config.ShouldLog, - Immidiate = config.Immediate, - Priority = config.Priority, - TransportComponentName = GetExtendedComponentName(), - }; - - message.ActivateTimeout = () => - { - TimeoutTask.StartNew(() => - { - - if (!message.AtLeastOneResponseReceived) - { - TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); - OnRequestFailed(message, ex); - message.SetException(ex); - } - - if (config.ContinuousTimeout != null) - { - Task.Factory.StartNew(async () => - { - while (!message.Completed) - { - await Task.Delay(config.ContinuousTimeout.Value).ContinueWith((y) => - { - if (!message.Completed) - { - if (DateTime.Now - message.LastResponseTime > config.ContinuousTimeout.Value) - { - TimeoutException ex = new TimeoutException("Continuous request message: " + typeof(Request).Name + " had failed to provide a response for a period of " + (config.ContinuousTimeout.Value.TotalSeconds) + " seconds and has timed out."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); - OnRequestFailed(message, ex); - message.SetException(ex); - return; - } - } - }); - } - }); - } - - }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); - }; - - EnqueueMessageOut(message); - - return subject.AsObservable(); + return SendContinuousRequestInternal>(request.Container.Token, request, config); } /// @@ -859,35 +705,75 @@ namespace Tango.Transport /// /// public IObservable SendContinuousRequest(MessageContainer container, TransportContinuousRequestConfig config = null) + { + return SendContinuousRequestInternal(container.Token, container, config); + } + + private IObservable SendContinuousRequestInternal(String token, Object request, TransportContinuousRequestConfig config = null) { config = config ?? new TransportContinuousRequestConfig(); - TimeSpan? timeout = GetContainerTimeoutOrDefault(container); - TimeSpan? continuousTimeout = container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?); + String requestName = TransportMessageBase.GetActualMessageTypeName(request); - String requestName = container.Type.ToString(); - String responseName = requestName.Replace("Request", "Response"); + Func toBytes = null; - LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); + if (request is ITangoMessage tangoMessage) + { + toBytes = () => Encoder.Encode(tangoMessage); + } + else if (request is IMessage protoMessage) + { + toBytes = () => protoMessage.ToByteArray(); + } - if (State != TransportComponentState.Connected) + Subject subject = new Subject(); + + TransportMessage message = new TransportMessage(token, request, TransportMessageDirection.Request, toBytes, null); + message.ShouldLog = config.ShouldLog; + message.Immidiate = config.Immediate; + message.Priority = config.Priority; + message.TransportComponentName = GetExtendedComponentName(); + message.IsContinuous = true; + message.ContinuesResponseSubject = subject; + + TimeSpan? timeout = config.Timeout; + TimeSpan? continuousTimeout = config.ContinuousTimeout; + + if (request is MessageContainer container) { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); + timeout = GetContainerTimeoutOrDefault(container); + + if (timeout == null) + { + timeout = config.Timeout != null ? config.Timeout.Value : RequestTimeout; + container.Timeout = (uint)timeout.Value.TotalMilliseconds; + } + + continuousTimeout = GetContainerContinuousTimeoutOrDefault(container); + } + + if (timeout == null) + { + timeout = RequestTimeout; + } + + if (request is ITangoMessage tanMessage) + { + tanMessage.Container.Continuous = true; + tanMessage.Container.Completed = false; + tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds; + tanMessage.Container.ContinuousTimeout = continuousTimeout != null ? (UInt32)continuousTimeout.Value.TotalMilliseconds : 0; } - Subject subject = new Subject(); + String responseName = requestName.Replace("Request", "Response"); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + responseName, LogCategory.Debug); + LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: {requestName} Token: {token}", LogCategory.Debug); + LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug); - TransportMessage message = new TransportMessage(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), null) + if (State != TransportComponentState.Connected) { - IsContinuous = true, - ContinuesResponseSubject = subject, - ShouldLog = config.ShouldLog, - Immidiate = config.Immediate, - Priority = config.Priority, - TransportComponentName = GetExtendedComponentName(), - }; + throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); + } message.ActivateTimeout = () => { @@ -896,8 +782,7 @@ namespace Tango.Transport if (!message.AtLeastOneResponseReceived) { - TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); + TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds."); OnRequestFailed(message, ex); message.SetException(ex); } @@ -914,8 +799,7 @@ namespace Tango.Transport { if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value) { - TimeoutException ex = new TimeoutException("Continuous request message: " + requestName + " had failed to provide a response for a period of " + (continuousTimeout.Value.TotalSeconds) + " seconds and has timed out."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); + TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Continuous request message '{requestName}' had failed to provide a response for a period of {continuousTimeout.Value.TotalSeconds} seconds and has timed out."); OnRequestFailed(message, ex); message.SetException(ex); return; @@ -926,7 +810,7 @@ namespace Tango.Transport }); } - }, timeout != null ? timeout.Value : RequestTimeout); + }, timeout.Value); }; EnqueueMessageOut(message); @@ -1321,6 +1205,16 @@ namespace Tango.Transport return container.Timeout > 0 ? TimeSpan.FromMilliseconds(container.Timeout) : default(TimeSpan?); } + /// + /// Gets the container timeout or default. + /// + /// The container. + /// + private TimeSpan? GetContainerContinuousTimeoutOrDefault(MessageContainer container) + { + return container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?); + } + /// /// Enqueues the message and releases the push wait handle. /// -- cgit v1.3.1