aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Transport
diff options
context:
space:
mode:
authorRoy Ben Shabat <Roy.mail.net@gmail.com>2020-02-28 19:46:34 +0200
committerRoy Ben Shabat <Roy.mail.net@gmail.com>2020-02-28 19:46:34 +0200
commit18dafa9e98e171321d3847a208c0af5be6f57ef6 (patch)
tree94d44edbe93ad2c95e62179ec8e2b1da9ce4dd13 /Software/Visual_Studio/Tango.Transport
parent5f45be5bae69be7b7e916f02fb6d69b2db60e529 (diff)
downloadTango-18dafa9e98e171321d3847a208c0af5be6f57ef6.tar.gz
Tango-18dafa9e98e171321d3847a208c0af5be6f57ef6.zip
Implemented Transport Immediate mode on TCP and SignalR.
Implemented Tango.Console components.
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport')
-rw-r--r--Software/Visual_Studio/Tango.Transport/Adapters/MemoryTransportAdapter.cs2
-rw-r--r--Software/Visual_Studio/Tango.Transport/Adapters/MultiTransportAdapter.cs2
-rw-r--r--Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs29
-rw-r--r--Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs17
-rw-r--r--Software/Visual_Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs3
-rw-r--r--Software/Visual_Studio/Tango.Transport/ITransportAdapter.cs3
-rw-r--r--Software/Visual_Studio/Tango.Transport/Servers/TcpServer.cs1
-rw-r--r--Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs3
-rw-r--r--Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs2
-rw-r--r--Software/Visual_Studio/Tango.Transport/TransportRequestConfig.cs1
-rw-r--r--Software/Visual_Studio/Tango.Transport/TransportResponseConfig.cs2
-rw-r--r--Software/Visual_Studio/Tango.Transport/TransporterBase.cs10
12 files changed, 62 insertions, 13 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/MemoryTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/MemoryTransportAdapter.cs
index c23fe3b03..fa365f805 100644
--- a/Software/Visual_Studio/Tango.Transport/Adapters/MemoryTransportAdapter.cs
+++ b/Software/Visual_Studio/Tango.Transport/Adapters/MemoryTransportAdapter.cs
@@ -98,7 +98,7 @@ namespace Tango.Transport.Adapters
/// Writes the specified data to the stream.
/// </summary>
/// <param name="data">The data.</param>
- public override void Write(byte[] data)
+ public override void Write(byte[] data, bool immidiate = false)
{
ThrowIfDisposed();
diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/MultiTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/MultiTransportAdapter.cs
index b944d4ab3..f4f7eb9cf 100644
--- a/Software/Visual_Studio/Tango.Transport/Adapters/MultiTransportAdapter.cs
+++ b/Software/Visual_Studio/Tango.Transport/Adapters/MultiTransportAdapter.cs
@@ -36,7 +36,7 @@ namespace Tango.Transport.Adapters
ComponentName = "Multi Transport Adapter";
}
- public override void Write(byte[] data)
+ public override void Write(byte[] data, bool immidiate = false)
{
Adapters.ToList().ForEach(x => x.Write(data));
}
diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs
index 433fad712..f1d3e10aa 100644
--- a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs
+++ b/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs
@@ -17,6 +17,7 @@ namespace Tango.Transport.Adapters
private HubConnection _connection;
private Thread _pushThread;
private ProducerConsumerQueue<byte[]> _pushQueue;
+ private object _writeSyncObject = new object();
/// <summary>
/// Gets or sets the URL of the SignalR service.
@@ -85,14 +86,33 @@ namespace Tango.Transport.Adapters
/// Writes the specified data to the stream.
/// </summary>
/// <param name="data">The data.</param>
- public override void Write(byte[] data)
+ /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param>
+ public override void Write(byte[] data, bool immidiate = false)
{
TotalBytesSent += data.Length;
_totalBytes += data.Length;
AppendTransferRateBytes(data.Length);
- _pushQueue.BlockEnqueue(data);
+ if (!immidiate)
+ {
+ _pushQueue.BlockEnqueue(data);
+ }
+ else
+ {
+ try
+ {
+ lock (_writeSyncObject)
+ {
+ _proxy.Invoke("Write", new List<byte[]>() { data }).GetAwaiter().GetResult();
+ }
+ }
+ catch (Exception ex)
+ {
+ OnFailed(LogManager.Log(ex, $"Error writing to SignalR adapter ({Address})."));
+ return;
+ }
+ }
}
/// <summary>
@@ -274,7 +294,10 @@ namespace Tango.Transport.Adapters
{
try
{
- _proxy.Invoke("Write", dataCollection).GetAwaiter().GetResult();
+ lock (_writeSyncObject)
+ {
+ _proxy.Invoke("Write", dataCollection).GetAwaiter().GetResult();
+ }
}
catch (Exception ex)
{
diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs
index 9f726f3bf..38c577724 100644
--- a/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs
+++ b/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs
@@ -27,6 +27,7 @@ namespace Tango.Transport.Adapters
private Thread _pushThread;
private ProducerConsumerQueue<byte[]> _pushQueue;
private byte[] _size_buffer;
+ private object _writeSyncObject = new object();
#region Properties
@@ -168,7 +169,8 @@ namespace Tango.Transport.Adapters
/// Writes the specified data to the stream.
/// </summary>
/// <param name="data">The data.</param>
- public override void Write(byte[] data)
+ /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param>
+ public override void Write(byte[] data, bool immidiate = false)
{
ThrowIfDisposed();
@@ -176,9 +178,12 @@ namespace Tango.Transport.Adapters
{
data = PostProcessBuffer(data);
- if (WriteMode == TcpTransportAdapterWriteMode.Direct)
+ if (WriteMode == TcpTransportAdapterWriteMode.Direct || immidiate)
{
- _socket.GetStream().Write(data, 0, data.Length);
+ lock (_writeSyncObject)
+ {
+ _socket.GetStream().Write(data, 0, data.Length);
+ }
}
else
{
@@ -286,7 +291,11 @@ namespace Tango.Transport.Adapters
try
{
byte[] allData = dataCollection.SelectMany(a => a).ToArray();
- _socket.GetStream().Write(allData, 0, allData.Length);
+
+ lock (_writeSyncObject)
+ {
+ _socket.GetStream().Write(allData, 0, allData.Length);
+ }
}
catch (Exception ex)
{
diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs
index 084c7b5ef..485eda628 100644
--- a/Software/Visual_Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs
+++ b/Software/Visual_Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs
@@ -202,7 +202,8 @@ namespace Tango.Transport.Adapters
/// Writes the specified data to the stream.
/// </summary>
/// <param name="data">The data.</param>
- public override void Write(byte[] data)
+ /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param>
+ public override void Write(byte[] data, bool immidiate = false)
{
ThrowIfDisposed();
diff --git a/Software/Visual_Studio/Tango.Transport/ITransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/ITransportAdapter.cs
index 5e6b528c2..3cfd1ca48 100644
--- a/Software/Visual_Studio/Tango.Transport/ITransportAdapter.cs
+++ b/Software/Visual_Studio/Tango.Transport/ITransportAdapter.cs
@@ -33,7 +33,8 @@ namespace Tango.Transport
/// Writes the specified data to the stream.
/// </summary>
/// <param name="data">The data.</param>
- void Write(byte[] data);
+ /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param>
+ void Write(byte[] data, bool immidiate = false);
/// <summary>
/// Occurs when new data is available.
diff --git a/Software/Visual_Studio/Tango.Transport/Servers/TcpServer.cs b/Software/Visual_Studio/Tango.Transport/Servers/TcpServer.cs
index 29cc3c7e8..8dcf2d661 100644
--- a/Software/Visual_Studio/Tango.Transport/Servers/TcpServer.cs
+++ b/Software/Visual_Studio/Tango.Transport/Servers/TcpServer.cs
@@ -58,6 +58,7 @@ namespace Tango.Transport.Servers
if (!IsStarted)
{
Listener = new TcpListener(System.Net.IPAddress.Any, Port);
+ Listener.ExclusiveAddressUse = false;
Listener.Start();
IsStarted = true;
LogManager.Log($"TCP started on port {Port}.");
diff --git a/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs b/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs
index d17bde261..2696f5698 100644
--- a/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs
+++ b/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs
@@ -219,7 +219,8 @@ namespace Tango.Transport
/// Writes the specified data to the stream.
/// </summary>
/// <param name="data">The data.</param>
- public abstract void Write(byte[] data);
+ /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param>
+ public abstract void Write(byte[] data, bool immidiate = false);
/// <summary>
/// Connects the transport component.
diff --git a/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs b/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs
index 849317b9b..7e22c7b46 100644
--- a/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs
+++ b/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs
@@ -52,6 +52,8 @@ namespace Tango.Transport
/// </summary>
public bool ShouldLog { get; set; }
+ public bool Immidiate { get; set; }
+
/// <summary>
/// Notifies the message observer of the new result.
/// </summary>
diff --git a/Software/Visual_Studio/Tango.Transport/TransportRequestConfig.cs b/Software/Visual_Studio/Tango.Transport/TransportRequestConfig.cs
index 6250cd881..255a566a7 100644
--- a/Software/Visual_Studio/Tango.Transport/TransportRequestConfig.cs
+++ b/Software/Visual_Studio/Tango.Transport/TransportRequestConfig.cs
@@ -10,5 +10,6 @@ namespace Tango.Transport
{
public TimeSpan? Timeout { get; set; }
public bool ShouldLog { get; set; }
+ public bool Immediate { get; set; }
}
}
diff --git a/Software/Visual_Studio/Tango.Transport/TransportResponseConfig.cs b/Software/Visual_Studio/Tango.Transport/TransportResponseConfig.cs
index 449482e8a..44115d99a 100644
--- a/Software/Visual_Studio/Tango.Transport/TransportResponseConfig.cs
+++ b/Software/Visual_Studio/Tango.Transport/TransportResponseConfig.cs
@@ -12,5 +12,7 @@ namespace Tango.Transport
public bool Completed { get; set; }
public ErrorCode? ErrorCode { get; set; }
public String ErrorMessage { get; set; }
+ public bool ShouldLog { get; set; }
+ public bool Immediate { get; set; }
}
}
diff --git a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
index e0f9d46dd..c9388678d 100644
--- a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
+++ b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
@@ -499,6 +499,7 @@ namespace Tango.Transport
TaskCompletionSource<IMessage> source = new TaskCompletionSource<IMessage>();
TransportMessage<IMessage> message = new TransportMessage<IMessage>(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), source);
message.ShouldLog = config.ShouldLog;
+ message.Immidiate = config.Immediate;
message.TransportComponentName = GetExtendedComponentName();
message.ActivateTimeout = () =>
@@ -547,6 +548,7 @@ namespace Tango.Transport
TaskCompletionSource<MessageContainer> source = new TaskCompletionSource<MessageContainer>();
TransportMessage<MessageContainer> message = new TransportMessage<MessageContainer>(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), source);
message.ShouldLog = config.ShouldLog;
+ message.Immidiate = config.Immediate;
message.TransportComponentName = GetExtendedComponentName();
message.ActivateTimeout = () =>
@@ -597,6 +599,7 @@ namespace Tango.Transport
TaskCompletionSource<TangoMessage<Response>> source = new TaskCompletionSource<TangoMessage<Response>>();
TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source);
message.ShouldLog = config.ShouldLog;
+ message.Immidiate = config.Immediate;
message.TransportComponentName = GetExtendedComponentName();
message.ActivateTimeout = () =>
@@ -656,6 +659,7 @@ namespace Tango.Transport
IsContinuous = true,
ContinuesResponseSubject = subject,
ShouldLog = config.ShouldLog,
+ Immidiate = config.Immediate,
TransportComponentName = GetExtendedComponentName(),
};
@@ -717,6 +721,7 @@ namespace Tango.Transport
IsContinuous = true,
ContinuesResponseSubject = subject,
ShouldLog = config.ShouldLog,
+ Immidiate = config.Immediate,
TransportComponentName = GetExtendedComponentName(),
};
@@ -798,6 +803,7 @@ namespace Tango.Transport
IsContinuous = true,
ContinuesResponseSubject = subject,
ShouldLog = config.ShouldLog,
+ Immidiate = config.Immediate,
TransportComponentName = GetExtendedComponentName(),
};
@@ -1044,6 +1050,8 @@ namespace Tango.Transport
TaskCompletionSource<object> source = new TaskCompletionSource<object>();
TransportMessage<object> message = new TransportMessage<object>(token, response, TransportMessageDirection.Response, () => Encoder.Encode(response), source);
+ message.ShouldLog = config.ShouldLog;
+ message.Immidiate = config.Immediate;
EnqueueMessageOut(message);
return source.Task;
}
@@ -1172,7 +1180,7 @@ namespace Tango.Transport
}
}
- Adapter.Write(message.Serialize());
+ Adapter.Write(message.Serialize(), message.Immidiate);
message.ActivateTimeout?.Invoke();