using Google.Protobuf;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Tango.Core;
using Tango.Logging;
using Tango.PMR;
using Tango.PMR.Common;
using System.Reactive.Linq;
using System.ServiceModel;
using Tango.Transport.Encoders;
using Tango.PMR.Connection;
using Tango.Core.Threading;
using System.IO;
using Tango.Core.ExtensionMethods;
using Tango.PMR.Integration;
using Newtonsoft.Json;
using System.Diagnostics;
using System.Reactive.Concurrency;
namespace Tango.Transport
{
///
/// Represents an base class.
///
///
public abstract class TransporterBase : ExtendedObject, ITransporter
{
protected static long _component_counter = 1;
private class RequestHandler
{
public Type RequestType { get; set; }
public Action Callback { get; set; }
public object RegisteredCallback { get; set; }
}
private const int MESSAGE_TOKEN_LENGTH = 36;
private PriorityProducerConsumerQueue _sendingQueue;
private ConcurrentList _pendingRequests;
private ProducerConsumerQueue _arrivedResponses;
private Thread _pushThread;
private Thread _pullThread;
private Thread _keepAliveThread;
private ITransportAdapter _adapter;
private Dictionary _pendingResponses;
private DateTime _lastKeepAliveTime;
private List _requestHandlers;
private static JsonSerializerSettings _genericMessageSettings = new JsonSerializerSettings()
{
TypeNameHandling = TypeNameHandling.All,
};
#region Events
///
/// Occurs when a new request message has been received.
///
public event EventHandler RequestReceived;
///
/// Occurs when a new response message has been received.
///
public event EventHandler PendingResponseReceived;
///
/// Occurs when component state changes.
///
public event EventHandler StateChanged;
///
/// Occurs when a request has been sent.
///
public event EventHandler RequestSent;
///
/// Occurs when a request response has been received.
///
public event EventHandler ResponseReceived;
///
/// Occurs when a request has failed.
///
public event EventHandler RequestFailed;
#endregion
#region Properties
///
/// Gets or sets the name of the transport component.
///
public String ComponentName { get; set; }
///
/// Gets or sets the used to read and write raw data.
///
public ITransportAdapter Adapter
{
get { return _adapter; }
set
{
var previous = _adapter;
_adapter = value;
OnAdapterChanged(previous, value);
RaisePropertyChangedAuto();
}
}
///
/// Gets or sets the transport encoder used to encode and decode tango messages.
///
public ITransportEncoder Encoder { get; set; }
private TransportComponentState _state;
///
/// Gets the component state.
///
public TransportComponentState State
{
get { return _state; }
protected set
{
if (_state != value)
{
_state = value;
OnStateChanged(_state);
}
}
}
///
/// Gets or sets the request timeout.
///
public TimeSpan RequestTimeout { get; set; }
private bool _useKeepAlive;
///
/// Gets or sets a value indicating whether to use a keep alive mechanism.
///
public bool UseKeepAlive
{
get { return _useKeepAlive; }
set
{
_useKeepAlive = value;
RaisePropertyChangedAuto();
if (_useKeepAlive)
{
LogManager.Log($"{GetExtendedComponentName()}: KeepAlive is activated...");
}
else
{
LogManager.Log($"{GetExtendedComponentName()}: KeepAlive is deactivated.");
}
}
}
///
/// Gets or sets the keep alive timeout.
///
public TimeSpan KeepAliveTimeout { get; set; }
///
/// Gets or sets the keep alive retries.
///
public int KeepAliveRetries { get; set; }
///
/// Gets or sets a value indicating whether to auto respond to keep alive requests.
///
public bool EnableKeepAliveAutoResponse { get; set; }
///
/// Gets or sets a value indicating whether the transporter will get in to a failed state if any adapter has failed.
///
public bool FailsWithAdapter { get; set; }
///
/// Gets the last failed state exception/reason.
///
public Exception FailedStateException { get; private set; }
private GenericMessageProtocol _genericProtocol;
///
/// Gets or sets the generic protocol used to serialize/deserialize generic messages.
///
public GenericMessageProtocol GenericProtocol
{
get { return _genericProtocol; }
set { _genericProtocol = value; RaisePropertyChangedAuto(); }
}
#endregion
#region Virtual Methods
///
/// Called when the has changed.
///
/// The adapter.
protected async virtual void OnAdapterChanged(ITransportAdapter oldAdapter, ITransportAdapter newAdapter)
{
if (oldAdapter != newAdapter)
{
_pendingRequests.Clear();
_pendingResponses.Clear();
_arrivedResponses = new ProducerConsumerQueue();
_sendingQueue = new PriorityProducerConsumerQueue();
}
if (oldAdapter != null)
{
oldAdapter.StateChanged -= OnAdapterStateChanged;
oldAdapter.DataAvailable -= OnAdapterDataAvailable;
}
if (newAdapter != null)
{
LogManager.Log($"{GetExtendedComponentName()}: Adapter Changed: Type = {newAdapter.GetType().Name}, Address = {newAdapter.Address}, State = {newAdapter.State}");
newAdapter.StateChanged -= OnAdapterStateChanged;
newAdapter.DataAvailable -= OnAdapterDataAvailable;
newAdapter.StateChanged += OnAdapterStateChanged;
newAdapter.DataAvailable += OnAdapterDataAvailable;
if (State == TransportComponentState.Connected && newAdapter.State == TransportComponentState.Disconnected)
{
await newAdapter.Connect();
}
}
else
{
LogManager.Log($"{GetExtendedComponentName()}: Adapter Changed: null");
}
}
///
/// Called when the current adapter state has changed.
///
/// The sender.
/// The e.
protected virtual void OnAdapterStateChanged(object sender, TransportComponentState e)
{
if (e == TransportComponentState.Failed && FailsWithAdapter)
{
OnFailed(new CommunicationException($"The adapter has failed with exception '{Adapter.FailedStateException.Message}' and the transporter is configured to fail with the adapter."));
}
}
///
/// Called when there is data available from the adapter.
///
/// The sender.
/// The data.
protected virtual void OnAdapterDataAvailable(object sender, byte[] data)
{
_lastKeepAliveTime = DateTime.Now;
EnqueueMessageIn(data);
}
///
/// Called when the component has failed.
///
/// The ex.
protected virtual async void OnFailed(Exception ex)
{
if (State != TransportComponentState.Failed)
{
FailedStateException = ex;
LogManager.Log(ex, $"{GetExtendedComponentName()}: Transporter failed.");
State = TransportComponentState.Failed;
await OnPostDisconnection();
}
else
{
LogManager.Log(ex, LogCategory.Warning, $"{GetExtendedComponentName()}: OnFailed called while state is already failed!");
}
}
protected virtual async Task OnPostDisconnection()
{
try
{
if (_pullThread != null)
{
_pullThread.Abort();
_pushThread.Abort();
_keepAliveThread.Abort();
}
}
catch { }
if (Adapter != null)
{
await Adapter.Disconnect();
}
NotifyContinuousRequestMessagesDisconnection();
}
///
/// Called when a new request has been received.
///
/// The request.
protected virtual void OnRequestReceived(RequestReceivedEventArgs e)
{
var container = e.Container;
if (_requestHandlers.Count > 0)
{
if (container.Type != MessageType.GenericRequest)
{
var handlers = _requestHandlers.Where(x => x.RequestType.Name == container.Type.ToOriginalName()).ToList();
if (handlers.Count > 0) //Handle
{
e.Handled = true;
var request = MessageFactory.ExtractMessageFromContainer(container);
foreach (var handler in handlers)
{
try
{
handler.Callback.Invoke(this, request, container.Token);
}
catch
{
//Ignore any exception on the client side.
}
}
}
}
else
{
var genericRequest = MessageFactory.ExtractMessageFromContainer(container);
var handlers = _requestHandlers.Where(x => x.RequestType.AssemblyQualifiedName == genericRequest.Type).ToList();
if (handlers.Count > 0)
{
e.Handled = true;
var innerRequest = GenericMessageSerializer.DeserializeFromByteString(handlers[0].RequestType, genericRequest.Data, GenericProtocol);
foreach (var handler in handlers)
{
try
{
handler.Callback.Invoke(this, innerRequest, container.Token);
}
catch
{
//Ignore any exception on the client side.
}
}
}
}
}
RequestReceived?.Invoke(this, e);
}
///
/// Called when a new response has been received.
///
/// The request.
protected virtual void OnResponseReceived(MessageContainer container)
{
PendingResponseReceived?.Invoke(this, container);
}
///
/// Called when the component state has changed.
///
/// The state.
protected virtual void OnStateChanged(TransportComponentState state)
{
StateChanged?.Invoke(this, state);
}
///
/// Returns a string representing the component name. If an adapter is attached it will be part of the string.
///
///
protected virtual String GetExtendedComponentName()
{
return Adapter != null ? $"{ComponentName} ({Adapter.Address})" : ComponentName;
}
///
/// Notifies all the continuous request messages about disconnection.
///
protected virtual void NotifyContinuousRequestMessagesDisconnection()
{
LogManager.Log("Notifying all continuous request messages about disconnection...");
foreach (var request in _pendingRequests.ToList().Where(x => x.Direction == TransportMessageDirection.Request && x.IsContinuous))
{
try
{
_pendingRequests.Remove(request);
LogManager.Log($"Notifying continuous request '{(request.Message as ITangoMessage).Type}'...");
OnRequestFailed(request, new TransporterDisconnectedException("Transporter disconnected."));
request.SetException(new TransporterDisconnectedException("Transporter disconnected."));
}
catch (Exception e)
{
System.Diagnostics.Debug.WriteLine(e.ToString());
}
}
}
///
/// Called when the request has been sent
///
/// The request.
private void OnRequestSent(TransportMessageBase request)
{
if (request.ShouldLog)
{
IMessage message = request.GetActualMessage() as IMessage;
if (message != null)
{
RequestSent?.Invoke(this, message);
}
}
}
///
/// Called when the response has been received
///
/// The response.
private void OnResponseReceived(IMessage response)
{
if (response != null)
{
ResponseReceived?.Invoke(this, response);
}
}
///
/// Called when the request has been failed
///
/// The request.
private void OnRequestFailed(TransportMessageBase request, Exception exception)
{
IMessage message = request.GetActualMessage() as IMessage;
if (message != null)
{
RequestFailed?.Invoke(this, new RequestFailedEventArgs(message, exception));
}
}
#endregion
#region Constructors
///
/// Initializes a new instance of the class.
///
public TransporterBase()
{
ComponentName = "Not Set";
Encoder = new ProtoEncoder();
_pendingResponses = new Dictionary();
_sendingQueue = new PriorityProducerConsumerQueue();
_pendingRequests = new ConcurrentList();
_arrivedResponses = new ProducerConsumerQueue();
_requestHandlers = new List();
RequestTimeout = TimeSpan.FromSeconds(5);
EnableKeepAliveAutoResponse = true;
KeepAliveTimeout = TimeSpan.FromSeconds(2);
KeepAliveRetries = 1;
}
///
/// Initializes a new instance of the class.
///
/// The transport adapter.
public TransporterBase(ITransportAdapter adapter) : this()
{
Adapter = adapter;
}
#endregion
#region Public Methods
///
/// Clears all message queues.
///
public void ClearQueues()
{
_sendingQueue = new PriorityProducerConsumerQueue();
_pendingRequests = new ConcurrentList();
_arrivedResponses = new ProducerConsumerQueue();
}
///
/// Connects the transport component.
///
///
public virtual async Task Connect()
{
if (Adapter != null)
{
await Adapter.Connect();
}
State = TransportComponentState.Connected;
StartThreads();
LogManager.Log($"{GetExtendedComponentName()}: Transporter Connected...");
}
///
/// Disconnects the transport component.
///
///
public virtual async Task Disconnect()
{
if (State == TransportComponentState.Connected)
{
State = TransportComponentState.Disconnected;
await OnPostDisconnection();
LogManager.Log($"{GetExtendedComponentName()}: Transporter Disconnected...");
}
}
#endregion
#region Public Request Methods
///
/// Sends a request.
///
/// The request.
/// Request configuration.
///
///
public Task SendRequest(IMessage request, TransportRequestConfig config = null)
{
MessageContainer container = new MessageContainer();
container.Token = Guid.NewGuid().ToString();
container.Data = request.ToByteString();
container.Timeout = config.Timeout.HasValue ? (UInt32)config.Timeout.Value.TotalMilliseconds : (UInt32)RequestTimeout.TotalMilliseconds;
container.Type = MessageFactory.ParseMessageType(request.GetType().Name);
return SendRequestInternal(container.Token, container, config).Task;
}
///
/// Sends the request.
///
///
/// Request configuration.
///
///
public Task SendRequest(MessageContainer container, TransportRequestConfig config = null)
{
return SendRequestInternal(container.Token, container, config).Task;
}
///
/// Sends a request.
///
/// The type of the request.
/// The type of the response.
/// The request.
/// Request configuration.
///
///
public Task> SendRequest(TangoMessage request, TransportRequestConfig config = null) where Request : IMessage where Response : IMessage
{
return SendRequestInternal>(request.Container.Token, request, config).Task;
}
private TaskCompletionSource SendRequestInternal(String token, Object request, TransportRequestConfig config = null)
{
config = config ?? new TransportRequestConfig();
TaskCompletionSource source = new TaskCompletionSource();
String requestName = TransportMessageBase.GetActualMessageTypeName(request);
Func toBytes = null;
if (request is ITangoMessage tangoMessage)
{
toBytes = () => Encoder.Encode(tangoMessage);
}
else if (request is IMessage protoMessage)
{
toBytes = () => protoMessage.ToByteArray();
}
TransportMessage message = new TransportMessage(token, request, TransportMessageDirection.Request, toBytes, source);
message.ShouldLog = config.ShouldLog;
message.Immidiate = config.Immediate;
message.Priority = config.Priority;
message.TransportComponentName = GetExtendedComponentName();
message.ThreadingMode = config.ThreadingMode;
TimeSpan? timeout = config.Timeout;
if (request is MessageContainer container)
{
timeout = GetContainerTimeoutOrDefault(container);
}
if (timeout == null)
{
timeout = RequestTimeout;
}
if (request is ITangoMessage tanMessage)
{
tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds;
}
String responseName = requestName.Replace("Request", "Response");
LogManager.Log($"{GetExtendedComponentName()}: Queuing request message: {requestName} Token: {token}", LogCategory.Debug);
LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug);
if (State != TransportComponentState.Connected)
{
throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}."));
}
Action timeoutAction = () =>
{
if (!source.Task.IsCompleted)
{
TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds.");
OnRequestFailed(message, ex);
message.SetException(ex);
}
};
if (config.ThreadingMode == TransportThreadingMode.NewThread)
{
message.ActivateTimeout = () =>
{
TimeoutTask.StartNew(() =>
{
timeoutAction();
}, timeout.Value);
};
}
else
{
message.ActivateTimeout = () =>
{
Task.Delay(timeout.Value).ContinueWith((x) =>
{
timeoutAction();
});
};
}
EnqueueMessageOut(message);
return source;
}
///
/// Sends a request and expecting multiple response messages.
///
///
/// Request configuration.
///
///
public IObservable SendContinuousRequest(IMessage request, TransportContinuousRequestConfig config = null)
{
MessageContainer container = new MessageContainer();
container.Token = Guid.NewGuid().ToString();
container.Data = request.ToByteString();
container.Type = MessageFactory.ParseMessageType(request.GetType().Name);
//We need to assign this timeout because when the internal method detects a MessageContainer it will bypass the container continuous timeout assignment.
container.ContinuousTimeout = config.ContinuousTimeout != null ? (UInt32)config.ContinuousTimeout.Value.TotalMilliseconds : 0;
container.Continuous = true;
return SendContinuousRequestInternal(container.Token, container, config);
}
///
/// Sends a request and expecting multiple response messages.
///
/// The type of the request.
/// The type of the response.
///
/// Request configuration.
///
///
public IObservable> SendContinuousRequest(TangoMessage request, TransportContinuousRequestConfig config = null) where Request : IMessage where Response : IMessage
{
request.Container.Continuous = true;
request.Container.Completed = false;
return SendContinuousRequestInternal>(request.Container.Token, request, config);
}
///
/// Sends a continuous request.
///
/// The container.
/// Request configuration.
///
///
public IObservable SendContinuousRequest(MessageContainer container, TransportContinuousRequestConfig config = null)
{
return SendContinuousRequestInternal(container.Token, container, config);
}
private IObservable SendContinuousRequestInternal(String token, Object request, TransportContinuousRequestConfig config = null)
{
config = config ?? new TransportContinuousRequestConfig();
String requestName = TransportMessageBase.GetActualMessageTypeName(request);
Func toBytes = null;
if (request is ITangoMessage tangoMessage)
{
toBytes = () => Encoder.Encode(tangoMessage);
}
else if (request is IMessage protoMessage)
{
toBytes = () => protoMessage.ToByteArray();
}
Subject subject = new Subject();
TransportMessage message = new TransportMessage(token, request, TransportMessageDirection.Request, toBytes, null);
message.ShouldLog = config.ShouldLog;
message.Immidiate = config.Immediate;
message.Priority = config.Priority;
message.TransportComponentName = GetExtendedComponentName();
message.IsContinuous = true;
message.ContinuesResponseSubject = subject;
TimeSpan? timeout = config.Timeout;
TimeSpan? continuousTimeout = config.ContinuousTimeout;
if (request is MessageContainer container)
{
timeout = GetContainerTimeoutOrDefault(container);
if (timeout == null)
{
timeout = config.Timeout != null ? config.Timeout.Value : RequestTimeout;
container.Timeout = (uint)timeout.Value.TotalMilliseconds;
}
continuousTimeout = GetContainerContinuousTimeoutOrDefault(container);
}
if (timeout == null)
{
timeout = RequestTimeout;
}
if (request is ITangoMessage tanMessage)
{
tanMessage.Container.Continuous = true;
tanMessage.Container.Completed = false;
tanMessage.Container.Timeout = (uint)timeout.Value.TotalMilliseconds;
tanMessage.Container.ContinuousTimeout = continuousTimeout != null ? (UInt32)continuousTimeout.Value.TotalMilliseconds : 0;
}
String responseName = requestName.Replace("Request", "Response");
LogManager.Log($"{GetExtendedComponentName()}: Queuing continuous request message: {requestName} Token: {token}", LogCategory.Debug);
LogManager.Log($"{GetExtendedComponentName()}: Expected response: {responseName}", LogCategory.Debug);
if (State != TransportComponentState.Connected)
{
throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the request while transporter state is {State}."));
}
Action timeoutAction = () =>
{
if (!message.AtLeastOneResponseReceived)
{
TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Request message '{requestName}' had timed out after {timeout.Value.TotalSeconds} seconds.");
OnRequestFailed(message, ex);
message.SetException(ex);
}
if (continuousTimeout != null)
{
Task.Factory.StartNew(async () =>
{
while (!message.Completed)
{
await Task.Delay(continuousTimeout.Value).ContinueWith((y) =>
{
if (!message.Completed)
{
if (DateTime.Now - message.LastResponseTime > continuousTimeout.Value)
{
TimeoutException ex = new TimeoutException($"{GetExtendedComponentName()}: Continuous request message '{requestName}' had failed to provide a response for a period of {continuousTimeout.Value.TotalSeconds} seconds and has timed out.");
OnRequestFailed(message, ex);
message.SetException(ex);
return;
}
}
});
}
});
}
};
if (config.ThreadingMode == TransportThreadingMode.NewThread)
{
message.ActivateTimeout = () =>
{
TimeoutTask.StartNew(() =>
{
timeoutAction();
}, timeout.Value);
};
}
else
{
message.ActivateTimeout = () =>
{
Task.Delay(timeout.Value).ContinueWith((x) =>
{
timeoutAction();
});
};
}
EnqueueMessageOut(message);
return subject.AsObservable();
}
///
/// Sends a generic request of any type.
///
/// The type of the request.
/// The type of the response.
/// The request.
/// The configuration.
///
public async Task SendGenericRequest(Request request, TransportRequestConfig config = null) where Request : class where Response : class
{
GenericRequest genericRequest = new GenericRequest();
genericRequest.Type = request.GetType().AssemblyQualifiedName;
genericRequest.Data = GenericMessageSerializer.SerializeToByteString(request, GenericProtocol);
var response = await SendRequest(genericRequest, config);
var responseObject = GenericMessageSerializer.DeserializeFromByteString(response.Message.Data, GenericProtocol);
return responseObject;
}
///
/// Sends a generic request and expecting multiple generic response messages.
///
/// The type of the request.
/// The type of the response.
/// Request configuration.
///
public IObservable SendGenericContinuousRequest(Request request, TransportContinuousRequestConfig config = null) where Request : class where Response : class
{
GenericRequest genericRequest = new GenericRequest();
genericRequest.Type = request.GetType().AssemblyQualifiedName;
genericRequest.Data = GenericMessageSerializer.SerializeToByteString(request, GenericProtocol);
Subject subject = new Subject();
SendContinuousRequest(genericRequest, config).Subscribe((response) =>
{
try
{
var responseObject = GenericMessageSerializer.DeserializeFromByteString(response.Message.Data, GenericProtocol);
subject.OnNext(responseObject);
}
catch (Exception ex)
{
Debugger.Break();
System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request continuous handler.\n{ex.ToString()}");
//Ignore exception at the client side.
}
}, (ex) =>
{
try
{
subject.OnError(ex);
}
catch (Exception xx)
{
Debugger.Break();
System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request error handler.\n{xx.ToString()}");
//Ignore exception at the client side.
}
}, () =>
{
try
{
subject.OnCompleted();
}
catch (Exception ex)
{
Debugger.Break();
System.Diagnostics.Debug.WriteLine($"Exception thrown by the generic request completed handler.\n{ex.ToString()}");
//Ignore exception at the client side.
}
});
return subject.AsObservable();
}
#endregion
#region Public Response Methods
///
/// Sends a response.
///
/// The type of the response.
/// The response.
///
public Task SendResponse(TangoMessage response) where Response : IMessage
{
return SendResponse(response, response.Container.Token);
}
///
/// Sends the response.
///
///
/// Request token.
/// Response configuration.
///
public Task SendResponse(IMessage response, String token, TransportResponseConfig config = null)
{
config = config ?? new TransportResponseConfig();
String responseName = response.GetType().Name;
MessageContainer container = new MessageContainer();
container.Token = token;
container.Data = response.ToByteString();
container.Type = MessageFactory.ParseMessageType(responseName);
if (config.ErrorCode.HasValue)
{
container.Error = config.ErrorCode.Value;
}
if (config.ErrorMessage != null)
{
container.ErrorMessage = config.ErrorMessage;
}
container.EventCode = (int)config.EventType;
container.Completed = config.Completed;
return SendResponse(container);
}
///
/// Sends the response.
///
/// The container.
///
///
///
public Task SendResponse(MessageContainer container, TransportResponseConfig config = null)
{
config = config ?? new TransportResponseConfig();
if (_pushThread == null || _pushThread.ThreadState == System.Threading.ThreadState.Aborted)
{
throw new InvalidOperationException("Transporter push thread is not in a running state.");
}
String token = container.Token;
LogManager.Log($"{GetExtendedComponentName()}: Queuing response message: " + container.Type, LogCategory.Debug);
PendingResponse pendingResponse = null;
if (State != TransportComponentState.Connected)
{
throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Could not send the response while transporter state is {State}."));
}
LogManager.Log($"{GetExtendedComponentName()}: Searching for matching request token: " + token, LogCategory.Debug);
if (_pendingResponses.TryGetValue(token, out pendingResponse))
{
LogManager.Log($"{GetExtendedComponentName()}: Found matching request token: " + token, LogCategory.Debug);
if (!pendingResponse.IsContinuous)
{
LogManager.Log($"{GetExtendedComponentName()}: Removing matching request token.", LogCategory.Debug);
_pendingResponses.Remove(token);
}
else if (container.Completed)
{
LogManager.Log($"{GetExtendedComponentName()}: Response completed. Removing matching request token.", LogCategory.Debug);
_pendingResponses.Remove(token);
}
}
else
{
//This should never happen.
throw LogManager.Log(new InvalidOperationException($"{GetExtendedComponentName()}: Matching request token was not found!"), LogCategory.Critical);
}
TaskCompletionSource