aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoy <roy.mail.net@gmail.com>2018-01-02 22:26:11 +0200
committerRoy <roy.mail.net@gmail.com>2018-01-02 22:26:11 +0200
commit0ab7e3d35c01eaaa6ebf03225971909bea365597 (patch)
tree03394eaeb59e077821da0859e1e4f184af209adb
parenta894d541baa4f89f1b0f31c88b773ea4b36db323 (diff)
downloadTango-0ab7e3d35c01eaaa6ebf03225971909bea365597.tar.gz
Tango-0ab7e3d35c01eaaa6ebf03225971909bea365597.zip
Refactored Transporter Adapters collection to single adapter !
-rw-r--r--Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/services/ExternalBridgeService.java16
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java58
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java29
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java4
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java25
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java225
-rw-r--r--Software/DB/Tango.mdfbin75497472 -> 75497472 bytes
-rw-r--r--Software/DB/Tango_log.ldfbin8388608 -> 8388608 bytes
-rw-r--r--Software/Visual_Studio/MachineStudio/Modules/Tango.MachineStudio.Stubs/ViewModels/MainViewVM.cs2
-rw-r--r--Software/Visual_Studio/Tango.Integration/Services/ExternalBridgeClient.cs3
-rw-r--r--Software/Visual_Studio/Tango.Transport/ITransporter.cs41
-rw-r--r--Software/Visual_Studio/Tango.Transport/PendingResponse.cs8
-rw-r--r--Software/Visual_Studio/Tango.Transport/TransportMessage.cs2
-rw-r--r--Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs8
-rw-r--r--Software/Visual_Studio/Tango.Transport/TransporterBase.cs164
-rw-r--r--Software/Visual_Studio/Utilities/Tango.MachineEM.UI/ViewModels/MainViewVM.cs4
-rw-r--r--Software/Visual_Studio/Web/Tango.MachineService/App_Data/Tango.dbbin557056 -> 602112 bytes
17 files changed, 151 insertions, 438 deletions
diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/services/ExternalBridgeService.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/services/ExternalBridgeService.java
index 99e2293ae..4788a8082 100644
--- a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/services/ExternalBridgeService.java
+++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/services/ExternalBridgeService.java
@@ -5,8 +5,6 @@ import android.net.DhcpInfo;
import android.net.wifi.WifiManager;
import android.os.StrictMode;
import android.os.SystemClock;
-import android.provider.SyncStateContract.Constants;
-import android.util.Log;
import com.elvishew.xlog.XLog;
import com.google.protobuf.GeneratedMessageV3;
@@ -21,7 +19,6 @@ import com.twine.tango.pmr.common.MessageTypeOuterClass.MessageType;
import com.twine.tango.pmr.integration.ExternalBridgeUdpDiscoveryPacketOuterClass.ExternalBridgeUdpDiscoveryPacket;
import com.twine.tango.pmr.integration.ExternalClientLoginRequestOuterClass.ExternalClientLoginRequest;
import com.twine.tango.pmr.integration.ExternalClientLoginResponseOuterClass.ExternalClientLoginResponse;
-import com.twine.tango.transport.ITransportAdapter;
import com.twine.tango.transport.ResponseErrorException;
import com.twine.tango.transport.adapters.TcpTransportAdapter;
import com.twine.tango.transport.transporters.ProtoTransporter;
@@ -170,14 +167,13 @@ public class ExternalBridgeService extends ProtoTransporter implements IExternal
if (newSocket != null)
{
- for (ITransportAdapter adapter : getAdapters())
+ if (getAdapter() != null)
{
- adapter.disconnect().blockingAwait();
+ getAdapter().disconnect().blockingAwait();
}
- getAdapters().clear();
TcpTransportAdapter adapter = new TcpTransportAdapter(newSocket);
adapter.setAuthenticated(false);
- getAdapters().add(adapter);
+ setAdapter(adapter);
connect().blockingAwait();
}
@@ -205,7 +201,7 @@ public class ExternalBridgeService extends ProtoTransporter implements IExternal
ExternalClientLoginRequest request = MessageFactory.parseMessageFromContainer(container);
if (request.getPassword().equals("Aa123456")) //TODO: Compare with global machine settings password...
{
- getAdapters().get(0).setAuthenticated(true);
+ getAdapter().setAuthenticated(true);
sendResponse(MessageFactory.createTangoMessage(
ExternalClientLoginResponse.class,
ExternalClientLoginResponse.newBuilder().setAuthenticated(true).build(),
@@ -213,7 +209,7 @@ public class ExternalBridgeService extends ProtoTransporter implements IExternal
}
else
{
- getAdapters().get(0).setAuthenticated(false);
+ getAdapter().setAuthenticated(false);
sendResponse(MessageFactory.createTangoMessage(
ExternalClientLoginResponse.class,
ExternalClientLoginResponse.newBuilder().setAuthenticated(false).build(),
@@ -223,7 +219,7 @@ public class ExternalBridgeService extends ProtoTransporter implements IExternal
return;
}
- if (getAdapters().get(0).isAuthenticated())
+ if (getAdapter().isAuthenticated())
{
GeneratedMessageV3 message = MessageFactory.parseMessageFromContainerAgnostic(container);
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java
index 82328ef18..5531f69c9 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java
@@ -20,32 +20,28 @@ public interface ITransporter extends ITransportComponent
{
/**
- * Gets the collection of adapters.
+ * Gets the adapter.
*
- * @return the adapters
+ * @return the adapter
*/
- ObservableCollection<ITransportAdapter> getAdapters();
+ ITransportAdapter getAdapter();
/**
- * Send a request to all adapters.
+ * Sets the adapter.
*
- * @param <Request> the type parameter
- * @param <Response> the type parameter
- * @param request the request
- * @return the single
+ * @param adapter the adapter
*/
- <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<TangoMessage<Response>> sendRequest(TangoMessage<Request> request);
+ void setAdapter(ITransportAdapter adapter);
/**
- * Sends a request on the specified adapter.
+ * Send a request to all adapters.
*
* @param <Request> the type parameter
* @param <Response> the type parameter
* @param request the request
- * @param adapter the adapter
* @return the single
*/
- <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<TangoMessage<Response>> sendRequest(TangoMessage<Request> request, ITransportAdapter adapter);
+ <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<TangoMessage<Response>> sendRequest(TangoMessage<Request> request);
/**
* Send a request to all adapters.
@@ -59,18 +55,6 @@ public interface ITransporter extends ITransportComponent
<Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<TangoMessage<Response>> sendRequest(TangoMessage<Request> request, Period timeout);
/**
- * Sends a request on the specified adapter.
- *
- * @param <Request> the type parameter
- * @param <Response> the type parameter
- * @param request the request
- * @param adapter the adapter
- * @param timeout the timeout
- * @return the single
- */
- <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<TangoMessage<Response>> sendRequest(TangoMessage<Request> request, ITransportAdapter adapter, Period timeout);
-
- /**
* Sends a continuous request on all adapters.
*
* @param <Request> the type parameter
@@ -81,17 +65,6 @@ public interface ITransporter extends ITransportComponent
<Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<TangoMessage<Response>> sendContinuousRequest(TangoMessage<Request> request);
/**
- * Sends a continuous request on the specified adapter.
- *
- * @param <Request> the type parameter
- * @param <Response> the type parameter
- * @param request the request
- * @param adapter the adapter
- * @return the observable
- */
- <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<TangoMessage<Response>> sendContinuousRequest(TangoMessage<Request> request, ITransportAdapter adapter);
-
- /**
* Sends a response.
*
* @param <Response> the type parameter
@@ -158,19 +131,4 @@ public interface ITransporter extends ITransportComponent
* @return the fails with adapter
*/
boolean getFailsWithAdapter();
-
- /**
- * Sets a value indicating whether the transporter should remove an adapter when it fails.
- *
- * @param value the value
- */
- void setRemoveAdaptersOnFailed(boolean value);
-
- /**
- * Gets a value indicating whether the transporter should remove an adapter when it fails.
- *
- * @return the remove adapters on fail
- */
- boolean getRemoveAdaptersOnFail();
-
}
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java
index bb4f2505a..91106aba0 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java
@@ -4,30 +4,10 @@ package com.twine.tango.transport;
/**
* Represents a pending response waiting to be returned to the request sender.
*/
-public class PendingResponse {
-
- private ITransportAdapter adapter;
- private boolean isContinuous;
-
- /**
- * Gets adapter.
- *
- * @return the adapter
- */
- public ITransportAdapter getAdapter()
- {
- return adapter;
- }
+public class PendingResponse
+{
- /**
- * Sets adapter.
- *
- * @param adapter the adapter
- */
- public void setAdapter(ITransportAdapter adapter)
- {
- this.adapter = adapter;
- }
+ private boolean isContinuous;
/**
* Is continuous boolean.
@@ -55,9 +35,8 @@ public class PendingResponse {
* @param adapter the adapter
* @param isContinuous is continuous
*/
- public PendingResponse(ITransportAdapter adapter, boolean isContinuous)
+ public PendingResponse(boolean isContinuous)
{
- this.adapter = adapter;
this.isContinuous = isContinuous;
}
}
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java
index 2acf2da85..a787d62ff 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java
@@ -25,9 +25,9 @@ public class TransportMessage<T> extends TransportMessageBase
* @param serialize the serialize
* @param publishSubject the publish subject
*/
- public TransportMessage(ITransportAdapter adapter, String token, Object message, TransportMessageDirection direction, Func<byte[]> serialize, PublishSubject<T> publishSubject)
+ public TransportMessage(String token, Object message, TransportMessageDirection direction, Func<byte[]> serialize, PublishSubject<T> publishSubject)
{
- super(adapter, token, message, direction, serialize);
+ super(token, message, direction, serialize);
this.publishSubject = publishSubject;
}
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java
index 718a1c66b..49d2fd31a 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java
@@ -8,8 +8,6 @@ import com.twine.tango.core.Func;
*/
public abstract class TransportMessageBase
{
-
- private ITransportAdapter adapter;
private boolean isContinuous;
private String token;
private TransportMessageDirection direction;
@@ -32,26 +30,6 @@ public abstract class TransportMessageBase
protected abstract void setException(Exception error);
/**
- * Gets adapter.
- *
- * @return the adapter
- */
- public ITransportAdapter getAdapter()
- {
- return adapter;
- }
-
- /**
- * Sets adapter.
- *
- * @param adapter the adapter
- */
- public void setAdapter(ITransportAdapter adapter)
- {
- this.adapter = adapter;
- }
-
- /**
* Is continuous boolean.
*
* @return the boolean
@@ -160,9 +138,8 @@ public abstract class TransportMessageBase
* @param direction the direction
* @param serialize the serialize
*/
- public TransportMessageBase(ITransportAdapter adapter, String token, Object message, TransportMessageDirection direction, Func<byte[]> serialize)
+ public TransportMessageBase(String token, Object message, TransportMessageDirection direction, Func<byte[]> serialize)
{
- this.adapter = adapter;
this.token = token;
this.direction = direction;
this.serialize = serialize;
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java
index 92ecf1220..8901b4c20 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java
@@ -2,16 +2,14 @@ package com.twine.tango.transport;
import android.os.AsyncTask;
import android.os.SystemClock;
-import android.util.Pair;
import com.elvishew.xlog.XLog;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.InvalidProtocolBufferException;
import com.twine.tango.core.Event;
-import com.twine.tango.core.IEventHandler;
import com.twine.tango.core.Func;
+import com.twine.tango.core.IEventHandler;
import com.twine.tango.core.ObjectDisposedException;
-import com.twine.tango.core.ObservableCollection;
import com.twine.tango.pmr.MessageFactory;
import com.twine.tango.pmr.TangoMessage;
import com.twine.tango.pmr.common.ErrorCodeOuterClass.ErrorCode;
@@ -47,17 +45,16 @@ public abstract class TransporterBase implements ITransporter
private ConcurrentLinkedQueue<TransportMessageBase> sendingQueue;
private List<TransportMessageBase> pendingRequests;
- private ConcurrentLinkedQueue<Pair<ITransportAdapter, byte[]>> arrivedResponses;
+ private ConcurrentLinkedQueue<byte[]> arrivedResponses;
private Thread pushThread;
private Thread pullThread;
private Thread keepAliveThread;
- private ObservableCollection<ITransportAdapter> adapters;
+ private ITransportAdapter adapter;
private Map<String, PendingResponse> pendingResponses;
private TransportComponentState state;
private Period requestTimeout;
private boolean useKeepAlive;
private boolean failsWithAdapter;
- private boolean removeAdaptersOnFailed;
//region Events
@@ -69,29 +66,30 @@ public abstract class TransporterBase implements ITransporter
//region Properties
@Override
- public ObservableCollection<ITransportAdapter> getAdapters()
+ public ITransportAdapter getAdapter()
{
- return adapters;
+ return adapter;
}
- /**
- * Sets adapters.
- *
- * @param adapters the adapters
- */
- protected void setAdapters(ObservableCollection<ITransportAdapter> adapters)
+ @Override
+ public void setAdapter(ITransportAdapter adapter)
{
+ this.adapter = adapter;
- if (this.adapters != null)
- {
- adapters.clearOnChangeListener();
- }
-
- this.adapters = adapters;
+ XLog.i("Adapter changed:");
- if (this.adapters != null)
+ if (this.adapter != null)
{
- adapters.setOnChangeListener(this::onAdaptersChanged);
+ XLog.i(adapter.getClass().getName() + ", " + adapter.getAddress() + ", " + adapter.getState());
+
+ adapter.removeStateChangedListener(this::onAdapterStateChanged);
+ adapter.addStateChangedListener(this::onAdapterStateChanged);
+ adapter.setDataAvailableListener(this::onAdapterDataAvailable);
+
+ if (this.getState() == TransportComponentState.Connected && adapter.getState() == TransportComponentState.Disconnected)
+ {
+ adapter.connect().subscribe();
+ }
}
}
@@ -148,63 +146,21 @@ public abstract class TransporterBase implements ITransporter
return failsWithAdapter;
}
- @Override
- public void setRemoveAdaptersOnFailed(boolean value)
- {
- removeAdaptersOnFailed = value;
- }
-
- @Override
- public boolean getRemoveAdaptersOnFail()
- {
- return removeAdaptersOnFailed;
- }
-
//endregion
//region Protected Methods
- /**
- * On adapters changed.
- */
- protected void onAdaptersChanged()
- {
-
- XLog.i("Adapters collection changed, Listing adapters:");
-
- for (ITransportAdapter adapter : adapters)
- {
-
- XLog.i(adapter.getClass().getName() + ", " + adapter.getAddress() + ", " + adapter.getState());
-
- adapter.removeStateChangedListener(this::onAdapterStateChanged);
- adapter.addStateChangedListener(this::onAdapterStateChanged);
- adapter.setDataAvailableListener(this::onAdapterDataAvailable);
-
- if (this.getState() == TransportComponentState.Connected && adapter.getState() == TransportComponentState.Disconnected)
- {
- adapter.connect().subscribe();
- }
- }
- }
-
private void onAdapterDataAvailable(Object sender, byte[] data)
{
- arrivedResponses.add(new Pair<>((ITransportAdapter) sender, data));
+ arrivedResponses.add(data);
}
private void onAdapterStateChanged(Object sender, TransportComponentState state)
{
-
- if (state == TransportComponentState.Failed && removeAdaptersOnFailed)
- {
- adapters.remove((ITransportAdapter) sender);
- }
-
if (state == TransportComponentState.Failed && failsWithAdapter)
{
- onFailed(new ObjectDisposedException("One of the transporter adapters has failed. Going into a failed state..."));
+ onFailed(new ObjectDisposedException("The adapter has failed. Going into a failed state..."));
}
}
@@ -303,7 +259,6 @@ public abstract class TransporterBase implements ITransporter
public TransporterBase()
{
stateChangedEvent = new Event<>();
- setAdapters(new ObservableCollection<>());
pendingResponses = new HashMap<>();
sendingQueue = new ConcurrentLinkedQueue<>();
pendingRequests = new ArrayList<>();
@@ -319,7 +274,7 @@ public abstract class TransporterBase implements ITransporter
public TransporterBase(ITransportAdapter adapter)
{
this();
- adapters.add(adapter);
+ this.adapter = adapter;
}
//endregion
@@ -353,12 +308,7 @@ public abstract class TransporterBase implements ITransporter
{
throwIfDisposed();
- if (adapters.size() == 0)
- {
- throw new IllegalArgumentException("This transporter has zero adapters.");
- }
-
- for (ITransportAdapter adapter : adapters)
+ if (adapter != null)
{
adapter.connect().blockingAwait();
}
@@ -368,8 +318,7 @@ public abstract class TransporterBase implements ITransporter
XLog.i("Transporter connected...");
x.onComplete();
- }
- catch (Exception e)
+ } catch (Exception e)
{
XLog.e("Error connecting transporter", e);
x.onError(e);
@@ -386,7 +335,7 @@ public abstract class TransporterBase implements ITransporter
{
throwIfDisposed();
- for (ITransportAdapter adapter : adapters)
+ if (adapter != null)
{
adapter.disconnect().blockingAwait();
}
@@ -396,8 +345,7 @@ public abstract class TransporterBase implements ITransporter
XLog.i("Transporter disconnected...");
x.onComplete();
- }
- catch (Exception e)
+ } catch (Exception e)
{
x.onError(e);
}
@@ -408,29 +356,17 @@ public abstract class TransporterBase implements ITransporter
@Override
public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<TangoMessage<Response>> sendRequest(TangoMessage<Request> request)
{
- return sendRequest(request, null, getRequestTimeout());
- }
-
- @Override
- public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<TangoMessage<Response>> sendRequest(TangoMessage<Request> request, ITransportAdapter adapter)
- {
- return sendRequest(request, adapter, getRequestTimeout());
+ return sendRequest(request, getRequestTimeout());
}
@Override
public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<TangoMessage<Response>> sendRequest(TangoMessage<Request> request, Period timeout)
{
- return sendRequest(request, null, timeout);
- }
-
- @Override
- public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<TangoMessage<Response>> sendRequest(TangoMessage<Request> request, ITransportAdapter adapter, Period timeout)
- {
- XLog.i("Queuing request message: " + request.getClass().getSimpleName() + " Token: " + request.getContainer().getToken() + " on adapter: " + (adapter != null ? adapter.getAddress() : "ALL"));
+ XLog.i("Queuing request message: " + request.getContainer().getType() + " Token: " + request.getContainer().getToken());
XLog.i("Expected response: " + Response.Builder.class.getSimpleName());
PublishSubject<TangoMessage<Response>> subject = PublishSubject.create();
- TransportMessage<TangoMessage<Response>> message = new TransportMessage<>(adapter, request.getContainer().getToken(), request, TransportMessageDirection.Request, onSerializingMessage(request), subject);
+ TransportMessage<TangoMessage<Response>> message = new TransportMessage<>(request.getContainer().getToken(), request, TransportMessageDirection.Request, onSerializingMessage(request), subject);
sendingQueue.add(message);
Completable.timer(getRequestTimeout().getSeconds(), TimeUnit.SECONDS)
@@ -451,20 +387,14 @@ public abstract class TransporterBase implements ITransporter
@Override
public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<TangoMessage<Response>> sendContinuousRequest(TangoMessage<Request> request)
{
- return sendContinuousRequest(request, null);
- }
-
- @Override
- public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<TangoMessage<Response>> sendContinuousRequest(TangoMessage<Request> request, ITransportAdapter adapter)
- {
- XLog.i("Queuing continuous response request message: " + request.getClass().getSimpleName() + " Token: " + request.getContainer().getToken() + " on adapter: " + (adapter != null ? adapter.getAddress() : "ALL"));
+ XLog.i("Queuing continuous response request message: " + request.getContainer().getType() + " Token: " + request.getContainer().getToken());
XLog.i("Expected response: " + Response.Builder.class.getSimpleName());
request.getContainer().setContinuous(true);
request.getContainer().setCompleted(false);
PublishSubject<TangoMessage<Response>> subject = PublishSubject.create();
- TransportMessage<TangoMessage<Response>> message = new TransportMessage<>(adapter, request.getContainer().getToken(), request, TransportMessageDirection.Request, onSerializingMessage(request), subject);
+ TransportMessage<TangoMessage<Response>> message = new TransportMessage<>(request.getContainer().getToken(), request, TransportMessageDirection.Request, onSerializingMessage(request), subject);
message.setContinuous(true);
sendingQueue.add(message);
@@ -486,7 +416,6 @@ public abstract class TransporterBase implements ITransporter
XLog.i("Queuing response message: " + response.getClass().getSimpleName());
PendingResponse pendingResponse = null;
- ITransportAdapter adapter = null;
XLog.i("Searching for matching request token: " + token);
@@ -494,21 +423,18 @@ public abstract class TransporterBase implements ITransporter
if (pendingResponse != null)
{
- adapter = pendingResponse.getAdapter();
XLog.i("Found matching request token: " + token + " on adapter: " + adapter.getAddress());
if (!pendingResponse.isContinuous())
{
XLog.i("Removing matching request token...");
pendingResponses.remove(token);
- }
- else if (response.getContainer().getCompleted())
+ } else if (response.getContainer().getCompleted())
{
XLog.i("Response completed. Removing matching request token...");
pendingResponses.remove(token);
}
- }
- else
+ } else
{
//This should never happen.
XLog.w("Matching request token was not found...");
@@ -516,7 +442,7 @@ public abstract class TransporterBase implements ITransporter
}
PublishSubject<Response> subject = PublishSubject.create();
- TransportMessage<Response> message = new TransportMessage<>(adapter, token, response, TransportMessageDirection.Response, onSerializingMessage(response), subject);
+ TransportMessage<Response> message = new TransportMessage<>(token, response, TransportMessageDirection.Response, onSerializingMessage(response), subject);
sendingQueue.add(message);
return subject.singleOrError().toCompletable();
@@ -571,7 +497,7 @@ public abstract class TransporterBase implements ITransporter
while (getState() == TransportComponentState.Connected)
{
- if (sendingQueue.size() > 0)
+ if (adapter != null && sendingQueue.size() > 0)
{
TransportMessageBase message = null;
message = sendingQueue.poll();
@@ -580,36 +506,20 @@ public abstract class TransporterBase implements ITransporter
{
try
{
- if (message.getAdapter() == null)
+ if (adapter.getState() == TransportComponentState.Connected)
{
- for (ITransportAdapter adapter : adapters)
- {
- if (adapter.getState() == TransportComponentState.Connected)
- {
- adapter.write(message.getSerialize().invoke());
- XLog.i("message sent on adapter: " + adapter.getAddress() + "...");
- }
- }
- }
- else
- {
- if (message.getAdapter().getState() == TransportComponentState.Connected)
- {
- message.getAdapter().write(message.getSerialize().invoke());
- XLog.i("message sent on adapter: " + message.getAdapter().getAddress() + "...");
- }
+ adapter.write(message.getSerialize().invoke());
+ XLog.i("message sent on adapter: " + adapter.getAddress() + "...");
}
if (message.getDirection() == TransportMessageDirection.Request)
{
pendingRequests.add(message);
- }
- else
+ } else
{
message.setResult(new Object(), true);
}
- }
- catch (Exception ex)
+ } catch (Exception ex)
{
message.setException(ex);
}
@@ -619,8 +529,7 @@ public abstract class TransporterBase implements ITransporter
SystemClock.sleep(10);
}
- }
- catch (Exception ex)
+ } catch (Exception ex)
{
onFailed(ex);
}
@@ -640,24 +549,23 @@ public abstract class TransporterBase implements ITransporter
while (getState() == TransportComponentState.Connected)
{
- Pair<ITransportAdapter, byte[]> data;
+ byte[] data;
- if (arrivedResponses.size() > 0)
+ if (adapter != null && arrivedResponses.size() > 0)
{
data = arrivedResponses.poll();
if (data != null)
{
- XLog.i("Message received on adapter: " + data.first.getAddress());
+ XLog.i("Message received on adapter: " + adapter.getAddress());
XLog.i("Parsing message container...");
MessageContainer container;
try
{
- container = onParseContainer(data.second);
- }
- catch (Exception e)
+ container = onParseContainer(data);
+ } catch (Exception e)
{
XLog.e("Invalid message container received!", e);
continue;
@@ -681,21 +589,18 @@ public abstract class TransporterBase implements ITransporter
if (container.getError() == ErrorCode.NONE)
{
XLog.i("Response has returned with error: " + container.getError().toString());
- request.setResult(onParseTangoMessage(data.second), true);
- }
- else
+ request.setResult(onParseTangoMessage(data), true);
+ } else
{
XLog.i("Parsing inner response message and setting pending request task result...");
- request.setException(new ResponseErrorException(container.getError(), onParseTangoMessage(data.second)));
+ request.setException(new ResponseErrorException(container.getError(), onParseTangoMessage(data)));
}
- }
- catch (Exception ex)
+ } catch (Exception ex)
{
XLog.e("Error parsing inner message", ex);
request.setException(ex);
}
- }
- else
+ } else
{
XLog.i("Pending request was identified as 'continuous response'. keeping pending request.");
@@ -708,29 +613,26 @@ public abstract class TransporterBase implements ITransporter
{
XLog.i("Continuous sequence completed.");
}
- request.setResult(onParseTangoMessage(data.second), container.getCompleted());
- }
- else
+ request.setResult(onParseTangoMessage(data), container.getCompleted());
+ } else
{
XLog.i("Response has returned with error: " + container.getError().toString());
- request.setException(new ResponseErrorException(container.getError(), onParseTangoMessage(data.second)));
+ request.setException(new ResponseErrorException(container.getError(), onParseTangoMessage(data)));
}
- }
- catch (Exception ex)
+ } catch (Exception ex)
{
XLog.e("Error parsing inner message", ex);
request.setException(ex);
}
}
- }
- else
+ } else
{
XLog.i("Message was identified as a new request message: " + container.getType().toString());
try
{
- XLog.i("Saving request token and adapter: " + container.getToken() + ", " + data.first.getAddress());
- pendingResponses.put(container.getToken(), new PendingResponse(data.first, container.getContinuous()));
+ XLog.i("Saving request token: " + container.getToken());
+ pendingResponses.put(container.getToken(), new PendingResponse(container.getContinuous()));
if (container.getType() == MessageType.KeepAliveRequest)
{
@@ -741,14 +643,12 @@ public abstract class TransporterBase implements ITransporter
{
//ex.printStackTrace();
});
- }
- else
+ } else
{
XLog.i("Invoking RequestReceived event...");
AsyncTask.execute(() -> onRequestReceived(container));
}
- }
- catch (Exception ex)
+ } catch (Exception ex)
{
//Ignore any exception that might occur on the event handler side...
}
@@ -759,8 +659,7 @@ public abstract class TransporterBase implements ITransporter
SystemClock.sleep(10);
}
- }
- catch (Exception ex)
+ } catch (Exception ex)
{
onFailed(ex);
}
diff --git a/Software/DB/Tango.mdf b/Software/DB/Tango.mdf
index b2ef9acc1..85651841e 100644
--- a/Software/DB/Tango.mdf
+++ b/Software/DB/Tango.mdf
Binary files differ
diff --git a/Software/DB/Tango_log.ldf b/Software/DB/Tango_log.ldf
index 5a79d2bcc..602120293 100644
--- a/Software/DB/Tango_log.ldf
+++ b/Software/DB/Tango_log.ldf
Binary files differ
diff --git a/Software/Visual_Studio/MachineStudio/Modules/Tango.MachineStudio.Stubs/ViewModels/MainViewVM.cs b/Software/Visual_Studio/MachineStudio/Modules/Tango.MachineStudio.Stubs/ViewModels/MainViewVM.cs
index 43b57bef3..48297c418 100644
--- a/Software/Visual_Studio/MachineStudio/Modules/Tango.MachineStudio.Stubs/ViewModels/MainViewVM.cs
+++ b/Software/Visual_Studio/MachineStudio/Modules/Tango.MachineStudio.Stubs/ViewModels/MainViewVM.cs
@@ -428,7 +428,7 @@ namespace Tango.MachineStudio.Stubs.ViewModels
if (ApplicationManager.IsMachineConnected && UseConnectedMachine)
{
- adapter = ApplicationManager.ConnectedMachine.Adapters.First();
+ adapter = ApplicationManager.ConnectedMachine.Adapter;
}
_stubManager = new StubManager(adapter);
diff --git a/Software/Visual_Studio/Tango.Integration/Services/ExternalBridgeClient.cs b/Software/Visual_Studio/Tango.Integration/Services/ExternalBridgeClient.cs
index cdc176d73..b783ec261 100644
--- a/Software/Visual_Studio/Tango.Integration/Services/ExternalBridgeClient.cs
+++ b/Software/Visual_Studio/Tango.Integration/Services/ExternalBridgeClient.cs
@@ -48,8 +48,7 @@ namespace Tango.Integration.Services
public override async Task Connect()
{
await Disconnect();
- Adapters.Clear();
- Adapters.Add(new TcpTransportAdapter(IPAddress, SettingsManager.Default.Integration.ExternalBridgeServicePort));
+ Adapter = new TcpTransportAdapter(IPAddress, SettingsManager.Default.Integration.ExternalBridgeServicePort);
await base.Connect();
}
diff --git a/Software/Visual_Studio/Tango.Transport/ITransporter.cs b/Software/Visual_Studio/Tango.Transport/ITransporter.cs
index 178991abf..9d6f13446 100644
--- a/Software/Visual_Studio/Tango.Transport/ITransporter.cs
+++ b/Software/Visual_Studio/Tango.Transport/ITransporter.cs
@@ -14,58 +14,36 @@ using System.Collections.ObjectModel;
namespace Tango.Transport
{
/// <summary>
- /// Represents a transportation engine which can send and receive <see cref="TangoMessage{T}"/> message using one or many <see cref="ITransportAdapter">Transport adapters</see>.
+ /// Represents a transportation engine which can send and receive <see cref="TangoMessage{T}"/> message using a <see cref="ITransportAdapter">Transport adapter</see>.
/// </summary>
/// <seealso cref="Tango.Transport.ITransportComponent" />
public interface ITransporter : ITransportComponent
{
/// <summary>
- /// Gets the serial adapter.
+ /// Gets or sets the <see cref="ITransportAdapter"/> used to read and write raw data.
/// </summary>
- ObservableCollection<ITransportAdapter> Adapters { get; }
+ ITransportAdapter Adapter { get; set; }
/// <summary>
- /// Sends a broadcast request to through all adapters.
+ /// Sends a request.
/// </summary>
/// <typeparam name="Request">The type of the request.</typeparam>
/// <typeparam name="Response">The type of the response.</typeparam>
/// <param name="request">The request.</param>
+ /// <param name="timeout">Optional timeout. If not specified will use the <see cref="RequestTimeout"/>.</param>
/// <returns></returns>
Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TimeSpan? timeout = null) where Request : IMessage<Request> where Response : IMessage<Response>;
/// <summary>
- /// Sends a request through the specified adapter.
+ /// Sends a request and expecting multiple response messages.
/// </summary>
/// <typeparam name="Request">The type of the request.</typeparam>
/// <typeparam name="Response">The type of the response.</typeparam>
/// <param name="request">The request.</param>
- /// <param name="adapter">Transport adapter</param>
- /// <returns></returns>
- Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter, TimeSpan? timeout = null) where Request : IMessage<Request> where Response : IMessage<Response>;
-
- /// <summary>
- /// Sends a request through all adapters which is expected to return multiple response messages.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="request">The request.</param>
- /// <param name="adapter">Transport adapter</param>
- /// <param name="responseCallback">The response callback delegate.</param>
/// <returns></returns>
IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request) where Request : IMessage<Request> where Response : IMessage<Response>;
/// <summary>
- /// Sends a request through the specified adapter which is expected to return multiple response messages.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="request">The request.</param>
- /// <param name="adapter">Transport adapter</param>
- /// <param name="responseCallback">The response callback delegate.</param>
- /// <returns></returns>
- IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter) where Request : IMessage<Request> where Response : IMessage<Response>;
-
- /// <summary>
/// Sends a response.
/// </summary>
/// <typeparam name="Response">The type of the response.</typeparam>
@@ -98,13 +76,8 @@ namespace Tango.Transport
bool UseKeepAlive { get; set; }
/// <summary>
- /// Gets or sets a value indicating whether the transporter will get in to a failed state if any adapter has failed.
+ /// Gets or sets a value indicating whether the transporter will get in to a failed state if the <see cref="Adapter"/> has failed.
/// </summary>
bool FailsWithAdapter { get; set; }
-
- /// <summary>
- /// Gets or sets a value indicating whether the transporter should remove an adapter when it fails.
- /// </summary>
- bool RemoveAdaptersOnFailed { get; set; }
}
}
diff --git a/Software/Visual_Studio/Tango.Transport/PendingResponse.cs b/Software/Visual_Studio/Tango.Transport/PendingResponse.cs
index bb9718b19..78c41e88a 100644
--- a/Software/Visual_Studio/Tango.Transport/PendingResponse.cs
+++ b/Software/Visual_Studio/Tango.Transport/PendingResponse.cs
@@ -12,11 +12,6 @@ namespace Tango.Transport
public class PendingResponse
{
/// <summary>
- /// Gets or sets the adapter target response adapter.
- /// </summary>
- public ITransportAdapter Adapter { get; set; }
-
- /// <summary>
/// Gets or sets a value indicating whether this is a continuous request.
/// </summary>
public bool IsContinuous { get; set; }
@@ -26,9 +21,8 @@ namespace Tango.Transport
/// </summary>
/// <param name="adapter">The adapter.</param>
/// <param name="isContinuous">if set to <c>true</c> [is continuous].</param>
- public PendingResponse(ITransportAdapter adapter,bool isContinuous)
+ public PendingResponse(bool isContinuous)
{
- Adapter = adapter;
IsContinuous = isContinuous;
}
}
diff --git a/Software/Visual_Studio/Tango.Transport/TransportMessage.cs b/Software/Visual_Studio/Tango.Transport/TransportMessage.cs
index d4ba54272..c3b3e87b3 100644
--- a/Software/Visual_Studio/Tango.Transport/TransportMessage.cs
+++ b/Software/Visual_Studio/Tango.Transport/TransportMessage.cs
@@ -28,7 +28,7 @@ namespace Tango.Transport
/// <param name="direction">The direction.</param>
/// <param name="toBytes">To bytes.</param>
/// <param name="completionSource">The completion source.</param>
- public TransportMessage(ITransportAdapter adapter, string token, object message, TransportMessageDirection direction, Func<byte[]> toBytes, TaskCompletionSource<T> completionSource) : base(adapter, token, message, direction, toBytes)
+ public TransportMessage(string token, object message, TransportMessageDirection direction, Func<byte[]> toBytes, TaskCompletionSource<T> completionSource) : base(token, message, direction, toBytes)
{
_completionSource = completionSource;
}
diff --git a/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs b/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs
index 4cc3823cf..27ad053be 100644
--- a/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs
+++ b/Software/Visual_Studio/Tango.Transport/TransportMessageBase.cs
@@ -13,11 +13,6 @@ namespace Tango.Transport
public abstract class TransportMessageBase
{
/// <summary>
- /// Gets or sets the source/destination adapter.
- /// </summary>
- public ITransportAdapter Adapter { get; set; }
-
- /// <summary>
/// Gets or sets a value indicating whether this instance is multi response.
/// </summary>
public bool IsContinuous { get; set; }
@@ -61,9 +56,8 @@ namespace Tango.Transport
/// <param name="message">The message.</param>
/// <param name="direction">The direction.</param>
/// <param name="toBytes">To bytes.</param>
- public TransportMessageBase(ITransportAdapter adapter, String token, object message, TransportMessageDirection direction, Func<byte[]> toBytes)
+ public TransportMessageBase(String token, object message, TransportMessageDirection direction, Func<byte[]> toBytes)
{
- Adapter = adapter;
Token = token;
Message = message;
Direction = direction;
diff --git a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
index aa481007c..756738ee8 100644
--- a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
+++ b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs
@@ -25,11 +25,11 @@ namespace Tango.Transport
{
private ConcurrentQueue<TransportMessageBase> _sendingQueue;
private List<TransportMessageBase> _pendingRequests;
- private ConcurrentQueue<KeyValuePair<ITransportAdapter, byte[]>> _arrivedResponses;
+ private ConcurrentQueue<byte[]> _arrivedResponses;
private Thread _pushThread;
private Thread _pullThread;
private Thread _keepAliveThread;
- private ObservableCollection<ITransportAdapter> _adapters;
+ private ITransportAdapter _adapter;
private Dictionary<String, PendingResponse> _pendingResponses;
#region Events
@@ -49,25 +49,12 @@ namespace Tango.Transport
#region Properties
/// <summary>
- /// Gets the serial adapter.
+ /// Gets or sets the <see cref="ITransportAdapter" /> used to read and write raw data.
/// </summary>
- public ObservableCollection<ITransportAdapter> Adapters
+ public ITransportAdapter Adapter
{
- get { return _adapters; }
- protected set
- {
- if (_adapters != null)
- {
- _adapters.CollectionChanged -= OnAdaptersCollectionChanged;
- }
-
- _adapters = value;
-
- if (_adapters != null)
- {
- _adapters.CollectionChanged += OnAdaptersCollectionChanged;
- }
- }
+ get { return _adapter; }
+ set { _adapter = value; OnAdapterChanged(value); }
}
private TransportComponentState _state;
@@ -89,11 +76,6 @@ namespace Tango.Transport
/// </summary>
public TimeSpan RequestTimeout { get; set; }
- /// <summary>
- /// Gets or sets a value indicating whether the transporter should remove an adapter when it fails.
- /// </summary>
- public bool RemoveAdaptersOnFailed { get; set; }
-
private bool _useKeepAlive;
/// <summary>
/// Gets or sets a value indicating whether to use a keep alive mechanism.
@@ -126,26 +108,25 @@ namespace Tango.Transport
#region Virtual Methods
/// <summary>
- /// Called when the collection of adapters has changed.
+ /// Called when the <see cref="Adapter"/> has changed.
/// </summary>
- /// <param name="sender">The sender.</param>
- /// <param name="e">The <see cref="System.Collections.Specialized.NotifyCollectionChangedEventArgs"/> instance containing the event data.</param>
- protected virtual async void OnAdaptersCollectionChanged(object sender, System.Collections.Specialized.NotifyCollectionChangedEventArgs e)
+ /// <param name="adapter">The adapter.</param>
+ protected async virtual void OnAdapterChanged(ITransportAdapter adapter)
{
- LogManager.Log("Adapters collection changed, Listing adapters:");
+ LogManager.Log("Adapter Changed:");
- foreach (var ad in new ConcurrentList<ITransportAdapter>(_adapters))
+ if (adapter != null)
{
- LogManager.Log(ad.GetType().Name + ", " + ad.Address + ", " + ad.State.ToString());
+ LogManager.Log(adapter.GetType().Name + ", " + adapter.Address + ", " + adapter.State.ToString());
- ad.StateChanged -= OnAdapterStateChanged;
- ad.StateChanged += OnAdapterStateChanged;
- ad.DataAvailable -= OnAdapterDataAvailable;
- ad.DataAvailable += OnAdapterDataAvailable;
+ adapter.StateChanged -= OnAdapterStateChanged;
+ adapter.StateChanged += OnAdapterStateChanged;
+ adapter.DataAvailable -= OnAdapterDataAvailable;
+ adapter.DataAvailable += OnAdapterDataAvailable;
- if (State == TransportComponentState.Connected && ad.State == TransportComponentState.Disconnected)
+ if (State == TransportComponentState.Connected && adapter.State == TransportComponentState.Disconnected)
{
- await ad.Connect();
+ await adapter.Connect();
}
}
}
@@ -157,14 +138,9 @@ namespace Tango.Transport
/// <param name="e">The e.</param>
protected virtual void OnAdapterStateChanged(object sender, TransportComponentState e)
{
- if (e == TransportComponentState.Failed && RemoveAdaptersOnFailed)
- {
- Adapters.Remove(sender as ITransportAdapter);
- }
-
if (e == TransportComponentState.Failed && FailsWithAdapter)
{
- OnFailed(new CommunicationException("One of the transporter adapters has failed. Going into a failed state..."));
+ OnFailed(new CommunicationException("The adapter has failed. Going into a failed state..."));
}
}
@@ -175,7 +151,7 @@ namespace Tango.Transport
/// <param name="data">The data.</param>
protected virtual void OnAdapterDataAvailable(object sender, byte[] data)
{
- _arrivedResponses.Enqueue(new KeyValuePair<ITransportAdapter, byte[]>(sender as ITransportAdapter, data));
+ _arrivedResponses.Enqueue(data);
}
/// <summary>
@@ -257,12 +233,10 @@ namespace Tango.Transport
/// </summary>
public TransporterBase()
{
- RemoveAdaptersOnFailed = true;
- Adapters = new ObservableCollection<ITransportAdapter>();
_pendingResponses = new Dictionary<string, PendingResponse>();
_sendingQueue = new ConcurrentQueue<TransportMessageBase>();
_pendingRequests = new List<TransportMessageBase>();
- _arrivedResponses = new ConcurrentQueue<KeyValuePair<ITransportAdapter, byte[]>>();
+ _arrivedResponses = new ConcurrentQueue<byte[]>();
RequestTimeout = TimeSpan.FromSeconds(5);
}
@@ -272,7 +246,7 @@ namespace Tango.Transport
/// <param name="adapter">The transport adapter.</param>
public TransporterBase(ITransportAdapter adapter) : this()
{
- Adapters.Add(adapter);
+ Adapter = adapter;
}
#endregion
@@ -287,7 +261,11 @@ namespace Tango.Transport
{
State = TransportComponentState.Connected;
StartThreads();
- await Task.WhenAll(Adapters.Select(x => x.Connect()));
+
+ if (Adapter != null)
+ {
+ await Adapter.Connect();
+ }
LogManager.Log("Transporter Connected...");
}
@@ -298,37 +276,28 @@ namespace Tango.Transport
public virtual async Task Disconnect()
{
State = TransportComponentState.Disconnected;
- await Task.WhenAll(Adapters.Select(x => x.Disconnect()));
+ if (Adapter != null)
+ {
+ await Adapter.Disconnect();
+ }
LogManager.Log("Transporter Disconnected...");
}
/// <summary>
- /// Sends a broadcast request to through all adapters.
+ /// Sends a request.
/// </summary>
/// <typeparam name="Request">The type of the request.</typeparam>
/// <typeparam name="Response">The type of the response.</typeparam>
/// <param name="request">The request.</param>
+ /// <param name="timeout">Optional timeout. If not specified will use the <see cref="RequestTimeout" />.</param>
/// <returns></returns>
public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, TimeSpan? timeout = null) where Request : IMessage<Request> where Response : IMessage<Response>
{
- return SendRequest<Request, Response>(request, null, timeout);
- }
-
- /// <summary>
- /// Sends a request through the specified adapter.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="request">The request.</param>
- /// <param name="adapter">Transport adapter</param>
- /// <returns></returns>
- public Task<TangoMessage<Response>> SendRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter, TimeSpan? timeout = null) where Request : IMessage<Request> where Response : IMessage<Response>
- {
- LogManager.Log("Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token + " on adapter: " + (adapter != null ? adapter.Address : "ALL"));
+ LogManager.Log("Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token);
LogManager.Log("Expected response: " + typeof(Response).Name);
TaskCompletionSource<TangoMessage<Response>> source = new TaskCompletionSource<TangoMessage<Response>>();
- TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(adapter, request.Container.Token, request, TransportMessageDirection.Request, OnSerializeingMessage(request), source);
+ TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, OnSerializeingMessage(request), source);
_sendingQueue.Enqueue(message);
Task.Delay(timeout != null ? timeout.Value : RequestTimeout).ContinueWith((x) =>
{
@@ -343,36 +312,24 @@ namespace Tango.Transport
}
/// <summary>
- /// Sends a request through all adapters which is expected to return multiple response messages.
+ /// Sends a request and expecting multiple response messages.
/// </summary>
/// <typeparam name="Request">The type of the request.</typeparam>
/// <typeparam name="Response">The type of the response.</typeparam>
/// <param name="request">The request.</param>
- /// <param name="responseCallback">The response callback delegate.</param>
+ /// <returns></returns>
public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request) where Request : IMessage<Request> where Response : IMessage<Response>
{
- return SendContinuousRequest<Request, Response>(request, null);
- }
+ LogManager.Log("Queuing continuous request message: " + typeof(Request).Name + " Token: " + request.Container.Token);
- /// <summary>
- /// Sends a request through the specified adapter which is expected to return multiple response messages.
- /// </summary>
- /// <typeparam name="Request">The type of the request.</typeparam>
- /// <typeparam name="Response">The type of the response.</typeparam>
- /// <param name="request">The request.</param>
- /// <param name="adapter">Transport adapter</param>
- /// <param name="responseCallback">The response callback delegate.</param>
- public IObservable<TangoMessage<Response>> SendContinuousRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter) where Request : IMessage<Request> where Response : IMessage<Response>
- {
Subject<TangoMessage<Response>> subject = new Subject<TangoMessage<Response>>();
- LogManager.Log("Queuing continuous request message: " + typeof(Request).Name + " on adapter: " + (adapter != null ? adapter.Address : "ALL"));
LogManager.Log("Expected response: " + typeof(Response).Name);
request.Container.Continuous = true;
request.Container.Completed = false;
- TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(adapter, request.Container.Token, request, TransportMessageDirection.Request, OnSerializeingMessage(request), null)
+ TransportMessage<TangoMessage<Response>> message = new TransportMessage<TangoMessage<Response>>(request.Container.Token, request, TransportMessageDirection.Request, OnSerializeingMessage(request), null)
{
IsContinuous = true,
ContinuesResponseSubject = subject,
@@ -407,14 +364,12 @@ namespace Tango.Transport
LogManager.Log("Queuing response message: " + typeof(Response).Name);
PendingResponse pendingResponse = null;
- ITransportAdapter adapter = null;
LogManager.Log("Searching for matching request token: " + token);
if (_pendingResponses.TryGetValue(token, out pendingResponse))
{
- adapter = pendingResponse.Adapter;
- LogManager.Log("Found matching request token: " + token + " on adapter: " + (adapter != null ? adapter.Address : "ALL"));
+ LogManager.Log("Found matching request token: " + token);
if (!pendingResponse.IsContinuous)
{
@@ -434,7 +389,7 @@ namespace Tango.Transport
}
TaskCompletionSource<object> source = new TaskCompletionSource<object>();
- TransportMessage<object> message = new TransportMessage<object>(adapter, token, response, TransportMessageDirection.Response, OnSerializeingMessage(response), source);
+ TransportMessage<object> message = new TransportMessage<object>(token, response, TransportMessageDirection.Response, OnSerializeingMessage(response), source);
_sendingQueue.Enqueue(message);
return source.Task;
}
@@ -477,28 +432,17 @@ namespace Tango.Transport
{
while (State == TransportComponentState.Connected)
{
- if (_sendingQueue.Count > 0)
+ if (Adapter != null && _sendingQueue.Count > 0)
{
TransportMessageBase message;
if (_sendingQueue.TryDequeue(out message))
{
try
{
- if (message.Adapter == null)
+ if (Adapter.State == TransportComponentState.Connected)
{
- foreach (var adapter in Adapters.Where(x => x.State == TransportComponentState.Connected))
- {
- adapter.Write(message.Serialize());
- LogManager.Log("Message sent on adapter: " + adapter.Address + "...");
- }
- }
- else
- {
- if (message.Adapter.State == TransportComponentState.Connected)
- {
- message.Adapter.Write(message.Serialize());
- LogManager.Log("Message sent on adapter: " + message.Adapter.Address + "...");
- }
+ Adapter.Write(message.Serialize());
+ LogManager.Log("Message sent on adapter: " + Adapter.Address + "...");
}
if (message.Direction == TransportMessageDirection.Request)
@@ -539,16 +483,16 @@ namespace Tango.Transport
{
while (State == TransportComponentState.Connected)
{
- KeyValuePair<ITransportAdapter, byte[]> data;
+ byte[] data;
- if (_arrivedResponses.Count > 0)
+ if (Adapter != null && _arrivedResponses.Count > 0)
{
if (_arrivedResponses.TryDequeue(out data))
{
- LogManager.Log("Message received on adapter: " + data.Key.Address);
+ LogManager.Log("Message received on adapter: " + Adapter.Address);
LogManager.Log("Parsing message container...");
- MessageContainer container = OnParseContainer(data.Value);
+ MessageContainer container = OnParseContainer(data);
LogManager.Log("Searching for pending request token: " + container.Token);
TransportMessageBase request = _pendingRequests.SingleOrDefault(x => x.Token == container.Token);
@@ -567,7 +511,7 @@ namespace Tango.Transport
if (container.Error == ErrorCode.None)
{
LogManager.Log("Parsing inner response message and setting pending request task result...");
- request.SetResult(OnParseTangoMessage(data.Value), true);
+ request.SetResult(OnParseTangoMessage(data), true);
LogManager.Log("Message enquirer released...");
}
else
@@ -594,7 +538,7 @@ namespace Tango.Transport
{
LogManager.Log("Continuous sequence completed.");
}
- request.SetResult(OnParseTangoMessage(data.Value), container.Completed);
+ request.SetResult(OnParseTangoMessage(data), container.Completed);
}
else
{
@@ -614,8 +558,8 @@ namespace Tango.Transport
try
{
- LogManager.Log("Saving request token and adapter: " + container.Token + ", " + data.Key.Address);
- _pendingResponses.Add(container.Token, new PendingResponse(data.Key, container.Continuous));
+ LogManager.Log("Saving request token: " + container.Token);
+ _pendingResponses.Add(container.Token, new PendingResponse(container.Continuous));
if (container.Type == MessageType.KeepAliveRequest)
{
@@ -665,7 +609,7 @@ namespace Tango.Transport
var task = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), TimeSpan.FromSeconds(5));
task.Wait();
var response = task.Result;
-
+
}
}
catch (TimeoutException)
diff --git a/Software/Visual_Studio/Utilities/Tango.MachineEM.UI/ViewModels/MainViewVM.cs b/Software/Visual_Studio/Utilities/Tango.MachineEM.UI/ViewModels/MainViewVM.cs
index 24a0f6119..ad5c449c8 100644
--- a/Software/Visual_Studio/Utilities/Tango.MachineEM.UI/ViewModels/MainViewVM.cs
+++ b/Software/Visual_Studio/Utilities/Tango.MachineEM.UI/ViewModels/MainViewVM.cs
@@ -191,7 +191,7 @@ namespace Tango.MachineEM.UI.ViewModels
/// <param name="e">The <see cref="ClientConnectedEventArgs"/> instance containing the event data.</param>
private void TcpServer_ClientConnected(object sender, ClientConnectedEventArgs e)
{
- Emulator.Transporter.Adapters.Add(new TcpTransportAdapter(e.Socket));
+ Emulator.Transporter.Adapter = new TcpTransportAdapter(e.Socket);
}
#endregion
@@ -211,7 +211,7 @@ namespace Tango.MachineEM.UI.ViewModels
}
else
{
- Emulator.Transporter.Adapters.Add(new UsbTransportAdapter(SelectedPort));
+ Emulator.Transporter.Adapter = new UsbTransportAdapter(SelectedPort);
}
await Emulator.Start();
InvalidateRelayCommands();
diff --git a/Software/Visual_Studio/Web/Tango.MachineService/App_Data/Tango.db b/Software/Visual_Studio/Web/Tango.MachineService/App_Data/Tango.db
index f7af9b52f..d676fc8c3 100644
--- a/Software/Visual_Studio/Web/Tango.MachineService/App_Data/Tango.db
+++ b/Software/Visual_Studio/Web/Tango.MachineService/App_Data/Tango.db
Binary files differ