aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
diff options
context:
space:
mode:
authorMirta <mirta@twine-s.com>2020-12-30 16:39:52 +0200
committerMirta <mirta@twine-s.com>2020-12-30 16:39:52 +0200
commit00a491d93733d4625ad329b2ba8237f445364b3f (patch)
tree4b24c6fa78d7648f4bb7cefafa464bb0b063fec4 /Software/Visual_Studio/Tango.Transport/TransporterBase.cs
parent124ad4150f80c6846fdee41dbbda9848c105f6e5 (diff)
downloadTango-00a491d9.tar.gz
Tango-00a491d9.zip
merge
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport/TransporterBase.cs')
-rw-r--r--Software/Visual_Studio/Tango.Transport/TransporterBase.cs1294
1 files changed, 411 insertions, 883 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
index 4c88a0158..2824ed26a 100644
--- a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
+++ b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
@@ -17,12 +17,6 @@ using System.ServiceModel;
using Tango.Transport.Encoders;
using Tango.PMR.Connection;
using Tango.Core.Threading;
-using System.IO;
-using Tango.Core.ExtensionMethods;
-using Tango.PMR.Integration;
-using Newtonsoft.Json;
-using System.Diagnostics;
-using System.Reactive.Concurrency;
namespace Tango.Transport
{
@@ -32,17 +26,8 @@ namespace Tango.Transport
/// <seealso cref="Tango.Transport.ITransporter" />
public abstract class TransporterBase : ExtendedObject, ITransporter
{
- protected static long _component_counter = 1;
-
- private class RequestHandler
- {
- public Type RequestType { get; set; }
- public Action<ITransporter, Object, String> Callback { get; set; }
- public object RegisteredCallback { get; set; }
- }
-
private const int MESSAGE_TOKEN_LENGTH = 36;
- private PriorityProducerConsumerQueue<TransportMessageBase> _sendingQueue;
+ private ProducerConsumerQueue<TransportMessageBase> _sendingQueue;
private ConcurrentList<TransportMessageBase> _pendingRequests;
private ProducerConsumerQueue<byte[]> _arrivedResponses;
private Thread _pushThread;
@@ -51,18 +36,13 @@ namespace Tango.Transport
private ITransportAdapter _adapter;
private Dictionary<String, PendingResponse> _pendingResponses;
private DateTime _lastKeepAliveTime;
- private List<RequestHandler> _requestHandlers;
- private static JsonSerializerSettings _genericMessageSettings = new JsonSerializerSettings()
- {
- TypeNameHandling = TypeNameHandling.All,
- };
#region Events
/// <summary>
/// Occurs when a new request message has been received.
/// </summary>
- public event EventHandler<RequestReceivedEventArgs> RequestReceived;
+ public event EventHandler<MessageContainer> RequestReceived;
/// <summary>
/// Occurs when a new response message has been received.
@@ -74,31 +54,11 @@ namespace Tango.Transport
/// </summary>
public event EventHandler<TransportComponentState> StateChanged;
- /// <summary>
- /// Occurs when a request has been sent.
- /// </summary>
- public event EventHandler<IMessage> RequestSent;
-
- /// <summary>
- /// Occurs when a request response has been received.
- /// </summary>
- public event EventHandler<IMessage> ResponseReceived;
-
- /// <summary>
- /// Occurs when a request has failed.
- /// </summary>
- public event EventHandler<RequestFailedEventArgs> RequestFailed;
-
#endregion
#region Properties
/// <summary>
- /// Gets or sets the name of the transport component.
- /// </summary>
- public String ComponentName { get; set; }
-
- /// <summary>
/// Gets or sets the <see cref="ITransportAdapter" /> used to read and write raw data.
/// </summary>
public ITransportAdapter Adapter
@@ -154,11 +114,11 @@ namespace Tango.Transport
if (_useKeepAlive)
{
- LogManager.Log($"{GetExtendedComponentName()}: KeepAlive is activated...");
+ LogManager.Log("KeepAlive is activated...");
}
else
{
- LogManager.Log($"{GetExtendedComponentName()}: KeepAlive is deactivated.");
+ LogManager.Log("KeepAlive is deactivated.");
}
}
}
@@ -188,16 +148,6 @@ namespace Tango.Transport
/// </summary>
public Exception FailedStateException { get; private set; }
- private GenericMessageProtocol _genericProtocol;
- /// <summary>
- /// Gets or sets the generic protocol used to serialize/deserialize generic messages.
- /// </summary>
- public GenericMessageProtocol GenericProtocol
- {
- get { return _genericProtocol; }
- set { _genericProtocol = value; RaisePropertyChangedAuto(); }
- }
-
#endregion
#region Virtual Methods
@@ -213,7 +163,7 @@ namespace Tango.Transport
_pendingRequests.Clear();
_pendingResponses.Clear();
_arrivedResponses = new ProducerConsumerQueue<byte[]>();
- _sendingQueue = new PriorityProducerConsumerQueue<TransportMessageBase>();
+ _sendingQueue = new ProducerConsumerQueue<TransportMessageBase>();
}
if (oldAdapter != null)
@@ -224,7 +174,7 @@ namespace Tango.Transport
if (newAdapter != null)
{
- LogManager.Log($"{GetExtendedComponentName()}: Adapter Changed: Type = {newAdapter.GetType().Name}, Address = {newAdapter.Address}, State = {newAdapter.State}");
+ LogManager.Log(String.Format("Adapter Changed: Type = {0}, Address = {1}, State = {2}", newAdapter.GetType().Name, newAdapter.Address, newAdapter.State));
newAdapter.StateChanged -= OnAdapterStateChanged;
newAdapter.DataAvailable -= OnAdapterDataAvailable;
@@ -238,7 +188,7 @@ namespace Tango.Transport
}
else
{
- LogManager.Log($"{GetExtendedComponentName()}: Adapter Changed: null");
+ LogManager.Log("Adapter Changed: null");
}
}
@@ -251,7 +201,7 @@ namespace Tango.Transport
{
if (e == TransportComponentState.Failed && FailsWithAdapter)
{
- OnFailed(new CommunicationException($"The adapter has failed with exception '{Adapter.FailedStateException.Message}' and the transporter is configured to fail with the adapter."));
+ OnFailed(new CommunicationException("The adapter has failed. Going into a failed state..."));
}
}
@@ -270,104 +220,21 @@ namespace Tango.Transport
/// Called when the component has failed.
/// </summary>
/// <param name="ex">The ex.</param>
- protected virtual async void OnFailed(Exception ex)
- {
- if (State != TransportComponentState.Failed)
- {
- FailedStateException = ex;
- LogManager.Log(ex, $"{GetExtendedComponentName()}: Transporter failed.");
- State = TransportComponentState.Failed;
-
- await OnPostDisconnection();
- }
- else
- {
- LogManager.Log(ex, LogCategory.Warning, $"{GetExtendedComponentName()}: OnFailed called while state is already failed!");
- }
- }
-
- protected virtual async Task OnPostDisconnection()
+ protected virtual void OnFailed(Exception ex)
{
- try
- {
- if (_pullThread != null)
- {
- _pullThread.Abort();
- _pushThread.Abort();
- _keepAliveThread.Abort();
- }
- }
- catch { }
-
- if (Adapter != null)
- {
- await Adapter.Disconnect();
- }
-
- NotifyContinuousRequestMessagesDisconnection();
+ FailedStateException = ex;
+ State = TransportComponentState.Failed;
+ LogManager.Log(ex, "Transporter failed.");
+ Disconnect().Wait();
}
/// <summary>
/// Called when a new request has been received.
/// </summary>
/// <param name="container">The request.</param>
- protected virtual void OnRequestReceived(RequestReceivedEventArgs e)
+ protected virtual void OnRequestReceived(MessageContainer container)
{
- var container = e.Container;
-
- if (_requestHandlers.Count > 0)
- {
- if (container.Type != MessageType.GenericRequest)
- {
- var handlers = _requestHandlers.Where(x => x.RequestType.Name == container.Type.ToOriginalName()).ToList();
-
- if (handlers.Count > 0) //Handle
- {
- e.Handled = true;
-
- var request = MessageFactory.ExtractMessageFromContainer(container);
-
- foreach (var handler in handlers)
- {
- try
- {
- handler.Callback.Invoke(this, request, container.Token);
- }
- catch
- {
- //Ignore any exception on the client side.
- }
- }
- }
- }
- else
- {
- var genericRequest = MessageFactory.ExtractMessageFromContainer<GenericRequest>(container);
-
- var handlers = _requestHandlers.Where(x => x.RequestType.AssemblyQualifiedName == genericRequest.Type).ToList();
-
- if (handlers.Count > 0)
- {
- e.Handled = true;
-
- var innerRequest = GenericMessageSerializer.DeserializeFromByteString(handlers[0].RequestType, genericRequest.Data, GenericProtocol);
-
- foreach (var handler in handlers)
- {
- try
- {
- handler.Callback.Invoke(this, innerRequest, container.Token);
- }
- catch
- {
- //Ignore any exception on the client side.
- }
- }
- }
- }
- }
-
- RequestReceived?.Invoke(this, e);
+ RequestReceived?.Invoke(this, container);
}
/// <summary>
@@ -388,80 +255,6 @@ namespace Tango.Transport
StateChanged?.Invoke(this, state);
}
- /// <summary>
- /// Returns a string representing the component name. If an adapter is attached it will be part of the string.
- /// </summary>
- /// <returns></returns>
- protected virtual String GetExtendedComponentName()
- {
- return Adapter != null ? $"{ComponentName} ({Adapter.Address})" : ComponentName;
- }
-
- /// <summary>
- /// Notifies all the continuous request messages about disconnection.
- /// </summary>
- protected virtual void NotifyContinuousRequestMessagesDisconnection()
- {
- LogManager.Log("Notifying all continuous request messages about disconnection...");
- foreach (var request in _pendingRequests.ToList().Where(x => x.Direction == TransportMessageDirection.Request && x.IsContinuous))
- {
- try
- {
- _pendingRequests.Remove(request);
- LogManager.Log($"Notifying continuous request '{(request.Message as ITangoMessage).Type}'...");
- OnRequestFailed(request, new TransporterDisconnectedException("Transporter disconnected."));
- request.SetException(new TransporterDisconnectedException("Transporter disconnected."));
- }
- catch (Exception e)
- {
- System.Diagnostics.Debug.WriteLine(e.ToString());
- }
- }
- }
-
- /// <summary>
- /// Called when the request has been sent
- /// </summary>
- /// <param name="response">The request.</param>
- private void OnRequestSent(TransportMessageBase request)
- {
- if (request.ShouldLog)
- {
- IMessage message = request.GetActualMessage() as IMessage;
-
- if (message != null)
- {
- RequestSent?.Invoke(this, message);
- }
- }
- }
-
- /// <summary>
- /// Called when the response has been received
- /// </summary>
- /// <param name="response">The response.</param>
- private void OnResponseReceived(IMessage response)
- {
- if (response != null)
- {
- ResponseReceived?.Invoke(this, response);
- }
- }
-
- /// <summary>
- /// Called when the request has been failed
- /// </summary>
- /// <param name="request">The request.</param>
- private void OnRequestFailed(TransportMessageBase request, Exception exception)
- {
- IMessage message = request.GetActualMessage() as IMessage;
-
- if (message != null)
- {
- RequestFailed?.Invoke(this, new RequestFailedEventArgs(message, exception));
- }
- }
-
#endregion
#region Constructors
@@ -471,13 +264,11 @@ namespace Tango.Transport
/// </summary>
public TransporterBase()
{
- ComponentName = "Not Set";
Encoder = new ProtoEncoder();
_pendingResponses = new Dictionary<string, PendingResponse>();
- _sendingQueue = new PriorityProducerConsumerQueue<TransportMessageBase>();
+ _sendingQueue = new ProducerConsumerQueue<TransportMessageBase>();
_pendingRequests = new ConcurrentList<TransportMessageBase>();
_arrivedResponses = new ProducerConsumerQueue<byte[]>();
- _requestHandlers = new List<RequestHandler>();
RequestTimeout = TimeSpan.FromSeconds(5);
EnableKeepAliveAutoResponse = true;
KeepAliveTimeout = TimeSpan.FromSeconds(2);
@@ -502,7 +293,7 @@ namespace Tango.Transport
/// </summary>
public void ClearQueues()
{
- _sendingQueue = new PriorityProducerConsumerQueue<TransportMessageBase>();
+ _sendingQueue = new ProducerConsumerQueue<TransportMessageBase>();
_pendingRequests = new ConcurrentList<TransportMessageBase>();
_arrivedResponses = new ProducerConsumerQueue<byte[]>();
}
@@ -521,7 +312,7 @@ namespace Tango.Transport
State = TransportComponentState.Connected;
StartThreads();
- LogManager.Log($"{GetExtendedComponentName()}: Transporter Connected...");
+ LogManager.Log("Transporter Connected...");
}
/// <summary>
@@ -530,323 +321,255 @@ namespace Tango.Transport
/// <returns></returns>
public virtual async Task Disconnect()
{
- if (State == TransportComponentState.Connected)
- {
- State = TransportComponentState.Disconnected;
+ State = TransportComponentState.Disconnected;
- await OnPostDisconnection();
+ try
+ {
+ if (_pullThread != null)
+ {
+ _pullThread.Abort();
+ _pushThread.Abort();
+ _keepAliveThread.Abort();
+ }
+ }
+ catch { }
- LogManager.Log($"{GetExtendedComponentName()}: Transporter Disconnected...");
+ if (Adapter != null)
+ {
+ await Adapter.Disconnect();
}
+ LogManager.Log("Transporter Disconnected...");
}
- #endregion
-
- #region Public Request Methods
-
/// <summary>
/// Sends a request.
/// </summary>
/// <param name="request">The request.</param>
- /// <param name="config">Request configuration.</param>
+ /// <param name="timeout">Optional timeout. If not specified will use the <see cref="RequestTimeout" />.</param>
/// <returns></returns>
- /// <exception cref="InvalidOperationException"></exception>
- public Task<IMessage> SendRequest(IMessage request, TransportRequestConfig config = null)
+ public Task<IMessage> SendRequest(IMessage request, TimeSpan? timeout = null)
{
+ String requestName = request.GetType().Name;
+ String responseName = requestName.Replace("Request", "Response");
+
MessageContainer container = new MessageContainer();
container.Token = Guid.NewGuid().ToString();
container.Data = request.ToByteString();
- container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
- container.Type = MessageFactory.ParseMessageType(request.GetType().Name);
+ container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
+ container.Type = MessageFactory.ParseMessageType(requestName);
- return SendRequestInternal<IMessage>(container.Token, container, config).Task;
- }
+ LogManager.Log("Queuing request message: " + requestName + " Token: " + container.Token, LogCategory.Debug);
+ LogManager.Log("Expected response: " + responseName, LogCategory.Debug);
- /// <summary>
- /// Sends the request.
- /// </summary>
- /// <param name="container"></param>
- /// <param name="config">Request configuration.</param>
- /// <returns></returns>
- /// <exception cref="InvalidOperationException"></exception>
- public Task<MessageContainer> SendRequest(MessageContainer container, TransportRequestConfig config = null)
- {
- return SendRequestInternal<MessageContainer>(container.Token, container, config).Task;
- }
+ if (State != TransportComponentState.Connected)
+ {
+ throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}."));
+ }
- /// <summary>
- /// Sends a request.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="request">The request.</param>
- /// <param name="config">Request configuration.</param>
- /// <returns></returns>
- /// <exception cref="InvalidOperationException"></exception>
- public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TransportRequestConfig config = null) where Request : IMessage<Request> where Response : IMessage<Response>
- {
- return SendRequestInternal<TangoMessage<Response>>(request.Container.Token, request, config).Task;
- }
+ TaskCompletionSource<IMessage> source = new TaskCompletionSource<IMessage>();
+ TransportMessage<IMessage> message = new TransportMessage<IMessage>(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), source);
- private TaskCompletionSource<T> SendRequestInternal<T>(String token, Object request, TransportRequestConfig config = null)
- {
- config = config ?? new TransportRequestConfig();
+ message.ActivateTimeout = () =>
+ {
+ TimeoutTask.StartNew(() =>
+ {
- TaskCompletionSource<T> source = new TaskCompletionSource<T>();
+ if (!source.Task.IsCompleted)
+ {
+ TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
+ LogManager.Log(ex);
+ LogManager.Log("Setting request task exception...", LogCategory.Debug);
+ source.SetException(ex);
+ }
- String requestName = TransportMessageBase.GetActualMessageTypeName(request);
+ }, timeout != null ? timeout.Value : RequestTimeout);
+ };
- Func<byte[]> toBytes = null;
+ EnqueueMessageOut(message);
- if (request is ITangoMessage tangoMessage)
- {
- toBytes = () => Encoder.Encode(tangoMessage);
- }
- else if (request is IMessage protoMessage)
- {
- toBytes = () => protoMessage.ToByteArray();
- }
+ return source.Task;
+ }
- TransportMessage<T> message = new TransportMessage<T>(token, request, TransportMessageDirection.Request, toBytes, source);
- message.ShouldLog = config.ShouldLog;
- message.Immidiate = config.Immediate;
- message.Priority = config.Priority;
- message.TransportComponentName = GetExtendedComponentName();
- message.ThreadingMode = config.ThreadingMode;
+ /// <summary>
+ /// Sends the response.
+ /// </summary>
+ /// <param name="response">The response.</param>
+ /// <param name="token">The token.</param>
+ /// <param name="completed">The completed.</param>
+ /// <param name="errorCode">The error code.</param>
+ /// <param name="errorMessage">The error message.</param>
+ /// <returns></returns>
+ /// <exception cref="System.InvalidOperationException">Matching request token was not found!</exception>
+ public Task SendResponse(IMessage response, string token, bool? completed = default(bool?), ErrorCode? errorCode = default(ErrorCode?), string errorMessage = null)
+ {
+ String responseName = response.GetType().Name;
- TimeSpan? timeout = config.Timeout;
+ MessageContainer container = new MessageContainer();
+ container.Token = token;
+ container.Data = response.ToByteString();
+ container.Type = MessageFactory.ParseMessageType(responseName);
- if (request is MessageContainer container)
+ if (errorCode.HasValue)
{
- timeout = GetContainerTimeoutOrDefault(container);
+ container.Error = errorCode.Value;
}
- if (timeout == null)
+ if (errorMessage != null)
{
- timeout = RequestTimeout;
+ container.ErrorMessage = errorMessage;
}
- if (request is ITangoMessage tanMessage)
+ if (completed.HasValue)
{
- tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds;
+ container.Completed = completed.Value;
}
- String responseName = requestName.Replace("Request", "Response");
+ return SendResponse(container);
+ }
- LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: {requestName} Token: {token}", LogCategory.Debug);
- LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug);
+ /// <summary>
+ /// Sends the request.
+ /// </summary>
+ /// <param name="container">The container.</param>
+ /// <returns></returns>
+ public Task<MessageContainer> SendRequest(MessageContainer container)
+ {
+ String responseName = container.Type.ToString().Replace("Request", "Response");
+ TimeSpan? timeout = GetContainerTimeoutOrDefault(container);
+
+ LogManager.Log("Queuing request message: " + container.Type + " Token: " + container.Token, LogCategory.Debug);
+ LogManager.Log("Expected response: " + responseName, LogCategory.Debug);
if (State != TransportComponentState.Connected)
{
- throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}."));
+ throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}."));
}
+ TaskCompletionSource<MessageContainer> source = new TaskCompletionSource<MessageContainer>();
+ TransportMessage<MessageContainer> message = new TransportMessage<MessageContainer>(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), source);
- Action timeoutAction = () =>
+ message.ActivateTimeout = () =>
{
- if (!source.Task.IsCompleted)
+ TimeoutTask.StartNew(() =>
{
- TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds.");
- OnRequestFailed(message, ex);
- message.SetException(ex);
- }
- };
- if (config.ThreadingMode == TransportThreadingMode.NewThread)
- {
- message.ActivateTimeout = () =>
- {
- TimeoutTask.StartNew(() =>
+ if (!source.Task.IsCompleted)
{
- timeoutAction();
- }, timeout.Value);
- };
- }
- else
- {
- message.ActivateTimeout = () =>
- {
- Task.Delay(timeout.Value).ContinueWith((x) =>
- {
- timeoutAction();
- });
- };
- }
-
- EnqueueMessageOut(message);
-
- return source;
- }
-
- /// <summary>
- /// Sends a request and expecting multiple response messages.
- /// </summary>
- /// <param name="request"></param>
- /// <param name="config">Request configuration.</param>
- /// <returns></returns>
- /// <exception cref="InvalidOperationException"></exception>
- public IObservable<IMessage> SendContinuousRequest(IMessage request, TransportContinuousRequestConfig config = null)
- {
- MessageContainer container = new MessageContainer();
- container.Token = Guid.NewGuid().ToString();
- container.Data = request.ToByteString();
- container.Type = MessageFactory.ParseMessageType(request.GetType().Name);
+ TimeoutException ex = new TimeoutException("Request message: " + container.Type + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
+ LogManager.Log(ex);
+ LogManager.Log("Setting request task exception...", LogCategory.Debug);
+ source.SetException(ex);
+ }
- //We need to assign this timeout because when the internal method detects a MessageContainer it will bypass the container continuous timeout assignment.
- container.ContinuousTimeout = config.ContinuousTimeout != null ? (UInt32)config.ContinuousTimeout.Value.TotalMilliseconds : 0;
+ }, timeout != null ? timeout.Value : RequestTimeout);
- container.Continuous = true;
+ };
- return SendContinuousRequestInternal<IMessage>(container.Token, container, config);
- }
+ EnqueueMessageOut(message);
- /// <summary>
- /// Sends a request and expecting multiple response messages.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="request"></param>
- /// <param name="config">Request configuration.</param>
- /// <returns></returns>
- /// <exception cref="InvalidOperationException"></exception>
- public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, TransportContinuousRequestConfig config = null) where Request : IMessage<Request> where Response : IMessage<Response>
- {
- request.Container.Continuous = true;
- request.Container.Completed = false;
- return SendContinuousRequestInternal<TangoMessage<Response>>(request.Container.Token, request, config);
+ return source.Task;
}
/// <summary>
- /// Sends a continuous request.
+ /// Sends the response.
/// </summary>
/// <param name="container">The container.</param>
- /// <param name="config">Request configuration.</param>
/// <returns></returns>
- /// <exception cref="InvalidOperationException"></exception>
- public IObservable<MessageContainer> SendContinuousRequest(MessageContainer container, TransportContinuousRequestConfig config = null)
+ /// <exception cref="System.InvalidOperationException">Matching request token was not found!</exception>
+ public Task SendResponse(MessageContainer container)
{
- return SendContinuousRequestInternal<MessageContainer>(container.Token, container, config);
- }
-
- private IObservable<T> SendContinuousRequestInternal<T>(String token, Object request, TransportContinuousRequestConfig config = null)
- {
- config = config ?? new TransportContinuousRequestConfig();
+ String token = container.Token;
- String requestName = TransportMessageBase.GetActualMessageTypeName(request);
+ LogManager.Log("Queuing response message: " + container.Type, LogCategory.Debug);
- Func<byte[]> toBytes = null;
+ PendingResponse pendingResponse = null;
- if (request is ITangoMessage tangoMessage)
- {
- toBytes = () => Encoder.Encode(tangoMessage);
- }
- else if (request is IMessage protoMessage)
+ if (State != TransportComponentState.Connected)
{
- toBytes = () => protoMessage.ToByteArray();
+ throw LogManager.Log(new InvalidOperationException($"Could not send the response while transporter state is {State}."));
}
- Subject<T> subject = new Subject<T>();
-
- TransportMessage<T> message = new TransportMessage<T>(token, request, TransportMessageDirection.Request, toBytes, null);
- message.ShouldLog = config.ShouldLog;
- message.Immidiate = config.Immediate;
- message.Priority = config.Priority;
- message.TransportComponentName = GetExtendedComponentName();
- message.IsContinuous = true;
- message.ContinuesResponseSubject = subject;
-
- TimeSpan? timeout = config.Timeout;
- TimeSpan? continuousTimeout = config.ContinuousTimeout;
+ LogManager.Log("Searching for matching request token: " + token, LogCategory.Debug);
- if (request is MessageContainer container)
+ if (_pendingResponses.TryGetValue(token, out pendingResponse))
{
- timeout = GetContainerTimeoutOrDefault(container);
+ LogManager.Log("Found matching request token: " + token, LogCategory.Debug);
- if (timeout == null)
+ if (!pendingResponse.IsContinuous)
{
- timeout = config.Timeout != null ? config.Timeout.Value : RequestTimeout;
- container.Timeout = (uint)timeout.Value.TotalMilliseconds;
+ LogManager.Log("Removing matching request token.", LogCategory.Debug);
+ _pendingResponses.Remove(token);
+ }
+ else if (container.Completed)
+ {
+ LogManager.Log("Response completed. Removing matching request token.", LogCategory.Debug);
+ _pendingResponses.Remove(token);
}
-
- continuousTimeout = GetContainerContinuousTimeoutOrDefault(container);
}
-
- if (timeout == null)
+ else
{
- timeout = RequestTimeout;
+ //This should never happen.
+ throw LogManager.Log(new InvalidOperationException("Matching request token was not found!"), LogCategory.Critical);
}
- if (request is ITangoMessage tanMessage)
- {
- tanMessage.Container.Continuous = true;
- tanMessage.Container.Completed = false;
- tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds;
- tanMessage.Container.ContinuousTimeout = continuousTimeout != null ? (UInt32)continuousTimeout.Value.TotalMilliseconds : 0;
- }
+ TaskCompletionSource<object> source = new TaskCompletionSource<object>();
+ TransportMessage<object> message = new TransportMessage<object>(token, container, TransportMessageDirection.Response, () => container.ToByteArray(), source);
+ EnqueueMessageOut(message);
+ return source.Task;
+ }
+ /// <summary>
+ /// Sends a request and expecting multiple response messages.
+ /// </summary>
+ /// <param name="request">The request.</param>
+ /// <param name="timeout"></param>
+ /// <returns></returns>
+ public IObservable<IMessage> SendContinuousRequest(IMessage request, TimeSpan? timeout = default(TimeSpan?))
+ {
+ String requestName = request.GetType().Name;
String responseName = requestName.Replace("Request", "Response");
- LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: {requestName} Token: {token}", LogCategory.Debug);
- LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug);
+ MessageContainer container = new MessageContainer();
+ container.Token = Guid.NewGuid().ToString();
+ container.Data = request.ToByteString();
+ container.Type = MessageFactory.ParseMessageType(requestName);
+ container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
+ container.Continuous = true;
+
+ LogManager.Log("Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug);
if (State != TransportComponentState.Connected)
{
- throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}."));
+ throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}."));
}
- Action timeoutAction = () =>
- {
- if (!message.AtLeastOneResponseReceived)
- {
- TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds.");
- OnRequestFailed(message, ex);
- message.SetException(ex);
- }
+ Subject<IMessage> subject = new Subject<IMessage>();
- if (continuousTimeout != null)
- {
- Task.Factory.StartNew(async () =>
- {
- while (!message.Completed)
- {
- await Task.Delay(continuousTimeout.Value).ContinueWith((y) =>
- {
- if (!message.Completed)
- {
- if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value)
- {
- TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Continuous request message '{requestName}' had failed to provide a response for a period of {continuousTimeout.Value.TotalSeconds} seconds and has timed out.");
- OnRequestFailed(message, ex);
- message.SetException(ex);
- return;
- }
- }
- });
- }
- });
- }
- };
+ LogManager.Log("Expected response: " + responseName, LogCategory.Debug);
- if (config.ThreadingMode == TransportThreadingMode.NewThread)
+ TransportMessage<IMessage> message = new TransportMessage<IMessage>(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), null)
{
- message.ActivateTimeout = () =>
- {
- TimeoutTask.StartNew(() =>
- {
- timeoutAction();
- }, timeout.Value);
- };
- }
- else
+ IsContinuous = true,
+ ContinuesResponseSubject = subject,
+ };
+
+ message.ActivateTimeout = () =>
{
- message.ActivateTimeout = () =>
+
+ TimeoutTask.StartNew(() =>
{
- Task.Delay(timeout.Value).ContinueWith((x) =>
+
+ if (!message.AtLeastOneResponseReceived)
{
- timeoutAction();
- });
- };
- }
+ TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
+ LogManager.Log(ex);
+ LogManager.Log("Setting request exception...", LogCategory.Debug);
+ message.SetException(ex);
+ }
+
+ }, timeout != null ? timeout.Value : RequestTimeout);
+
+ };
EnqueueMessageOut(message);
@@ -854,191 +577,200 @@ namespace Tango.Transport
}
/// <summary>
- /// Sends a generic request of any type.
+ /// Sends a request.
/// </summary>
/// <typeparam name="Request">The type of the request.</typeparam>
/// <typeparam name="Response">The type of the response.</typeparam>
/// <param name="request">The request.</param>
- /// <param name="config">The configuration.</param>
- /// <returns></returns>
- public async Task<Response> SendGenericRequest<Request, Response>(Request request, TransportRequestConfig config = null) where Request : class where Response : class
- {
- GenericRequest genericRequest = new GenericRequest();
- genericRequest.Type = request.GetType().AssemblyQualifiedName;
- genericRequest.Data = GenericMessageSerializer.SerializeToByteString<Request>(request, GenericProtocol);
-
- var response = await SendRequest<GenericRequest, GenericResponse>(genericRequest, config);
- var responseObject = GenericMessageSerializer.DeserializeFromByteString<Response>(response.Message.Data, GenericProtocol);
- return responseObject;
- }
-
- /// <summary>
- /// Sends a generic request and expecting multiple generic response messages.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="config">Request configuration.</param>
+ /// <param name="timeout">Optional timeout. If not specified will use the <see cref="RequestTimeout" />.</param>
/// <returns></returns>
- public IObservable<Response> SendGenericContinuousRequest<Request, Response>(Request request, TransportContinuousRequestConfig config = null) where Request : class where Response : class
+ public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TimeSpan? timeout = null) where Request : IMessage<Request> where Response : IMessage<Response>
{
- GenericRequest genericRequest = new GenericRequest();
- genericRequest.Type = request.GetType().AssemblyQualifiedName;
+ LogManager.Log("Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug);
+ LogManager.Log("Expected response: " + typeof(Response).Name, LogCategory.Debug);
- genericRequest.Data = GenericMessageSerializer.SerializeToByteString<Request>(request, GenericProtocol);
-
- Subject<Response> subject = new Subject<Response>();
-
- SendContinuousRequest<GenericRequest, GenericResponse>(genericRequest, config).Subscribe((response) =>
+ if (State != TransportComponentState.Connected)
{
+ throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}."));
+ }
- try
- {
- var responseObject = GenericMessageSerializer.DeserializeFromByteString<Response>(response.Message.Data, GenericProtocol);
- subject.OnNext(responseObject);
- }
- catch (Exception ex)
- {
- Debugger.Break();
- System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request continuous handler.\n{ex.ToString()}");
- //Ignore exception at the client side.
- }
-
- }, (ex) =>
- {
+ request.Container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
- try
- {
- subject.OnError(ex);
- }
- catch (Exception xx)
- {
- Debugger.Break();
- System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request error handler.\n{xx.ToString()}");
- //Ignore exception at the client side.
- }
+ TaskCompletionSource<TangoMessage<Response>> source = new TaskCompletionSource<TangoMessage<Response>>();
+ TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source);
- }, () =>
+ message.ActivateTimeout = () =>
{
- try
- {
- subject.OnCompleted();
- }
- catch (Exception ex)
+ TimeoutTask.StartNew(() =>
{
- Debugger.Break();
- System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request completed handler.\n{ex.ToString()}");
- //Ignore exception at the client side.
- }
- });
- return subject.AsObservable();
- }
+ if (!source.Task.IsCompleted)
+ {
+ TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
+ LogManager.Log(ex);
+ LogManager.Log("Setting request task exception...", LogCategory.Debug);
+ source.SetException(ex);
+ }
- #endregion
+ }, timeout != null ? timeout.Value : RequestTimeout);
+ };
- #region Public Response Methods
+ EnqueueMessageOut(message);
- /// <summary>
- /// Sends a response.
- /// </summary>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="response">The response.</param>
- /// <returns></returns>
- public Task SendResponse<Response>(TangoMessage<Response> response) where Response : IMessage<Response>
- {
- return SendResponse<Response>(response, response.Container.Token);
+ return source.Task;
}
/// <summary>
- /// Sends the response.
+ /// Sends a request and expecting multiple response messages.
/// </summary>
- /// <param name="response"></param>
- /// <param name="token">Request token.</param>
- /// <param name="config">Response configuration.</param>
+ /// <typeparam name="Request">The type of the request.</typeparam>
+ /// <typeparam name="Response">The type of the response.</typeparam>
+ /// <param name="request">The request.</param>
+ /// <param name="timeout"></param>
/// <returns></returns>
- public Task SendResponse(IMessage response, String token, TransportResponseConfig config = null)
+ public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, TimeSpan? firstTimeout = null, TimeSpan? continousTimeout = null) where Request : IMessage<Request> where Response : IMessage<Response>
{
- config = config ?? new TransportResponseConfig();
+ LogManager.Log("Queuing continuous request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug);
- String responseName = response.GetType().Name;
+ Subject<TangoMessage<Response>> subject = new Subject<TangoMessage<Response>>();
- MessageContainer container = new MessageContainer();
- container.Token = token;
- container.Data = response.ToByteString();
- container.Type = MessageFactory.ParseMessageType(responseName);
+ LogManager.Log("Expected response: " + typeof(Response).Name, LogCategory.Debug);
- if (config.ErrorCode.HasValue)
+ if (State != TransportComponentState.Connected)
{
- container.Error = config.ErrorCode.Value;
+ throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}."));
}
- if (config.ErrorMessage != null)
+ request.Container.Continuous = true;
+ request.Container.Completed = false;
+
+ request.Container.Timeout = firstTimeout.HasValue ? (UInt32)firstTimeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
+ request.Container.ContinuousTimeout = continousTimeout.HasValue ? (UInt32)continousTimeout.Value.TotalMilliseconds : 0;
+
+ TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), null)
{
- container.ErrorMessage = config.ErrorMessage;
- }
+ IsContinuous = true,
+ ContinuesResponseSubject = subject,
+ };
- container.Completed = config.Completed;
+ message.ActivateTimeout = () =>
+ {
+ TimeoutTask.StartNew(() =>
+ {
- return SendResponse(container);
+ if (!message.AtLeastOneResponseReceived)
+ {
+ TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (firstTimeout != null ? firstTimeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
+ LogManager.Log(ex);
+ LogManager.Log("Setting request exception...", LogCategory.Debug);
+ message.SetException(ex);
+ }
+
+ if (continousTimeout != null)
+ {
+ Task.Factory.StartNew(async () =>
+ {
+ while (!message.Completed)
+ {
+ await Task.Delay(continousTimeout.Value).ContinueWith((y) =>
+ {
+ if (!message.Completed)
+ {
+ if (DateTime.Now - message.LastResponseTime > continousTimeout.Value)
+ {
+ TimeoutException ex = new TimeoutException("Continuous request message: " + typeof(Request).Name + " had failed to provide a response for a period of " + (continousTimeout.Value.TotalSeconds) + " seconds and has timed out.");
+ LogManager.Log(ex);
+ LogManager.Log("Setting request exception...", LogCategory.Debug);
+ message.SetException(ex);
+ return;
+ }
+ }
+ });
+ }
+ });
+ }
+
+ }, firstTimeout != null ? firstTimeout.Value : RequestTimeout);
+ };
+
+ EnqueueMessageOut(message);
+
+ return subject.AsObservable();
}
/// <summary>
- /// Sends the response.
+ /// Sends a continuous request.
/// </summary>
/// <param name="container">The container.</param>
/// <returns></returns>
- /// <exception cref="InvalidOperationException">
- /// </exception>
- public Task SendResponse(MessageContainer container, TransportResponseConfig config = null)
+ public IObservable<MessageContainer> SendContinuousRequest(MessageContainer container)
{
- config = config ?? new TransportResponseConfig();
-
- if (_pushThread == null || _pushThread.ThreadState == System.Threading.ThreadState.Aborted)
- {
- throw new InvalidOperationException("Transporter push thread is not in a running state.");
- }
+ TimeSpan? timeout = GetContainerTimeoutOrDefault(container);
+ TimeSpan? continuousTimeout = container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?);
- String token = container.Token;
-
- LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + container.Type, LogCategory.Debug);
+ String requestName = container.Type.ToString();
+ String responseName = requestName.Replace("Request", "Response");
- PendingResponse pendingResponse = null;
+ LogManager.Log("Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug);
if (State != TransportComponentState.Connected)
{
- throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}."));
+ throw LogManager.Log(new InvalidOperationException($"Could not send the request while transporter state is {State}."));
}
- LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug);
+ Subject<MessageContainer> subject = new Subject<MessageContainer>();
- if (_pendingResponses.TryGetValue(token, out pendingResponse))
+ LogManager.Log("Expected response: " + responseName, LogCategory.Debug);
+
+ TransportMessage<MessageContainer> message = new TransportMessage<MessageContainer>(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), null)
{
- LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug);
+ IsContinuous = true,
+ ContinuesResponseSubject = subject,
+ };
- if (!pendingResponse.IsContinuous)
- {
- LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug);
- _pendingResponses.Remove(token);
- }
- else if (container.Completed)
- {
- LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug);
- _pendingResponses.Remove(token);
- }
- }
- else
+ message.ActivateTimeout = () =>
{
- //This should never happen.
- throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical);
- }
+ TimeoutTask.StartNew(() =>
+ {
+
+ if (!message.AtLeastOneResponseReceived)
+ {
+ TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
+ LogManager.Log(ex);
+ LogManager.Log("Setting request exception...", LogCategory.Debug);
+ message.SetException(ex);
+ }
+
+ if (continuousTimeout != null)
+ {
+ Task.Factory.StartNew(async () =>
+ {
+ while (!message.Completed)
+ {
+ await Task.Delay(continuousTimeout.Value).ContinueWith((y) =>
+ {
+ if (!message.Completed)
+ {
+ if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value)
+ {
+ TimeoutException ex = new TimeoutException("Continuous request message: " + requestName + " had failed to provide a response for a period of " + (continuousTimeout.Value.TotalSeconds) + " seconds and has timed out.");
+ LogManager.Log(ex);
+ LogManager.Log("Setting request exception...", LogCategory.Debug);
+ message.SetException(ex);
+ return;
+ }
+ }
+ });
+ }
+ });
+ }
+
+ }, timeout != null ? timeout.Value : RequestTimeout);
+ };
- TaskCompletionSource<object> source = new TaskCompletionSource<object>();
- TransportMessage<object> message = new TransportMessage<object>(token, container, TransportMessageDirection.Response, () => container.ToByteArray(), source);
- message.ShouldLog = config.ShouldLog;
- message.Immidiate = config.Immediate;
- message.Priority = config.Priority;
EnqueueMessageOut(message);
- return source.Task;
+
+ return subject.AsObservable();
}
/// <summary>
@@ -1046,156 +778,94 @@ namespace Tango.Transport
/// </summary>
/// <typeparam name="Response">The type of the response.</typeparam>
/// <param name="response">The response.</param>
- /// <param name="token">Request token.</param>
- /// <param name="config">Response configuration.</param>
/// <returns></returns>
- /// <exception cref="InvalidOperationException">
- /// Transporter push thread is not in a running state.
- /// </exception>
- public Task SendResponse<Response>(TangoMessage<Response> response, String token, TransportResponseConfig config = null) where Response : IMessage<Response>
+ public Task SendResponse<Response>(TangoMessage<Response> response) where Response : IMessage<Response>
{
- config = config ?? new TransportResponseConfig();
+ return SendResponse<Response>(response, response.Container.Token);
+ }
- if (_pushThread == null || _pushThread.ThreadState == System.Threading.ThreadState.Aborted)
+ /// <summary>
+ /// Sends a response for the specified token.
+ /// </summary>
+ /// <typeparam name="Response">The type of the response.</typeparam>
+ /// <param name="response">The response.</param>
+ /// <param name="token">The token.</param>
+ /// <param name="completed">The completed.</param>
+ /// <param name="errorCode">The error code.</param>
+ /// <param name="errorMessage">The error message.</param>
+ /// <returns></returns>
+ /// <exception cref="InvalidOperationException">Matching request token was not found!</exception>
+ public Task SendResponse<Response>(TangoMessage<Response> response, String token, bool? completed = null, ErrorCode? errorCode = null, String errorMessage = null) where Response : IMessage<Response>
+ {
+ if (_pushThread == null || _pushThread.ThreadState == ThreadState.Aborted)
{
throw new InvalidOperationException("Transporter push thread is not in a running state.");
}
response.Container.Token = token;
- response.Container.Completed = config.Completed;
+ if (completed.HasValue)
+ {
+ response.Container.Completed = completed.Value;
+ }
- if (config.ErrorCode.HasValue)
+ if (errorCode.HasValue)
{
- response.Container.Error = config.ErrorCode.Value;
+ response.Container.Error = errorCode.Value;
}
- if (!String.IsNullOrEmpty(config.ErrorMessage))
+ if (!String.IsNullOrEmpty(errorMessage))
{
- response.Container.ErrorMessage = config.ErrorMessage;
+ response.Container.ErrorMessage = errorMessage;
}
- LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + typeof(Response).Name, LogCategory.Debug);
+ LogManager.Log("Queuing response message: " + typeof(Response).Name, LogCategory.Debug);
if (State != TransportComponentState.Connected)
{
- throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}."));
+ throw LogManager.Log(new InvalidOperationException($"Could not send the response while transporter state is {State}."));
}
PendingResponse pendingResponse = null;
- LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug);
+ LogManager.Log("Searching for matching request token: " + token, LogCategory.Debug);
if (_pendingResponses.TryGetValue(token, out pendingResponse))
{
- LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug);
+ LogManager.Log("Found matching request token: " + token, LogCategory.Debug);
if (!pendingResponse.IsContinuous)
{
- LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug);
+ LogManager.Log("Removing matching request token.", LogCategory.Debug);
_pendingResponses.Remove(token);
}
else if (response.Container.Completed)
{
- LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug);
+ LogManager.Log("Response completed. Removing matching request token.", LogCategory.Debug);
_pendingResponses.Remove(token);
}
}
else
{
//This should never happen.
- throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical);
+ throw LogManager.Log(new InvalidOperationException("Matching request token was not found!"), LogCategory.Critical);
}
TaskCompletionSource<object> source = new TaskCompletionSource<object>();
TransportMessage<object> message = new TransportMessage<object>(token, response, TransportMessageDirection.Response, () => Encoder.Encode(response), source);
- message.ShouldLog = config.ShouldLog;
- message.Immidiate = config.Immediate;
- message.Priority = config.Priority;
EnqueueMessageOut(message);
return source.Task;
}
/// <summary>
- /// Sends a generic response.
- /// </summary>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="response">The response.</param>
- /// <param name="token">The request token.</param>
- /// <param name="config">The response configuration.</param>
- /// <returns></returns>
- public async Task SendGenericResponse<Response>(Response response, String token, TransportResponseConfig config = null) where Response : class
- {
- GenericResponse genericResponse = new GenericResponse();
-
- genericResponse.Type = response.GetType().AssemblyQualifiedName;
- genericResponse.Data = GenericMessageSerializer.SerializeToByteString<Response>(response, GenericProtocol);
- await SendResponse<GenericResponse>(genericResponse, token, config);
- }
-
- /// <summary>
/// Sends a general error response agnostic to the type of request.
/// </summary>
/// <param name="exception">The exception.</param>
- /// <param name="config">Response configuration.</param>
+ /// <param name="token">The token.</param>
/// <returns></returns>
- public Task SendErrorResponse(Exception exception, String token)
+ public Task SendErrorResponse(Exception exception, string token)
{
- return SendResponse<ErrorResponse>(new ErrorResponse() { }, token, new TransportResponseConfig()
- {
- ErrorCode = ErrorCode.GeneralError,
- Completed = true,
- ErrorMessage = exception.FlattenMessage()
- });
- }
-
- #endregion
-
- #region Request Handlers
-
- /// <summary>
- /// Registers a custom request handler.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <param name="callback">The callback.</param>
- public void RegisterRequestHandler<Request>(RequestHandlerCallbackDelegate<Request> callback) where Request : class
- {
- RequestHandler handler = new RequestHandler();
- handler.RequestType = typeof(Request);
- handler.RegisteredCallback = callback;
- handler.Callback = (transporter, obj, token) =>
- {
- callback?.Invoke(transporter, obj as Request, token);
- };
-
- _requestHandlers.Add(handler);
- }
-
- /// <summary>
- /// Unregisters a custom request handler.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <param name="callback">The callback.</param>
- public void UnregisterRequestHandler<Request>(RequestHandlerCallbackDelegate<Request> callback) where Request : class
- {
- var handler = _requestHandlers.FirstOrDefault(x => (x.RegisteredCallback as RequestHandlerCallbackDelegate<Request>) == callback);
- if (handler != null)
- {
- _requestHandlers.Remove(handler);
- }
- }
-
- /// <summary>
- /// Copies this instance request handlers to the specified instance.
- /// </summary>
- /// <param name="transporter">The transporter to copy the handlers to.</param>
- public void CopyRequestHandlers(ITransporter transporter)
- {
- foreach (var handler in _requestHandlers.ToList())
- {
- (transporter as TransporterBase)._requestHandlers.Add(handler);
- _requestHandlers.Remove(handler);
- }
+ return SendResponse<ErrorResponse>(new ErrorResponse() { }, token, true, ErrorCode.GeneralError, exception.Message);
}
#endregion
@@ -1207,27 +877,17 @@ namespace Tango.Transport
/// </summary>
protected void StartThreads()
{
- try
- {
- _pullThread = new Thread(PullThreadMethod);
- _pullThread.Name = $"{GetExtendedComponentName()} Pull Thread";
- _pullThread.IsBackground = true;
- _pullThread.Start();
+ _pullThread = new Thread(PullThreadMethod);
+ _pullThread.IsBackground = true;
+ _pullThread.Start();
- _pushThread = new Thread(PushThreadMethod);
- _pushThread.Name = $"{GetExtendedComponentName()} Push Thread";
- _pushThread.IsBackground = true;
- _pushThread.Start();
+ _pushThread = new Thread(PushThreadMethod);
+ _pushThread.IsBackground = true;
+ _pushThread.Start();
- _keepAliveThread = new Thread(KeepAliveThreadMethod);
- _keepAliveThread.Name = $"{GetExtendedComponentName()} KeepAlive Thread";
- _keepAliveThread.IsBackground = true;
- _keepAliveThread.Start();
- }
- catch (Exception ex)
- {
- LogManager.Log(ex, "Error starting transporter threads.");
- }
+ _keepAliveThread = new Thread(KeepAliveThreadMethod);
+ _keepAliveThread.IsBackground = true;
+ _keepAliveThread.Start();
}
/// <summary>
@@ -1241,22 +901,12 @@ namespace Tango.Transport
}
/// <summary>
- /// Gets the container timeout or default.
- /// </summary>
- /// <param name="container">The container.</param>
- /// <returns></returns>
- private TimeSpan? GetContainerContinuousTimeoutOrDefault(MessageContainer container)
- {
- return container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?);
- }
-
- /// <summary>
/// Enqueues the message and releases the push wait handle.
/// </summary>
/// <param name="message">The message.</param>
private void EnqueueMessageOut(TransportMessageBase message)
{
- _sendingQueue.BlockEnqueue(message, message.Priority);
+ _sendingQueue.BlockEnqueue(message);
}
/// <summary>
@@ -1289,46 +939,31 @@ namespace Tango.Transport
{
if (message.Token.Length != MESSAGE_TOKEN_LENGTH)
{
- var ex = LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Invalid message token length: " + message.Token));
- OnRequestFailed(message, ex);
- message.SetException(ex);
+ message.SetException(LogManager.Log(new InvalidOperationException("Invalid message token length: " + message.Token)));
continue;
}
+ LogManager.Log("Sending message on adapter: " + Adapter.Address + "...", LogCategory.Debug, message.Message);
+
if (message.Direction == TransportMessageDirection.Request)
{
- if (message.ShouldLog)
- {
- LogManager.Log($"{GetExtendedComponentName()}: Sending request '{message.GetActualMessageTypeName()}'...\n{message.GetActualMessage().ToJsonString()}", LogCategory.Info);
- OnRequestSent(message);
- }
-
lock (_pendingRequests)
{
_pendingRequests.Add(message);
}
}
- else
- {
- if (message.ShouldLog)
- {
- LogManager.Log($"{GetExtendedComponentName()}: Sending response '{message.GetActualMessageTypeName()}'...\n{message.GetActualMessage().ToJsonString()}", LogCategory.Info);
- }
- }
- Adapter.Write(message.Serialize(), message.Immidiate);
+ Adapter.Write(message.Serialize());
message.ActivateTimeout?.Invoke();
- LogManager.Log($"{GetExtendedComponentName()}: Message sent...", LogCategory.Debug, message.Message);
+ LogManager.Log("Message sent on adapter: " + Adapter.Address + "...", LogCategory.Debug, message.Message);
}
else
{
if (message.Direction == TransportMessageDirection.Request)
{
- var ex = new InvalidOperationException($"{GetExtendedComponentName()}: Could not send message " + message.GetActualMessageTypeName() + ". Adapter is disconnected.");
- OnRequestFailed(message, ex);
- message.SetException(ex);
+ message.SetException(LogManager.Log(new InvalidOperationException("Could not send message " + message.Message.GetType().Name + ". Adapter is disconnected.")));
}
}
@@ -1337,22 +972,15 @@ namespace Tango.Transport
message.SetResult(true, true);
}
}
- catch (ThreadAbortException)
- {
- Exception requestException = FailedStateException != null ? FailedStateException : new TransporterDisconnectedException("The transporter push thread has been aborted.");
- OnRequestFailed(message, requestException);
- message.SetException(requestException);
- }
catch (Exception ex)
{
- OnRequestFailed(message, ex);
message.SetException(ex);
}
}
}
catch (ThreadAbortException)
{
- LogManager.Log($"{GetExtendedComponentName()}: Push thread has been aborted.");
+ LogManager.Log("Push thread has been aborted.");
}
catch (Exception ex)
{
@@ -1375,66 +1003,34 @@ namespace Tango.Transport
{
byte[] data = _arrivedResponses.BlockDequeue();
- LogManager.Log($"{GetExtendedComponentName()}: Message received...", LogCategory.Debug);
+ LogManager.Log("Message received on adapter: " + Adapter.Address, LogCategory.Debug);
- LogManager.Log($"{GetExtendedComponentName()}: Parsing message container...", LogCategory.Debug);
+ LogManager.Log("Parsing message container...", LogCategory.Debug);
+ MessageContainer container = Encoder.DecodeContainer(data);
- MessageContainer container = null;
-
- try
- {
- container = Encoder.DecodeContainer(data);
- }
- catch (Exception ex)
- {
- LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing message container. Skipping incoming message...");
- continue;
- }
-
- LogManager.Log($"{GetExtendedComponentName()}: Message was identified as " + container.Type + ".", LogCategory.Debug);
+ LogManager.Log("Message was identified as " + container.Type + ".", LogCategory.Debug);
if (container.Token.Length != MESSAGE_TOKEN_LENGTH)
{
- LogManager.Log($"{GetExtendedComponentName()}: Invalid message token length received: " + container.Token, LogCategory.Error);
+ LogManager.Log("Invalid message token length received: " + container.Token, LogCategory.Error);
continue;
}
- LogManager.Log($"{GetExtendedComponentName()}: Searching for pending request token: " + container.Token, LogCategory.Debug);
+ LogManager.Log("Searching for pending request token: " + container.Token, LogCategory.Debug);
TransportMessageBase request = null;
lock (_pendingRequests)
{
- try
- {
- var requests = _pendingRequests.ToList().Where(x => x.Token == container.Token).ToList();
-
- try
- {
- if (requests.Count > 1)
- {
- LogManager.Log($"{GetExtendedComponentName()}: {requests.Count} requests with the same token were detected - {requests.First().GetActualMessageTypeName()}.", LogCategory.Warning);
- }
- }
- catch (Exception ex)
- {
- LogManager.Log(ex, LogCategory.Warning, "Something bad happened please investigate.");
- }
-
- request = requests.LastOrDefault();
- }
- catch (Exception ex)
- {
- LogManager.Log(ex, LogCategory.Warning, "Something bad happened please investigate.");
- }
+ request = _pendingRequests.ToList().SingleOrDefault(x => x.Token == container.Token);
}
if (request != null)
{
- LogManager.Log($"{GetExtendedComponentName()}: Found pending request: " + request.GetActualMessageTypeName(), LogCategory.Debug);
+ LogManager.Log("Found pending request: " + (request.Message.GetType().IsGenericType ? request.Message.GetType().GetGenericArguments()[0].Name : request.Message.GetType().Name), LogCategory.Debug);
if (!request.IsContinuous)
{
- LogManager.Log($"{GetExtendedComponentName()}: Pending request was identified as 'single response'. Removing pending request.", LogCategory.Debug);
+ LogManager.Log("Pending request was identified as 'single response'. Removing pending request.", LogCategory.Debug);
_pendingRequests.Remove(request);
@@ -1443,54 +1039,23 @@ namespace Tango.Transport
if (container.Error == ErrorCode.None)
{
var message = Encoder.Decode(data);
-
- if (request.ShouldLog)
- {
- try
- {
- String responseType = message.Type.ToString();
- var messageContent = message.GetType().GetProperty("Message").GetValue(message);
-
- try
- {
- if (messageContent.GetType() == typeof(GenericResponse))
- {
- Type genericType = Type.GetType((messageContent as GenericResponse).Type);
- responseType = genericType.Name;
- messageContent = GenericMessageSerializer.DeserializeFromByteString(genericType, (messageContent as GenericResponse).Data, GenericProtocol);
- }
- }
- catch { }
-
- LogManager.Log($"{GetExtendedComponentName()}: Response received '{responseType}'...\n{messageContent.ToJsonString()}", LogCategory.Info);
- OnResponseReceived(messageContent as IMessage);
- }
- catch
- {
- LogManager.Log("Error logging response received.", LogCategory.Warning);
- }
- }
-
- LogManager.Log($"{GetExtendedComponentName()}: Parsing inner response message and setting pending request task result...", LogCategory.Debug, message);
+ LogManager.Log("Parsing inner response message and setting pending request task result...", LogCategory.Debug, message);
request.SetResult(message, true);
- LogManager.Log($"{GetExtendedComponentName()}: Message enquirer released...", LogCategory.Debug);
+ LogManager.Log("Message enquirer released...", LogCategory.Debug);
}
else
{
- var ex = new ResponseErrorException(container, request.GetActualMessageTypeName());
- OnRequestFailed(request, ex);
- request.SetException(ex);
+ request.SetException(LogManager.Log(new ResponseErrorException(container), LogCategory.Warning));
}
}
catch (Exception ex)
{
- OnRequestFailed(request, ex);
- request.SetException(LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing response message."));
+ request.SetException(LogManager.Log(ex, "Error parsing response message."));
}
}
else
{
- LogManager.Log($"{GetExtendedComponentName()}: Pending request was identified as 'continuous response'. keeping pending request.", LogCategory.Debug);
+ LogManager.Log("Pending request was identified as 'continuous response'. keeping pending request.", LogCategory.Debug);
try
{
@@ -1498,50 +1063,32 @@ namespace Tango.Transport
{
var message = Encoder.Decode(data);
- if (request.ShouldLog && !request.AtLeastOneResponseReceived)
- {
- try
- {
- var messageContent = message.GetType().GetProperty("Message").GetValue(message);
- LogManager.Log($"{GetExtendedComponentName()}: Response received '{message.Type}'...\n{messageContent.ToJsonString()}", LogCategory.Info);
- OnResponseReceived(messageContent as IMessage);
- }
- catch
- {
- LogManager.Log("Error logging response received.", LogCategory.Warning);
- }
- }
-
- LogManager.Log($"{GetExtendedComponentName()}: Parsing inner response message and invoking continuous response callback...", LogCategory.Debug, message);
+ LogManager.Log("Parsing inner response message and invoking continuous response callback...", LogCategory.Debug, message);
if (container.Completed)
{
- LogManager.Log($"{GetExtendedComponentName()}: Continuous sequence completed.", LogCategory.Debug);
+ LogManager.Log("Continuous sequence completed.", LogCategory.Debug);
_pendingRequests.Remove(request);
}
request.SetResult(message, container.Completed);
}
else if (container.Error == ErrorCode.ContinuousResponseAborted)
{
- String m = $"{GetExtendedComponentName()}: Continuous response " + container.Type + " has been aborted: " + container.Error.ToString();
+ String m = "Continuous response " + container.Type + " has been aborted: " + container.Error.ToString();
LogManager.Log(m, LogCategory.Info);
_pendingRequests.Remove(request);
- OnRequestFailed(request, new ContinuousResponseAbortedException(container, m));
- request.SetException(new ContinuousResponseAbortedException(container, m));
+ request.SetException(new ContinuousResponseAbortedException(m));
}
else
{
- LogManager.Log($"{GetExtendedComponentName()}: Continuous response has returned with error: " + container.Error.ToString(), LogCategory.Warning);
+ LogManager.Log("Continuous response has returned with error: " + container.Error.ToString(), LogCategory.Warning);
_pendingRequests.Remove(request);
-
- var exx = new ResponseErrorException(container, request.GetActualMessageTypeName());
- OnRequestFailed(request, exx);
- request.SetException(exx);
+ request.SetException(new ResponseErrorException(container));
}
}
catch (Exception ex)
{
- LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing response message.");
+ LogManager.Log(ex, "Error parsing response message.");
}
}
@@ -1558,36 +1105,29 @@ namespace Tango.Transport
{
if (container.Type.ToString().EndsWith("Response"))
{
- LogManager.Log($"{GetExtendedComponentName()}: A response message with no awaiting request was identified. {container.Type}, Token: {container.Token}. Message ignored.", LogCategory.Warning);
+ LogManager.Log(String.Format("A response message with no awaiting request was identified. {0}, Token: {1}. Message ignored.", container.Type, container.Token), LogCategory.Warning);
continue;
}
- LogManager.Log($"{GetExtendedComponentName()}: Message was identified as a new request message: " + container.Type.ToString(), LogCategory.Debug);
+ LogManager.Log("Message was identified as a new request message: " + container.Type.ToString(), LogCategory.Debug);
try
{
- LogManager.Log($"{GetExtendedComponentName()}: Saving request token: " + container.Token, LogCategory.Debug);
+ LogManager.Log("Saving request token: " + container.Token, LogCategory.Debug);
_pendingResponses.Add(container.Token, new PendingResponse(container.Continuous));
if (container.Type == MessageType.KeepAliveRequest && EnableKeepAliveAutoResponse)
{
- LogManager.Log($"{GetExtendedComponentName()}: Submitting keep alive response...", LogCategory.Debug);
- try
- {
- SendResponse<KeepAliveResponse>(new KeepAliveResponse(), container.Token, new TransportResponseConfig()
- {
- Priority = QueuePriority.High
- });
- }
- catch { }
+ LogManager.Log("Submitting keep alive response...", LogCategory.Debug);
+ SendResponse<KeepAliveResponse>(new KeepAliveResponse(), container.Token);
}
else
{
- LogManager.Log($"{GetExtendedComponentName()}: Invoking RequestReceived event...", LogCategory.Debug, container);
+ LogManager.Log("Invoking RequestReceived event...", LogCategory.Debug, container);
try
{
- Task.Factory.StartNew(() => OnRequestReceived(new RequestReceivedEventArgs(container)));
+ Task.Factory.StartNew(() => OnRequestReceived(container));
}
catch
{
@@ -1604,7 +1144,7 @@ namespace Tango.Transport
}
catch (ThreadAbortException)
{
- LogManager.Log($"{GetExtendedComponentName()}: Pull thread has been aborted.");
+ LogManager.Log("Pull thread has been aborted.");
}
catch (Exception ex)
{
@@ -1640,24 +1180,12 @@ namespace Tango.Transport
if (_arrivedResponses.Count == 0)
{
retryCounter--;
-
- if (State == TransportComponentState.Connected)
- {
- var response = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), new TransportRequestConfig()
- {
- Timeout = KeepAliveTimeout,
- Priority = QueuePriority.High
- }).Result;
- retryCounter = KeepAliveRetries;
- }
- else
- {
- continue;
- }
+ var response = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), KeepAliveTimeout).Result;
+ retryCounter = KeepAliveRetries;
}
else
{
- LogManager.Log($"{GetExtendedComponentName()}: Keep alive request was skipped due to busy response queue.", LogCategory.Debug);
+ LogManager.Log("Keep alive request was skipped due to busy response queue.", LogCategory.Debug);
}
}
}
@@ -1682,14 +1210,14 @@ namespace Tango.Transport
else
{
retryCounter = KeepAliveRetries;
- LogManager.Log($"{GetExtendedComponentName()}: The transporter has not received a KeepAlive response within the given time, but was rescued due to other message received within the given time.", LogCategory.Warning);
+ LogManager.Log("The transporter has not received a KeepAlive response within the given time, but was rescued due to other message received within the given time.", LogCategory.Warning);
}
}
}
catch (ThreadAbortException)
{
aborted = true;
- LogManager.Log($"{GetExtendedComponentName()}: KeepAlive thread has been aborted.");
+ LogManager.Log("KeepAlive thread has been aborted.");
return;
}
catch (Exception ex)
@@ -1706,7 +1234,7 @@ namespace Tango.Transport
}
catch (ThreadAbortException)
{
- LogManager.Log($"{GetExtendedComponentName()}: KeepAlive thread has been aborted.");
+ LogManager.Log("KeepAlive thread has been aborted.");
}
}