aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport/TransporterBase.cs')
-rw-r--r--Software/Visual_Studio/Tango.Transport/TransporterBase.cs495
1 files changed, 323 insertions, 172 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
index 263e78933..ba220797e 100644
--- a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
+++ b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
@@ -18,6 +18,7 @@ using Tango.Transport.Encoders;
using Tango.PMR.Connection;
using Tango.Core.Threading;
using System.IO;
+using Tango.Core.ExtensionMethods;
namespace Tango.Transport
{
@@ -57,6 +58,21 @@ namespace Tango.Transport
/// </summary>
public event EventHandler<TransportComponentState> StateChanged;
+ /// <summary>
+ /// Occurs when a request has been sent.
+ /// </summary>
+ public event EventHandler<IMessage> RequestSent;
+
+ /// <summary>
+ /// Occurs when a request response has been received.
+ /// </summary>
+ public event EventHandler<IMessage> ResponseReceived;
+
+ /// <summary>
+ /// Occurs when a request has failed.
+ /// </summary>
+ public event EventHandler<RequestFailedEventArgs> RequestFailed;
+
#endregion
#region Properties
@@ -283,6 +299,7 @@ namespace Tango.Transport
try
{
LogManager.Log($"Notifying continuous request '{(request.Message as ITangoMessage).Type}'...");
+ OnRequestFailed(request, new TransporterDisconnectedException("Transporter disconnected."));
request.SetException(new TransporterDisconnectedException("Transporter disconnected."));
}
catch (Exception e)
@@ -292,6 +309,49 @@ namespace Tango.Transport
}
}
+ /// <summary>
+ /// Called when the request has been sent
+ /// </summary>
+ /// <param name="response">The request.</param>
+ private void OnRequestSent(TransportMessageBase request)
+ {
+ if (request.ShouldLog)
+ {
+ IMessage message = request.GetActualMessage() as IMessage;
+
+ if (message != null)
+ {
+ RequestSent?.Invoke(this, message);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Called when the response has been received
+ /// </summary>
+ /// <param name="response">The response.</param>
+ private void OnResponseReceived(IMessage response)
+ {
+ if (response != null)
+ {
+ ResponseReceived?.Invoke(this, response);
+ }
+ }
+
+ /// <summary>
+ /// Called when the request has been failed
+ /// </summary>
+ /// <param name="request">The request.</param>
+ private void OnRequestFailed(TransportMessageBase request, Exception exception)
+ {
+ IMessage message = request.GetActualMessage() as IMessage;
+
+ if (message != null)
+ {
+ RequestFailed?.Invoke(this, new RequestFailedEventArgs(message, exception));
+ }
+ }
+
#endregion
#region Constructors
@@ -381,21 +441,28 @@ namespace Tango.Transport
NotifyContinuousRequestMessagesDisconnection();
}
+ #endregion
+
+ #region Public Request Methods
+
/// <summary>
/// Sends a request.
/// </summary>
/// <param name="request">The request.</param>
- /// <param name="timeout">Optional timeout. If not specified will use the <see cref="RequestTimeout" />.</param>
+ /// <param name="config">Request configuration.</param>
/// <returns></returns>
- public Task<IMessage> SendRequest(IMessage request, TimeSpan? timeout = null)
+ /// <exception cref="InvalidOperationException"></exception>
+ public Task<IMessage> SendRequest(IMessage request, TransportRequestConfig config = null)
{
+ config = config ?? new TransportRequestConfig();
+
String requestName = request.GetType().Name;
String responseName = requestName.Replace("Request", "Response");
MessageContainer container = new MessageContainer();
container.Token = Guid.NewGuid().ToString();
container.Data = request.ToByteString();
- container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
+ container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
container.Type = MessageFactory.ParseMessageType(requestName);
LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + requestName + " Token: " + container.Token, LogCategory.Debug);
@@ -408,6 +475,8 @@ namespace Tango.Transport
TaskCompletionSource<IMessage> source = new TaskCompletionSource<IMessage>();
TransportMessage<IMessage> message = new TransportMessage<IMessage>(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), source);
+ message.ShouldLog = config.ShouldLog;
+ message.TransportComponentName = GetExtendedComponentName();
message.ActivateTimeout = () =>
{
@@ -416,13 +485,13 @@ namespace Tango.Transport
if (!source.Task.IsCompleted)
{
- TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
- LogManager.Log(ex);
+ TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug);
- source.SetException(ex);
+ OnRequestFailed(message, ex);
+ message.SetException(ex);
}
- }, timeout != null ? timeout.Value : RequestTimeout);
+ }, config.Timeout != null ? config.Timeout.Value : RequestTimeout);
};
EnqueueMessageOut(message);
@@ -431,49 +500,16 @@ namespace Tango.Transport
}
/// <summary>
- /// Sends the response.
- /// </summary>
- /// <param name="response">The response.</param>
- /// <param name="token">The token.</param>
- /// <param name="completed">The completed.</param>
- /// <param name="errorCode">The error code.</param>
- /// <param name="errorMessage">The error message.</param>
- /// <returns></returns>
- /// <exception cref="System.InvalidOperationException">Matching request token was not found!</exception>
- public Task SendResponse(IMessage response, string token, bool? completed = default(bool?), ErrorCode? errorCode = default(ErrorCode?), string errorMessage = null)
- {
- String responseName = response.GetType().Name;
-
- MessageContainer container = new MessageContainer();
- container.Token = token;
- container.Data = response.ToByteString();
- container.Type = MessageFactory.ParseMessageType(responseName);
-
- if (errorCode.HasValue)
- {
- container.Error = errorCode.Value;
- }
-
- if (errorMessage != null)
- {
- container.ErrorMessage = errorMessage;
- }
-
- if (completed.HasValue)
- {
- container.Completed = completed.Value;
- }
-
- return SendResponse(container);
- }
-
- /// <summary>
/// Sends the request.
/// </summary>
- /// <param name="container">The container.</param>
+ /// <param name="container"></param>
+ /// <param name="config">Request configuration.</param>
/// <returns></returns>
- public Task<MessageContainer> SendRequest(MessageContainer container)
+ /// <exception cref="InvalidOperationException"></exception>
+ public Task<MessageContainer> SendRequest(MessageContainer container, TransportRequestConfig config = null)
{
+ config = config ?? new TransportRequestConfig();
+
String responseName = container.Type.ToString().Replace("Request", "Response");
TimeSpan? timeout = GetContainerTimeoutOrDefault(container);
@@ -487,6 +523,8 @@ namespace Tango.Transport
TaskCompletionSource<MessageContainer> source = new TaskCompletionSource<MessageContainer>();
TransportMessage<MessageContainer> message = new TransportMessage<MessageContainer>(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), source);
+ message.ShouldLog = config.ShouldLog;
+ message.TransportComponentName = GetExtendedComponentName();
message.ActivateTimeout = () =>
{
@@ -496,9 +534,9 @@ namespace Tango.Transport
if (!source.Task.IsCompleted)
{
TimeoutException ex = new TimeoutException("Request message: " + container.Type + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
- LogManager.Log(ex);
LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug);
- source.SetException(ex);
+ OnRequestFailed(message, ex);
+ message.SetException(ex);
}
}, timeout != null ? timeout.Value : RequestTimeout);
@@ -511,61 +549,65 @@ namespace Tango.Transport
}
/// <summary>
- /// Sends the response.
+ /// Sends a request.
/// </summary>
- /// <param name="container">The container.</param>
+ /// <typeparam name="Request">The type of the request.</typeparam>
+ /// <typeparam name="Response">The type of the response.</typeparam>
+ /// <param name="request">The request.</param>
+ /// <param name="config">Request configuration.</param>
/// <returns></returns>
- /// <exception cref="System.InvalidOperationException">Matching request token was not found!</exception>
- public Task SendResponse(MessageContainer container)
+ /// <exception cref="InvalidOperationException"></exception>
+ public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TransportRequestConfig config = null) where Request : IMessage<Request> where Response : IMessage<Response>
{
- String token = container.Token;
-
- LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + container.Type, LogCategory.Debug);
+ config = config ?? new TransportRequestConfig();
- PendingResponse pendingResponse = null;
+ LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug);
+ LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + typeof(Response).Name, LogCategory.Debug);
if (State != TransportComponentState.Connected)
{
- throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}."));
+ throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}."));
}
- LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug);
+ request.Container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
- if (_pendingResponses.TryGetValue(token, out pendingResponse))
- {
- LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug);
+ TaskCompletionSource<TangoMessage<Response>> source = new TaskCompletionSource<TangoMessage<Response>>();
+ TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source);
+ message.ShouldLog = config.ShouldLog;
+ message.TransportComponentName = GetExtendedComponentName();
- if (!pendingResponse.IsContinuous)
- {
- LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug);
- _pendingResponses.Remove(token);
- }
- else if (container.Completed)
- {
- LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug);
- _pendingResponses.Remove(token);
- }
- }
- else
+ message.ActivateTimeout = () =>
{
- //This should never happen.
- throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical);
- }
+ TimeoutTask.StartNew(() =>
+ {
+
+ if (!source.Task.IsCompleted)
+ {
+ TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
+ LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug);
+ OnRequestFailed(message, ex);
+ message.SetException(ex);
+ }
+
+ }, config.Timeout != null ? config.Timeout.Value : RequestTimeout);
+ };
- TaskCompletionSource<object> source = new TaskCompletionSource<object>();
- TransportMessage<object> message = new TransportMessage<object>(token, container, TransportMessageDirection.Response, () => container.ToByteArray(), source);
EnqueueMessageOut(message);
+
return source.Task;
}
/// <summary>
/// Sends a request and expecting multiple response messages.
/// </summary>
- /// <param name="request">The request.</param>
- /// <param name="timeout"></param>
+ /// <param name="request"></param>
+ /// <param name="config">Request configuration.</param>
/// <returns></returns>
- public IObservable<IMessage> SendContinuousRequest(IMessage request, TimeSpan? timeout = default(TimeSpan?))
+ /// <exception cref="InvalidOperationException"></exception>
+ public IObservable<IMessage> SendContinuousRequest(IMessage request, TransportContinuousRequestConfig config = null)
{
+ config = config ?? new TransportContinuousRequestConfig();
+
String requestName = request.GetType().Name;
String responseName = requestName.Replace("Request", "Response");
@@ -573,7 +615,7 @@ namespace Tango.Transport
container.Token = Guid.NewGuid().ToString();
container.Data = request.ToByteString();
container.Type = MessageFactory.ParseMessageType(requestName);
- container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
+ container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
container.Continuous = true;
LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: " + requestName + " Token: " + container.Token, LogCategory.Debug);
@@ -591,6 +633,8 @@ namespace Tango.Transport
{
IsContinuous = true,
ContinuesResponseSubject = subject,
+ ShouldLog = config.ShouldLog,
+ TransportComponentName = GetExtendedComponentName(),
};
message.ActivateTimeout = () =>
@@ -601,13 +645,13 @@ namespace Tango.Transport
if (!message.AtLeastOneResponseReceived)
{
- TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
- LogManager.Log(ex);
+ TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug);
+ OnRequestFailed(message, ex);
message.SetException(ex);
}
- }, timeout != null ? timeout.Value : RequestTimeout);
+ }, config.Timeout != null ? config.Timeout.Value : RequestTimeout);
};
@@ -617,59 +661,18 @@ namespace Tango.Transport
}
/// <summary>
- /// Sends a request.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="request">The request.</param>
- /// <param name="timeout">Optional timeout. If not specified will use the <see cref="RequestTimeout" />.</param>
- /// <returns></returns>
- public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TimeSpan? timeout = null) where Request : IMessage<Request> where Response : IMessage<Response>
- {
- LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug);
- LogManager.Log($"{GetExtendedComponentName()}: Expected response: " + typeof(Response).Name, LogCategory.Debug);
-
- if (State != TransportComponentState.Connected)
- {
- throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}."));
- }
-
- request.Container.Timeout = timeout.HasValue ? (UInt32)timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
-
- TaskCompletionSource<TangoMessage<Response>> source = new TaskCompletionSource<TangoMessage<Response>>();
- TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source);
-
- message.ActivateTimeout = () =>
- {
- TimeoutTask.StartNew(() =>
- {
-
- if (!source.Task.IsCompleted)
- {
- TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
- LogManager.Log(ex);
- LogManager.Log($"{GetExtendedComponentName()}: Setting request task exception...", LogCategory.Debug);
- source.SetException(ex);
- }
-
- }, timeout != null ? timeout.Value : RequestTimeout);
- };
-
- EnqueueMessageOut(message);
-
- return source.Task;
- }
-
- /// <summary>
/// Sends a request and expecting multiple response messages.
/// </summary>
/// <typeparam name="Request">The type of the request.</typeparam>
/// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="request">The request.</param>
- /// <param name="timeout"></param>
+ /// <param name="request"></param>
+ /// <param name="config">Request configuration.</param>
/// <returns></returns>
- public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, TimeSpan? firstTimeout = null, TimeSpan? continousTimeout = null) where Request : IMessage<Request> where Response : IMessage<Response>
+ /// <exception cref="InvalidOperationException"></exception>
+ public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, TransportContinuousRequestConfig config = null) where Request : IMessage<Request> where Response : IMessage<Response>
{
+ config = config ?? new TransportContinuousRequestConfig();
+
LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: " + typeof(Request).Name + " Token: " + request.Container.Token, LogCategory.Debug);
Subject<TangoMessage<Response>> subject = new Subject<TangoMessage<Response>>();
@@ -684,13 +687,15 @@ namespace Tango.Transport
request.Container.Continuous = true;
request.Container.Completed = false;
- request.Container.Timeout = firstTimeout.HasValue ? (UInt32)firstTimeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
- request.Container.ContinuousTimeout = continousTimeout.HasValue ? (UInt32)continousTimeout.Value.TotalMilliseconds : 0;
+ request.Container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
+ request.Container.ContinuousTimeout = config.ContinuousTimeout.HasValue ? (UInt32)config.ContinuousTimeout.Value.TotalMilliseconds : 0;
TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), null)
{
IsContinuous = true,
ContinuesResponseSubject = subject,
+ ShouldLog = config.ShouldLog,
+ TransportComponentName = GetExtendedComponentName(),
};
message.ActivateTimeout = () =>
@@ -700,27 +705,27 @@ namespace Tango.Transport
if (!message.AtLeastOneResponseReceived)
{
- TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (firstTimeout != null ? firstTimeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
- LogManager.Log(ex);
+ TimeoutException ex = new TimeoutException("Request message: " + typeof(Request).Name + " had timed out after " + (config.Timeout != null ? config.Timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug);
+ OnRequestFailed(message, ex);
message.SetException(ex);
}
- if (continousTimeout != null)
+ if (config.ContinuousTimeout != null)
{
Task.Factory.StartNew(async () =>
{
while (!message.Completed)
{
- await Task.Delay(continousTimeout.Value).ContinueWith((y) =>
+ await Task.Delay(config.ContinuousTimeout.Value).ContinueWith((y) =>
{
if (!message.Completed)
{
- if (DateTime.Now - message.LastResponseTime > continousTimeout.Value)
+ if (DateTime.Now - message.LastResponseTime > config.ContinuousTimeout.Value)
{
- TimeoutException ex = new TimeoutException("Continuous request message: " + typeof(Request).Name + " had failed to provide a response for a period of " + (continousTimeout.Value.TotalSeconds) + " seconds and has timed out.");
- LogManager.Log(ex);
+ TimeoutException ex = new TimeoutException("Continuous request message: " + typeof(Request).Name + " had failed to provide a response for a period of " + (config.ContinuousTimeout.Value.TotalSeconds) + " seconds and has timed out.");
LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug);
+ OnRequestFailed(message, ex);
message.SetException(ex);
return;
}
@@ -730,7 +735,7 @@ namespace Tango.Transport
});
}
- }, firstTimeout != null ? firstTimeout.Value : RequestTimeout);
+ }, config.Timeout != null ? config.Timeout.Value : RequestTimeout);
};
EnqueueMessageOut(message);
@@ -742,9 +747,13 @@ namespace Tango.Transport
/// Sends a continuous request.
/// </summary>
/// <param name="container">The container.</param>
+ /// <param name="config">Request configuration.</param>
/// <returns></returns>
- public IObservable<MessageContainer> SendContinuousRequest(MessageContainer container)
+ /// <exception cref="InvalidOperationException"></exception>
+ public IObservable<MessageContainer> SendContinuousRequest(MessageContainer container, TransportContinuousRequestConfig config = null)
{
+ config = config ?? new TransportContinuousRequestConfig();
+
TimeSpan? timeout = GetContainerTimeoutOrDefault(container);
TimeSpan? continuousTimeout = container.ContinuousTimeout > 0 ? TimeSpan.FromMilliseconds(container.ContinuousTimeout) : default(TimeSpan?);
@@ -766,6 +775,8 @@ namespace Tango.Transport
{
IsContinuous = true,
ContinuesResponseSubject = subject,
+ ShouldLog = config.ShouldLog,
+ TransportComponentName = GetExtendedComponentName(),
};
message.ActivateTimeout = () =>
@@ -776,8 +787,8 @@ namespace Tango.Transport
if (!message.AtLeastOneResponseReceived)
{
TimeoutException ex = new TimeoutException("Request message: " + requestName + " had timed out after " + (timeout != null ? timeout.Value.TotalSeconds : RequestTimeout.TotalSeconds) + " seconds.");
- LogManager.Log(ex);
LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug);
+ OnRequestFailed(message, ex);
message.SetException(ex);
}
@@ -794,8 +805,8 @@ namespace Tango.Transport
if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value)
{
TimeoutException ex = new TimeoutException("Continuous request message: " + requestName + " had failed to provide a response for a period of " + (continuousTimeout.Value.TotalSeconds) + " seconds and has timed out.");
- LogManager.Log(ex);
LogManager.Log($"{GetExtendedComponentName()}: Setting request exception...", LogCategory.Debug);
+ OnRequestFailed(message, ex);
message.SetException(ex);
return;
}
@@ -813,6 +824,10 @@ namespace Tango.Transport
return subject.AsObservable();
}
+ #endregion
+
+ #region Public Response Methods
+
/// <summary>
/// Sends a response.
/// </summary>
@@ -825,18 +840,102 @@ namespace Tango.Transport
}
/// <summary>
- /// Sends a response for the specified token.
+ /// Sends the response.
+ /// </summary>
+ /// <param name="response"></param>
+ /// <param name="token">Request token.</param>
+ /// <param name="config">Response configuration.</param>
+ /// <returns></returns>
+ public Task SendResponse(IMessage response, String token, TransportResponseConfig config = null)
+ {
+ config = config ?? new TransportResponseConfig();
+
+ String responseName = response.GetType().Name;
+
+ MessageContainer container = new MessageContainer();
+ container.Token = token;
+ container.Data = response.ToByteString();
+ container.Type = MessageFactory.ParseMessageType(responseName);
+
+ if (config.ErrorCode.HasValue)
+ {
+ container.Error = config.ErrorCode.Value;
+ }
+
+ if (config.ErrorMessage != null)
+ {
+ container.ErrorMessage = config.ErrorMessage;
+ }
+
+ container.Completed = config.Completed;
+
+ return SendResponse(container);
+ }
+
+ /// <summary>
+ /// Sends the response.
+ /// </summary>
+ /// <param name="container">The container.</param>
+ /// <returns></returns>
+ /// <exception cref="InvalidOperationException">
+ /// </exception>
+ public Task SendResponse(MessageContainer container)
+ {
+ String token = container.Token;
+
+ LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + container.Type, LogCategory.Debug);
+
+ PendingResponse pendingResponse = null;
+
+ if (State != TransportComponentState.Connected)
+ {
+ throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}."));
+ }
+
+ LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug);
+
+ if (_pendingResponses.TryGetValue(token, out pendingResponse))
+ {
+ LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug);
+
+ if (!pendingResponse.IsContinuous)
+ {
+ LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug);
+ _pendingResponses.Remove(token);
+ }
+ else if (container.Completed)
+ {
+ LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug);
+ _pendingResponses.Remove(token);
+ }
+ }
+ else
+ {
+ //This should never happen.
+ throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical);
+ }
+
+ TaskCompletionSource<object> source = new TaskCompletionSource<object>();
+ TransportMessage<object> message = new TransportMessage<object>(token, container, TransportMessageDirection.Response, () => container.ToByteArray(), source);
+ EnqueueMessageOut(message);
+ return source.Task;
+ }
+
+ /// <summary>
+ /// Sends a response.
/// </summary>
/// <typeparam name="Response">The type of the response.</typeparam>
/// <param name="response">The response.</param>
- /// <param name="token">The token.</param>
- /// <param name="completed">The completed.</param>
- /// <param name="errorCode">The error code.</param>
- /// <param name="errorMessage">The error message.</param>
+ /// <param name="token">Request token.</param>
+ /// <param name="config">Response configuration.</param>
/// <returns></returns>
- /// <exception cref="InvalidOperationException">Matching request token was not found!</exception>
- public Task SendResponse<Response>(TangoMessage<Response> response, String token, bool? completed = null, ErrorCode? errorCode = null, String errorMessage = null) where Response : IMessage<Response>
+ /// <exception cref="InvalidOperationException">
+ /// Transporter push thread is not in a running state.
+ /// </exception>
+ public Task SendResponse<Response>(TangoMessage<Response> response, String token, TransportResponseConfig config = null) where Response : IMessage<Response>
{
+ config = config ?? new TransportResponseConfig();
+
if (_pushThread == null || _pushThread.ThreadState == ThreadState.Aborted)
{
throw new InvalidOperationException("Transporter push thread is not in a running state.");
@@ -844,19 +943,16 @@ namespace Tango.Transport
response.Container.Token = token;
- if (completed.HasValue)
- {
- response.Container.Completed = completed.Value;
- }
+ response.Container.Completed = config.Completed;
- if (errorCode.HasValue)
+ if (config.ErrorCode.HasValue)
{
- response.Container.Error = errorCode.Value;
+ response.Container.Error = config.ErrorCode.Value;
}
- if (!String.IsNullOrEmpty(errorMessage))
+ if (!String.IsNullOrEmpty(config.ErrorMessage))
{
- response.Container.ErrorMessage = errorMessage;
+ response.Container.ErrorMessage = config.ErrorMessage;
}
LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + typeof(Response).Name, LogCategory.Debug);
@@ -901,11 +997,16 @@ namespace Tango.Transport
/// Sends a general error response agnostic to the type of request.
/// </summary>
/// <param name="exception">The exception.</param>
- /// <param name="token">The token.</param>
+ /// <param name="config">Response configuration.</param>
/// <returns></returns>
- public Task SendErrorResponse(Exception exception, string token)
+ public Task SendErrorResponse(Exception exception, String token)
{
- return SendResponse<ErrorResponse>(new ErrorResponse() { }, token, true, ErrorCode.GeneralError, exception.Message);
+ return SendResponse<ErrorResponse>(new ErrorResponse() { }, token, new TransportResponseConfig()
+ {
+ ErrorCode = ErrorCode.GeneralError,
+ Completed = true,
+ ErrorMessage = exception.Message
+ });
}
#endregion
@@ -989,31 +1090,46 @@ namespace Tango.Transport
{
if (message.Token.Length != MESSAGE_TOKEN_LENGTH)
{
- message.SetException(LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Invalid message token length: " + message.Token)));
+ var ex = LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Invalid message token length: " + message.Token));
+ OnRequestFailed(message, ex);
+ message.SetException(ex);
continue;
}
- LogManager.Log($"{GetExtendedComponentName()}: Sending message on adapter: " + Adapter.Address + "...", LogCategory.Debug, message.Message);
-
if (message.Direction == TransportMessageDirection.Request)
{
+ if (message.ShouldLog)
+ {
+ LogManager.Log($"{GetExtendedComponentName()}: Sending request '{message.GetActualMessageTypeName()}'...\n{message.GetActualMessage().ToJsonString()}", LogCategory.Info);
+ OnRequestSent(message);
+ }
+
lock (_pendingRequests)
{
_pendingRequests.Add(message);
}
}
+ else
+ {
+ if (message.ShouldLog)
+ {
+ LogManager.Log($"{GetExtendedComponentName()}: Sending response '{message.GetActualMessageTypeName()}'...\n{message.GetActualMessage().ToJsonString()}", LogCategory.Info);
+ }
+ }
Adapter.Write(message.Serialize());
message.ActivateTimeout?.Invoke();
- LogManager.Log($"{GetExtendedComponentName()}: Message sent on adapter: " + Adapter.Address + "...", LogCategory.Debug, message.Message);
+ LogManager.Log($"{GetExtendedComponentName()}: Message sent...", LogCategory.Debug, message.Message);
}
else
{
if (message.Direction == TransportMessageDirection.Request)
{
- message.SetException(LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send message " + message.Message.GetType().Name + ". Adapter is disconnected.")));
+ var ex = new InvalidOperationException($"{GetExtendedComponentName()}: Could not send message " + message.Message.GetType().Name + ". Adapter is disconnected.");
+ OnRequestFailed(message, ex);
+ message.SetException(ex);
}
}
@@ -1024,6 +1140,7 @@ namespace Tango.Transport
}
catch (Exception ex)
{
+ OnRequestFailed(message, ex);
message.SetException(ex);
}
}
@@ -1053,7 +1170,7 @@ namespace Tango.Transport
{
byte[] data = _arrivedResponses.BlockDequeue();
- LogManager.Log($"{GetExtendedComponentName()}: Message received on adapter: " + Adapter.Address, LogCategory.Debug);
+ LogManager.Log($"{GetExtendedComponentName()}: Message received...", LogCategory.Debug);
LogManager.Log($"{GetExtendedComponentName()}: Parsing message container...", LogCategory.Debug);
MessageContainer container = Encoder.DecodeContainer(data);
@@ -1110,17 +1227,35 @@ namespace Tango.Transport
if (container.Error == ErrorCode.None)
{
var message = Encoder.Decode(data);
+
+ if (request.ShouldLog)
+ {
+ try
+ {
+ var messageContent = message.GetType().GetProperty("Message").GetValue(message);
+ LogManager.Log($"{GetExtendedComponentName()}: Response received '{message.Type}'...\n{messageContent.ToJsonString()}", LogCategory.Info);
+ OnResponseReceived(messageContent as IMessage);
+ }
+ catch
+ {
+ LogManager.Log("Error logging response received.", LogCategory.Warning);
+ }
+ }
+
LogManager.Log($"{GetExtendedComponentName()}: Parsing inner response message and setting pending request task result...", LogCategory.Debug, message);
request.SetResult(message, true);
LogManager.Log($"{GetExtendedComponentName()}: Message enquirer released...", LogCategory.Debug);
}
else
{
- request.SetException(LogManager.Log(new ResponseErrorException(container), LogCategory.Warning));
+ var ex = LogManager.Log(new ResponseErrorException(container));
+ OnRequestFailed(request, ex);
+ request.SetException(ex);
}
}
catch (Exception ex)
{
+ OnRequestFailed(request, ex);
request.SetException(LogManager.Log(ex, $"{GetExtendedComponentName()}: Error parsing response message."));
}
}
@@ -1134,6 +1269,20 @@ namespace Tango.Transport
{
var message = Encoder.Decode(data);
+ if (request.ShouldLog && !request.AtLeastOneResponseReceived)
+ {
+ try
+ {
+ var messageContent = message.GetType().GetProperty("Message").GetValue(message);
+ LogManager.Log($"{GetExtendedComponentName()}: Response received '{message.Type}'...\n{messageContent.ToJsonString()}", LogCategory.Info);
+ OnResponseReceived(messageContent as IMessage);
+ }
+ catch
+ {
+ LogManager.Log("Error logging response received.", LogCategory.Warning);
+ }
+ }
+
LogManager.Log($"{GetExtendedComponentName()}: Parsing inner response message and invoking continuous response callback...", LogCategory.Debug, message);
if (container.Completed)
@@ -1148,12 +1297,14 @@ namespace Tango.Transport
String m = $"{GetExtendedComponentName()}: Continuous response " + container.Type + " has been aborted: " + container.Error.ToString();
LogManager.Log(m, LogCategory.Info);
_pendingRequests.Remove(request);
- request.SetException(new ContinuousResponseAbortedException(m));
+ OnRequestFailed(request, new ContinuousResponseAbortedException(container, m));
+ request.SetException(new ContinuousResponseAbortedException(container, m));
}
else
{
LogManager.Log($"{GetExtendedComponentName()}: Continuous response has returned with error: " + container.Error.ToString(), LogCategory.Warning);
_pendingRequests.Remove(request);
+ OnRequestFailed(request, new ResponseErrorException(container));
request.SetException(new ResponseErrorException(container));
}
}
@@ -1251,7 +1402,7 @@ namespace Tango.Transport
if (_arrivedResponses.Count == 0)
{
retryCounter--;
- var response = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), KeepAliveTimeout).Result;
+ var response = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), new TransportRequestConfig() { Timeout = KeepAliveTimeout }).Result;
retryCounter = KeepAliveRetries;
}
else