using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Tango.Core;
using Tango.Logging;
using Tango.Transport.Compression;
namespace Tango.Transport.Adapters
{
///
/// Represents an which communicates over TCP/IP.
///
///
public class TcpTransportAdapter : TransportAdapterBase
{
private TcpClient _socket;
private Thread _pullThread;
private bool _initializedFromConstructor;
private Thread _pushThread;
private ProducerConsumerQueue _pushQueue;
private byte[] _size_buffer;
private object _writeSyncObject = new object();
#region Properties
///
/// Gets or sets the TCP listener port.
///
public int Port { get; set; }
///
/// Gets or sets the adapter write mode.
///
public TcpTransportAdapterWriteMode WriteMode { get; set; }
///
/// Gets or sets the write interval when using mode.
///
public TimeSpan WriteInterval { get; set; }
#endregion
#region Constructors
///
/// Initializes a new instance of the class.
///
public TcpTransportAdapter()
{
ComponentName = $"TCP Adapter {_component_counter++}";
Address = "127.0.0.1";
Port = 9999;
WriteMode = TcpTransportAdapterWriteMode.Interval;
WriteInterval = TimeSpan.FromMilliseconds(10);
}
///
/// Initializes a new instance of the class.
///
/// The address.
/// The port.
public TcpTransportAdapter(String address, int port) : this()
{
Address = address;
Port = port;
}
///
/// Initializes a new instance of the class.
///
/// The socket.
public TcpTransportAdapter(TcpClient socket) : this()
{
_initializedFromConstructor = true;
_socket = socket;
Address = socket.GetIPAddress().ToStringSafe();
SetSocketProperties();
}
#endregion
#region Public Methods
///
/// Connects the transport component.
///
///
public override Task Connect()
{
ThrowIfDisposed();
return Task.Factory.StartNew(() =>
{
try
{
if (State != TransportComponentState.Connected)
{
if (!_initializedFromConstructor)
{
_socket = new TcpClient(Address, Port);
SetSocketProperties();
}
LogManager.Log($"TCP adapter ({Address}) Connected...");
State = TransportComponentState.Connected;
_pullThread = new Thread(PullThreadMethod);
_pullThread.Name = $"{ComponentName} Pull Thread";
_pullThread.IsBackground = true;
_pullThread.Start();
if (WriteMode == TcpTransportAdapterWriteMode.Interval)
{
_pushThread = new Thread(PushThreadMethod);
_pushThread.IsBackground = true;
_pushThread.Name = $"{ComponentName} Push Thread";
_pushQueue = new ProducerConsumerQueue();
_pushThread.Start();
}
}
}
catch (Exception ex)
{
throw LogManager.Log(ex, $"Could not connect the TCP adapter ({Address}).");
}
});
}
///
/// Disconnects the transport component.
///
///
public override Task Disconnect()
{
return Task.Factory.StartNew((Action)(() =>
{
try
{
if (State == TransportComponentState.Connected)
{
State = TransportComponentState.Disconnected;
_socket.Close();
try
{
_pushThread.Abort();
}
catch { }
LogManager.Log($"TCP adapter ({Address}) disconnected.");
}
}
catch (Exception ex)
{
LogManager.Log(ex, $"Could not disconnect the TCP adapter ({Address}).");
}
}));
}
///
/// Writes the specified data to the stream.
///
/// The data.
/// Writes the data as soon as possible while ignoring any message queuing and batching.
public override void Write(byte[] data, bool immidiate = false)
{
ThrowIfDisposed();
try
{
if (EnableCompression)
{
data = GZipHelper.Compress(data);
}
data = PostProcessBuffer(data);
if (WriteMode == TcpTransportAdapterWriteMode.Direct || immidiate)
{
lock (_writeSyncObject)
{
_socket.GetStream().Write(data, 0, data.Length);
}
}
else
{
_pushQueue.BlockEnqueue(data);
}
}
catch (Exception ex)
{
OnFailed(LogManager.Log(ex, $"Error writing to TCP adapter ({Address})."));
}
}
#endregion
#region Pull Thread
private void PullThreadMethod()
{
try
{
if (State == TransportComponentState.Connected)
{
_size_buffer = new byte[4];
_socket.GetStream().BeginRead(_size_buffer, 0, _size_buffer.Length, EndReading, _socket.GetStream());
}
}
catch (Exception ex)
{
OnFailed(ex);
}
}
private void EndReading(IAsyncResult ar)
{
try
{
if (State == TransportComponentState.Connected)
{
_socket.GetStream().EndRead(ar);
int expectedSize = BitConverter.ToInt32(_size_buffer, 0);
if (expectedSize > 0)
{
byte[] data = new byte[expectedSize];
int read = 0;
while (read < expectedSize)
{
read += _socket.GetStream().Read(data, read, Math.Min(_socket.Available, expectedSize - read));
if (State != TransportComponentState.Connected)
{
break;
}
}
if (EnableCompression)
{
try
{
data = GZipHelper.Decompress(data);
}
catch (Exception ex)
{
if (ex.Message.Contains("GZip"))
{
//Temporarily ignore, probably switching protocol definitions...
}
else
{
throw ex;
}
}
}
OnDataAvailable(data);
}
else
{
Thread.Sleep(2000);
}
PullThreadMethod();
}
}
catch (Exception ex)
{
OnFailed(ex);
}
}
#endregion
#region Push Thread
private void PushThreadMethod()
{
try
{
while (State == TransportComponentState.Connected)
{
List dataCollection = new List();
var data = _pushQueue.BlockDequeue();
var first = true;
while (_pushQueue.Count > 0 || first)
{
if (!first)
{
data = _pushQueue.BlockDequeue();
}
else
{
first = false;
}
dataCollection.Add(data);
}
if (dataCollection.Count > 0)
{
try
{
byte[] allData = dataCollection.SelectMany(a => a).ToArray();
lock (_writeSyncObject)
{
_socket.GetStream().Write(allData, 0, allData.Length);
}
}
catch (Exception ex)
{
OnFailed(LogManager.Log(ex, $"Error writing to TCP adapter ({Address})."));
return;
}
}
Thread.Sleep(WriteInterval);
}
}
catch (ThreadAbortException) { }
}
#endregion
#region Private Methods
private void SetSocketProperties()
{
_socket.SendBufferSize = MAX_BUFFER_SIZE;
_socket.ReceiveBufferSize = MAX_BUFFER_SIZE;
}
#endregion
}
}