aboutsummaryrefslogtreecommitdiffstats
path: root/Software
diff options
context:
space:
mode:
authorRoy <roy.mail.net@gmail.com>2017-11-10 23:22:30 +0200
committerRoy <roy.mail.net@gmail.com>2017-11-10 23:22:30 +0200
commit22ba0c01b8ada9a23221d73a3997ca37bb9d1cf9 (patch)
tree3c8cf8af98794d44a2a48f226d37874288bcdb76 /Software
parent070aa53b6763fd2b5f7b482bf0f5070711f91ef6 (diff)
downloadTango-22ba0c01b8ada9a23221d73a3997ca37bb9d1cf9.tar.gz
Tango-22ba0c01b8ada9a23221d73a3997ca37bb9d1cf9.zip
Implemented Transporters, Adapters!!!
Diffstat (limited to 'Software')
-rw-r--r--Software/Visual Studio/Tango.Emulator/MainWindow.xaml.cs49
-rw-r--r--Software/Visual Studio/Tango.Emulator/Tango.Emulator.csproj4
-rw-r--r--Software/Visual Studio/Tango.Integration/Adapters/ISerialAdapter.cs20
-rw-r--r--Software/Visual Studio/Tango.Integration/Adapters/TcpSerialAdapter.cs187
-rw-r--r--Software/Visual Studio/Tango.Integration/Emulators/MachineEmulator.cs54
-rw-r--r--Software/Visual Studio/Tango.Integration/TCP/TcpServer.cs3
-rw-r--r--Software/Visual Studio/Tango.Integration/Tango.Integration.csproj11
-rw-r--r--Software/Visual Studio/Tango.Integration/Transport/ITransporter.cs66
-rw-r--r--Software/Visual Studio/Tango.Integration/Transport/MessageBase.cs33
-rw-r--r--Software/Visual Studio/Tango.Integration/Transport/ProtoTransporter.cs255
-rw-r--r--Software/Visual Studio/Tango.Integration/Transport/RequestMessage.cs17
-rw-r--r--Software/Visual Studio/Tango.Integration/Transport/ResponseMessage.cs17
-rw-r--r--Software/Visual Studio/Tango.PMR/MessageFactory.cs12
-rw-r--r--Software/Visual Studio/Tango.PMR/TangoMessage.cs10
-rw-r--r--Software/Visual Studio/Tango.Transport/Adapters/SplitterAdapter.cs26
-rw-r--r--Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs201
-rw-r--r--Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs150
-rw-r--r--Software/Visual Studio/Tango.Transport/ITransportAdapter.cs33
-rw-r--r--Software/Visual Studio/Tango.Transport/ITransportComponent.cs33
-rw-r--r--Software/Visual Studio/Tango.Transport/ITransporter.cs64
-rw-r--r--Software/Visual Studio/Tango.Transport/Properties/AssemblyInfo.cs6
-rw-r--r--Software/Visual Studio/Tango.Transport/Tango.Transport.csproj104
-rw-r--r--Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs132
-rw-r--r--Software/Visual Studio/Tango.Transport/TransportComponentState.cs31
-rw-r--r--Software/Visual Studio/Tango.Transport/TransportMessage.cs49
-rw-r--r--Software/Visual Studio/Tango.Transport/TransportMessageBase.cs67
-rw-r--r--Software/Visual Studio/Tango.Transport/TransportMessageDirection.cs23
-rw-r--r--Software/Visual Studio/Tango.Transport/TransporterBase.cs454
-rw-r--r--Software/Visual Studio/Tango.Transport/Transporters/JsonTransporter.cs60
-rw-r--r--Software/Visual Studio/Tango.Transport/Transporters/ProtoTransporter.cs47
-rw-r--r--Software/Visual Studio/Tango.Transport/packages.config10
-rw-r--r--Software/Visual Studio/Tango.sln8
32 files changed, 1578 insertions, 658 deletions
diff --git a/Software/Visual Studio/Tango.Emulator/MainWindow.xaml.cs b/Software/Visual Studio/Tango.Emulator/MainWindow.xaml.cs
index 43e3563bf..b7a827813 100644
--- a/Software/Visual Studio/Tango.Emulator/MainWindow.xaml.cs
+++ b/Software/Visual Studio/Tango.Emulator/MainWindow.xaml.cs
@@ -13,10 +13,10 @@ using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
-using Tango.Integration.Adapters;
+using Tango.Transport.Adapters;
using Tango.Integration.Emulators;
using Tango.Integration.TCP;
-using Tango.Integration.Transport;
+using Tango.Transport.Transporters;
using Tango.Logging;
using Tango.PMR;
using Tango.PMR.Stubs;
@@ -29,7 +29,7 @@ namespace Tango.Emulator
/// </summary>
public partial class MainWindow : MetroWindow
{
- private ProtoTransporter mobileTransporter;
+ private JsonTransporter mobileTransporter;
#region Properties
@@ -105,22 +105,24 @@ namespace Tango.Emulator
TcpServer = new TcpServer(9999);
TcpServer.ClientConnected += TcpServer_ClientConnected;
- mobileTransporter = new ProtoTransporter();
+ mobileTransporter = new JsonTransporter();
- StartCommand = new RelayCommand(() =>
+ StartCommand = new RelayCommand(async () =>
{
- TcpServer.Start();
+ //TcpServer.Start();
+ Emulator = new MachineEmulator(new JsonTransporter(new UsbTransportAdapter("COM1")));
+ await Emulator.Start();
StopCommand.RaiseCanExecuteChanged();
StartCommand.RaiseCanExecuteChanged();
}, x => !TcpServer.IsStarted);
- StopCommand = new RelayCommand(() =>
+ StopCommand = new RelayCommand(async () =>
{
TcpServer.Stop();
if (Emulator != null)
{
- Emulator.Stop();
+ await Emulator.Stop();
}
StartCommand.RaiseCanExecuteChanged();
StopCommand.RaiseCanExecuteChanged();
@@ -129,49 +131,44 @@ namespace Tango.Emulator
return TcpServer.IsStarted;
});
- ConnectCommand = new RelayCommand(() =>
+ ConnectCommand = new RelayCommand(async () =>
{
- mobileTransporter.Adapter = new TcpSerialAdapter("127.0.0.1", 9999);
- mobileTransporter.Connect().Subscribe();
+ mobileTransporter.Adapters.Add(new UsbTransportAdapter("COM2"));
+ await mobileTransporter.Connect();
ConnectCommand.RaiseCanExecuteChanged();
DisconnectCommand.RaiseCanExecuteChanged();
- }, (x) => !mobileTransporter.IsConnected);
+ }, (x) => mobileTransporter.State != Transport.TransportComponentState.Connected);
- DisconnectCommand = new RelayCommand(() =>
+ DisconnectCommand = new RelayCommand(async () =>
{
- mobileTransporter.Disconnect().Subscribe();
+ await mobileTransporter.Disconnect();
ConnectCommand.RaiseCanExecuteChanged();
DisconnectCommand.RaiseCanExecuteChanged();
- }, (x) => mobileTransporter.IsConnected);
+ }, (x) => mobileTransporter.State == Transport.TransportComponentState.Connected);
InitializeComponent();
}
- private void TcpServer_ClientConnected(object sender, ClientConnectedEventArgs e)
+ private async void TcpServer_ClientConnected(object sender, ClientConnectedEventArgs e)
{
- Emulator = new MachineEmulator(new ProtoTransporter(new TcpSerialAdapter(e.Socket)));
- Emulator.Start();
+ Emulator = new MachineEmulator(new JsonTransporter(new TcpTransportAdapter(e.Socket)));
+ await Emulator.Start();
}
#endregion
- private void Button_Click(object sender, RoutedEventArgs e)
+ private async void Button_Click(object sender, RoutedEventArgs e)
{
var request = MessageFactory.CreateContainer<Stub1Request>();
request.Message.A = 5;
request.Message.B = 10;
+ var result = await mobileTransporter.SendRequest<Stub1Request, Stub1Response>(request,mobileTransporter.Adapters.First());
- mobileTransporter.SendRequest<Stub1Request, Stub1Response>(request).Subscribe((response) =>
- {
-
- }, (error) =>
- {
-
- });
+ var a = result;
}
}
}
diff --git a/Software/Visual Studio/Tango.Emulator/Tango.Emulator.csproj b/Software/Visual Studio/Tango.Emulator/Tango.Emulator.csproj
index 0b3cfabb1..696486c3e 100644
--- a/Software/Visual Studio/Tango.Emulator/Tango.Emulator.csproj
+++ b/Software/Visual Studio/Tango.Emulator/Tango.Emulator.csproj
@@ -151,6 +151,10 @@
<Project>{ac489889-6e50-4f16-9dba-ff4c6f9ec72b}</Project>
<Name>Tango.SharedUI</Name>
</ProjectReference>
+ <ProjectReference Include="..\Tango.Transport\Tango.Transport.csproj">
+ <Project>{74e700b0-1156-4126-be40-ee450d3c3026}</Project>
+ <Name>Tango.Transport</Name>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
<Resource Include="Images\machine-trans.png" />
diff --git a/Software/Visual Studio/Tango.Integration/Adapters/ISerialAdapter.cs b/Software/Visual Studio/Tango.Integration/Adapters/ISerialAdapter.cs
deleted file mode 100644
index 0da75704c..000000000
--- a/Software/Visual Studio/Tango.Integration/Adapters/ISerialAdapter.cs
+++ /dev/null
@@ -1,20 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Reactive;
-using System.Reactive.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Tango.Integration.Adapters
-{
- public interface ISerialAdapter : IDisposable
- {
- IObservable<Unit> Open();
- IObservable<Unit> Close();
- void Write(byte[] data);
- IObservable<byte[]> ReceiveData();
- bool IsOpened { get; }
- IObservable<Unit> SubscribeForDisconnection();
- }
-}
diff --git a/Software/Visual Studio/Tango.Integration/Adapters/TcpSerialAdapter.cs b/Software/Visual Studio/Tango.Integration/Adapters/TcpSerialAdapter.cs
deleted file mode 100644
index 92d0894e1..000000000
--- a/Software/Visual Studio/Tango.Integration/Adapters/TcpSerialAdapter.cs
+++ /dev/null
@@ -1,187 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net.Sockets;
-using System.Reactive;
-using System.Reactive.Concurrency;
-using System.Reactive.Disposables;
-using System.Reactive.Linq;
-using System.Reactive.Subjects;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Tango.Logging;
-
-namespace Tango.Integration.Adapters
-{
- public class TcpSerialAdapter : ISerialAdapter
- {
- private TcpClient _socket;
- private Thread _receiveThread;
- private Subject<byte[]> _receiveSubject;
- private Subject<Unit> _disconnectionSubject;
-
- public bool IsOpened { get; private set; }
-
- public String Address { get; set; }
-
- public int Port { get; set; }
-
- public TcpSerialAdapter()
- {
- _receiveSubject = new Subject<byte[]>();
- _disconnectionSubject = new Subject<Unit>();
- }
-
- public TcpSerialAdapter(String address, int port) : this()
- {
- Address = address;
- Port = port;
- }
-
- public TcpSerialAdapter(TcpClient socket) : this()
- {
- _socket = socket;
- Start();
- }
-
- public IObservable<Unit> Open()
- {
- return Observable.Create<Unit>((x) =>
- {
- try
- {
- if (!IsOpened)
- {
- _socket = new TcpClient(Address, Port);
- Start();
- LogManager.Log("Adapter connected.");
- }
-
- x.OnNext(Unit.Default);
- }
- catch (Exception ex)
- {
- x.OnError(LogManager.Log(ex, "Could not connect the adapter."));
- }
- finally
- {
- x.OnCompleted();
- }
-
- return Disposable.Empty;
-
- }).ObserveOn(Scheduler.Default);
- }
-
- private void Start()
- {
- IsOpened = true;
- _receiveThread = new Thread(ReceiveThreadMethod);
- _receiveThread.IsBackground = true;
- _receiveThread.Start();
- }
-
- public IObservable<Unit> Close()
- {
- return Observable.Create<Unit>((x) =>
- {
- try
- {
- if (IsOpened)
- {
- IsOpened = false;
- _socket.GetStream().Close();
- _socket.Close();
- LogManager.Log("Adapter disconnected.");
- }
-
- x.OnNext(Unit.Default);
- }
- catch (Exception ex)
- {
- x.OnError(LogManager.Log(ex, "Could not disconnect the adapter."));
- }
- finally
- {
- x.OnCompleted();
- }
-
- return Disposable.Empty;
-
- }).ObserveOn(Scheduler.Default);
- }
-
- public void Write(byte[] data)
- {
- _socket.GetStream().Write(data, 0, data.Length);
- }
-
- public IObservable<byte[]> ReceiveData()
- {
- return _receiveSubject;
- }
-
- private void ReceiveThreadMethod()
- {
- int counter = 0;
-
- try
- {
- while (IsOpened)
- {
- if (_socket.Available > 0)
- {
- byte[] data = new byte[_socket.Available];
- _socket.GetStream().Read(data, 0, data.Length);
-
- if (_receiveSubject != null)
- {
- _receiveSubject.OnNext(data);
- }
- }
-
- Thread.Sleep(10);
- counter++;
-
- if (counter >= 200)
- {
- try
- {
- if (_socket.Client.Poll(1, SelectMode.SelectRead) && _socket.Client.Available == 0)
- {
- Close().Subscribe();
- _disconnectionSubject.OnError(LogManager.Log(new Exception("Client disconnected.")));
- return;
- }
- }
- catch (SocketException ex)
- {
- Close().Subscribe();
- _disconnectionSubject.OnError(LogManager.Log(ex));
- return;
- }
-
- counter = 0;
- }
- }
-
- _receiveSubject.OnCompleted();
- }
- catch (Exception ex)
- {
- _receiveSubject.OnError(LogManager.Log(ex));
- }
- }
-
- public void Dispose()
- {
- Close();
- }
-
- public IObservable<Unit> SubscribeForDisconnection()
- {
- return _disconnectionSubject;
- }
- }
-}
diff --git a/Software/Visual Studio/Tango.Integration/Emulators/MachineEmulator.cs b/Software/Visual Studio/Tango.Integration/Emulators/MachineEmulator.cs
index cbb5f00f9..31113151f 100644
--- a/Software/Visual Studio/Tango.Integration/Emulators/MachineEmulator.cs
+++ b/Software/Visual Studio/Tango.Integration/Emulators/MachineEmulator.cs
@@ -6,15 +6,14 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
-using Tango.Integration.Adapters;
using Tango.Integration.TCP;
-using Tango.Integration.Transport;
using Tango.Logging;
using Tango.PMR;
using Tango.PMR.Common;
using Tango.PMR.Stubs;
using Tango.SharedUI;
using Tango.SharedUI.Commands;
+using Tango.Transport;
namespace Tango.Integration.Emulators
{
@@ -57,16 +56,21 @@ namespace Tango.Integration.Emulators
/// </summary>
public MachineEmulator(ITransporter transporter)
{
- _transporter = transporter;
- _transporter.SubscribeForRequests().Subscribe(HandleRequest);
- _transporter.SubscribeForDisconnection().Subscribe((x) => {}, (ex) =>
+ if (_transporter != null)
{
- LogManager.Log("Transporter disconnected.");
- });
- StartCommand = new RelayCommand(Start, (x) => !IsStarted);
- StopCommand = new RelayCommand(Stop, (x) => IsStarted);
+ _transporter.StateChanged -= OnTransporterStateChanged;
+ }
+
+ _transporter = transporter;
+ _transporter.RequestReceived += OnTransporterRequestReceived;
+
+ _transporter.StateChanged += OnTransporterStateChanged;
+ StartCommand = new RelayCommand(async () => { await Start(); }, (x) => !IsStarted);
+ StopCommand = new RelayCommand(async () => { await Stop(); }, (x) => IsStarted);
}
+
+
#endregion
#region Public Methods
@@ -74,28 +78,24 @@ namespace Tango.Integration.Emulators
/// <summary>
/// Stops this instance.
/// </summary>
- public void Stop()
+ public async Task Stop()
{
if (IsStarted)
{
- _transporter.Disconnect().Subscribe(x =>
- {
- IsStarted = false;
- }, ex => { throw ex; });
+ await _transporter.Disconnect();
+ IsStarted = false;
}
}
/// <summary>
/// Starts this instance.
/// </summary>
- public void Start()
+ public async Task Start()
{
if (!IsStarted)
{
- _transporter.Connect().Subscribe(x =>
- {
- IsStarted = true;
- }, ex => { throw ex; });
+ await _transporter.Connect();
+ IsStarted = true;
}
}
@@ -103,19 +103,19 @@ namespace Tango.Integration.Emulators
#region Virtual Methods
- /// <summary>
- /// Handles the request.
- /// </summary>
- /// <param name="container">The container.</param>
- protected void HandleRequest(MessageContainer container)
+ protected virtual void OnTransporterStateChanged(object sender, TransportComponentState e)
+ {
+ LogManager.Log("Transporter state changed: " + e.ToString());
+ }
+
+ protected virtual void OnTransporterRequestReceived(object sender, MessageContainer container)
{
switch (container.Type)
{
case MessageType.Stub1Request:
- Task.Factory.StartNew(() =>
+ Task.Factory.StartNew(() =>
{
- Thread.Sleep(5000);
-
+ Thread.Sleep(1000);
var request = MessageFactory.ParseContainer<Stub1Request>(container);
var response = MessageFactory.CreateContainer<Stub1Response>(container.Token);
response.Message.Sum = request.A + request.B;
diff --git a/Software/Visual Studio/Tango.Integration/TCP/TcpServer.cs b/Software/Visual Studio/Tango.Integration/TCP/TcpServer.cs
index 6f24646ae..94e9bcbbc 100644
--- a/Software/Visual Studio/Tango.Integration/TCP/TcpServer.cs
+++ b/Software/Visual Studio/Tango.Integration/TCP/TcpServer.cs
@@ -91,7 +91,6 @@ namespace Tango.Integration.TCP
try
{
OnClientConnected(Listener.EndAcceptTcpClient(ar));
- Debug.WriteLine("New Client Connected");
WaitForConnection();
}
catch (ObjectDisposedException ex)
@@ -107,7 +106,7 @@ namespace Tango.Integration.TCP
protected virtual void OnClientConnected(TcpClient socket)
{
- LogManager.Log("Client Connected.");
+ LogManager.Log("New client connected.");
Task.Factory.StartNew(() =>
{
diff --git a/Software/Visual Studio/Tango.Integration/Tango.Integration.csproj b/Software/Visual Studio/Tango.Integration/Tango.Integration.csproj
index da3ee255e..f5624e0db 100644
--- a/Software/Visual Studio/Tango.Integration/Tango.Integration.csproj
+++ b/Software/Visual Studio/Tango.Integration/Tango.Integration.csproj
@@ -63,17 +63,10 @@
<Compile Include="..\Versioning\GlobalVersionInfo.cs">
<Link>GlobalVersionInfo.cs</Link>
</Compile>
- <Compile Include="Adapters\ISerialAdapter.cs" />
- <Compile Include="Adapters\TcpSerialAdapter.cs" />
<Compile Include="Emulators\MachineEmulator.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TCP\ClientConnectedEventArgs.cs" />
<Compile Include="TCP\TcpServer.cs" />
- <Compile Include="Transport\ITransporter.cs" />
- <Compile Include="Transport\MessageBase.cs" />
- <Compile Include="Transport\ProtoTransporter.cs" />
- <Compile Include="Transport\RequestMessage.cs" />
- <Compile Include="Transport\ResponseMessage.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tango.Core\Tango.Core.csproj">
@@ -96,6 +89,10 @@
<Project>{ac489889-6e50-4f16-9dba-ff4c6f9ec72b}</Project>
<Name>Tango.SharedUI</Name>
</ProjectReference>
+ <ProjectReference Include="..\Tango.Transport\Tango.Transport.csproj">
+ <Project>{74e700b0-1156-4126-be40-ee450d3c3026}</Project>
+ <Name>Tango.Transport</Name>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
diff --git a/Software/Visual Studio/Tango.Integration/Transport/ITransporter.cs b/Software/Visual Studio/Tango.Integration/Transport/ITransporter.cs
deleted file mode 100644
index cb4693efa..000000000
--- a/Software/Visual Studio/Tango.Integration/Transport/ITransporter.cs
+++ /dev/null
@@ -1,66 +0,0 @@
-using Google.Protobuf;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Reactive;
-using System.Reactive.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Tango.Integration.Adapters;
-using Tango.PMR;
-using Tango.PMR.Common;
-
-namespace Tango.Integration.Transport
-{
- public interface ITransporter : IDisposable
- {
- /// <summary>
- /// Gets the serial adapter.
- /// </summary>
- ISerialAdapter Adapter { get; set; }
-
- /// <summary>
- /// Gets a value indicating whether this transporter is connected.
- /// </summary>
- bool IsConnected { get; }
-
- /// <summary>
- /// Sends a request.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="request">The request.</param>
- /// <returns></returns>
- IObservable<Response> SendRequest<Request, Response>(TangoMessage<Request> request) where Request : IMessage<Request> where Response : IMessage<Response>;
-
- /// <summary>
- /// Sends a response.
- /// </summary>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="response">The response.</param>
- /// <returns></returns>
- IObservable<object> SendResponse<Response>(TangoMessage<Response> response) where Response : IMessage<Response>;
-
- /// <summary>
- /// Subscribes for requests.
- /// </summary>
- /// <returns></returns>
- IObservable<MessageContainer> SubscribeForRequests();
-
- /// <summary>
- /// Opens a connection on the <see cref="Adapter"/>.
- /// </summary>
- IObservable<Unit> Connect();
-
- /// <summary>
- /// Closes the connection on the <see cref="Adapter"/>.
- /// </summary>
- IObservable<Unit> Disconnect();
-
- /// <summary>
- /// Subscribes for disconnection.
- /// </summary>
- /// <returns></returns>
- IObservable<Unit> SubscribeForDisconnection();
- }
-}
diff --git a/Software/Visual Studio/Tango.Integration/Transport/MessageBase.cs b/Software/Visual Studio/Tango.Integration/Transport/MessageBase.cs
deleted file mode 100644
index 46d5b3109..000000000
--- a/Software/Visual Studio/Tango.Integration/Transport/MessageBase.cs
+++ /dev/null
@@ -1,33 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Reactive.Subjects;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Tango.Integration.Transport
-{
- internal class MessageBase
- {
- public String Token { get; set; }
-
- public Subject<object> Subject { get; set; }
-
- private Func<byte[]> _toBytes;
-
- public object Message { get; set; }
-
- public byte[] ToBytes()
- {
- return _toBytes();
- }
-
- public MessageBase(String token, Subject<object> subject, object message, Func<byte[]> toBytes)
- {
- Token = token;
- Subject = subject;
- Message = message;
- _toBytes = toBytes;
- }
- }
-}
diff --git a/Software/Visual Studio/Tango.Integration/Transport/ProtoTransporter.cs b/Software/Visual Studio/Tango.Integration/Transport/ProtoTransporter.cs
deleted file mode 100644
index aaaa5a5ce..000000000
--- a/Software/Visual Studio/Tango.Integration/Transport/ProtoTransporter.cs
+++ /dev/null
@@ -1,255 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Reactive.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Google.Protobuf;
-using Tango.Integration.Adapters;
-using Tango.PMR;
-using Tango.PMR.Common;
-using System.Collections.Concurrent;
-using System.Reactive.Subjects;
-using System.Threading;
-using System.Reactive;
-using System.Reactive.Disposables;
-using Tango.Logging;
-
-namespace Tango.Integration.Transport
-{
- public class ProtoTransporter : ITransporter
- {
- private ConcurrentQueue<MessageBase> _sendingQueue;
- private List<RequestMessage> _pendingRequests;
- private ConcurrentQueue<byte[]> _arrivedResponses;
-
- private Subject<MessageContainer> _requestSubject;
-
- private Thread _pushThread;
- private Thread _pullThread;
-
- private ISerialAdapter _adapter;
- /// <summary>
- /// Gets the serial adapter.
- /// </summary>
- public ISerialAdapter Adapter
- {
- get { return _adapter; }
- set
- {
- _adapter = value;
-
- if (IsConnected)
- {
- Connect();
- }
- }
- }
-
- /// <summary>
- /// Gets a value indicating whether this transporter is connected.
- /// </summary>
- public bool IsConnected { get; private set; }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="ProtoTransporter"/> class.
- /// </summary>
- public ProtoTransporter()
- {
- _sendingQueue = new ConcurrentQueue<MessageBase>();
- _pendingRequests = new List<RequestMessage>();
- _arrivedResponses = new ConcurrentQueue<byte[]>();
- _requestSubject = new Subject<MessageContainer>();
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="ProtoTransporter"/> class.
- /// </summary>
- /// <param name="adapter">The adapter.</param>
- public ProtoTransporter(ISerialAdapter adapter) : this()
- {
- Adapter = adapter;
- }
-
- /// <summary>
- /// Opens a connection on the <see cref="Adapter" />.
- /// </summary>
- public IObservable<Unit> Connect()
- {
- IsConnected = true;
- Adapter.ReceiveData().Subscribe((data) => { _arrivedResponses.Enqueue(data); });
- StartThreads();
- return Adapter.Open();
- }
-
- /// <summary>
- /// Closes the connection on the <see cref="Adapter" />.
- /// </summary>
- public IObservable<Unit> Disconnect()
- {
- IsConnected = false;
- return Adapter.Close();
- }
-
- /// <summary>
- /// Starts the threads.
- /// </summary>
- private void StartThreads()
- {
- _pullThread = new Thread(PullThreadMethod);
- _pullThread.IsBackground = true;
- _pullThread.Start();
-
- _pushThread = new Thread(PushThreadMethod);
- _pushThread.IsBackground = true;
- _pushThread.Start();
- }
-
- /// <summary>
- /// Push thread loop.
- /// </summary>
- private void PushThreadMethod()
- {
- while (IsConnected)
- {
- if (_sendingQueue.Count > 0)
- {
- MessageBase message;
- if (_sendingQueue.TryDequeue(out message))
- {
- try
- {
- Adapter.Write(message.ToBytes());
-
- if (message.GetType() == typeof(RequestMessage))
- {
- _pendingRequests.Add(message as RequestMessage);
- }
- else
- {
- message.Subject.OnNext(new object());
- message.Subject.OnCompleted();
- }
- }
- catch (Exception ex)
- {
- message.Subject.OnError(ex);
- }
- }
- }
-
- Thread.Sleep(10);
- }
- }
-
- /// <summary>
- /// Pull thread loop.
- /// </summary>
- private void PullThreadMethod()
- {
- while (IsConnected)
- {
- byte[] data;
-
- if (_arrivedResponses.Count > 0)
- {
- if (_arrivedResponses.TryDequeue(out data))
- {
- MessageContainer container = MessageFactory.ParseContainerPartial(data);
- RequestMessage request = _pendingRequests.SingleOrDefault(x => x.Token == container.Token);
-
- if (request != null)
- {
- _pendingRequests.Remove(request);
-
- try
- {
- request.Subject.OnNext(MessageFactory.ParseContainer(container));
- }
- catch (Exception ex)
- {
- request.Subject.OnError(ex);
- }
- }
- else
- {
- try
- {
- _requestSubject.OnNext(container);
- }
- catch (Exception ex)
- {
- _requestSubject.OnError(ex);
- }
- }
- }
- }
-
- Thread.Sleep(10);
- }
- }
-
- /// <summary>
- /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
- /// </summary>
- public void Dispose()
- {
- Disconnect();
- }
-
- /// <summary>
- /// Sends a request.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="request">The request.</param>
- /// <returns></returns>
- public IObservable<Response> SendRequest<Request, Response>(TangoMessage<Request> request) where Request : IMessage<Request> where Response : IMessage<Response>
- {
- Subject<object> subject = new Subject<object>();
- RequestMessage message = new RequestMessage(request.Container.Token, subject, request, request.ToBytes);
- _sendingQueue.Enqueue(message);
- IObservable<Response> observer = subject.Select(x => (Response)x);
- return observer;
- }
-
- /// <summary>
- /// Sends a response.
- /// </summary>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="response">The response.</param>
- /// <returns></returns>
- public IObservable<object> SendResponse<Response>(TangoMessage<Response> response) where Response : IMessage<Response>
- {
- Subject<object> subject = new Subject<object>();
- ResponseMessage message = new ResponseMessage(Guid.NewGuid().ToString(), subject, response, response.ToBytes);
- _sendingQueue.Enqueue(message);
- return subject;
- }
-
- /// <summary>
- /// Subscribes for requests.
- /// </summary>
- /// <returns></returns>
- public IObservable<MessageContainer> SubscribeForRequests()
- {
- return _requestSubject;
- }
-
- /// <summary>
- /// Subscribes for disconnection.
- /// </summary>
- /// <returns></returns>
- public IObservable<Unit> SubscribeForDisconnection()
- {
- var observable = Adapter.SubscribeForDisconnection();
- observable.Subscribe((x) => { }, (ex) =>
- {
- LogManager.Log("Adapter closed. Disconnecting...");
- Disconnect().Subscribe();
- });
-
- return observable;
- }
- }
-}
diff --git a/Software/Visual Studio/Tango.Integration/Transport/RequestMessage.cs b/Software/Visual Studio/Tango.Integration/Transport/RequestMessage.cs
deleted file mode 100644
index 149712bee..000000000
--- a/Software/Visual Studio/Tango.Integration/Transport/RequestMessage.cs
+++ /dev/null
@@ -1,17 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Reactive.Subjects;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Tango.Integration.Transport
-{
- internal class RequestMessage : MessageBase
- {
- public RequestMessage(String token, Subject<object> subject, object message, Func<byte[]> toBytes) : base(token, subject, message, toBytes)
- {
-
- }
- }
-}
diff --git a/Software/Visual Studio/Tango.Integration/Transport/ResponseMessage.cs b/Software/Visual Studio/Tango.Integration/Transport/ResponseMessage.cs
deleted file mode 100644
index 8eb956eec..000000000
--- a/Software/Visual Studio/Tango.Integration/Transport/ResponseMessage.cs
+++ /dev/null
@@ -1,17 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Reactive.Subjects;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Tango.Integration.Transport
-{
- internal class ResponseMessage : MessageBase
- {
- internal ResponseMessage(String token, Subject<object> subject, object message, Func<byte[]> toBytes) : base(token, subject, message, toBytes)
- {
-
- }
- }
-}
diff --git a/Software/Visual Studio/Tango.PMR/MessageFactory.cs b/Software/Visual Studio/Tango.PMR/MessageFactory.cs
index b507b5c4e..1cc84acd8 100644
--- a/Software/Visual Studio/Tango.PMR/MessageFactory.cs
+++ b/Software/Visual Studio/Tango.PMR/MessageFactory.cs
@@ -81,5 +81,17 @@ namespace Tango.PMR
MessageContainer container = MessageContainer.Parser.ParseFrom(data);
return container;
}
+
+ /// <summary>
+ /// Parses the container partial.
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <param name="data">The data.</param>
+ /// <returns></returns>
+ public static MessageContainer ParseContainerPartialJson(byte[] data)
+ {
+ MessageContainer container = MessageContainer.Parser.ParseJson(Encoding.UTF8.GetString(data));
+ return container;
+ }
}
}
diff --git a/Software/Visual Studio/Tango.PMR/TangoMessage.cs b/Software/Visual Studio/Tango.PMR/TangoMessage.cs
index 71d4a046b..ef420a502 100644
--- a/Software/Visual Studio/Tango.PMR/TangoMessage.cs
+++ b/Software/Visual Studio/Tango.PMR/TangoMessage.cs
@@ -59,5 +59,15 @@ namespace Tango.PMR
return ms.ToArray();
}
}
+
+ /// <summary>
+ /// Generates a new <see cref="MessageContainer"/> containing the message of type <see cref="T"/> and returns a byte array.
+ /// </summary>
+ /// <returns></returns>
+ public byte[] ToJsonBytes()
+ {
+ Container.Data = Message.ToByteString();
+ return Encoding.UTF8.GetBytes(Container.ToString());
+ }
}
}
diff --git a/Software/Visual Studio/Tango.Transport/Adapters/SplitterAdapter.cs b/Software/Visual Studio/Tango.Transport/Adapters/SplitterAdapter.cs
new file mode 100644
index 000000000..48b477c39
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/Adapters/SplitterAdapter.cs
@@ -0,0 +1,26 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Transport.Adapters
+{
+ public class SplitterAdapter : TransportAdapterBase
+ {
+ public override Task Connect()
+ {
+ throw new NotImplementedException();
+ }
+
+ public override Task Disconnect()
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void Write(byte[] data)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs b/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs
new file mode 100644
index 000000000..d3bc4a855
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs
@@ -0,0 +1,201 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net.Sockets;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Reactive.Subjects;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Tango.Logging;
+
+namespace Tango.Transport.Adapters
+{
+ /// <summary>
+ /// Represents an <see cref="ITransportAdapter"/> which communicates over TCP/IP.
+ /// </summary>
+ /// <seealso cref="Tango.Transport.TransportAdapterBase" />
+ public class TcpTransportAdapter : TransportAdapterBase
+ {
+ private TcpClient _socket;
+ private Thread _pullThread;
+ private bool _initializedFromConstructor;
+
+ #region Properties
+
+ /// <summary>
+ /// Gets or sets the TCP listener port.
+ /// </summary>
+ public int Port { get; set; }
+
+ #endregion
+
+ #region Constructors
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TcpTransportAdapter"/> class.
+ /// </summary>
+ public TcpTransportAdapter()
+ {
+ Address = "127.0.0.1";
+ Port = 9999;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TcpTransportAdapter"/> class.
+ /// </summary>
+ /// <param name="address">The address.</param>
+ /// <param name="port">The port.</param>
+ public TcpTransportAdapter(String address, int port) : this()
+ {
+ Address = address;
+ Port = port;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TcpTransportAdapter"/> class.
+ /// </summary>
+ /// <param name="socket">The socket.</param>
+ public TcpTransportAdapter(TcpClient socket) : this()
+ {
+ _initializedFromConstructor = true;
+ _socket = socket;
+ }
+
+ #endregion
+
+ #region Public Methods
+
+ /// <summary>
+ /// Connects the transport component.
+ /// </summary>
+ /// <returns></returns>
+ public override Task Connect()
+ {
+ ThrowFailedOrDisposed();
+
+ return Task.Factory.StartNew(() =>
+ {
+ try
+ {
+ if (State != TransportComponentState.Connected)
+ {
+ if (!_initializedFromConstructor)
+ {
+ _socket = new TcpClient(Address, Port);
+ }
+
+ State = TransportComponentState.Connected;
+ _pullThread = new Thread(PullThreadMethod);
+ _pullThread.IsBackground = true;
+ _pullThread.Start();
+ LogManager.Log("TCP adapter Connected...");
+ }
+ }
+ catch (Exception ex)
+ {
+ throw LogManager.Log(ex, "Could not connect the TCP adapter.");
+ }
+ });
+ }
+
+ /// <summary>
+ /// Disconnects the transport component.
+ /// </summary>
+ /// <returns></returns>
+ public override Task Disconnect()
+ {
+ ThrowFailedOrDisposed();
+
+ return Task.Factory.StartNew((Action)(() =>
+ {
+ try
+ {
+ if (State == TransportComponentState.Connected)
+ {
+ State = TransportComponentState.Disconnected;
+ _socket.GetStream().Close();
+ _socket.Close();
+ LogManager.Log("TCP adapter disconnected.");
+ Dispose();
+ }
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Could not disconnect the TCP adapter.");
+ }
+ }));
+ }
+
+ /// <summary>
+ /// Writes the specified data to the stream.
+ /// </summary>
+ /// <param name="data">The data.</param>
+ public override void Write(byte[] data)
+ {
+ ThrowFailedOrDisposed();
+
+ try
+ {
+ _socket.GetStream().Write(data, 0, data.Length);
+ }
+ catch (Exception ex)
+ {
+ OnFailed(LogManager.Log(ex));
+ }
+ }
+
+ #endregion
+
+ #region Pull Thread
+
+ private void PullThreadMethod()
+ {
+ int counter = 0;
+
+ try
+ {
+ while (State == TransportComponentState.Connected)
+ {
+ if (_socket.Available > 0)
+ {
+ byte[] data = new byte[_socket.Available];
+ _socket.GetStream().Read(data, 0, data.Length);
+ OnDataAvailable(data);
+ }
+
+ Thread.Sleep(10);
+ counter++;
+
+ if (counter >= 200)
+ {
+ try
+ {
+ if (_socket.Client.Poll(1, SelectMode.SelectRead) && _socket.Client.Available == 0)
+ {
+ OnFailed(LogManager.Log(new Exception("Client disconnected.")));
+ return;
+ }
+ }
+ catch (SocketException ex)
+ {
+ OnFailed(LogManager.Log(ex));
+ return;
+ }
+
+ counter = 0;
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ OnFailed(ex);
+ }
+ }
+
+ #endregion
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs b/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs
new file mode 100644
index 000000000..eb7820697
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs
@@ -0,0 +1,150 @@
+using System;
+using System.Collections.Generic;
+using System.IO.Ports;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Tango.Logging;
+
+namespace Tango.Transport.Adapters
+{
+ /// <summary>
+ /// Represents an <see cref="ITransportAdapter"/> which communicates over USB serial port.
+ /// </summary>
+ /// <seealso cref="Tango.Transport.TransportAdapterBase" />
+ public class UsbTransportAdapter : TransportAdapterBase
+ {
+ private SerialPort _serialPort; //Serial port instance used to communicate over the serial port.
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="UsbTransportAdapter"/> class.
+ /// </summary>
+ public UsbTransportAdapter()
+ {
+ Address = "COM1";
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="UsbTransportAdapter"/> class.
+ /// </summary>
+ /// <param name="portName">The COM.</param>
+ public UsbTransportAdapter(String portName) : base()
+ {
+ Address = portName;
+ }
+
+ /// <summary>
+ /// Connects the transport component.
+ /// </summary>
+ /// <returns></returns>
+ public override Task Connect()
+ {
+ ThrowFailedOrDisposed();
+
+ return Task.Factory.StartNew(() =>
+ {
+ try
+ {
+ if (State != TransportComponentState.Connected)
+ {
+ LogManager.Log("Connecting USB adapter on port " + Address + "...");
+
+ if (_serialPort != null)
+ {
+ _serialPort.DataReceived -= OnSerialPortDataReceived;
+ }
+
+ _serialPort = new SerialPort();
+ _serialPort.DataReceived += OnSerialPortDataReceived;
+ _serialPort.PortName = Address;
+ _serialPort.Open();
+ _serialPort.DiscardInBuffer();
+ _serialPort.DiscardOutBuffer();
+ State = TransportComponentState.Connected;
+
+ LogManager.Log("USB adapter connected.");
+ }
+ }
+ catch (Exception ex)
+ {
+ throw LogManager.Log(ex, "Could not open serial port on " + Address + ".");
+ }
+ });
+ }
+
+ /// <summary>
+ /// Disconnects the transport component.
+ /// </summary>
+ /// <returns></returns>
+ public override Task Disconnect()
+ {
+ ThrowFailedOrDisposed();
+
+ return Task.Factory.StartNew(() =>
+ {
+ try
+ {
+ if (State == TransportComponentState.Connected)
+ {
+ LogManager.Log("Disconnecting USB adapter on port " + Address + "...");
+
+ if (_serialPort != null)
+ {
+ _serialPort.DataReceived -= OnSerialPortDataReceived;
+ }
+
+ _serialPort.DiscardOutBuffer();
+ _serialPort.DiscardInBuffer();
+ _serialPort.Close();
+ State = TransportComponentState.Disconnected;
+ LogManager.Log("USB adapter disconnected.");
+ }
+ }
+ catch (Exception ex)
+ {
+ throw LogManager.Log(ex, "Could not close serial port on " + Address + ".");
+ }
+ });
+ }
+
+ /// <summary>
+ /// Writes the specified data to the stream.
+ /// </summary>
+ /// <param name="data">The data.</param>
+ public override void Write(byte[] data)
+ {
+ ThrowFailedOrDisposed();
+
+ try
+ {
+ _serialPort.Write(data, 0, data.Length);
+ }
+ catch (Exception ex)
+ {
+ OnFailed(LogManager.Log(ex));
+ }
+ }
+
+ /// <summary>
+ /// Called when internal serial port has received data.
+ /// </summary>
+ /// <param name="sender">The sender.</param>
+ /// <param name="e">The <see cref="SerialDataReceivedEventArgs"/> instance containing the event data.</param>
+ protected virtual void OnSerialPortDataReceived(object sender, SerialDataReceivedEventArgs e)
+ {
+ try
+ {
+ if (e.EventType == SerialData.Eof) return;
+ Thread.Sleep(10);
+ byte[] data = new byte[_serialPort.BytesToRead];
+ _serialPort.Read(data, 0, data.Length);
+ OnDataAvailable(data);
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Error occurred while trying to read from serial port.");
+ }
+ }
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/ITransportAdapter.cs b/Software/Visual Studio/Tango.Transport/ITransportAdapter.cs
new file mode 100644
index 000000000..b0e055cde
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/ITransportAdapter.cs
@@ -0,0 +1,33 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Reactive.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Transport
+{
+ /// <summary>
+ /// Represents a transport adapter capable of connecting, writing and receiving data from a serial stream.
+ /// </summary>
+ /// <seealso cref="Tango.Transport.ITransportComponent" />
+ public interface ITransportAdapter : ITransportComponent
+ {
+ /// <summary>
+ /// Writes the specified data to the stream.
+ /// </summary>
+ /// <param name="data">The data.</param>
+ void Write(byte[] data);
+
+ /// <summary>
+ /// Occurs when new data is available.
+ /// </summary>
+ event EventHandler<byte[]> DataAvailable;
+
+ /// <summary>
+ /// Gets or sets the channel address.
+ /// </summary>
+ String Address { get; set; }
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/ITransportComponent.cs b/Software/Visual Studio/Tango.Transport/ITransportComponent.cs
new file mode 100644
index 000000000..06207ddf9
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/ITransportComponent.cs
@@ -0,0 +1,33 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Transport
+{
+ /// <summary>
+ /// Represents a transport component.
+ /// </summary>
+ public interface ITransportComponent : IDisposable
+ {
+ /// <summary>
+ /// Connects the transport component.
+ /// </summary>
+ /// <returns></returns>
+ Task Connect();
+ /// <summary>
+ /// Disconnects the transport component.
+ /// </summary>
+ /// <returns></returns>
+ Task Disconnect();
+ /// <summary>
+ /// Occurs when component state changes.
+ /// </summary>
+ event EventHandler<TransportComponentState> StateChanged;
+ /// <summary>
+ /// Gets the component state.
+ /// </summary>
+ TransportComponentState State { get; }
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/ITransporter.cs b/Software/Visual Studio/Tango.Transport/ITransporter.cs
new file mode 100644
index 000000000..5142b68f1
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/ITransporter.cs
@@ -0,0 +1,64 @@
+using Google.Protobuf;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Reactive.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Transport.Adapters;
+using Tango.PMR;
+using Tango.PMR.Common;
+using System.Collections.ObjectModel;
+
+namespace Tango.Transport
+{
+ /// <summary>
+ /// Represents a transportation engine which can send and receive <see cref="TangoMessage{T}"/> message using one or many <see cref="ITransportAdapter">Transport adapters</see>.
+ /// </summary>
+ /// <seealso cref="Tango.Transport.ITransportComponent" />
+ public interface ITransporter : ITransportComponent
+ {
+ /// <summary>
+ /// Gets the serial adapter.
+ /// </summary>
+ ObservableCollection<ITransportAdapter> Adapters { get; }
+
+ /// <summary>
+ /// Sends a broadcast request to through all adapters.
+ /// </summary>
+ /// <typeparam name="Request">The type of the request.</typeparam>
+ /// <typeparam name="Response">The type of the response.</typeparam>
+ /// <param name="request">The request.</param>
+ /// <returns></returns>
+ Task<Response> SendRequest<Request, Response>(TangoMessage<Request> request) where Request : IMessage<Request> where Response : IMessage<Response>;
+
+ /// <summary>
+ /// Sends a request through the specified adapter.
+ /// </summary>
+ /// <typeparam name="Request">The type of the request.</typeparam>
+ /// <typeparam name="Response">The type of the response.</typeparam>
+ /// <param name="request">The request.</param>
+ /// <param name="adapter">Transport adapter</param>
+ /// <returns></returns>
+ Task<Response> SendRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter) where Request : IMessage<Request> where Response : IMessage<Response>;
+
+ /// <summary>
+ /// Sends a response.
+ /// </summary>
+ /// <typeparam name="Response">The type of the response.</typeparam>
+ /// <param name="response">The response.</param>
+ /// <returns></returns>
+ Task SendResponse<Response>(TangoMessage<Response> response) where Response : IMessage<Response>;
+
+ /// <summary>
+ /// Occurs when a new request message has been received.
+ /// </summary>
+ event EventHandler<MessageContainer> RequestReceived;
+
+ /// <summary>
+ /// Gets or sets the request timeout.
+ /// </summary>
+ TimeSpan RequestTimeout { get; set; }
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/Properties/AssemblyInfo.cs b/Software/Visual Studio/Tango.Transport/Properties/AssemblyInfo.cs
new file mode 100644
index 000000000..9a309793f
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/Properties/AssemblyInfo.cs
@@ -0,0 +1,6 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+[assembly: AssemblyTitle("Tango - Transport Components")]
+[assembly: ComVisible(false)] \ No newline at end of file
diff --git a/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj b/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj
new file mode 100644
index 000000000..f17a85752
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProjectGuid>{74E700B0-1156-4126-BE40-EE450D3C3026}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Tango.Transport</RootNamespace>
+ <AssemblyName>Tango.Transport</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>..\Build\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Google.Protobuf, Version=3.4.1.0, Culture=neutral, PublicKeyToken=a7d26565bac4d604, processorArchitecture=MSIL">
+ <HintPath>..\packages\Google.Protobuf.3.4.1\lib\net45\Google.Protobuf.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Reactive.Core, Version=3.0.1000.0, Culture=neutral, PublicKeyToken=94bc3704cddfc263, processorArchitecture=MSIL">
+ <HintPath>..\packages\System.Reactive.Core.3.1.1\lib\net45\System.Reactive.Core.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Interfaces, Version=3.0.1000.0, Culture=neutral, PublicKeyToken=94bc3704cddfc263, processorArchitecture=MSIL">
+ <HintPath>..\packages\System.Reactive.Interfaces.3.1.1\lib\net45\System.Reactive.Interfaces.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Linq, Version=3.0.1000.0, Culture=neutral, PublicKeyToken=94bc3704cddfc263, processorArchitecture=MSIL">
+ <HintPath>..\packages\System.Reactive.Linq.3.1.1\lib\net45\System.Reactive.Linq.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.PlatformServices, Version=3.0.1000.0, Culture=neutral, PublicKeyToken=94bc3704cddfc263, processorArchitecture=MSIL">
+ <HintPath>..\packages\System.Reactive.PlatformServices.3.1.1\lib\net45\System.Reactive.PlatformServices.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Windows.Threading, Version=3.0.1000.0, Culture=neutral, PublicKeyToken=94bc3704cddfc263, processorArchitecture=MSIL">
+ <HintPath>..\packages\System.Reactive.Windows.Threading.3.1.1\lib\net45\System.Reactive.Windows.Threading.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Windows" />
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Net.Http" />
+ <Reference Include="System.Xml" />
+ <Reference Include="WindowsBase" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="..\Versioning\GlobalVersionInfo.cs">
+ <Link>GlobalVersionInfo.cs</Link>
+ </Compile>
+ <Compile Include="Adapters\SplitterAdapter.cs" />
+ <Compile Include="Adapters\UsbTransportAdapter.cs" />
+ <Compile Include="ITransportComponent.cs" />
+ <Compile Include="ITransportAdapter.cs" />
+ <Compile Include="Adapters\TcpTransportAdapter.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="ITransporter.cs" />
+ <Compile Include="TransportAdapterBase.cs" />
+ <Compile Include="TransportComponentState.cs" />
+ <Compile Include="TransporterBase.cs" />
+ <Compile Include="Transporters\JsonTransporter.cs" />
+ <Compile Include="Transporters\ProtoTransporter.cs" />
+ <Compile Include="TransportMessage.cs" />
+ <Compile Include="TransportMessageBase.cs" />
+ <Compile Include="TransportMessageDirection.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Tango.Core\Tango.Core.csproj">
+ <Project>{a34ee0f0-649d-41c8-8489-b6f1cc6924ee}</Project>
+ <Name>Tango.Core</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Tango.Logging\Tango.Logging.csproj">
+ <Project>{bc932dbd-7cdb-488c-99e4-f02cf441f55e}</Project>
+ <Name>Tango.Logging</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Tango.PMR\Tango.PMR.csproj">
+ <Project>{e4927038-348d-4295-aaf4-861c58cb3943}</Project>
+ <Name>Tango.PMR</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Tango.Protobuf\Tango.Protobuf.csproj">
+ <Project>{40073806-914e-4e78-97ab-fa9639308ebe}</Project>
+ <Name>Tango.Protobuf</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+</Project> \ No newline at end of file
diff --git a/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs b/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs
new file mode 100644
index 000000000..fb442577d
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs
@@ -0,0 +1,132 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Logging;
+
+namespace Tango.Transport
+{
+ /// <summary>
+ /// Represents an <see cref="ITransportAdapter"/> base class.
+ /// </summary>
+ /// <seealso cref="Tango.Transport.ITransportAdapter" />
+ public abstract class TransportAdapterBase : ITransportAdapter
+ {
+ #region Events
+
+ /// <summary>
+ /// Occurs when component state changes.
+ /// </summary>
+ public event EventHandler<TransportComponentState> StateChanged;
+
+ /// <summary>
+ /// Occurs when new data is available.
+ /// </summary>
+ public event EventHandler<byte[]> DataAvailable;
+
+ #endregion
+
+ #region Properties
+
+ /// <summary>
+ /// Gets or sets the channel address.
+ /// </summary>
+ public String Address { get; set; }
+
+ private TransportComponentState _state;
+ /// <summary>
+ /// Gets the component state.
+ /// </summary>
+ public TransportComponentState State
+ {
+ get { return _state; }
+ protected set
+ {
+ _state = value;
+ OnStateChanged(_state);
+ }
+ }
+
+ #endregion
+
+ #region Virtual Methods
+
+ /// <summary>
+ /// Called when the adapter has failed.
+ /// </summary>
+ /// <param name="ex">The ex.</param>
+ protected virtual void OnFailed(Exception ex)
+ {
+ Disconnect().Wait();
+ State = TransportComponentState.Failed;
+ LogManager.Log(ex, "Adapter failed.");
+ }
+
+ /// <summary>
+ /// Called when there is new data available.
+ /// </summary>
+ /// <param name="data">The data.</param>
+ protected virtual void OnDataAvailable(byte[] data)
+ {
+ DataAvailable?.Invoke(this, data);
+ }
+
+ /// <summary>
+ /// Called when the adapter state has changed.
+ /// </summary>
+ /// <param name="state">The state.</param>
+ protected virtual void OnStateChanged(TransportComponentState state)
+ {
+ StateChanged?.Invoke(this, state);
+ }
+
+ /// <summary>
+ /// Throws an exception if adapter is in a failed or disposed state.
+ /// </summary>
+ protected virtual void ThrowFailedOrDisposed()
+ {
+ if (State == TransportComponentState.Failed || State == TransportComponentState.Disposed)
+ {
+ throw new ObjectDisposedException("The adapter is in a " + State + " state.");
+ }
+ }
+
+ #endregion
+
+ #region Dispose
+
+ /// <summary>
+ /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
+ /// </summary>
+ public virtual void Dispose()
+ {
+ Disconnect().Wait();
+ State = TransportComponentState.Disposed;
+ }
+
+ #endregion
+
+ #region Abstract Methods
+
+ /// <summary>
+ /// Writes the specified data to the stream.
+ /// </summary>
+ /// <param name="data">The data.</param>
+ public abstract void Write(byte[] data);
+
+ /// <summary>
+ /// Connects the transport component.
+ /// </summary>
+ /// <returns></returns>
+ public abstract Task Connect();
+
+ /// <summary>
+ /// Disconnects the transport component.
+ /// </summary>
+ /// <returns></returns>
+ public abstract Task Disconnect();
+
+ #endregion
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/TransportComponentState.cs b/Software/Visual Studio/Tango.Transport/TransportComponentState.cs
new file mode 100644
index 000000000..6764876a0
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/TransportComponentState.cs
@@ -0,0 +1,31 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Transport
+{
+ /// <summary>
+ /// Represents a <see cref="ITransportComponent"/> state.
+ /// </summary>
+ public enum TransportComponentState
+ {
+ /// <summary>
+ /// Disconnected.
+ /// </summary>
+ Disconnected,
+ /// <summary>
+ /// Started.
+ /// </summary>
+ Connected,
+ /// <summary>
+ /// Failed.
+ /// </summary>
+ Failed,
+ /// <summary>
+ /// Disposed.
+ /// </summary>
+ Disposed,
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/TransportMessage.cs b/Software/Visual Studio/Tango.Transport/TransportMessage.cs
new file mode 100644
index 000000000..e10c736a5
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/TransportMessage.cs
@@ -0,0 +1,49 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Transport
+{
+ /// <summary>
+ /// Represents a generic transport message.
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <seealso cref="Tango.Transport.TransportMessageBase" />
+ public class TransportMessage<T> : TransportMessageBase
+ {
+ private TaskCompletionSource<T> _completionSource;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TransportMessage{T}"/> class.
+ /// </summary>
+ /// <param name="token">The token.</param>
+ /// <param name="message">The message.</param>
+ /// <param name="direction">The direction.</param>
+ /// <param name="toBytes">To bytes.</param>
+ /// <param name="completionSource">The completion source.</param>
+ public TransportMessage(ITransportAdapter adapter, string token, object message, TransportMessageDirection direction, Func<byte[]> toBytes, TaskCompletionSource<T> completionSource) : base(adapter, token, message, direction, toBytes)
+ {
+ _completionSource = completionSource;
+ }
+
+ /// <summary>
+ /// Notifies the message observer of the new result.
+ /// </summary>
+ /// <param name="result">The result.</param>
+ public override void SetResult(object result)
+ {
+ _completionSource.SetResult((T)result);
+ }
+
+ /// <summary>
+ /// Notifies the message observer of an exception.
+ /// </summary>
+ /// <param name="ex">The ex.</param>
+ public override void SetException(Exception ex)
+ {
+ _completionSource.SetException(ex);
+ }
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs b/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs
new file mode 100644
index 000000000..c20b11555
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs
@@ -0,0 +1,67 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Transport
+{
+ /// <summary>
+ /// Represents a base transport message.
+ /// </summary>
+ public abstract class TransportMessageBase
+ {
+ /// <summary>
+ /// Gets or sets the source/destination adapter.
+ /// </summary>
+ public ITransportAdapter Adapter { get; set; }
+
+ /// <summary>
+ /// Gets or sets the message token.
+ /// </summary>
+ public String Token { get; set; }
+
+ /// <summary>
+ /// Gets or sets the message direction.
+ /// </summary>
+ public TransportMessageDirection Direction { get; set; }
+
+ /// <summary>
+ /// Gets or sets method to serial the message to byte array.
+ /// </summary>
+ public Func<byte[]> Serialize { get; set; }
+
+ /// <summary>
+ /// Gets or sets the message.
+ /// </summary>
+ public Object Message { get; set; }
+
+ /// <summary>
+ /// Notifies the message observer of the new result.
+ /// </summary>
+ /// <param name="result">The result.</param>
+ public abstract void SetResult(object result);
+
+ /// <summary>
+ /// Notifies the message observer of an exception.
+ /// </summary>
+ /// <param name="ex">The ex.</param>
+ public abstract void SetException(Exception ex);
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TransportMessageBase"/> class.
+ /// </summary>
+ /// <param name="token">The token.</param>
+ /// <param name="message">The message.</param>
+ /// <param name="direction">The direction.</param>
+ /// <param name="toBytes">To bytes.</param>
+ public TransportMessageBase(ITransportAdapter adapter, String token, object message, TransportMessageDirection direction, Func<byte[]> toBytes)
+ {
+ Adapter = adapter;
+ Token = token;
+ Message = message;
+ Direction = direction;
+ Serialize = toBytes;
+ }
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/TransportMessageDirection.cs b/Software/Visual Studio/Tango.Transport/TransportMessageDirection.cs
new file mode 100644
index 000000000..6802afbdf
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/TransportMessageDirection.cs
@@ -0,0 +1,23 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Transport
+{
+ /// <summary>
+ /// Represents <see cref="TransportMessageBase"/> direction.
+ /// </summary>
+ public enum TransportMessageDirection
+ {
+ /// <summary>
+ /// Request.
+ /// </summary>
+ Request,
+ /// <summary>
+ /// Response.
+ /// </summary>
+ Response,
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/TransporterBase.cs b/Software/Visual Studio/Tango.Transport/TransporterBase.cs
new file mode 100644
index 000000000..9b27b860d
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/TransporterBase.cs
@@ -0,0 +1,454 @@
+using Google.Protobuf;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Tango.Logging;
+using Tango.PMR;
+using Tango.PMR.Common;
+
+namespace Tango.Transport
+{
+ /// <summary>
+ /// Represents an <see cref="ITransporter"/> base class.
+ /// </summary>
+ /// <seealso cref="Tango.Transport.ITransporter" />
+ public abstract class TransporterBase : ITransporter
+ {
+ private ConcurrentQueue<TransportMessageBase> _sendingQueue;
+ private List<TransportMessageBase> _pendingRequests;
+ private ConcurrentQueue<KeyValuePair<ITransportAdapter, byte[]>> _arrivedResponses;
+ private Thread _pushThread;
+ private Thread _pullThread;
+ private ObservableCollection<ITransportAdapter> _adapters;
+ private Dictionary<String, ITransportAdapter> _tokenAdapters;
+
+ #region Events
+
+ /// <summary>
+ /// Occurs when a new request message has been received.
+ /// </summary>
+ public event EventHandler<MessageContainer> RequestReceived;
+
+ /// <summary>
+ /// Occurs when component state changes.
+ /// </summary>
+ public event EventHandler<TransportComponentState> StateChanged;
+
+ #endregion
+
+ #region Properties
+
+ /// <summary>
+ /// Gets the serial adapter.
+ /// </summary>
+ public ObservableCollection<ITransportAdapter> Adapters
+ {
+ get { return _adapters; }
+ set
+ {
+ if (_adapters != null)
+ {
+ _adapters.CollectionChanged -= OnAdaptersCollectionChanged;
+ }
+
+ _adapters = value;
+
+ if (_adapters != null)
+ {
+ _adapters.CollectionChanged += OnAdaptersCollectionChanged;
+ }
+ }
+ }
+
+ private TransportComponentState _state;
+ /// <summary>
+ /// Gets the component state.
+ /// </summary>
+ public TransportComponentState State
+ {
+ get { return _state; }
+ set
+ {
+ _state = value;
+ OnStateChanged(_state);
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the request timeout.
+ /// </summary>
+ public TimeSpan RequestTimeout { get; set; }
+
+ #endregion
+
+ #region Virtual Methods
+
+ /// <summary>
+ /// Called when the collection of adapters has changed.
+ /// </summary>
+ /// <param name="sender">The sender.</param>
+ /// <param name="e">The <see cref="System.Collections.Specialized.NotifyCollectionChangedEventArgs"/> instance containing the event data.</param>
+ protected virtual void OnAdaptersCollectionChanged(object sender, System.Collections.Specialized.NotifyCollectionChangedEventArgs e)
+ {
+ foreach (var ad in _adapters)
+ {
+ ad.StateChanged -= OnAdapterStateChanged;
+ ad.StateChanged += OnAdapterStateChanged;
+ ad.DataAvailable -= OnAdapterDataAvailable;
+ ad.DataAvailable += OnAdapterDataAvailable;
+ }
+ }
+
+ /// <summary>
+ /// Called when the adapter has changed.
+ /// </summary>
+ /// <param name="adapter">The adapter.</param>
+ protected virtual void OnAdapterChanged(ITransportAdapter adapter)
+ {
+ if (State == TransportComponentState.Connected)
+ {
+ Connect().Wait();
+ }
+ }
+
+ /// <summary>
+ /// Called when the current adapter state has changed.
+ /// </summary>
+ /// <param name="sender">The sender.</param>
+ /// <param name="e">The e.</param>
+ protected virtual void OnAdapterStateChanged(object sender, TransportComponentState e)
+ {
+ if (e == TransportComponentState.Failed)
+ {
+ Disconnect().Wait();
+ }
+ }
+
+ /// <summary>
+ /// Called when there is data available from the adapter.
+ /// </summary>
+ /// <param name="sender">The sender.</param>
+ /// <param name="data">The data.</param>
+ protected virtual void OnAdapterDataAvailable(object sender, byte[] data)
+ {
+ _arrivedResponses.Enqueue(new KeyValuePair<ITransportAdapter, byte[]>(sender as ITransportAdapter, data));
+ }
+
+ /// <summary>
+ /// Called when the component has failed.
+ /// </summary>
+ /// <param name="ex">The ex.</param>
+ protected virtual void OnFailed(Exception ex)
+ {
+ Disconnect().Wait();
+ State = TransportComponentState.Failed;
+ LogManager.Log(ex, "Transporter failed.");
+ }
+
+ /// <summary>
+ /// Called when a new request has been received.
+ /// </summary>
+ /// <param name="request">The request.</param>
+ protected virtual void OnRequestReceived(MessageContainer request)
+ {
+ RequestReceived?.Invoke(this, request);
+ }
+
+ /// <summary>
+ /// Called when the component state has changed.
+ /// </summary>
+ /// <param name="state">The state.</param>
+ protected virtual void OnStateChanged(TransportComponentState state)
+ {
+ StateChanged?.Invoke(this, state);
+ }
+
+ /// <summary>
+ /// Override in order to provide a method serializer for <see cref="TangoMessage{T}"/>.
+ /// </summary>
+ /// <typeparam name="Request">The type of the request.</typeparam>
+ /// <param name="message">The message.</param>
+ /// <returns></returns>
+ protected virtual Func<byte[]> OnSerializeingMessage<Request>(TangoMessage<Request> message) where Request : IMessage<Request>
+ {
+ return message.ToBytes;
+ }
+
+ /// <summary>
+ /// Override in order to provide a deserialized message container part of the message.
+ /// </summary>
+ /// <param name="data">The data.</param>
+ /// <returns></returns>
+ protected virtual MessageContainer OnParseContainerPartial(byte[] data)
+ {
+ return MessageFactory.ParseContainerPartial(data);
+ }
+
+ /// <summary>
+ /// Override in order to provide a deserialized message from a container.
+ /// </summary>
+ /// <param name="container">The container.</param>
+ /// <returns></returns>
+ protected virtual IMessage OnParseContainer(MessageContainer container)
+ {
+ return MessageFactory.ParseContainer(container);
+ }
+
+ #endregion
+
+ #region Constructors
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TransporterBase"/> class.
+ /// </summary>
+ public TransporterBase()
+ {
+ Adapters = new ObservableCollection<ITransportAdapter>();
+ _tokenAdapters = new Dictionary<string, ITransportAdapter>();
+ _sendingQueue = new ConcurrentQueue<TransportMessageBase>();
+ _pendingRequests = new List<TransportMessageBase>();
+ _arrivedResponses = new ConcurrentQueue<KeyValuePair<ITransportAdapter, byte[]>>();
+ RequestTimeout = TimeSpan.FromSeconds(5);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TransporterBase"/> class.
+ /// </summary>
+ /// <param name="adapter">The transport adapter.</param>
+ public TransporterBase(ITransportAdapter adapter) : this()
+ {
+ Adapters.Add(adapter);
+ }
+
+ #endregion
+
+ #region Public Methods
+
+ /// <summary>
+ /// Connects the transport component.
+ /// </summary>
+ /// <returns></returns>
+ public async Task Connect()
+ {
+ State = TransportComponentState.Connected;
+ StartThreads();
+ await Task.WhenAll(Adapters.Select(x => x.Connect()));
+ LogManager.Log("Transporter Connected...");
+ }
+
+ /// <summary>
+ /// Disconnects the transport component.
+ /// </summary>
+ /// <returns></returns>
+ public async Task Disconnect()
+ {
+ State = TransportComponentState.Disconnected;
+ await Task.WhenAll(Adapters.Select(x => x.Disconnect()));
+ LogManager.Log("Transporter Disconnected...");
+ }
+
+ /// <summary>
+ /// Sends a broadcast request to through all adapters.
+ /// </summary>
+ /// <typeparam name="Request">The type of the request.</typeparam>
+ /// <typeparam name="Response">The type of the response.</typeparam>
+ /// <param name="request">The request.</param>
+ /// <returns></returns>
+ public Task<Response> SendRequest<Request, Response>(TangoMessage<Request> request) where Request : IMessage<Request> where Response : IMessage<Response>
+ {
+ return SendRequest<Request, Response>(request, null);
+ }
+
+ /// <summary>
+ /// Sends a request through the specified adapter.
+ /// </summary>
+ /// <typeparam name="Request">The type of the request.</typeparam>
+ /// <typeparam name="Response">The type of the response.</typeparam>
+ /// <param name="request">The request.</param>
+ /// <param name="adapter">Transport adapter</param>
+ /// <returns></returns>
+ public Task<Response> SendRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter) where Request : IMessage<Request> where Response : IMessage<Response>
+ {
+ TaskCompletionSource<Response> source = new TaskCompletionSource<Response>();
+ TransportMessage<Response> message = new TransportMessage<Response>(adapter, request.Container.Token, request, TransportMessageDirection.Request, OnSerializeingMessage(request), source);
+ _sendingQueue.Enqueue(message);
+ Task.Delay(RequestTimeout).ContinueWith((x) => { if (!source.Task.IsCompleted) source.SetException(new TimeoutException()); });
+ return source.Task;
+ }
+
+ /// <summary>
+ /// Sends a response.
+ /// </summary>
+ /// <typeparam name="Response">The type of the response.</typeparam>
+ /// <param name="response">The response.</param>
+ /// <returns></returns>
+ public Task SendResponse<Response>(TangoMessage<Response> response) where Response : IMessage<Response>
+ {
+ ITransportAdapter adapter = null;
+
+ if (_tokenAdapters.TryGetValue(response.Container.Token, out adapter))
+ {
+ _tokenAdapters.Remove(response.Container.Token);
+ }
+
+ TaskCompletionSource<object> source = new TaskCompletionSource<object>();
+ TransportMessage<object> message = new TransportMessage<object>(adapter, response.Container.Token, response, TransportMessageDirection.Response, OnSerializeingMessage(response), source);
+ _sendingQueue.Enqueue(message);
+ return source.Task;
+ }
+
+ #endregion
+
+ #region Private Methods
+
+ /// <summary>
+ /// Starts the pull and push threads.
+ /// </summary>
+ private void StartThreads()
+ {
+ _pullThread = new Thread(PullThreadMethod);
+ _pullThread.IsBackground = true;
+ _pullThread.Start();
+
+ _pushThread = new Thread(PushThreadMethod);
+ _pushThread.IsBackground = true;
+ _pushThread.Start();
+ }
+
+ #endregion
+
+ #region Push Thread
+
+ /// <summary>
+ /// Push thread loop.
+ /// </summary>
+ private void PushThreadMethod()
+ {
+ try
+ {
+ while (State == TransportComponentState.Connected)
+ {
+ if (_sendingQueue.Count > 0)
+ {
+ TransportMessageBase message;
+ if (_sendingQueue.TryDequeue(out message))
+ {
+ try
+ {
+ if (message.Adapter == null)
+ {
+ foreach (var adapter in Adapters.Where(x => x.State == TransportComponentState.Connected))
+ {
+ adapter.Write(message.Serialize());
+ }
+ }
+ else
+ {
+ message.Adapter.Write(message.Serialize());
+ }
+
+ if (message.Direction == TransportMessageDirection.Request)
+ {
+ _pendingRequests.Add(message);
+ }
+ else
+ {
+ message.SetResult(true);
+ }
+ }
+ catch (Exception ex)
+ {
+ message.SetException(ex);
+ }
+ }
+ }
+
+ Thread.Sleep(10);
+ }
+ }
+ catch (Exception ex)
+ {
+ OnFailed(ex);
+ }
+ }
+
+ #endregion
+
+ #region Pull Thread
+
+ /// <summary>
+ /// Pull thread loop.
+ /// </summary>
+ private void PullThreadMethod()
+ {
+ try
+ {
+ while (State == TransportComponentState.Connected)
+ {
+ KeyValuePair<ITransportAdapter, byte[]> data;
+
+ if (_arrivedResponses.Count > 0)
+ {
+ if (_arrivedResponses.TryDequeue(out data))
+ {
+ MessageContainer container = OnParseContainerPartial(data.Value);
+ TransportMessageBase request = _pendingRequests.SingleOrDefault(x => x.Token == container.Token);
+
+ if (request != null)
+ {
+ _pendingRequests.Remove(request);
+
+ try
+ {
+ request.SetResult(OnParseContainer(container));
+ }
+ catch (Exception ex)
+ {
+ request.SetException(ex);
+ }
+ }
+ else
+ {
+ try
+ {
+ _tokenAdapters.Add(container.Token, data.Key);
+ OnRequestReceived(container);
+ }
+ catch (Exception ex)
+ {
+ throw ex;
+ }
+ }
+ }
+ }
+
+ Thread.Sleep(10);
+ }
+ }
+ catch (Exception ex)
+ {
+ OnFailed(ex);
+ }
+ }
+
+ #endregion
+
+ #region Dispose
+
+ /// <summary>
+ /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
+ /// </summary>
+ public void Dispose()
+ {
+ Disconnect().Wait();
+ State = TransportComponentState.Disposed;
+ }
+
+ #endregion
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/Transporters/JsonTransporter.cs b/Software/Visual Studio/Tango.Transport/Transporters/JsonTransporter.cs
new file mode 100644
index 000000000..67756d39c
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/Transporters/JsonTransporter.cs
@@ -0,0 +1,60 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Tango.PMR;
+using Tango.PMR.Common;
+
+namespace Tango.Transport.Transporters
+{
+ /// <summary>
+ /// Represents an <see cref="ITransporter"/> which send and receive <see cref="TangoMessage{T}"/> messages using JSON formatted strings.
+ /// </summary>
+ /// <seealso cref="Tango.Transport.TransporterBase" />
+ public class JsonTransporter : TransporterBase
+ {
+ #region Constructors
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="JsonTransporter"/> class.
+ /// </summary>
+ public JsonTransporter() : base()
+ {
+
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="JsonTransporter"/> class.
+ /// </summary>
+ /// <param name="adapter">The transport adapter.</param>
+ public JsonTransporter(ITransportAdapter adapter) : base(adapter)
+ {
+
+ }
+
+ #endregion
+
+ /// <summary>
+ /// Override in order to provide a method serializer for <see cref="TangoMessage{T}" />.
+ /// </summary>
+ /// <typeparam name="Request">The type of the request.</typeparam>
+ /// <param name="message">The message.</param>
+ /// <returns></returns>
+ protected override Func<byte[]> OnSerializeingMessage<Request>(TangoMessage<Request> message)
+ {
+ return message.ToJsonBytes;
+ }
+
+ /// <summary>
+ /// Override in order to provide a deserialized message container part of the message.
+ /// </summary>
+ /// <param name="data">The data.</param>
+ /// <returns></returns>
+ protected override MessageContainer OnParseContainerPartial(byte[] data)
+ {
+ return MessageFactory.ParseContainerPartialJson(data);
+ }
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/Transporters/ProtoTransporter.cs b/Software/Visual Studio/Tango.Transport/Transporters/ProtoTransporter.cs
new file mode 100644
index 000000000..7c82639d2
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/Transporters/ProtoTransporter.cs
@@ -0,0 +1,47 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Tango.Transport.Adapters;
+using Tango.PMR;
+using Tango.PMR.Common;
+using System.Collections.Concurrent;
+using System.Reactive.Subjects;
+using System.Threading;
+using System.Reactive;
+using System.Reactive.Disposables;
+using Tango.Logging;
+
+namespace Tango.Transport.Transporters
+{
+ /// <summary>
+ /// Represents an <see cref="ITransporter"/> which send and receive <see cref="TangoMessage{T}"/> messages using Protobuf binary data.
+ /// </summary>
+ /// <seealso cref="Tango.Transport.TransporterBase" />
+ public class ProtoTransporter : TransporterBase
+ {
+ #region Constructors
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ProtoTransporter"/> class.
+ /// </summary>
+ public ProtoTransporter() : base()
+ {
+
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ProtoTransporter"/> class.
+ /// </summary>
+ /// <param name="adapter">The transport adapter.</param>
+ public ProtoTransporter(ITransportAdapter adapter) : base(adapter)
+ {
+
+ }
+
+ #endregion
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/packages.config b/Software/Visual Studio/Tango.Transport/packages.config
new file mode 100644
index 000000000..5a7de5f4e
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/packages.config
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="utf-8"?>
+<packages>
+ <package id="Google.Protobuf" version="3.4.1" targetFramework="net45" />
+ <package id="System.Reactive" version="3.1.1" targetFramework="net45" />
+ <package id="System.Reactive.Core" version="3.1.1" targetFramework="net45" />
+ <package id="System.Reactive.Interfaces" version="3.1.1" targetFramework="net45" />
+ <package id="System.Reactive.Linq" version="3.1.1" targetFramework="net45" />
+ <package id="System.Reactive.PlatformServices" version="3.1.1" targetFramework="net45" />
+ <package id="System.Reactive.Windows.Threading" version="3.1.1" targetFramework="net45" />
+</packages> \ No newline at end of file
diff --git a/Software/Visual Studio/Tango.sln b/Software/Visual Studio/Tango.sln
index e1d820bea..d217cf754 100644
--- a/Software/Visual Studio/Tango.sln
+++ b/Software/Visual Studio/Tango.sln
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
-VisualStudioVersion = 15.0.26430.16
+VisualStudioVersion = 15.0.26430.14
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tango.Protobuf", "Tango.Protobuf\Tango.Protobuf.csproj", "{40073806-914E-4E78-97AB-FA9639308EBE}"
EndProject
@@ -35,6 +35,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tango.Emulator", "Tango.Emu
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tango.Integration", "Tango.Integration\Tango.Integration.csproj", "{B6182925-8864-401F-A390-6EFF737C4FC7}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tango.Transport", "Tango.Transport\Tango.Transport.csproj", "{74E700B0-1156-4126-BE40-EE450D3C3026}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -81,6 +83,10 @@ Global
{B6182925-8864-401F-A390-6EFF737C4FC7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B6182925-8864-401F-A390-6EFF737C4FC7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B6182925-8864-401F-A390-6EFF737C4FC7}.Release|Any CPU.Build.0 = Release|Any CPU
+ {74E700B0-1156-4126-BE40-EE450D3C3026}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {74E700B0-1156-4126-BE40-EE450D3C3026}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {74E700B0-1156-4126-BE40-EE450D3C3026}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {74E700B0-1156-4126-BE40-EE450D3C3026}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE