diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2020-09-04 20:07:23 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2020-09-04 20:07:23 +0300 |
| commit | 916d7f42b72eef37b590663b5964dcdec7065e5d (patch) | |
| tree | 7c9fcb56e5bb8c8a174f9695bdc309f11c3365c4 /Software/Visual_Studio/Tango.Transport | |
| parent | a68e224066b0c39bafb811be86ea736418db59df (diff) | |
| download | Tango-916d7f42b72eef37b590663b5964dcdec7065e5d.tar.gz Tango-916d7f42b72eef37b590663b5964dcdec7065e5d.zip | |
Refactored TransporterBase to unify all request messages !
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport')
3 files changed, 178 insertions, 275 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/TransportMessage.cs b/Software/Visual_Studio/Tango.Transport/TransportMessage.cs index f7a0cf8d8..9710e19b8 100644 --- a/Software/Visual_Studio/Tango.Transport/TransportMessage.cs +++ b/Software/Visual_Studio/Tango.Transport/TransportMessage.cs @@ -160,49 +160,7 @@ namespace Tango.Transport public override string GetActualMessageTypeName() { - String name = String.Empty; - - if (Message is ITangoMessage) - { - var message = Message.GetType().GetProperty("Message").GetValue(Message); - - if (message.GetType() == typeof(GenericRequest)) - { - try - { - name = (message as GenericRequest).GetTypeName(); - } - catch - { - name = message.GetType().Name; - } - } - else if (message.GetType() == typeof(GenericResponse)) - { - try - { - name = (message as GenericResponse).GetTypeName(); - } - catch - { - name = message.GetType().Name; - } - } - else - { - name = message.GetType().Name; - } - } - else if (Message is MessageContainer) - { - name = (Message as MessageContainer).Type.ToString(); - } - else - { - name = Message.GetType().Name; - } - - return name; + return GetActualMessageTypeName(Message); } public override object GetActualMessage() @@ -224,5 +182,6 @@ namespace Tango.Transport return obj; } + } } diff --git a/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs b/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs index d35ad108f..08ce403e1 100644 --- a/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs @@ -5,6 +5,9 @@ using System.Reactive.Subjects; using System.Text; using System.Threading.Tasks; using Tango.Core; +using Tango.PMR; +using Tango.PMR.Common; +using Tango.PMR.Integration; namespace Tango.Transport { @@ -86,5 +89,52 @@ namespace Tango.Transport Direction = direction; Serialize = toBytes; } + + public static String GetActualMessageTypeName(Object obj) + { + String name = String.Empty; + + if (obj is ITangoMessage) + { + var message = obj.GetType().GetProperty("Message").GetValue(obj); + + if (message.GetType() == typeof(GenericRequest)) + { + try + { + name = (message as GenericRequest).GetTypeName(); + } + catch + { + name = message.GetType().Name; + } + } + else if (message.GetType() == typeof(GenericResponse)) + { + try + { + name = (message as GenericResponse).GetTypeName(); + } + catch + { + name = message.GetType().Name; + } + } + else + { + name = message.GetType().Name; + } + } + else if (obj is MessageContainer) + { + name = (obj as MessageContainer).Type.ToString(); + } + else + { + name = obj.GetType().Name; + } + + return name; + } } } diff --git a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs index 435e66196..4f2e621b9 100644 --- a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs @@ -553,51 +553,13 @@ namespace Tango.Transport /// <exception cref="InvalidOperationException"></exception> public Task<IMessage> SendRequest(IMessage request, TransportRequestConfig config = null) { - config = config ?? new TransportRequestConfig(); - - String requestName = request.GetType().Name; - String responseName = requestName.Replace("Request", "Response"); - MessageContainer container = new MessageContainer(); container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - container.Type = MessageFactory.ParseMessageType(requestName); - - LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + responseName, LogCategory.Debug); - - if (State != TransportComponentState.Connected) - { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); - } - - TaskCompletionSource<IMessage> source = new TaskCompletionSource<IMessage>(); - TransportMessage<IMessage> message = new TransportMessage<IMessage>(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), source); - message.ShouldLog = config.ShouldLog; - message.Immidiate = config.Immediate; - message.Priority = config.Priority; - message.TransportComponentName = GetExtendedComponentName(); - - message.ActivateTimeout = () => - { - TimeoutTask.StartNew(() => - { - - if (!source.Task.IsCompleted) - { - TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug); - OnRequestFailed(message, ex); - message.SetException(ex); - } - - }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); - }; - - EnqueueMessageOut(message); + container.Type = MessageFactory.ParseMessageType(request.GetType().Name); - return source.Task; + return SendRequestInternal<IMessage>(container.Token, container, config).Task; } /// <summary> @@ -609,46 +571,7 @@ namespace Tango.Transport /// <exception cref="InvalidOperationException"></exception> public Task<MessageContainer> SendRequest(MessageContainer container, TransportRequestConfig config = null) { - config = config ?? new TransportRequestConfig(); - - String responseName = container.Type.ToString().Replace("Request", "Response"); - TimeSpan? timeout = GetContainerTimeoutOrDefault(container); - - LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + container.Type + " Token: " + container.Token, LogCategory.Debug); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + responseName, LogCategory.Debug); - - if (State != TransportComponentState.Connected) - { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); - } - - TaskCompletionSource<MessageContainer> source = new TaskCompletionSource<MessageContainer>(); - TransportMessage<MessageContainer> message = new TransportMessage<MessageContainer>(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), source); - message.ShouldLog = config.ShouldLog; - message.Immidiate = config.Immediate; - message.Priority = config.Priority; - message.TransportComponentName = GetExtendedComponentName(); - - message.ActivateTimeout = () => - { - TimeoutTask.StartNew(() => - { - - if (!source.Task.IsCompleted) - { - TimeoutException ex = new TimeoutException("Request message: " + container.Type + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug); - OnRequestFailed(message, ex); - message.SetException(ex); - } - - }, timeout != null ? timeout.Value : RequestTimeout); - - }; - - EnqueueMessageOut(message); - - return source.Task; + return SendRequestInternal<MessageContainer>(container.Token, container, config).Task; } /// <summary> @@ -662,43 +585,78 @@ namespace Tango.Transport /// <exception cref="InvalidOperationException"></exception> public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TransportRequestConfig config = null) where Request : IMessage<Request> where Response : IMessage<Response> { + return SendRequestInternal<TangoMessage<Response>>(request.Container.Token, request, config).Task; + } + + private TaskCompletionSource<T> SendRequestInternal<T>(String token, Object request, TransportRequestConfig config = null) + { config = config ?? new TransportRequestConfig(); - LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + typeof(Response).Name, LogCategory.Debug); + TaskCompletionSource<T> source = new TaskCompletionSource<T>(); - if (State != TransportComponentState.Connected) + String requestName = TransportMessageBase.GetActualMessageTypeName(request); + + Func<byte[]> toBytes = null; + + if (request is ITangoMessage tangoMessage) { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); + toBytes = () => Encoder.Encode(tangoMessage); + } + else if (request is IMessage protoMessage) + { + toBytes = () => protoMessage.ToByteArray(); } - request.Container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - - TaskCompletionSource<TangoMessage<Response>> source = new TaskCompletionSource<TangoMessage<Response>>(); - TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source); + TransportMessage<T> message = new TransportMessage<T>(token, request, TransportMessageDirection.Request, toBytes, source); message.ShouldLog = config.ShouldLog; message.Immidiate = config.Immediate; message.Priority = config.Priority; message.TransportComponentName = GetExtendedComponentName(); + TimeSpan? timeout = config.Timeout; + + if (request is MessageContainer container) + { + timeout = GetContainerTimeoutOrDefault(container); + } + + if (timeout == null) + { + timeout = RequestTimeout; + } + + if (request is ITangoMessage tanMessage) + { + tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds; + } + + String responseName = requestName.Replace("Request", "Response"); + + LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: {requestName} Token: {token}", LogCategory.Debug); + LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug); + + if (State != TransportComponentState.Connected) + { + throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); + } + message.ActivateTimeout = () => { TimeoutTask.StartNew(() => { if (!source.Task.IsCompleted) { - TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug); + TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds."); OnRequestFailed(message, ex); message.SetException(ex); } - }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); + }, timeout.Value); }; EnqueueMessageOut(message); - return source.Task; + return source; } /// <summary> @@ -710,60 +668,17 @@ namespace Tango.Transport /// <exception cref="InvalidOperationException"></exception> public IObservable<IMessage> SendContinuousRequest(IMessage request, TransportContinuousRequestConfig config = null) { - config = config ?? new TransportContinuousRequestConfig(); - - String requestName = request.GetType().Name; - String responseName = requestName.Replace("Request", "Response"); - MessageContainer container = new MessageContainer(); container.Token = Guid.NewGuid().ToString(); container.Data = request.ToByteString(); - container.Type = MessageFactory.ParseMessageType(requestName); - container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - container.Continuous = true; + container.Type = MessageFactory.ParseMessageType(request.GetType().Name); - LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); - - if (State != TransportComponentState.Connected) - { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); - } + //We need to assign this timeout because when the internal method detects a MessageContainer it will bypass the container continuous timeout assignment. + container.ContinuousTimeout = config.ContinuousTimeout != null ? (UInt32)config.ContinuousTimeout.Value.TotalMilliseconds : 0; - Subject<IMessage> subject = new Subject<IMessage>(); - - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + responseName, LogCategory.Debug); - - TransportMessage<IMessage> message = new TransportMessage<IMessage>(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), null) - { - IsContinuous = true, - ContinuesResponseSubject = subject, - ShouldLog = config.ShouldLog, - Immidiate = config.Immediate, - Priority = config.Priority, - TransportComponentName = GetExtendedComponentName(), - }; - - message.ActivateTimeout = () => - { - - TimeoutTask.StartNew(() => - { - - if (!message.AtLeastOneResponseReceived) - { - TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); - OnRequestFailed(message, ex); - message.SetException(ex); - } - - }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); - - }; - - EnqueueMessageOut(message); + container.Continuous = true; - return subject.AsObservable(); + return SendContinuousRequestInternal<IMessage>(container.Token, container, config); } /// <summary> @@ -777,78 +692,9 @@ namespace Tango.Transport /// <exception cref="InvalidOperationException"></exception> public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, TransportContinuousRequestConfig config = null) where Request : IMessage<Request> where Response : IMessage<Response> { - config = config ?? new TransportContinuousRequestConfig(); - - LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug); - - Subject<TangoMessage<Response>> subject = new Subject<TangoMessage<Response>>(); - - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + typeof(Response).Name, LogCategory.Debug); - - if (State != TransportComponentState.Connected) - { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); - } - request.Container.Continuous = true; request.Container.Completed = false; - - request.Container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds; - request.Container.ContinuousTimeout = config.ContinuousTimeout.HasValue ? (UInt32)config.ContinuousTimeout.Value.TotalMilliseconds : 0; - - TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), null) - { - IsContinuous = true, - ContinuesResponseSubject = subject, - ShouldLog = config.ShouldLog, - Immidiate = config.Immediate, - Priority = config.Priority, - TransportComponentName = GetExtendedComponentName(), - }; - - message.ActivateTimeout = () => - { - TimeoutTask.StartNew(() => - { - - if (!message.AtLeastOneResponseReceived) - { - TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); - OnRequestFailed(message, ex); - message.SetException(ex); - } - - if (config.ContinuousTimeout != null) - { - Task.Factory.StartNew(async () => - { - while (!message.Completed) - { - await Task.Delay(config.ContinuousTimeout.Value).ContinueWith((y) => - { - if (!message.Completed) - { - if (DateTime.Now - message.LastResponseTime > config.ContinuousTimeout.Value) - { - TimeoutException ex = new TimeoutException("Continuous request message: " + typeof(Request).Name + " had failed to provide a response for a period of " + (config.ContinuousTimeout.Value.TotalSeconds) + " seconds and has timed out."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); - OnRequestFailed(message, ex); - message.SetException(ex); - return; - } - } - }); - } - }); - } - - }, config.Timeout != null ? config.Timeout.Value : RequestTimeout); - }; - - EnqueueMessageOut(message); - - return subject.AsObservable(); + return SendContinuousRequestInternal<TangoMessage<Response>>(request.Container.Token, request, config); } /// <summary> @@ -860,34 +706,74 @@ namespace Tango.Transport /// <exception cref="InvalidOperationException"></exception> public IObservable<MessageContainer> SendContinuousRequest(MessageContainer container, TransportContinuousRequestConfig config = null) { + return SendContinuousRequestInternal<MessageContainer>(container.Token, container, config); + } + + private IObservable<T> SendContinuousRequestInternal<T>(String token, Object request, TransportContinuousRequestConfig config = null) + { config = config ?? new TransportContinuousRequestConfig(); - TimeSpan? timeout = GetContainerTimeoutOrDefault(container); - TimeSpan? continuousTimeout = container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?); + String requestName = TransportMessageBase.GetActualMessageTypeName(request); - String requestName = container.Type.ToString(); - String responseName = requestName.Replace("Request", "Response"); + Func<byte[]> toBytes = null; - LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug); + if (request is ITangoMessage tangoMessage) + { + toBytes = () => Encoder.Encode(tangoMessage); + } + else if (request is IMessage protoMessage) + { + toBytes = () => protoMessage.ToByteArray(); + } - if (State != TransportComponentState.Connected) + Subject<T> subject = new Subject<T>(); + + TransportMessage<T> message = new TransportMessage<T>(token, request, TransportMessageDirection.Request, toBytes, null); + message.ShouldLog = config.ShouldLog; + message.Immidiate = config.Immediate; + message.Priority = config.Priority; + message.TransportComponentName = GetExtendedComponentName(); + message.IsContinuous = true; + message.ContinuesResponseSubject = subject; + + TimeSpan? timeout = config.Timeout; + TimeSpan? continuousTimeout = config.ContinuousTimeout; + + if (request is MessageContainer container) { - throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); + timeout = GetContainerTimeoutOrDefault(container); + + if (timeout == null) + { + timeout = config.Timeout != null ? config.Timeout.Value : RequestTimeout; + container.Timeout = (uint)timeout.Value.TotalMilliseconds; + } + + continuousTimeout = GetContainerContinuousTimeoutOrDefault(container); + } + + if (timeout == null) + { + timeout = RequestTimeout; + } + + if (request is ITangoMessage tanMessage) + { + tanMessage.Container.Continuous = true; + tanMessage.Container.Completed = false; + tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds; + tanMessage.Container.ContinuousTimeout = continuousTimeout != null ? (UInt32)continuousTimeout.Value.TotalMilliseconds : 0; } - Subject<MessageContainer> subject = new Subject<MessageContainer>(); + String responseName = requestName.Replace("Request", "Response"); - LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + responseName, LogCategory.Debug); + LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: {requestName} Token: {token}", LogCategory.Debug); + LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug); - TransportMessage<MessageContainer> message = new TransportMessage<MessageContainer>(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), null) + if (State != TransportComponentState.Connected) { - IsContinuous = true, - ContinuesResponseSubject = subject, - ShouldLog = config.ShouldLog, - Immidiate = config.Immediate, - Priority = config.Priority, - TransportComponentName = GetExtendedComponentName(), - }; + throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}.")); + } message.ActivateTimeout = () => { @@ -896,8 +782,7 @@ namespace Tango.Transport if (!message.AtLeastOneResponseReceived) { - TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); + TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds."); OnRequestFailed(message, ex); message.SetException(ex); } @@ -914,8 +799,7 @@ namespace Tango.Transport { if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value) { - TimeoutException ex = new TimeoutException("Continuous request message: " + requestName + " had failed to provide a response for a period of " + (continuousTimeout.Value.TotalSeconds) + " seconds and has timed out."); - LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug); + TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Continuous request message '{requestName}' had failed to provide a response for a period of {continuousTimeout.Value.TotalSeconds} seconds and has timed out."); OnRequestFailed(message, ex); message.SetException(ex); return; @@ -926,7 +810,7 @@ namespace Tango.Transport }); } - }, timeout != null ? timeout.Value : RequestTimeout); + }, timeout.Value); }; EnqueueMessageOut(message); @@ -1322,6 +1206,16 @@ namespace Tango.Transport } /// <summary> + /// Gets the container timeout or default. + /// </summary> + /// <param name="container">The container.</param> + /// <returns></returns> + private TimeSpan? GetContainerContinuousTimeoutOrDefault(MessageContainer container) + { + return container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?); + } + + /// <summary> /// Enqueues the message and releases the push wait handle. /// </summary> /// <param name="message">The message.</param> |
