diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2020-02-28 19:46:34 +0200 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2020-02-28 19:46:34 +0200 |
| commit | 18dafa9e98e171321d3847a208c0af5be6f57ef6 (patch) | |
| tree | 94d44edbe93ad2c95e62179ec8e2b1da9ce4dd13 /Software/Visual_Studio/Tango.Transport | |
| parent | 5f45be5bae69be7b7e916f02fb6d69b2db60e529 (diff) | |
| download | Tango-18dafa9e98e171321d3847a208c0af5be6f57ef6.tar.gz Tango-18dafa9e98e171321d3847a208c0af5be6f57ef6.zip | |
Implemented Transport Immediate mode on TCP and SignalR.
Implemented Tango.Console components.
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport')
12 files changed, 62 insertions, 13 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/MemoryTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/MemoryTransportAdapter.cs index c23fe3b03..fa365f805 100644 --- a/Software/Visual_Studio/Tango.Transport/Adapters/MemoryTransportAdapter.cs +++ b/Software/Visual_Studio/Tango.Transport/Adapters/MemoryTransportAdapter.cs @@ -98,7 +98,7 @@ namespace Tango.Transport.Adapters /// Writes the specified data to the stream. /// </summary> /// <param name="data">The data.</param> - public override void Write(byte[] data) + public override void Write(byte[] data, bool immidiate = false) { ThrowIfDisposed(); diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/MultiTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/MultiTransportAdapter.cs index b944d4ab3..f4f7eb9cf 100644 --- a/Software/Visual_Studio/Tango.Transport/Adapters/MultiTransportAdapter.cs +++ b/Software/Visual_Studio/Tango.Transport/Adapters/MultiTransportAdapter.cs @@ -36,7 +36,7 @@ namespace Tango.Transport.Adapters ComponentName = "Multi Transport Adapter"; } - public override void Write(byte[] data) + public override void Write(byte[] data, bool immidiate = false) { Adapters.ToList().ForEach(x => x.Write(data)); } diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs index 433fad712..f1d3e10aa 100644 --- a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs +++ b/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs @@ -17,6 +17,7 @@ namespace Tango.Transport.Adapters private HubConnection _connection; private Thread _pushThread; private ProducerConsumerQueue<byte[]> _pushQueue; + private object _writeSyncObject = new object(); /// <summary> /// Gets or sets the URL of the SignalR service. @@ -85,14 +86,33 @@ namespace Tango.Transport.Adapters /// Writes the specified data to the stream. /// </summary> /// <param name="data">The data.</param> - public override void Write(byte[] data) + /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param> + public override void Write(byte[] data, bool immidiate = false) { TotalBytesSent += data.Length; _totalBytes += data.Length; AppendTransferRateBytes(data.Length); - _pushQueue.BlockEnqueue(data); + if (!immidiate) + { + _pushQueue.BlockEnqueue(data); + } + else + { + try + { + lock (_writeSyncObject) + { + _proxy.Invoke("Write", new List<byte[]>() { data }).GetAwaiter().GetResult(); + } + } + catch (Exception ex) + { + OnFailed(LogManager.Log(ex, $"Error writing to SignalR adapter ({Address}).")); + return; + } + } } /// <summary> @@ -274,7 +294,10 @@ namespace Tango.Transport.Adapters { try { - _proxy.Invoke("Write", dataCollection).GetAwaiter().GetResult(); + lock (_writeSyncObject) + { + _proxy.Invoke("Write", dataCollection).GetAwaiter().GetResult(); + } } catch (Exception ex) { diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs index 9f726f3bf..38c577724 100644 --- a/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs +++ b/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs @@ -27,6 +27,7 @@ namespace Tango.Transport.Adapters private Thread _pushThread; private ProducerConsumerQueue<byte[]> _pushQueue; private byte[] _size_buffer; + private object _writeSyncObject = new object(); #region Properties @@ -168,7 +169,8 @@ namespace Tango.Transport.Adapters /// Writes the specified data to the stream. /// </summary> /// <param name="data">The data.</param> - public override void Write(byte[] data) + /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param> + public override void Write(byte[] data, bool immidiate = false) { ThrowIfDisposed(); @@ -176,9 +178,12 @@ namespace Tango.Transport.Adapters { data = PostProcessBuffer(data); - if (WriteMode == TcpTransportAdapterWriteMode.Direct) + if (WriteMode == TcpTransportAdapterWriteMode.Direct || immidiate) { - _socket.GetStream().Write(data, 0, data.Length); + lock (_writeSyncObject) + { + _socket.GetStream().Write(data, 0, data.Length); + } } else { @@ -286,7 +291,11 @@ namespace Tango.Transport.Adapters try { byte[] allData = dataCollection.SelectMany(a => a).ToArray(); - _socket.GetStream().Write(allData, 0, allData.Length); + + lock (_writeSyncObject) + { + _socket.GetStream().Write(allData, 0, allData.Length); + } } catch (Exception ex) { diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs index 084c7b5ef..485eda628 100644 --- a/Software/Visual_Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs +++ b/Software/Visual_Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs @@ -202,7 +202,8 @@ namespace Tango.Transport.Adapters /// Writes the specified data to the stream. /// </summary> /// <param name="data">The data.</param> - public override void Write(byte[] data) + /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param> + public override void Write(byte[] data, bool immidiate = false) { ThrowIfDisposed(); diff --git a/Software/Visual_Studio/Tango.Transport/ITransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/ITransportAdapter.cs index 5e6b528c2..3cfd1ca48 100644 --- a/Software/Visual_Studio/Tango.Transport/ITransportAdapter.cs +++ b/Software/Visual_Studio/Tango.Transport/ITransportAdapter.cs @@ -33,7 +33,8 @@ namespace Tango.Transport /// Writes the specified data to the stream. /// </summary> /// <param name="data">The data.</param> - void Write(byte[] data); + /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param> + void Write(byte[] data, bool immidiate = false); /// <summary> /// Occurs when new data is available. diff --git a/Software/Visual_Studio/Tango.Transport/Servers/TcpServer.cs b/Software/Visual_Studio/Tango.Transport/Servers/TcpServer.cs index 29cc3c7e8..8dcf2d661 100644 --- a/Software/Visual_Studio/Tango.Transport/Servers/TcpServer.cs +++ b/Software/Visual_Studio/Tango.Transport/Servers/TcpServer.cs @@ -58,6 +58,7 @@ namespace Tango.Transport.Servers if (!IsStarted) { Listener = new TcpListener(System.Net.IPAddress.Any, Port); + Listener.ExclusiveAddressUse = false; Listener.Start(); IsStarted = true; LogManager.Log($"TCP started on port {Port}."); diff --git a/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs b/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs index d17bde261..2696f5698 100644 --- a/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs @@ -219,7 +219,8 @@ namespace Tango.Transport /// Writes the specified data to the stream. /// </summary> /// <param name="data">The data.</param> - public abstract void Write(byte[] data); + /// <param name="immidiate">Writes the data as soon as possible while ignoring any message queuing and batching.</param> + public abstract void Write(byte[] data, bool immidiate = false); /// <summary> /// Connects the transport component. diff --git a/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs b/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs index 849317b9b..7e22c7b46 100644 --- a/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs @@ -52,6 +52,8 @@ namespace Tango.Transport /// </summary> public bool ShouldLog { get; set; } + public bool Immidiate { get; set; } + /// <summary> /// Notifies the message observer of the new result. /// </summary> diff --git a/Software/Visual_Studio/Tango.Transport/TransportRequestConfig.cs b/Software/Visual_Studio/Tango.Transport/TransportRequestConfig.cs index 6250cd881..255a566a7 100644 --- a/Software/Visual_Studio/Tango.Transport/TransportRequestConfig.cs +++ b/Software/Visual_Studio/Tango.Transport/TransportRequestConfig.cs @@ -10,5 +10,6 @@ namespace Tango.Transport { public TimeSpan? Timeout { get; set; } public bool ShouldLog { get; set; } + public bool Immediate { get; set; } } } diff --git a/Software/Visual_Studio/Tango.Transport/TransportResponseConfig.cs b/Software/Visual_Studio/Tango.Transport/TransportResponseConfig.cs index 449482e8a..44115d99a 100644 --- a/Software/Visual_Studio/Tango.Transport/TransportResponseConfig.cs +++ b/Software/Visual_Studio/Tango.Transport/TransportResponseConfig.cs @@ -12,5 +12,7 @@ namespace Tango.Transport public bool Completed { get; set; } public ErrorCode? ErrorCode { get; set; } public String ErrorMessage { get; set; } + public bool ShouldLog { get; set; } + public bool Immediate { get; set; } } } diff --git a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs index e0f9d46dd..c9388678d 100644 --- a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs @@ -499,6 +499,7 @@ namespace Tango.Transport TaskCompletionSource<IMessage> source = new TaskCompletionSource<IMessage>(); TransportMessage<IMessage> message = new TransportMessage<IMessage>(container.Token, request, TransportMessageDirection.Request, () => container.ToByteArray(), source); message.ShouldLog = config.ShouldLog; + message.Immidiate = config.Immediate; message.TransportComponentName = GetExtendedComponentName(); message.ActivateTimeout = () => @@ -547,6 +548,7 @@ namespace Tango.Transport TaskCompletionSource<MessageContainer> source = new TaskCompletionSource<MessageContainer>(); TransportMessage<MessageContainer> message = new TransportMessage<MessageContainer>(container.Token, container, TransportMessageDirection.Request, () => container.ToByteArray(), source); message.ShouldLog = config.ShouldLog; + message.Immidiate = config.Immediate; message.TransportComponentName = GetExtendedComponentName(); message.ActivateTimeout = () => @@ -597,6 +599,7 @@ namespace Tango.Transport TaskCompletionSource<TangoMessage<Response>> source = new TaskCompletionSource<TangoMessage<Response>>(); TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, () => Encoder.Encode(request), source); message.ShouldLog = config.ShouldLog; + message.Immidiate = config.Immediate; message.TransportComponentName = GetExtendedComponentName(); message.ActivateTimeout = () => @@ -656,6 +659,7 @@ namespace Tango.Transport IsContinuous = true, ContinuesResponseSubject = subject, ShouldLog = config.ShouldLog, + Immidiate = config.Immediate, TransportComponentName = GetExtendedComponentName(), }; @@ -717,6 +721,7 @@ namespace Tango.Transport IsContinuous = true, ContinuesResponseSubject = subject, ShouldLog = config.ShouldLog, + Immidiate = config.Immediate, TransportComponentName = GetExtendedComponentName(), }; @@ -798,6 +803,7 @@ namespace Tango.Transport IsContinuous = true, ContinuesResponseSubject = subject, ShouldLog = config.ShouldLog, + Immidiate = config.Immediate, TransportComponentName = GetExtendedComponentName(), }; @@ -1044,6 +1050,8 @@ namespace Tango.Transport TaskCompletionSource<object> source = new TaskCompletionSource<object>(); TransportMessage<object> message = new TransportMessage<object>(token, response, TransportMessageDirection.Response, () => Encoder.Encode(response), source); + message.ShouldLog = config.ShouldLog; + message.Immidiate = config.Immediate; EnqueueMessageOut(message); return source.Task; } @@ -1172,7 +1180,7 @@ namespace Tango.Transport } } - Adapter.Write(message.Serialize()); + Adapter.Write(message.Serialize(), message.Immidiate); message.ActivateTimeout?.Invoke(); |
