using System; using System.Collections.Generic; using System.Linq; using System.Net.Sockets; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; using Tango.Core; using Tango.Logging; using Tango.Transport.Compression; namespace Tango.Transport.Adapters { /// /// Represents an which communicates over TCP/IP. /// /// public class TcpTransportAdapter : TransportAdapterBase { private TcpClient _socket; private Thread _pullThread; private bool _initializedFromConstructor; private Thread _pushThread; private ProducerConsumerQueue _pushQueue; private byte[] _size_buffer; private object _writeSyncObject = new object(); #region Properties /// /// Gets or sets the TCP listener port. /// public int Port { get; set; } /// /// Gets or sets the adapter write mode. /// public TcpTransportAdapterWriteMode WriteMode { get; set; } /// /// Gets or sets the write interval when using mode. /// public TimeSpan WriteInterval { get; set; } #endregion #region Constructors /// /// Initializes a new instance of the class. /// public TcpTransportAdapter() { ComponentName = $"TCP Adapter {_component_counter++}"; Address = "127.0.0.1"; Port = 9999; WriteMode = TcpTransportAdapterWriteMode.Interval; WriteInterval = TimeSpan.FromMilliseconds(10); } /// /// Initializes a new instance of the class. /// /// The address. /// The port. public TcpTransportAdapter(String address, int port) : this() { Address = address; Port = port; } /// /// Initializes a new instance of the class. /// /// The socket. public TcpTransportAdapter(TcpClient socket) : this() { _initializedFromConstructor = true; _socket = socket; Address = socket.GetIPAddress().ToStringSafe(); SetSocketProperties(); } #endregion #region Public Methods /// /// Connects the transport component. /// /// public override Task Connect() { ThrowIfDisposed(); return Task.Factory.StartNew(() => { try { if (State != TransportComponentState.Connected) { if (!_initializedFromConstructor) { _socket = new TcpClient(Address, Port); SetSocketProperties(); } LogManager.Log($"TCP adapter ({Address}) Connected..."); State = TransportComponentState.Connected; _pullThread = new Thread(PullThreadMethod); _pullThread.Name = $"{ComponentName} Pull Thread"; _pullThread.IsBackground = true; _pullThread.Start(); if (WriteMode == TcpTransportAdapterWriteMode.Interval) { _pushThread = new Thread(PushThreadMethod); _pushThread.IsBackground = true; _pushThread.Name = $"{ComponentName} Push Thread"; _pushQueue = new ProducerConsumerQueue(); _pushThread.Start(); } } } catch (Exception ex) { throw LogManager.Log(ex, $"Could not connect the TCP adapter ({Address})."); } }); } /// /// Disconnects the transport component. /// /// public override Task Disconnect() { return Task.Factory.StartNew((Action)(() => { try { if (State == TransportComponentState.Connected) { State = TransportComponentState.Disconnected; _socket.Close(); try { _pushThread.Abort(); } catch { } LogManager.Log($"TCP adapter ({Address}) disconnected."); } } catch (Exception ex) { LogManager.Log(ex, $"Could not disconnect the TCP adapter ({Address})."); } })); } /// /// Writes the specified data to the stream. /// /// The data. /// Writes the data as soon as possible while ignoring any message queuing and batching. public override void Write(byte[] data, bool immidiate = false) { ThrowIfDisposed(); try { if (EnableCompression) { data = GZipHelper.Compress(data); } data = PostProcessBuffer(data); if (WriteMode == TcpTransportAdapterWriteMode.Direct || immidiate) { lock (_writeSyncObject) { _socket.GetStream().Write(data, 0, data.Length); } } else { _pushQueue.BlockEnqueue(data); } } catch (Exception ex) { OnFailed(LogManager.Log(ex, $"Error writing to TCP adapter ({Address}).")); } } #endregion #region Pull Thread private void PullThreadMethod() { try { if (State == TransportComponentState.Connected) { _size_buffer = new byte[4]; _socket.GetStream().BeginRead(_size_buffer, 0, _size_buffer.Length, EndReading, _socket.GetStream()); } } catch (Exception ex) { OnFailed(ex); } } private void EndReading(IAsyncResult ar) { try { if (State == TransportComponentState.Connected) { _socket.GetStream().EndRead(ar); int expectedSize = BitConverter.ToInt32(_size_buffer, 0); if (expectedSize > 0) { byte[] data = new byte[expectedSize]; int read = 0; while (read < expectedSize) { read += _socket.GetStream().Read(data, read, Math.Min(_socket.Available, expectedSize - read)); if (State != TransportComponentState.Connected) { break; } } if (EnableCompression) { try { data = GZipHelper.Decompress(data); } catch (Exception ex) { if (ex.Message.Contains("GZip")) { //Temporarily ignore, probably switching protocol definitions... } else { throw ex; } } } OnDataAvailable(data); } else { Thread.Sleep(2000); } PullThreadMethod(); } } catch (Exception ex) { OnFailed(ex); } } #endregion #region Push Thread private void PushThreadMethod() { try { while (State == TransportComponentState.Connected) { List dataCollection = new List(); var data = _pushQueue.BlockDequeue(); var first = true; while (_pushQueue.Count > 0 || first) { if (!first) { data = _pushQueue.BlockDequeue(); } else { first = false; } dataCollection.Add(data); } if (dataCollection.Count > 0) { try { byte[] allData = dataCollection.SelectMany(a => a).ToArray(); lock (_writeSyncObject) { _socket.GetStream().Write(allData, 0, allData.Length); } } catch (Exception ex) { OnFailed(LogManager.Log(ex, $"Error writing to TCP adapter ({Address}).")); return; } } Thread.Sleep(WriteInterval); } } catch (ThreadAbortException) { } } #endregion #region Private Methods private void SetSocketProperties() { _socket.SendBufferSize = MAX_BUFFER_SIZE; _socket.ReceiveBufferSize = MAX_BUFFER_SIZE; } #endregion } }