diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2020-01-26 22:06:58 +0200 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2020-01-26 22:06:58 +0200 |
| commit | 9419d4d513dd03b431fdc14f3cd7694ae11e89c5 (patch) | |
| tree | 34c4a8311fcae258434162b97dd91a8323fcd794 /Software/Visual_Studio/Tango.Transport | |
| parent | 2367c43d732aea2ea169f6fa5844fc877d96632d (diff) | |
| download | Tango-9419d4d513dd03b431fdc14f3cd7694ae11e89c5.tar.gz Tango-9419d4d513dd03b431fdc14f3cd7694ae11e89c5.zip | |
Implemented TCP transport adapter Interval write mode.
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport')
4 files changed, 117 insertions, 2 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs index 1cc90396b..699b172a0 100644 --- a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs +++ b/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs @@ -261,7 +261,7 @@ namespace Tango.Transport.Adapters } catch (Exception ex) { - OnFailed(ex); + OnFailed(LogManager.Log(ex, $"Error writing to SignalR adapter ({Address}).")); return; } } diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs index 138b1c2e1..b855c8f1d 100644 --- a/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs +++ b/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs @@ -10,6 +10,7 @@ using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; +using Tango.Core; using Tango.Logging; namespace Tango.Transport.Adapters @@ -23,6 +24,8 @@ namespace Tango.Transport.Adapters private TcpClient _socket; private Thread _pullThread; private bool _initializedFromConstructor; + private Thread _pushThread; + private ProducerConsumerQueue<byte[]> _pushQueue; private byte[] _size_buffer; #region Properties @@ -32,6 +35,16 @@ namespace Tango.Transport.Adapters /// </summary> public int Port { get; set; } + /// <summary> + /// Gets or sets the adapter write mode. + /// </summary> + public TcpTransportAdapterWriteMode WriteMode { get; set; } + + /// <summary> + /// Gets or sets the write interval when using <see cref="TcpTransportAdapterWriteMode.Interval"/> mode. + /// </summary> + public TimeSpan WriteInterval { get; set; } + #endregion #region Constructors @@ -44,6 +57,8 @@ namespace Tango.Transport.Adapters ComponentName = $"TCP Adapter {_component_counter++}"; Address = "127.0.0.1"; Port = 9999; + WriteMode = TcpTransportAdapterWriteMode.Interval; + WriteInterval = TimeSpan.FromSeconds(1); } /// <summary> @@ -95,8 +110,19 @@ namespace Tango.Transport.Adapters State = TransportComponentState.Connected; _pullThread = new Thread(PullThreadMethod); + _pullThread.Name = $"{ComponentName} Pull Thread"; _pullThread.IsBackground = true; _pullThread.Start(); + + if (WriteMode == TcpTransportAdapterWriteMode.Interval) + { + _pushThread = new Thread(PushThreadMethod); + _pushThread.IsBackground = true; + _pushThread.Name = $"{ComponentName} Push Thread"; + _pushQueue = new ProducerConsumerQueue<byte[]>(); + _pushThread.Start(); + } + LogManager.Log($"TCP adapter ({Address}) Connected..."); } } @@ -121,6 +147,13 @@ namespace Tango.Transport.Adapters { State = TransportComponentState.Disconnected; _socket.Close(); + + try + { + _pushThread.Abort(); + } + catch { } + LogManager.Log($"TCP adapter ({Address}) disconnected."); } } @@ -142,7 +175,15 @@ namespace Tango.Transport.Adapters try { data = PostProcessBuffer(data); - _socket.GetStream().Write(data, 0, data.Length); + + if (WriteMode == TcpTransportAdapterWriteMode.Direct) + { + _socket.GetStream().Write(data, 0, data.Length); + } + else + { + _pushQueue.BlockEnqueue(data); + } } catch (Exception ex) { @@ -213,6 +254,55 @@ namespace Tango.Transport.Adapters #endregion + #region Push Thread + + private void PushThreadMethod() + { + try + { + while (State == TransportComponentState.Connected) + { + List<byte[]> dataCollection = new List<byte[]>(); + + var data = _pushQueue.BlockDequeue(); + var first = true; + + while (_pushQueue.Count > 0 || first) + { + if (!first) + { + data = _pushQueue.BlockDequeue(); + } + else + { + first = false; + } + + dataCollection.Add(data); + } + + if (dataCollection.Count > 0) + { + try + { + byte[] allData = dataCollection.SelectMany(a => a).ToArray(); + _socket.GetStream().Write(allData, 0, allData.Length); + } + catch (Exception ex) + { + OnFailed(LogManager.Log(ex, $"Error writing to TCP adapter ({Address}).")); + return; + } + } + + Thread.Sleep(WriteInterval); + } + } + catch (ThreadAbortException) { } + } + + #endregion + #region Private Methods private void SetSocketProperties() diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapterWriteMode.cs b/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapterWriteMode.cs new file mode 100644 index 000000000..89665e2d2 --- /dev/null +++ b/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapterWriteMode.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Transport.Adapters +{ + /// <summary> + /// Represents a <see cref="TcpTransportAdapter"/> write mode. + /// </summary> + public enum TcpTransportAdapterWriteMode + { + /// <summary> + /// Writes the data directly from the Write(); method (default). + /// </summary> + Direct, + + /// <summary> + /// Accumulates the data being written from the Write(); method and writes a block of messages on intervals. + /// </summary> + Interval + } +} diff --git a/Software/Visual_Studio/Tango.Transport/Tango.Transport.csproj b/Software/Visual_Studio/Tango.Transport/Tango.Transport.csproj index b38058b98..066faea91 100644 --- a/Software/Visual_Studio/Tango.Transport/Tango.Transport.csproj +++ b/Software/Visual_Studio/Tango.Transport/Tango.Transport.csproj @@ -80,6 +80,7 @@ <Compile Include="Adapters\MultiTransportAdapter.cs" /> <Compile Include="Adapters\SignalRTransportAdapter.cs" /> <Compile Include="Adapters\SignalRTransportAdapterMode.cs" /> + <Compile Include="Adapters\TcpTransportAdapterWriteMode.cs" /> <Compile Include="Adapters\UsbSerialBaudRates.cs" /> <Compile Include="Adapters\UsbTransportAdapter.cs" /> <Compile Include="Components\ComPortEnumerator.cs" /> |
