using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Tango.Core.ExtensionMethods;
using Tango.Core.Threading;
using Tango.Logging;
using Tango.PMR;
using Tango.PMR.Common;
using Tango.PMR.Integration;
namespace Tango.Transport
{
///
/// Represents a generic transport message.
///
///
///
internal class TransportMessage : TransportMessageBase
{
private bool exceptionRaised;
private TaskCompletionSource _completionSource;
private bool taskCompleted;
public Subject ContinuesResponseSubject { get; set; }
public DateTime LastResponseTime { get; set; }
public bool Completed { get; set; }
///
/// Initializes a new instance of the class.
///
/// The token.
/// The message.
/// The direction.
/// To bytes.
/// The completion source.
public TransportMessage(string token, object message, TransportMessageDirection direction, Func toBytes, TaskCompletionSource completionSource) : base(token, message, direction, toBytes)
{
_completionSource = completionSource;
}
///
/// Notifies the message observer of the new result.
///
/// The result.
public override void SetResult(object result, bool completed)
{
Completed = completed;
Action setResultAction = () =>
{
try
{
if (!IsContinuous)
{
if (!taskCompleted)
{
taskCompleted = true;
if (_completionSource.GetType() == typeof(TaskCompletionSource))
{
_completionSource.SetResult((T)result.GetType().GetProperty("Message").GetValue(result));
}
else if (_completionSource.GetType() == typeof(TaskCompletionSource))
{
_completionSource.SetResult((T)result.GetType().GetProperty("Container").GetValue(result));
}
else
{
_completionSource.SetResult((T)result);
}
}
}
else
{
LastResponseTime = DateTime.Now;
AtLeastOneResponseReceived = true;
if (ContinuesResponseSubject.GetType() == typeof(Subject))
{
ContinuesResponseSubject.OnNext((T)result.GetType().GetProperty("Message").GetValue(result));
}
else if (ContinuesResponseSubject.GetType() == typeof(Subject))
{
ContinuesResponseSubject.OnNext((T)result.GetType().GetProperty("Container").GetValue(result));
}
else
{
ContinuesResponseSubject.OnNext((T)result);
}
if (completed)
{
ContinuesResponseSubject.OnCompleted();
}
}
}
catch (Exception ex)
{
LogManager.Log(ex, $"Error while settings result for message.");
try
{
SetException(ex);
}
catch (Exception e)
{
LogManager.Log(e, $"Error while settings exception for message.");
}
}
};
if (ThreadingMode == TransportThreadingMode.NewThread)
{
ThreadFactory.StartNew(() =>
{
setResultAction();
});
}
else
{
Task.Factory.StartNew(() =>
{
setResultAction();
});
}
}
///
/// Notifies the message observer of an exception.
///
/// The ex.
public override void SetException(Exception ex)
{
if (exceptionRaised || taskCompleted)
{
return;
}
Completed = true;
exceptionRaised = true;
Action setExceptionAction = () =>
{
if (!IsContinuous)
{
if (!taskCompleted)
{
taskCompleted = true;
if (!(ex is ContinuousResponseAbortedException) && !(ex is TransporterDisconnectedException))
{
LogManager.Log($"{TransportComponentName}: Request failed '{GetActualMessageTypeName()}'...\n{ex.FlattenException()}", LogCategory.Error);
}
_completionSource.SetException(ex);
}
}
else
{
if (!(ex is ContinuousResponseAbortedException) && !(ex is TransporterDisconnectedException))
{
LogManager.Log($"{TransportComponentName}: Request failed '{GetActualMessageTypeName()}'...\n{ex.FlattenException()}", LogCategory.Error);
}
AtLeastOneResponseReceived = true;
ContinuesResponseSubject.OnError(ex);
}
};
if (ThreadingMode == TransportThreadingMode.NewThread)
{
ThreadFactory.StartNew(() =>
{
setExceptionAction();
});
}
else
{
Task.Factory.StartNew(() =>
{
setExceptionAction();
});
}
}
public override string GetActualMessageTypeName()
{
return GetActualMessageTypeName(Message);
}
public override object GetActualMessage()
{
object obj = null;
if (Message is ITangoMessage)
{
obj = Message.GetType().GetProperty("Message").GetValue(Message);
}
else if (Message is MessageContainer)
{
obj = Message;
}
else
{
obj = Message;
}
return obj;
}
}
}