using Google.Protobuf; using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading.Tasks; using Tango.Core.ExtensionMethods; using Tango.Core.Threading; using Tango.Logging; using Tango.PMR; using Tango.PMR.Common; using Tango.PMR.Integration; namespace Tango.Transport { /// /// Represents a generic transport message. /// /// /// internal class TransportMessage : TransportMessageBase { private bool exceptionRaised; private TaskCompletionSource _completionSource; private bool taskCompleted; public Subject ContinuesResponseSubject { get; set; } public DateTime LastResponseTime { get; set; } public bool Completed { get; set; } /// /// Initializes a new instance of the class. /// /// The token. /// The message. /// The direction. /// To bytes. /// The completion source. public TransportMessage(string token, object message, TransportMessageDirection direction, Func toBytes, TaskCompletionSource completionSource) : base(token, message, direction, toBytes) { _completionSource = completionSource; } /// /// Notifies the message observer of the new result. /// /// The result. public override void SetResult(object result, bool completed) { Completed = completed; Action setResultAction = () => { try { if (!IsContinuous) { if (!taskCompleted) { taskCompleted = true; if (_completionSource.GetType() == typeof(TaskCompletionSource)) { _completionSource.SetResult((T)result.GetType().GetProperty("Message").GetValue(result)); } else if (_completionSource.GetType() == typeof(TaskCompletionSource)) { _completionSource.SetResult((T)result.GetType().GetProperty("Container").GetValue(result)); } else { _completionSource.SetResult((T)result); } } } else { LastResponseTime = DateTime.Now; AtLeastOneResponseReceived = true; if (ContinuesResponseSubject.GetType() == typeof(Subject)) { ContinuesResponseSubject.OnNext((T)result.GetType().GetProperty("Message").GetValue(result)); } else if (ContinuesResponseSubject.GetType() == typeof(Subject)) { ContinuesResponseSubject.OnNext((T)result.GetType().GetProperty("Container").GetValue(result)); } else { ContinuesResponseSubject.OnNext((T)result); } if (completed) { ContinuesResponseSubject.OnCompleted(); } } } catch (Exception ex) { LogManager.Log(ex, $"Error while settings result for message."); try { SetException(ex); } catch (Exception e) { LogManager.Log(e, $"Error while settings exception for message."); } } }; if (ThreadingMode == TransportThreadingMode.NewThread) { ThreadFactory.StartNew(() => { setResultAction(); }); } else { Task.Factory.StartNew(() => { setResultAction(); }); } } /// /// Notifies the message observer of an exception. /// /// The ex. public override void SetException(Exception ex) { if (exceptionRaised || taskCompleted) { return; } Completed = true; exceptionRaised = true; Action setExceptionAction = () => { if (!IsContinuous) { if (!taskCompleted) { taskCompleted = true; if (!(ex is ContinuousResponseAbortedException) && !(ex is TransporterDisconnectedException)) { LogManager.Log($"{TransportComponentName}: Request failed '{GetActualMessageTypeName()}'...\n{ex.FlattenException()}", LogCategory.Error); } _completionSource.SetException(ex); } } else { if (!(ex is ContinuousResponseAbortedException) && !(ex is TransporterDisconnectedException)) { LogManager.Log($"{TransportComponentName}: Request failed '{GetActualMessageTypeName()}'...\n{ex.FlattenException()}", LogCategory.Error); } AtLeastOneResponseReceived = true; ContinuesResponseSubject.OnError(ex); } }; if (ThreadingMode == TransportThreadingMode.NewThread) { ThreadFactory.StartNew(() => { setExceptionAction(); }); } else { Task.Factory.StartNew(() => { setExceptionAction(); }); } } public override string GetActualMessageTypeName() { return GetActualMessageTypeName(Message); } public override object GetActualMessage() { object obj = null; if (Message is ITangoMessage) { obj = Message.GetType().GetProperty("Message").GetValue(Message); } else if (Message is MessageContainer) { obj = Message; } else { obj = Message; } return obj; } } }