using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Tango.Logging;
using Tango.PMR;
using Tango.PMR.Common;
namespace Tango.Transport
{
///
/// Represents a generic transport message.
///
///
///
internal class TransportMessage : TransportMessageBase
{
private TaskCompletionSource _completionSource;
public Subject ContinuesResponseSubject { get; set; }
public bool AtLeastOneResponseReceived { get; set; }
public DateTime LastResponseTime { get; set; }
public bool Completed { get; set; }
///
/// Initializes a new instance of the class.
///
/// The token.
/// The message.
/// The direction.
/// To bytes.
/// The completion source.
public TransportMessage(string token, object message, TransportMessageDirection direction, Func toBytes, TaskCompletionSource completionSource) : base(token, message, direction, toBytes)
{
_completionSource = completionSource;
}
///
/// Notifies the message observer of the new result.
///
/// The result.
public override void SetResult(object result, bool completed)
{
Completed = completed;
Task.Factory.StartNew(() =>
{
try
{
if (!IsContinuous)
{
if (!_completionSource.Task.IsCompleted)
{
if (_completionSource.GetType() == typeof(TaskCompletionSource))
{
_completionSource.SetResult((T)result.GetType().GetProperty("Message").GetValue(result));
}
else if (_completionSource.GetType() == typeof(TaskCompletionSource))
{
_completionSource.SetResult((T)result.GetType().GetProperty("Container").GetValue(result));
}
else
{
_completionSource.SetResult((T)result);
}
}
}
else
{
LastResponseTime = DateTime.Now;
AtLeastOneResponseReceived = true;
if (ContinuesResponseSubject.GetType() == typeof(Subject))
{
ContinuesResponseSubject.OnNext((T)result.GetType().GetProperty("Message").GetValue(result));
}
else if (ContinuesResponseSubject.GetType() == typeof(Subject))
{
ContinuesResponseSubject.OnNext((T)result.GetType().GetProperty("Container").GetValue(result));
}
else
{
ContinuesResponseSubject.OnNext((T)result);
}
if (completed)
{
ContinuesResponseSubject.OnCompleted();
}
}
}
catch (Exception ex)
{
LogManager.Log(ex, $"Error while settings result for message.");
try
{
SetException(ex);
}
catch (Exception e)
{
LogManager.Log(e, $"Error while settings exception for message.");
}
}
});
}
///
/// Notifies the message observer of an exception.
///
/// The ex.
public override void SetException(Exception ex)
{
Completed = true;
Task.Factory.StartNew(() =>
{
if (!IsContinuous)
{
if (!_completionSource.Task.IsCompleted)
{
_completionSource.SetException(ex);
}
}
else
{
AtLeastOneResponseReceived = true;
ContinuesResponseSubject.OnError(ex);
}
});
}
}
}