aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoy <roy.mail.net@gmail.com>2017-11-15 01:13:28 +0200
committerRoy <roy.mail.net@gmail.com>2017-11-15 01:13:28 +0200
commit382c28941990c2cbf99c2bb21ed4ec34d9ed28eb (patch)
tree43a4822b15561347e217ea8255d4b18b18c7ea92
parentef781529f0ca3de76f1ab449b0773d7bb4b1aedc (diff)
downloadTango-382c28941990c2cbf99c2bb21ed4ec34d9ed28eb.tar.gz
Tango-382c28941990c2cbf99c2bb21ed4ec34d9ed28eb.zip
Added continuous definition and completed for MessageContainer.
Implemented auto continuous request completion. Implemented some proofing for TransportAdapter failure.
-rw-r--r--Software/Android-Studio/Tango.PMR/src/main/java/com/twine/tango/pmr/common/MessageContainerOuterClass.java155
-rw-r--r--Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/App.java12
-rw-r--r--Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/views/stub/StubActivityVM.java4
-rw-r--r--Software/Android-Studio/Tango.Stubs/src/main/java/com/twine/tango/stubs/stubs/Progress.java2
-rw-r--r--Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java4
-rw-r--r--Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java63
-rw-r--r--Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java4
-rw-r--r--Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java27
-rw-r--r--Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java83
-rw-r--r--Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java453
-rw-r--r--Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java6
-rw-r--r--Software/PMR/Messages/Common/MessageContainer.proto4
-rw-r--r--Software/Visual Studio/Tango.Emulations/Emulators/MachineEmulator.cs3
-rw-r--r--Software/Visual Studio/Tango.PMR/Common/MessageContainer.cs73
-rw-r--r--Software/Visual Studio/Tango.PMR/Stubs/CalculateRequest.cs (renamed from Software/Visual Studio/Tango.PMR/Stubs/Calculate.cs)139
-rw-r--r--Software/Visual Studio/Tango.PMR/Stubs/CalculateResponse.cs159
-rw-r--r--Software/Visual Studio/Tango.PMR/Stubs/ProgressRequest.cs131
-rw-r--r--Software/Visual Studio/Tango.PMR/Stubs/ProgressResponse.cs (renamed from Software/Visual Studio/Tango.PMR/Stubs/Progress.cs)108
-rw-r--r--Software/Visual Studio/Tango.PMR/Tango.PMR.csproj6
-rw-r--r--Software/Visual Studio/Tango.Stubs/Stubs/Progress.cs2
-rw-r--r--Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs4
-rw-r--r--Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs6
-rw-r--r--Software/Visual Studio/Tango.Transport/ITransporter.cs4
-rw-r--r--Software/Visual Studio/Tango.Transport/PendingResponse.cs35
-rw-r--r--Software/Visual Studio/Tango.Transport/Tango.Transport.csproj1
-rw-r--r--Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs4
-rw-r--r--Software/Visual Studio/Tango.Transport/TransportMessage.cs4
-rw-r--r--Software/Visual Studio/Tango.Transport/TransportMessageBase.cs6
-rw-r--r--Software/Visual Studio/Tango.Transport/TransporterBase.cs77
29 files changed, 1029 insertions, 550 deletions
diff --git a/Software/Android-Studio/Tango.PMR/src/main/java/com/twine/tango/pmr/common/MessageContainerOuterClass.java b/Software/Android-Studio/Tango.PMR/src/main/java/com/twine/tango/pmr/common/MessageContainerOuterClass.java
index 24e2fc7f9..c4919dae3 100644
--- a/Software/Android-Studio/Tango.PMR/src/main/java/com/twine/tango/pmr/common/MessageContainerOuterClass.java
+++ b/Software/Android-Studio/Tango.PMR/src/main/java/com/twine/tango/pmr/common/MessageContainerOuterClass.java
@@ -38,7 +38,17 @@ public final class MessageContainerOuterClass {
getTokenBytes();
/**
- * <code>bytes Data = 3;</code>
+ * <code>bool Continuous = 3;</code>
+ */
+ boolean getContinuous();
+
+ /**
+ * <code>bool Completed = 4;</code>
+ */
+ boolean getCompleted();
+
+ /**
+ * <code>bytes Data = 5;</code>
*/
com.google.protobuf.ByteString getData();
}
@@ -57,6 +67,8 @@ public final class MessageContainerOuterClass {
private MessageContainer() {
type_ = 0;
token_ = "";
+ continuous_ = false;
+ completed_ = false;
data_ = com.google.protobuf.ByteString.EMPTY;
}
@@ -100,7 +112,17 @@ public final class MessageContainerOuterClass {
token_ = s;
break;
}
- case 26: {
+ case 24: {
+
+ continuous_ = input.readBool();
+ break;
+ }
+ case 32: {
+
+ completed_ = input.readBool();
+ break;
+ }
+ case 42: {
data_ = input.readBytes();
break;
@@ -179,10 +201,28 @@ public final class MessageContainerOuterClass {
}
}
- public static final int DATA_FIELD_NUMBER = 3;
+ public static final int CONTINUOUS_FIELD_NUMBER = 3;
+ private boolean continuous_;
+ /**
+ * <code>bool Continuous = 3;</code>
+ */
+ public boolean getContinuous() {
+ return continuous_;
+ }
+
+ public static final int COMPLETED_FIELD_NUMBER = 4;
+ private boolean completed_;
+ /**
+ * <code>bool Completed = 4;</code>
+ */
+ public boolean getCompleted() {
+ return completed_;
+ }
+
+ public static final int DATA_FIELD_NUMBER = 5;
private com.google.protobuf.ByteString data_;
/**
- * <code>bytes Data = 3;</code>
+ * <code>bytes Data = 5;</code>
*/
public com.google.protobuf.ByteString getData() {
return data_;
@@ -206,8 +246,14 @@ public final class MessageContainerOuterClass {
if (!getTokenBytes().isEmpty()) {
com.google.protobuf.GeneratedMessageV3.writeString(output, 2, token_);
}
+ if (continuous_ != false) {
+ output.writeBool(3, continuous_);
+ }
+ if (completed_ != false) {
+ output.writeBool(4, completed_);
+ }
if (!data_.isEmpty()) {
- output.writeBytes(3, data_);
+ output.writeBytes(5, data_);
}
unknownFields.writeTo(output);
}
@@ -224,9 +270,17 @@ public final class MessageContainerOuterClass {
if (!getTokenBytes().isEmpty()) {
size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, token_);
}
+ if (continuous_ != false) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(3, continuous_);
+ }
+ if (completed_ != false) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(4, completed_);
+ }
if (!data_.isEmpty()) {
size += com.google.protobuf.CodedOutputStream
- .computeBytesSize(3, data_);
+ .computeBytesSize(5, data_);
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
@@ -247,6 +301,10 @@ public final class MessageContainerOuterClass {
result = result && type_ == other.type_;
result = result && getToken()
.equals(other.getToken());
+ result = result && (getContinuous()
+ == other.getContinuous());
+ result = result && (getCompleted()
+ == other.getCompleted());
result = result && getData()
.equals(other.getData());
result = result && unknownFields.equals(other.unknownFields);
@@ -264,6 +322,12 @@ public final class MessageContainerOuterClass {
hash = (53 * hash) + type_;
hash = (37 * hash) + TOKEN_FIELD_NUMBER;
hash = (53 * hash) + getToken().hashCode();
+ hash = (37 * hash) + CONTINUOUS_FIELD_NUMBER;
+ hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean(
+ getContinuous());
+ hash = (37 * hash) + COMPLETED_FIELD_NUMBER;
+ hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean(
+ getCompleted());
hash = (37 * hash) + DATA_FIELD_NUMBER;
hash = (53 * hash) + getData().hashCode();
hash = (29 * hash) + unknownFields.hashCode();
@@ -399,6 +463,10 @@ public final class MessageContainerOuterClass {
token_ = "";
+ continuous_ = false;
+
+ completed_ = false;
+
data_ = com.google.protobuf.ByteString.EMPTY;
return this;
@@ -425,6 +493,8 @@ public final class MessageContainerOuterClass {
com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer result = new com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer(this);
result.type_ = type_;
result.token_ = token_;
+ result.continuous_ = continuous_;
+ result.completed_ = completed_;
result.data_ = data_;
onBuilt();
return result;
@@ -474,6 +544,12 @@ public final class MessageContainerOuterClass {
token_ = other.token_;
onChanged();
}
+ if (other.getContinuous() != false) {
+ setContinuous(other.getContinuous());
+ }
+ if (other.getCompleted() != false) {
+ setCompleted(other.getCompleted());
+ }
if (other.getData() != com.google.protobuf.ByteString.EMPTY) {
setData(other.getData());
}
@@ -617,15 +693,67 @@ public final class MessageContainerOuterClass {
return this;
}
+ private boolean continuous_ ;
+ /**
+ * <code>bool Continuous = 3;</code>
+ */
+ public boolean getContinuous() {
+ return continuous_;
+ }
+ /**
+ * <code>bool Continuous = 3;</code>
+ */
+ public Builder setContinuous(boolean value) {
+
+ continuous_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>bool Continuous = 3;</code>
+ */
+ public Builder clearContinuous() {
+
+ continuous_ = false;
+ onChanged();
+ return this;
+ }
+
+ private boolean completed_ ;
+ /**
+ * <code>bool Completed = 4;</code>
+ */
+ public boolean getCompleted() {
+ return completed_;
+ }
+ /**
+ * <code>bool Completed = 4;</code>
+ */
+ public Builder setCompleted(boolean value) {
+
+ completed_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>bool Completed = 4;</code>
+ */
+ public Builder clearCompleted() {
+
+ completed_ = false;
+ onChanged();
+ return this;
+ }
+
private com.google.protobuf.ByteString data_ = com.google.protobuf.ByteString.EMPTY;
/**
- * <code>bytes Data = 3;</code>
+ * <code>bytes Data = 5;</code>
*/
public com.google.protobuf.ByteString getData() {
return data_;
}
/**
- * <code>bytes Data = 3;</code>
+ * <code>bytes Data = 5;</code>
*/
public Builder setData(com.google.protobuf.ByteString value) {
if (value == null) {
@@ -637,7 +765,7 @@ public final class MessageContainerOuterClass {
return this;
}
/**
- * <code>bytes Data = 3;</code>
+ * <code>bytes Data = 5;</code>
*/
public Builder clearData() {
@@ -709,9 +837,10 @@ public final class MessageContainerOuterClass {
static {
java.lang.String[] descriptorData = {
"\n\026MessageContainer.proto\022\020Tango.PMR.Comm" +
- "on\032\021MessageType.proto\"\\\n\020MessageContaine" +
- "r\022+\n\004Type\030\001 \001(\0162\035.Tango.PMR.Common.Messa" +
- "geType\022\r\n\005Token\030\002 \001(\t\022\014\n\004Data\030\003 \001(\014B\034\n\032c" +
+ "on\032\021MessageType.proto\"\203\001\n\020MessageContain" +
+ "er\022+\n\004Type\030\001 \001(\0162\035.Tango.PMR.Common.Mess" +
+ "ageType\022\r\n\005Token\030\002 \001(\t\022\022\n\nContinuous\030\003 \001" +
+ "(\010\022\021\n\tCompleted\030\004 \001(\010\022\014\n\004Data\030\005 \001(\014B\034\n\032c" +
"om.twine.tango.pmr.commonb\006proto3"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
@@ -732,7 +861,7 @@ public final class MessageContainerOuterClass {
internal_static_Tango_PMR_Common_MessageContainer_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_Tango_PMR_Common_MessageContainer_descriptor,
- new java.lang.String[] { "Type", "Token", "Data", });
+ new java.lang.String[] { "Type", "Token", "Continuous", "Completed", "Data", });
com.twine.tango.pmr.common.MessageTypeOuterClass.getDescriptor();
}
diff --git a/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/App.java b/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/App.java
index d26d62dbc..f0c71477f 100644
--- a/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/App.java
+++ b/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/App.java
@@ -60,8 +60,8 @@ public class App extends Application {
// .b()
.addObjectFormatter(GeneratedMessageV3.class, msg -> msg.toString()).build();
- File sdcard = Environment.getExternalStorageDirectory();
- File dir = new File(sdcard.getAbsolutePath() + "/twine/tango/logs");
+ File sdcard = context.getFilesDir();
+ File dir = new File(sdcard.getAbsolutePath() + "/logs");
boolean b = dir.mkdirs();
Printer androidPrinter = new AndroidPrinter();
@@ -71,6 +71,14 @@ public class App extends Application {
.build();
XLog.init(config, androidPrinter, filePrinter);
+ XLog.i("Logger Initialized. logs will be saved to: " + dir.getAbsolutePath());
+
+
+ Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
+
+ XLog.e(throwable);
+ System.exit(1);
+ });
}
public static ApplicationComponent getComponent() {
diff --git a/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/views/stub/StubActivityVM.java b/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/views/stub/StubActivityVM.java
index 6cc1ee32a..d95420dd5 100644
--- a/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/views/stub/StubActivityVM.java
+++ b/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/views/stub/StubActivityVM.java
@@ -18,6 +18,8 @@ import com.twine.tango.transport.TransportComponentState;
import javax.inject.Inject;
+import io.reactivex.android.schedulers.AndroidSchedulers;
+
/**
* Created by Roy on 11/7/2017.
*/
@@ -65,7 +67,7 @@ public class StubActivityVM extends ViewModelBase<StubActivityContract>
private void runSelectedStub()
{
StubBase stub = this.stub.get().createInstance(transporter);
- stub.run().subscribe((response) ->
+ stub.run().observeOn(AndroidSchedulers.mainThread()).subscribe((response) ->
{
XLog.i(response);
}, (ex) ->
diff --git a/Software/Android-Studio/Tango.Stubs/src/main/java/com/twine/tango/stubs/stubs/Progress.java b/Software/Android-Studio/Tango.Stubs/src/main/java/com/twine/tango/stubs/stubs/Progress.java
index 25243afe3..4bfe84609 100644
--- a/Software/Android-Studio/Tango.Stubs/src/main/java/com/twine/tango/stubs/stubs/Progress.java
+++ b/Software/Android-Studio/Tango.Stubs/src/main/java/com/twine/tango/stubs/stubs/Progress.java
@@ -28,7 +28,7 @@ public class Progress extends StubBase
{
PublishSubject<String> subject = PublishSubject.create();
- getTransporter().<ProgressRequest, ProgressResponse>sendRequestMulti(
+ getTransporter().<ProgressRequest, ProgressResponse>sendContinuousRequest(
MessageFactory.createTangoMessage(
ProgressRequest.class,
ProgressRequest.newBuilder().build())).subscribe((response) ->
diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java
index 7ae5579de..0ad375cba 100644
--- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java
+++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java
@@ -27,9 +27,9 @@ public interface ITransporter extends ITransportComponent {
<Request extends GeneratedMessageV3,Response extends GeneratedMessageV3> Single<Response> sendRequest(TangoMessage<Request> request,ITransportAdapter adapter);
- <Request extends GeneratedMessageV3,Response extends GeneratedMessageV3> Observable<Response> sendRequestMulti(TangoMessage<Request> request);
+ <Request extends GeneratedMessageV3,Response extends GeneratedMessageV3> Observable<Response> sendContinuousRequest(TangoMessage<Request> request);
- <Request extends GeneratedMessageV3,Response extends GeneratedMessageV3> Observable<Response> sendRequestMulti(TangoMessage<Request> request,ITransportAdapter adapter);
+ <Request extends GeneratedMessageV3,Response extends GeneratedMessageV3> Observable<Response> sendContinuousRequest(TangoMessage<Request> request, ITransportAdapter adapter);
<Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response);
diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java
new file mode 100644
index 000000000..bb4f2505a
--- /dev/null
+++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java
@@ -0,0 +1,63 @@
+package com.twine.tango.transport;
+
+
+/**
+ * Represents a pending response waiting to be returned to the request sender.
+ */
+public class PendingResponse {
+
+ private ITransportAdapter adapter;
+ private boolean isContinuous;
+
+ /**
+ * Gets adapter.
+ *
+ * @return the adapter
+ */
+ public ITransportAdapter getAdapter()
+ {
+ return adapter;
+ }
+
+ /**
+ * Sets adapter.
+ *
+ * @param adapter the adapter
+ */
+ public void setAdapter(ITransportAdapter adapter)
+ {
+ this.adapter = adapter;
+ }
+
+ /**
+ * Is continuous boolean.
+ *
+ * @return the boolean
+ */
+ public boolean isContinuous()
+ {
+ return isContinuous;
+ }
+
+ /**
+ * Sets continuous.
+ *
+ * @param continuous the continuous
+ */
+ public void setContinuous(boolean continuous)
+ {
+ isContinuous = continuous;
+ }
+
+ /**
+ * Instantiates a new Pending response.
+ *
+ * @param adapter the adapter
+ * @param isContinuous is continuous
+ */
+ public PendingResponse(ITransportAdapter adapter, boolean isContinuous)
+ {
+ this.adapter = adapter;
+ this.isContinuous = isContinuous;
+ }
+}
diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java
index 1d28a25ad..faa50dadf 100644
--- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java
+++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java
@@ -70,8 +70,8 @@ public abstract class TransportAdapterBase implements ITransportAdapter {
setState(TransportComponentState.Disposed);
}
- protected void throwFailedOrDisposed() throws ObjectDisposedException {
- if (state == TransportComponentState.Failed || state == TransportComponentState.Disposed) {
+ protected void throwIfDisposed() throws ObjectDisposedException {
+ if (state == TransportComponentState.Disposed) {
throw new ObjectDisposedException("The adapter is in a " + state + " state.");
}
}
diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java
index e625e4b0d..4ed894dba 100644
--- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java
+++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java
@@ -1,35 +1,36 @@
package com.twine.tango.transport;
-import com.twine.tango.core.Action;
import com.twine.tango.core.Func;
import io.reactivex.subjects.PublishSubject;
-import java8.util.function.Consumer;
-import java8.util.function.Supplier;
/**
* Created by Roy on 11/13/2017.
*/
-public class TransportMessage<T> extends TransportMessageBase {
-
+public class TransportMessage<T> extends TransportMessageBase
+{
+
private PublishSubject<T> publishSubject;
-
- public TransportMessage(ITransportAdapter adapter, String token, Object message, TransportMessageDirection direction, Func<byte[]> serialize, PublishSubject<T> publishSubject) {
+
+ public TransportMessage(ITransportAdapter adapter, String token, Object message, TransportMessageDirection direction, Func<byte[]> serialize, PublishSubject<T> publishSubject)
+ {
super(adapter, token, message, direction, serialize);
this.publishSubject = publishSubject;
}
-
+
@SuppressWarnings("unchecked")
@Override
- protected void setResult(Object result) {
+ protected void setResult(Object result, boolean completed)
+ {
- publishSubject.onNext((T)result);
- if (!isMultiResponse()) publishSubject.onComplete();
+ publishSubject.onNext((T) result);
+ if (completed) publishSubject.onComplete();
}
-
+
@Override
- protected void setException(Exception error) {
+ protected void setException(Exception error)
+ {
publishSubject.onError(error);
publishSubject.onComplete();
}
diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java
index 6d0af57c7..e236fc44f 100644
--- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java
+++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java
@@ -2,73 +2,86 @@ package com.twine.tango.transport;
import com.twine.tango.core.Func;
-import java8.util.function.Supplier;
-
/**
* Created by Roy on 11/13/2017.
*/
-public abstract class TransportMessageBase {
-
+public abstract class TransportMessageBase
+{
+
private ITransportAdapter adapter;
- private boolean isMultiResponse;
+ private boolean isContinuous;
private String token;
private TransportMessageDirection direction;
private Func<byte[]> serialize;
private Object message;
-
- protected abstract void setResult(Object result);
+
+ protected abstract void setResult(Object result, boolean completed);
+
protected abstract void setException(Exception error);
-
- public ITransportAdapter getAdapter() {
+
+ public ITransportAdapter getAdapter()
+ {
return adapter;
}
-
- public void setAdapter(ITransportAdapter adapter) {
+
+ public void setAdapter(ITransportAdapter adapter)
+ {
this.adapter = adapter;
}
-
- public boolean isMultiResponse() {
- return isMultiResponse;
+
+ public boolean isContinuous()
+ {
+ return isContinuous;
}
-
- public void setMultiResponse(boolean multiResponse) {
- isMultiResponse = multiResponse;
+
+ public void setContinuous(boolean continuous)
+ {
+ isContinuous = continuous;
}
-
- public String getToken() {
+
+ public String getToken()
+ {
return token;
}
-
- public void setToken(String token) {
+
+ public void setToken(String token)
+ {
this.token = token;
}
-
- public TransportMessageDirection getDirection() {
+
+ public TransportMessageDirection getDirection()
+ {
return direction;
}
-
- public void setDirection(TransportMessageDirection direction) {
+
+ public void setDirection(TransportMessageDirection direction)
+ {
this.direction = direction;
}
-
- public Func<byte[]> getSerialize() {
+
+ public Func<byte[]> getSerialize()
+ {
return serialize;
}
-
- public void setSerialize(Func<byte[]> serialize) {
+
+ public void setSerialize(Func<byte[]> serialize)
+ {
this.serialize = serialize;
}
-
- public Object getMessage() {
+
+ public Object getMessage()
+ {
return message;
}
-
- public void setMessage(Object message) {
+
+ public void setMessage(Object message)
+ {
this.message = message;
}
-
- public TransportMessageBase(ITransportAdapter adapter, String token, Object message,TransportMessageDirection direction, Func<byte[]> serialize) {
+
+ public TransportMessageBase(ITransportAdapter adapter, String token, Object message, TransportMessageDirection direction, Func<byte[]> serialize)
+ {
this.adapter = adapter;
this.token = token;
this.direction = direction;
diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java
index c1bd4f503..83ce58b97 100644
--- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java
+++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java
@@ -7,14 +7,12 @@ import android.util.Pair;
import com.elvishew.xlog.XLog;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.InvalidProtocolBufferException;
-import com.twine.tango.core.Action;
import com.twine.tango.core.EventHandler;
import com.twine.tango.core.Func;
import com.twine.tango.core.ObjectDisposedException;
import com.twine.tango.core.ObservableCollection;
import com.twine.tango.pmr.MessageFactory;
import com.twine.tango.pmr.TangoMessage;
-import com.twine.tango.pmr.common.MessageContainerOuterClass;
import com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer;
import org.joda.time.Period;
@@ -27,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.Single;
@@ -38,169 +37,203 @@ import static br.com.zbra.androidlinq.Linq.stream;
/**
* Represents an {@link ITransporter} base class.
*/
-public abstract class TransporterBase implements ITransporter {
-
+public abstract class TransporterBase implements ITransporter
+{
+
private ConcurrentLinkedQueue<TransportMessageBase> sendingQueue;
private List<TransportMessageBase> pendingRequests;
private ConcurrentLinkedQueue<Pair<ITransportAdapter, byte[]>> arrivedResponses;
private Thread pushThread;
private Thread pullThread;
private ObservableCollection<ITransportAdapter> adapters;
- private Map<String, ITransportAdapter> tokenAdapters;
+ private Map<String, PendingResponse> pendingResponses;
private TransportComponentState state;
private Period requestTimeout;
-
+
//region Events
-
+
private EventHandler<MessageContainer> requestReceivedListener;
private EventHandler<TransportComponentState> stateChangedListener;
-
+
//endregion
-
+
//region Properties
-
+
@Override
- public ObservableCollection<ITransportAdapter> getAdapters() {
+ public ObservableCollection<ITransportAdapter> getAdapters()
+ {
return adapters;
}
-
- protected void setAdapters(ObservableCollection<ITransportAdapter> adapters) {
-
- if (this.adapters != null) {
+
+ protected void setAdapters(ObservableCollection<ITransportAdapter> adapters)
+ {
+
+ if (this.adapters != null)
+ {
adapters.clearOnChangeListener();
}
-
+
this.adapters = adapters;
-
- if (this.adapters != null) {
+
+ if (this.adapters != null)
+ {
adapters.setOnChangeListener(this::onAdaptersChanged);
}
}
-
+
@Override
- public TransportComponentState getState() {
+ public TransportComponentState getState()
+ {
return state;
}
-
- protected void setState(TransportComponentState state) {
+
+ protected void setState(TransportComponentState state)
+ {
this.state = state;
onStateChanged(this.state);
}
-
+
@Override
- public Period getRequestTimeout() {
+ public Period getRequestTimeout()
+ {
return requestTimeout;
}
-
+
@Override
- public void setRequestTimeout(Period requestTimeout) {
+ public void setRequestTimeout(Period requestTimeout)
+ {
this.requestTimeout = requestTimeout;
}
-
+
//endregion
-
+
//region Protected Methods
-
- protected void onAdaptersChanged() {
-
+
+ protected void onAdaptersChanged()
+ {
+
XLog.i("Adapters collection changed, Listing adapters:");
-
- for (ITransportAdapter adapter : adapters) {
-
+
+ for (ITransportAdapter adapter : adapters)
+ {
+
XLog.i(adapter.getClass().getName() + ", " + adapter.getAddress() + ", " + adapter.getState());
-
+
adapter.setStateChangedListener(this::onAdapterStateChanged);
adapter.setDataAvailableListener(this::onAdapterDataAvailable);
-
- if (this.getState() == TransportComponentState.Connected && adapter.getState() == TransportComponentState.Disconnected) {
- try {
+
+ if (this.getState() == TransportComponentState.Connected && adapter.getState() == TransportComponentState.Disconnected)
+ {
+ try
+ {
adapter.connect().subscribe();
- } catch (ObjectDisposedException e) {
+ } catch (ObjectDisposedException e)
+ {
XLog.e(e);
}
}
}
}
-
- private void onAdapterDataAvailable(Object sender, byte[] data) {
+
+ private void onAdapterDataAvailable(Object sender, byte[] data)
+ {
arrivedResponses.add(new Pair<>((ITransportAdapter) sender, data));
}
-
-
- private void onAdapterStateChanged(Object sender, TransportComponentState state) {
-
- if (state == TransportComponentState.Failed) {
- disconnect().blockingAwait();
+
+
+ private void onAdapterStateChanged(Object sender, TransportComponentState state)
+ {
+
+ if (state == TransportComponentState.Disposed)
+ {
+ adapters.remove((ITransportAdapter) sender);
+ } else if (state == TransportComponentState.Failed)
+ {
+ //TODO: decide what to do here...
}
}
-
- protected void onStateChanged(TransportComponentState state) {
+
+ protected void onStateChanged(TransportComponentState state)
+ {
if (stateChangedListener != null) stateChangedListener.invoke(this, state);
}
-
- protected <T extends GeneratedMessageV3> Func<byte[]> onSerializingMessage(TangoMessage<T> message) {
- return new Func<byte[]>() {
+
+ protected <T extends GeneratedMessageV3> Func<byte[]> onSerializingMessage(TangoMessage<T> message)
+ {
+ return new Func<byte[]>()
+ {
@Override
- public byte[] invoke() {
+ public byte[] invoke()
+ {
return message.toBytes();
}
};
}
-
- protected void onFailed(Exception ex) {
+
+ protected void onFailed(Exception ex)
+ {
disconnect().blockingAwait();
setState(TransportComponentState.Failed);
XLog.e("Transporter failed.", ex);
}
-
- protected void onRequestReceived(MessageContainer request) {
+
+ protected void onRequestReceived(MessageContainer request)
+ {
if (requestReceivedListener != null) requestReceivedListener.invoke(this, request);
}
-
- protected MessageContainer onParseContainer(byte[] data) throws InvalidProtocolBufferException {
+
+ protected MessageContainer onParseContainer(byte[] data) throws InvalidProtocolBufferException
+ {
return MessageFactory.parseContainer(data);
}
-
- protected GeneratedMessageV3 onParseMessage(MessageContainer container) throws InvalidProtocolBufferException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+
+ protected GeneratedMessageV3 onParseMessage(MessageContainer container) throws InvalidProtocolBufferException, NoSuchMethodException, IllegalAccessException, InvocationTargetException
+ {
return MessageFactory.parseMessageFromContainerAgnostic(container);
}
-
+
//endregion
-
+
//region Constructors
-
- public TransporterBase() {
+
+ public TransporterBase()
+ {
setAdapters(new ObservableCollection<>());
- tokenAdapters = new HashMap<>();
+ pendingResponses = new HashMap<>();
sendingQueue = new ConcurrentLinkedQueue<>();
pendingRequests = new ArrayList<>();
arrivedResponses = new ConcurrentLinkedQueue<>();
setRequestTimeout(Period.seconds(10));
}
-
- public TransporterBase(ITransportAdapter adapter) {
+
+ public TransporterBase(ITransportAdapter adapter)
+ {
this();
adapters.add(adapter);
}
-
+
//endregion
-
+
//region Public Methods
-
+
@Override
- public void setStateChangedListener(EventHandler<TransportComponentState> stateChangedListener) {
+ public void setStateChangedListener(EventHandler<TransportComponentState> stateChangedListener)
+ {
this.stateChangedListener = stateChangedListener;
}
-
+
@Override
- public void setRequestReceiverListener(EventHandler<MessageContainer> listener) {
+ public void setRequestReceiverListener(EventHandler<MessageContainer> listener)
+ {
this.requestReceivedListener = listener;
}
-
- @Override
- public Completable connect() throws ObjectDisposedException {
- return Completable.create((x) -> {
+ @Override
+ public Completable connect() throws ObjectDisposedException
+ {
+ return Completable.create((x) ->
+ {
+
try
{
if (adapters.size() == 0)
@@ -208,300 +241,326 @@ public abstract class TransporterBase implements ITransporter {
throw new IllegalArgumentException("This transporter has zero adapters.");
}
- for (ITransportAdapter adapter : adapters) {
-
+ for (ITransportAdapter adapter : adapters)
+ {
+
adapter.connect().blockingAwait();
}
-
+
setState(TransportComponentState.Connected);
startThreads();
-
+
XLog.i("Transporter connected...");
x.onComplete();
} catch (Exception e)
{
- XLog.e("Error connecting transporter",e);
+ XLog.e("Error connecting transporter", e);
}
}).subscribeOn(Schedulers.io());
}
-
+
@Override
- public Completable disconnect() {
+ public Completable disconnect()
+ {
setState(TransportComponentState.Disconnected);
-
- return Completable.create((x) -> {
-
+
+ return Completable.create((x) ->
+ {
+
try
{
- for (ITransportAdapter adapter : adapters) {
-
+ for (ITransportAdapter adapter : adapters)
+ {
+
adapter.disconnect().blockingAwait();
}
-
+
XLog.i("Transporter disconnected...");
-
+
x.onComplete();
} catch (Exception e)
{
x.onError(e);
}
-
+
}).subscribeOn(Schedulers.io());
}
-
+
@Override
- public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<Response> sendRequest(TangoMessage<Request> request) {
+ public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<Response> sendRequest(TangoMessage<Request> request)
+ {
return sendRequest(request, null);
}
-
+
@Override
- public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<Response> sendRequest(TangoMessage<Request> request, ITransportAdapter adapter) {
+ public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<Response> sendRequest(TangoMessage<Request> request, ITransportAdapter adapter)
+ {
XLog.i("Queuing request message: " + request.getClass().getSimpleName() + " Token: " + request.getContainer().getToken() + " on adapter: " + (adapter != null ? adapter.getAddress() : "ALL"));
XLog.i("Expected response: " + Response.Builder.class.getSimpleName());
-
+
PublishSubject<Response> subject = PublishSubject.create();
TransportMessage<Response> message = new TransportMessage<>(adapter, request.getContainer().getToken(), request, TransportMessageDirection.Request, onSerializingMessage(request), subject);
sendingQueue.add(message);
-
+
Completable.timer(getRequestTimeout().getSeconds(), TimeUnit.SECONDS)
.subscribe(() ->
{
- if (!subject.hasComplete()) {
+ if (!subject.hasComplete())
+ {
XLog.i("Request message " + request.getClass().getSimpleName() + " had timed out after " + getRequestTimeout().getSeconds() + " seconds.");
XLog.i("Setting request task exception...");
subject.onError(new TimeoutException());
}
});
-
+
return subject.singleOrError();
}
-
+
@Override
- public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> sendRequestMulti(TangoMessage<Request> request) {
- return sendRequestMulti(request, null);
+ public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> sendContinuousRequest(TangoMessage<Request> request)
+ {
+ return sendContinuousRequest(request, null);
}
-
+
@Override
- public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> sendRequestMulti(TangoMessage<Request> request, ITransportAdapter adapter) {
- XLog.i("Queuing multi response request message: " + request.getClass().getSimpleName() + " Token: " + request.getContainer().getToken() + " on adapter: " + (adapter != null ? adapter.getAddress() : "ALL"));
+ public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> sendContinuousRequest(TangoMessage<Request> request, ITransportAdapter adapter)
+ {
+ XLog.i("Queuing continuous response request message: " + request.getClass().getSimpleName() + " Token: " + request.getContainer().getToken() + " on adapter: " + (adapter != null ? adapter.getAddress() : "ALL"));
XLog.i("Expected response: " + Response.Builder.class.getSimpleName());
-
+
+ request.getContainer().setContinuous(true);
+ request.getContainer().setCompleted(false);
+
PublishSubject<Response> subject = PublishSubject.create();
- TransportMessage<Response> message = new TransportMessage<>(adapter,request.getContainer().getToken(),request,TransportMessageDirection.Request,onSerializingMessage(request),subject);
- message.setMultiResponse(true);
+ TransportMessage<Response> message = new TransportMessage<>(adapter, request.getContainer().getToken(), request, TransportMessageDirection.Request, onSerializingMessage(request), subject);
+ message.setContinuous(true);
sendingQueue.add(message);
-
+
return subject;
}
-
+
@Override
- public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response) {
- return sendResponse(response,response.getContainer().getToken());
+ public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response)
+ {
+ return sendResponse(response, response.getContainer().getToken());
}
-
+
@Override
- public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response, String token) {
-
+ public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response, String token)
+ {
+
response.getContainer().setToken(token);
-
+
XLog.i("Queuing response message: " + response.getClass().getSimpleName());
-
+
+ PendingResponse pendingResponse = null;
ITransportAdapter adapter = null;
-
+
XLog.i("Searching for matching request token: " + token);
-
- adapter = tokenAdapters.get(token);
-
- if (adapter != null)
+
+ pendingResponse = pendingResponses.get(token);
+
+ if (pendingResponse != null)
{
+ adapter = pendingResponse.getAdapter();
XLog.i("Found matching request token: " + token + " on adapter: " + adapter.getAddress());
- XLog.i("Removing matching request token...");
- tokenAdapters.remove(token);
- }
- else
+
+ if (!pendingResponse.isContinuous())
+ {
+ XLog.i("Removing matching request token...");
+ pendingResponses.remove(token);
+ } else if (response.getContainer().getCompleted())
+ {
+ XLog.i("Response completed. Removing matching request token...");
+ pendingResponses.remove(token);
+ }
+ } else
{
+ //This should never happen.
XLog.w("Matching request token was not found...");
+ throw new RuntimeException();
}
-
+
PublishSubject<Response> subject = PublishSubject.create();
- TransportMessage<Response> message = new TransportMessage<>(adapter,token,response,TransportMessageDirection.Response,onSerializingMessage(response),subject);
+ TransportMessage<Response> message = new TransportMessage<>(adapter, token, response, TransportMessageDirection.Response, onSerializingMessage(response), subject);
sendingQueue.add(message);
-
+
return subject.singleOrError();
}
//endregion
-
+
//region Private Methods
-
- private void startThreads() {
+
+ private void startThreads()
+ {
pullThread = new Thread(this::pullThreadMethod);
pullThread.setName("Pull Thread");
pullThread.start();
-
+
pushThread = new Thread(this::pushThreadMethod);
pushThread.setName("Push Thread");
pushThread.start();
}
-
+
//endregion
-
+
//region Push Thread
-
+
public void pushThreadMethod()
{
- try {
-
+ try
+ {
+
while (getState() == TransportComponentState.Connected)
{
if (sendingQueue.size() > 0)
{
TransportMessageBase message = null;
message = sendingQueue.poll();
-
+
if (message != null)
{
- try {
-
+ try
+ {
if (message.getAdapter() == null)
{
- for (ITransportAdapter adapter : adapters) {
+ for (ITransportAdapter adapter : adapters)
+ {
if (adapter.getState() == TransportComponentState.Connected)
{
adapter.write(message.getSerialize().invoke());
+ XLog.i("message sent on adapter: " + adapter.getAddress() + "...");
}
}
- }
- else
+ } else
{
message.getAdapter().write(message.getSerialize().invoke());
+ XLog.i("message sent on adapter: " + message.getAdapter().getAddress() + "...");
}
-
+
if (message.getDirection() == TransportMessageDirection.Request)
{
pendingRequests.add(message);
- }
- else
+ } else
{
- message.setResult(true);
+ message.setResult(true, true);
}
- }
- catch (Exception ex)
+ } catch (Exception ex)
{
message.setException(ex);
}
}
}
-
+
SystemClock.sleep(10);
}
-
- }
- catch (Exception ex)
+
+ } catch (Exception ex)
{
onFailed(ex);
}
}
-
+
//endregion
-
+
//region Pull Thread
-
+
public void pullThreadMethod()
{
- try {
-
+ try
+ {
+
while (getState() == TransportComponentState.Connected)
{
- Pair<ITransportAdapter,byte[]> data;
-
+ Pair<ITransportAdapter, byte[]> data;
+
if (arrivedResponses.size() > 0)
{
data = arrivedResponses.poll();
-
+
if (data != null)
{
XLog.i("Message received on adapter: " + data.first.getAddress());
XLog.i("Parsing message container...");
MessageContainer container = onParseContainer(data.second);
XLog.i("Searching for pending request token: " + container.getToken());
- TransportMessageBase request = stream(pendingRequests).singleOrDefault(x -> x.getToken().equals(container.getToken()),null);
-
+ TransportMessageBase request = stream(pendingRequests).singleOrDefault(x -> x.getToken().equals(container.getToken()), null);
+
if (request != null)
{
XLog.i("Found pending request: " + request.getMessage().getClass().getSimpleName());
-
- if (!request.isMultiResponse())
+
+ if (!request.isContinuous())
{
XLog.i("Pending request was identified as 'single response'. Removing pending request.");
-
+
pendingRequests.remove(request);
-
+
try
{
XLog.i("Parsing inner response message and setting pending request task result...");
- request.setResult(onParseMessage(container));
- }
- catch(Exception ex)
+ request.setResult(onParseMessage(container), true);
+ } catch (Exception ex)
{
- XLog.e("Error parsing inner message",ex);
+ XLog.e("Error parsing inner message", ex);
request.setException(ex);
}
- }
- else
+ } else
{
- XLog.i("Pending request was identified as 'multi response'. keeping pending request.");
-
+ XLog.i("Pending request was identified as 'continuous response'. keeping pending request.");
+
try
{
- XLog.i("Parsing inner response message and invoking multi response callback...");
- request.setResult(onParseMessage(container));
- }
- catch (Exception ex)
+ XLog.i("Parsing inner response message and invoking continuous response callback...");
+ if (container.getCompleted())
+ {
+ XLog.i("Continuous sequence completed.");
+ }
+ request.setResult(onParseMessage(container), container.getCompleted());
+ } catch (Exception ex)
{
XLog.e("Error parsing inner message", ex);
request.setException(ex);
}
}
- }
- else
+ } else
{
XLog.i("Message was identified as a new request message: " + container.getType().toString());
-
+
try
{
XLog.i("Saving request token and adapter: " + container.getToken() + ", " + data.first.getAddress());
- tokenAdapters.put(container.getToken(),data.first);
+ pendingResponses.put(container.getToken(), new PendingResponse(data.first, container.getContinuous()));
XLog.i("Invoking RequestReceived event...");
AsyncTask.execute(() -> onRequestReceived(container));
- }
- catch (Exception ex)
+ } catch (Exception ex)
{
//Ignore any exception that might occur on the event handler side...
}
}
}
}
-
+
SystemClock.sleep(10);
}
-
- }
- catch (Exception ex)
+
+ } catch (Exception ex)
{
onFailed(ex);
}
}
-
+
//endregion
-
+
//region Dispose
-
+
@Override
- public void dispose() {
+ public void dispose()
+ {
disconnect().blockingAwait();
setState(TransportComponentState.Disposed);
}
-
+
//endregion
}
diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java
index b47e12594..25c23919f 100644
--- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java
+++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java
@@ -11,8 +11,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import io.reactivex.Completable;
@@ -74,7 +72,7 @@ public class TcpTransportAdapter extends TransportAdapterBase
@Override
public Completable connect() throws ObjectDisposedException
{
- throwFailedOrDisposed();
+ throwIfDisposed();
return Completable.create((x) ->
{
@@ -134,7 +132,7 @@ public class TcpTransportAdapter extends TransportAdapterBase
@Override
public void write(byte[] data) throws ObjectDisposedException, IOException
{
- throwFailedOrDisposed();
+ throwIfDisposed();
try
{
diff --git a/Software/PMR/Messages/Common/MessageContainer.proto b/Software/PMR/Messages/Common/MessageContainer.proto
index ae507822f..c65796b76 100644
--- a/Software/PMR/Messages/Common/MessageContainer.proto
+++ b/Software/PMR/Messages/Common/MessageContainer.proto
@@ -9,6 +9,8 @@ message MessageContainer
{
MessageType Type = 1;
string Token = 2;
- bytes Data = 3;
+ bool Continuous = 3;
+ bool Completed = 4;
+ bytes Data = 5;
}
diff --git a/Software/Visual Studio/Tango.Emulations/Emulators/MachineEmulator.cs b/Software/Visual Studio/Tango.Emulations/Emulators/MachineEmulator.cs
index 9e4337b9f..b41fd1804 100644
--- a/Software/Visual Studio/Tango.Emulations/Emulators/MachineEmulator.cs
+++ b/Software/Visual Studio/Tango.Emulations/Emulators/MachineEmulator.cs
@@ -71,6 +71,9 @@ namespace Tango.Emulations.Emulators
Thread.Sleep(500);
var res = MessageFactory.CreateTangoMessage<ProgressResponse>(container.Token);
res.Message.Progress = i;
+
+ if (i == 9) res.Container.Completed = true;
+
Transporter.SendResponse(res);
}
});
diff --git a/Software/Visual Studio/Tango.PMR/Common/MessageContainer.cs b/Software/Visual Studio/Tango.PMR/Common/MessageContainer.cs
index 4b4a437a9..3939e53c7 100644
--- a/Software/Visual Studio/Tango.PMR/Common/MessageContainer.cs
+++ b/Software/Visual Studio/Tango.PMR/Common/MessageContainer.cs
@@ -23,14 +23,15 @@ namespace Tango.PMR.Common {
byte[] descriptorData = global::System.Convert.FromBase64String(
string.Concat(
"ChZNZXNzYWdlQ29udGFpbmVyLnByb3RvEhBUYW5nby5QTVIuQ29tbW9uGhFN",
- "ZXNzYWdlVHlwZS5wcm90byJcChBNZXNzYWdlQ29udGFpbmVyEisKBFR5cGUY",
- "ASABKA4yHS5UYW5nby5QTVIuQ29tbW9uLk1lc3NhZ2VUeXBlEg0KBVRva2Vu",
- "GAIgASgJEgwKBERhdGEYAyABKAxCHAoaY29tLnR3aW5lLnRhbmdvLnBtci5j",
- "b21tb25iBnByb3RvMw=="));
+ "ZXNzYWdlVHlwZS5wcm90byKDAQoQTWVzc2FnZUNvbnRhaW5lchIrCgRUeXBl",
+ "GAEgASgOMh0uVGFuZ28uUE1SLkNvbW1vbi5NZXNzYWdlVHlwZRINCgVUb2tl",
+ "bhgCIAEoCRISCgpDb250aW51b3VzGAMgASgIEhEKCUNvbXBsZXRlZBgEIAEo",
+ "CBIMCgREYXRhGAUgASgMQhwKGmNvbS50d2luZS50YW5nby5wbXIuY29tbW9u",
+ "YgZwcm90bzM="));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { global::Tango.PMR.Common.MessageTypeReflection.Descriptor, },
new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
- new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Common.MessageContainer), global::Tango.PMR.Common.MessageContainer.Parser, new[]{ "Type", "Token", "Data" }, null, null, null)
+ new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Common.MessageContainer), global::Tango.PMR.Common.MessageContainer.Parser, new[]{ "Type", "Token", "Continuous", "Completed", "Data" }, null, null, null)
}));
}
#endregion
@@ -63,6 +64,8 @@ namespace Tango.PMR.Common {
public MessageContainer(MessageContainer other) : this() {
type_ = other.type_;
token_ = other.token_;
+ continuous_ = other.continuous_;
+ completed_ = other.completed_;
data_ = other.data_;
}
@@ -93,8 +96,30 @@ namespace Tango.PMR.Common {
}
}
+ /// <summary>Field number for the "Continuous" field.</summary>
+ public const int ContinuousFieldNumber = 3;
+ private bool continuous_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public bool Continuous {
+ get { return continuous_; }
+ set {
+ continuous_ = value;
+ }
+ }
+
+ /// <summary>Field number for the "Completed" field.</summary>
+ public const int CompletedFieldNumber = 4;
+ private bool completed_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public bool Completed {
+ get { return completed_; }
+ set {
+ completed_ = value;
+ }
+ }
+
/// <summary>Field number for the "Data" field.</summary>
- public const int DataFieldNumber = 3;
+ public const int DataFieldNumber = 5;
private pb::ByteString data_ = pb::ByteString.Empty;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public pb::ByteString Data {
@@ -119,6 +144,8 @@ namespace Tango.PMR.Common {
}
if (Type != other.Type) return false;
if (Token != other.Token) return false;
+ if (Continuous != other.Continuous) return false;
+ if (Completed != other.Completed) return false;
if (Data != other.Data) return false;
return true;
}
@@ -128,6 +155,8 @@ namespace Tango.PMR.Common {
int hash = 1;
if (Type != 0) hash ^= Type.GetHashCode();
if (Token.Length != 0) hash ^= Token.GetHashCode();
+ if (Continuous != false) hash ^= Continuous.GetHashCode();
+ if (Completed != false) hash ^= Completed.GetHashCode();
if (Data.Length != 0) hash ^= Data.GetHashCode();
return hash;
}
@@ -147,8 +176,16 @@ namespace Tango.PMR.Common {
output.WriteRawTag(18);
output.WriteString(Token);
}
+ if (Continuous != false) {
+ output.WriteRawTag(24);
+ output.WriteBool(Continuous);
+ }
+ if (Completed != false) {
+ output.WriteRawTag(32);
+ output.WriteBool(Completed);
+ }
if (Data.Length != 0) {
- output.WriteRawTag(26);
+ output.WriteRawTag(42);
output.WriteBytes(Data);
}
}
@@ -162,6 +199,12 @@ namespace Tango.PMR.Common {
if (Token.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(Token);
}
+ if (Continuous != false) {
+ size += 1 + 1;
+ }
+ if (Completed != false) {
+ size += 1 + 1;
+ }
if (Data.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeBytesSize(Data);
}
@@ -179,6 +222,12 @@ namespace Tango.PMR.Common {
if (other.Token.Length != 0) {
Token = other.Token;
}
+ if (other.Continuous != false) {
+ Continuous = other.Continuous;
+ }
+ if (other.Completed != false) {
+ Completed = other.Completed;
+ }
if (other.Data.Length != 0) {
Data = other.Data;
}
@@ -200,7 +249,15 @@ namespace Tango.PMR.Common {
Token = input.ReadString();
break;
}
- case 26: {
+ case 24: {
+ Continuous = input.ReadBool();
+ break;
+ }
+ case 32: {
+ Completed = input.ReadBool();
+ break;
+ }
+ case 42: {
Data = input.ReadBytes();
break;
}
diff --git a/Software/Visual Studio/Tango.PMR/Stubs/Calculate.cs b/Software/Visual Studio/Tango.PMR/Stubs/CalculateRequest.cs
index 75aa074e1..36003965f 100644
--- a/Software/Visual Studio/Tango.PMR/Stubs/Calculate.cs
+++ b/Software/Visual Studio/Tango.PMR/Stubs/CalculateRequest.cs
@@ -1,5 +1,5 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
-// source: Calculate.proto
+// source: CalculateRequest.proto
#pragma warning disable 1591, 0612, 3021
#region Designer generated code
@@ -9,28 +9,26 @@ using pbr = global::Google.Protobuf.Reflection;
using scg = global::System.Collections.Generic;
namespace Tango.PMR.Stubs {
- /// <summary>Holder for reflection information generated from Calculate.proto</summary>
- public static partial class CalculateReflection {
+ /// <summary>Holder for reflection information generated from CalculateRequest.proto</summary>
+ public static partial class CalculateRequestReflection {
#region Descriptor
- /// <summary>File descriptor for Calculate.proto</summary>
+ /// <summary>File descriptor for CalculateRequest.proto</summary>
public static pbr::FileDescriptor Descriptor {
get { return descriptor; }
}
private static pbr::FileDescriptor descriptor;
- static CalculateReflection() {
+ static CalculateRequestReflection() {
byte[] descriptorData = global::System.Convert.FromBase64String(
string.Concat(
- "Cg9DYWxjdWxhdGUucHJvdG8SD1RhbmdvLlBNUi5TdHVicyIoChBDYWxjdWxh",
- "dGVSZXF1ZXN0EgkKAUEYASABKAESCQoBQhgCIAEoASIgChFDYWxjdWxhdGVS",
- "ZXNwb25zZRILCgNTdW0YASABKAFCGwoZY29tLnR3aW5lLnRhbmdvLnBtci5z",
- "dHVic2IGcHJvdG8z"));
+ "ChZDYWxjdWxhdGVSZXF1ZXN0LnByb3RvEg9UYW5nby5QTVIuU3R1YnMiKAoQ",
+ "Q2FsY3VsYXRlUmVxdWVzdBIJCgFBGAEgASgBEgkKAUIYAiABKAFCGwoZY29t",
+ "LnR3aW5lLnRhbmdvLnBtci5zdHVic2IGcHJvdG8z"));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { },
new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
- new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.CalculateRequest), global::Tango.PMR.Stubs.CalculateRequest.Parser, new[]{ "A", "B" }, null, null, null),
- new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.CalculateResponse), global::Tango.PMR.Stubs.CalculateResponse.Parser, new[]{ "Sum" }, null, null, null)
+ new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.CalculateRequest), global::Tango.PMR.Stubs.CalculateRequest.Parser, new[]{ "A", "B" }, null, null, null)
}));
}
#endregion
@@ -44,7 +42,7 @@ namespace Tango.PMR.Stubs {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Tango.PMR.Stubs.CalculateReflection.Descriptor.MessageTypes[0]; }
+ get { return global::Tango.PMR.Stubs.CalculateRequestReflection.Descriptor.MessageTypes[0]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -182,123 +180,6 @@ namespace Tango.PMR.Stubs {
}
- public sealed partial class CalculateResponse : pb::IMessage<CalculateResponse> {
- private static readonly pb::MessageParser<CalculateResponse> _parser = new pb::MessageParser<CalculateResponse>(() => new CalculateResponse());
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public static pb::MessageParser<CalculateResponse> Parser { get { return _parser; } }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public static pbr::MessageDescriptor Descriptor {
- get { return global::Tango.PMR.Stubs.CalculateReflection.Descriptor.MessageTypes[1]; }
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- pbr::MessageDescriptor pb::IMessage.Descriptor {
- get { return Descriptor; }
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public CalculateResponse() {
- OnConstruction();
- }
-
- partial void OnConstruction();
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public CalculateResponse(CalculateResponse other) : this() {
- sum_ = other.sum_;
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public CalculateResponse Clone() {
- return new CalculateResponse(this);
- }
-
- /// <summary>Field number for the "Sum" field.</summary>
- public const int SumFieldNumber = 1;
- private double sum_;
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public double Sum {
- get { return sum_; }
- set {
- sum_ = value;
- }
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public override bool Equals(object other) {
- return Equals(other as CalculateResponse);
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public bool Equals(CalculateResponse other) {
- if (ReferenceEquals(other, null)) {
- return false;
- }
- if (ReferenceEquals(other, this)) {
- return true;
- }
- if (Sum != other.Sum) return false;
- return true;
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public override int GetHashCode() {
- int hash = 1;
- if (Sum != 0D) hash ^= Sum.GetHashCode();
- return hash;
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public override string ToString() {
- return pb::JsonFormatter.ToDiagnosticString(this);
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public void WriteTo(pb::CodedOutputStream output) {
- if (Sum != 0D) {
- output.WriteRawTag(9);
- output.WriteDouble(Sum);
- }
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public int CalculateSize() {
- int size = 0;
- if (Sum != 0D) {
- size += 1 + 8;
- }
- return size;
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public void MergeFrom(CalculateResponse other) {
- if (other == null) {
- return;
- }
- if (other.Sum != 0D) {
- Sum = other.Sum;
- }
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public void MergeFrom(pb::CodedInputStream input) {
- uint tag;
- while ((tag = input.ReadTag()) != 0) {
- switch(tag) {
- default:
- input.SkipLastField();
- break;
- case 9: {
- Sum = input.ReadDouble();
- break;
- }
- }
- }
- }
-
- }
-
#endregion
}
diff --git a/Software/Visual Studio/Tango.PMR/Stubs/CalculateResponse.cs b/Software/Visual Studio/Tango.PMR/Stubs/CalculateResponse.cs
new file mode 100644
index 000000000..8d8ca7c44
--- /dev/null
+++ b/Software/Visual Studio/Tango.PMR/Stubs/CalculateResponse.cs
@@ -0,0 +1,159 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: CalculateResponse.proto
+#pragma warning disable 1591, 0612, 3021
+#region Designer generated code
+
+using pb = global::Google.Protobuf;
+using pbc = global::Google.Protobuf.Collections;
+using pbr = global::Google.Protobuf.Reflection;
+using scg = global::System.Collections.Generic;
+namespace Tango.PMR.Stubs {
+
+ /// <summary>Holder for reflection information generated from CalculateResponse.proto</summary>
+ public static partial class CalculateResponseReflection {
+
+ #region Descriptor
+ /// <summary>File descriptor for CalculateResponse.proto</summary>
+ public static pbr::FileDescriptor Descriptor {
+ get { return descriptor; }
+ }
+ private static pbr::FileDescriptor descriptor;
+
+ static CalculateResponseReflection() {
+ byte[] descriptorData = global::System.Convert.FromBase64String(
+ string.Concat(
+ "ChdDYWxjdWxhdGVSZXNwb25zZS5wcm90bxIPVGFuZ28uUE1SLlN0dWJzIiAK",
+ "EUNhbGN1bGF0ZVJlc3BvbnNlEgsKA1N1bRgBIAEoAUIbChljb20udHdpbmUu",
+ "dGFuZ28ucG1yLnN0dWJzYgZwcm90bzM="));
+ descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
+ new pbr::FileDescriptor[] { },
+ new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
+ new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.CalculateResponse), global::Tango.PMR.Stubs.CalculateResponse.Parser, new[]{ "Sum" }, null, null, null)
+ }));
+ }
+ #endregion
+
+ }
+ #region Messages
+ public sealed partial class CalculateResponse : pb::IMessage<CalculateResponse> {
+ private static readonly pb::MessageParser<CalculateResponse> _parser = new pb::MessageParser<CalculateResponse>(() => new CalculateResponse());
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public static pb::MessageParser<CalculateResponse> Parser { get { return _parser; } }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public static pbr::MessageDescriptor Descriptor {
+ get { return global::Tango.PMR.Stubs.CalculateResponseReflection.Descriptor.MessageTypes[0]; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ pbr::MessageDescriptor pb::IMessage.Descriptor {
+ get { return Descriptor; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public CalculateResponse() {
+ OnConstruction();
+ }
+
+ partial void OnConstruction();
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public CalculateResponse(CalculateResponse other) : this() {
+ sum_ = other.sum_;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public CalculateResponse Clone() {
+ return new CalculateResponse(this);
+ }
+
+ /// <summary>Field number for the "Sum" field.</summary>
+ public const int SumFieldNumber = 1;
+ private double sum_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public double Sum {
+ get { return sum_; }
+ set {
+ sum_ = value;
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override bool Equals(object other) {
+ return Equals(other as CalculateResponse);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public bool Equals(CalculateResponse other) {
+ if (ReferenceEquals(other, null)) {
+ return false;
+ }
+ if (ReferenceEquals(other, this)) {
+ return true;
+ }
+ if (Sum != other.Sum) return false;
+ return true;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override int GetHashCode() {
+ int hash = 1;
+ if (Sum != 0D) hash ^= Sum.GetHashCode();
+ return hash;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override string ToString() {
+ return pb::JsonFormatter.ToDiagnosticString(this);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void WriteTo(pb::CodedOutputStream output) {
+ if (Sum != 0D) {
+ output.WriteRawTag(9);
+ output.WriteDouble(Sum);
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public int CalculateSize() {
+ int size = 0;
+ if (Sum != 0D) {
+ size += 1 + 8;
+ }
+ return size;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void MergeFrom(CalculateResponse other) {
+ if (other == null) {
+ return;
+ }
+ if (other.Sum != 0D) {
+ Sum = other.Sum;
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void MergeFrom(pb::CodedInputStream input) {
+ uint tag;
+ while ((tag = input.ReadTag()) != 0) {
+ switch(tag) {
+ default:
+ input.SkipLastField();
+ break;
+ case 9: {
+ Sum = input.ReadDouble();
+ break;
+ }
+ }
+ }
+ }
+
+ }
+
+ #endregion
+
+}
+
+#endregion Designer generated code
diff --git a/Software/Visual Studio/Tango.PMR/Stubs/ProgressRequest.cs b/Software/Visual Studio/Tango.PMR/Stubs/ProgressRequest.cs
new file mode 100644
index 000000000..1eda06217
--- /dev/null
+++ b/Software/Visual Studio/Tango.PMR/Stubs/ProgressRequest.cs
@@ -0,0 +1,131 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: ProgressRequest.proto
+#pragma warning disable 1591, 0612, 3021
+#region Designer generated code
+
+using pb = global::Google.Protobuf;
+using pbc = global::Google.Protobuf.Collections;
+using pbr = global::Google.Protobuf.Reflection;
+using scg = global::System.Collections.Generic;
+namespace Tango.PMR.Stubs {
+
+ /// <summary>Holder for reflection information generated from ProgressRequest.proto</summary>
+ public static partial class ProgressRequestReflection {
+
+ #region Descriptor
+ /// <summary>File descriptor for ProgressRequest.proto</summary>
+ public static pbr::FileDescriptor Descriptor {
+ get { return descriptor; }
+ }
+ private static pbr::FileDescriptor descriptor;
+
+ static ProgressRequestReflection() {
+ byte[] descriptorData = global::System.Convert.FromBase64String(
+ string.Concat(
+ "ChVQcm9ncmVzc1JlcXVlc3QucHJvdG8SD1RhbmdvLlBNUi5TdHVicyIRCg9Q",
+ "cm9ncmVzc1JlcXVlc3RCGwoZY29tLnR3aW5lLnRhbmdvLnBtci5zdHVic2IG",
+ "cHJvdG8z"));
+ descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
+ new pbr::FileDescriptor[] { },
+ new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
+ new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.ProgressRequest), global::Tango.PMR.Stubs.ProgressRequest.Parser, null, null, null, null)
+ }));
+ }
+ #endregion
+
+ }
+ #region Messages
+ public sealed partial class ProgressRequest : pb::IMessage<ProgressRequest> {
+ private static readonly pb::MessageParser<ProgressRequest> _parser = new pb::MessageParser<ProgressRequest>(() => new ProgressRequest());
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public static pb::MessageParser<ProgressRequest> Parser { get { return _parser; } }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public static pbr::MessageDescriptor Descriptor {
+ get { return global::Tango.PMR.Stubs.ProgressRequestReflection.Descriptor.MessageTypes[0]; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ pbr::MessageDescriptor pb::IMessage.Descriptor {
+ get { return Descriptor; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public ProgressRequest() {
+ OnConstruction();
+ }
+
+ partial void OnConstruction();
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public ProgressRequest(ProgressRequest other) : this() {
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public ProgressRequest Clone() {
+ return new ProgressRequest(this);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override bool Equals(object other) {
+ return Equals(other as ProgressRequest);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public bool Equals(ProgressRequest other) {
+ if (ReferenceEquals(other, null)) {
+ return false;
+ }
+ if (ReferenceEquals(other, this)) {
+ return true;
+ }
+ return true;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override int GetHashCode() {
+ int hash = 1;
+ return hash;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override string ToString() {
+ return pb::JsonFormatter.ToDiagnosticString(this);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void WriteTo(pb::CodedOutputStream output) {
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public int CalculateSize() {
+ int size = 0;
+ return size;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void MergeFrom(ProgressRequest other) {
+ if (other == null) {
+ return;
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void MergeFrom(pb::CodedInputStream input) {
+ uint tag;
+ while ((tag = input.ReadTag()) != 0) {
+ switch(tag) {
+ default:
+ input.SkipLastField();
+ break;
+ }
+ }
+ }
+
+ }
+
+ #endregion
+
+}
+
+#endregion Designer generated code
diff --git a/Software/Visual Studio/Tango.PMR/Stubs/Progress.cs b/Software/Visual Studio/Tango.PMR/Stubs/ProgressResponse.cs
index 2a00e15de..7ffc4a41d 100644
--- a/Software/Visual Studio/Tango.PMR/Stubs/Progress.cs
+++ b/Software/Visual Studio/Tango.PMR/Stubs/ProgressResponse.cs
@@ -1,5 +1,5 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
-// source: Progress.proto
+// source: ProgressResponse.proto
#pragma warning disable 1591, 0612, 3021
#region Designer generated code
@@ -9,26 +9,25 @@ using pbr = global::Google.Protobuf.Reflection;
using scg = global::System.Collections.Generic;
namespace Tango.PMR.Stubs {
- /// <summary>Holder for reflection information generated from Progress.proto</summary>
- public static partial class ProgressReflection {
+ /// <summary>Holder for reflection information generated from ProgressResponse.proto</summary>
+ public static partial class ProgressResponseReflection {
#region Descriptor
- /// <summary>File descriptor for Progress.proto</summary>
+ /// <summary>File descriptor for ProgressResponse.proto</summary>
public static pbr::FileDescriptor Descriptor {
get { return descriptor; }
}
private static pbr::FileDescriptor descriptor;
- static ProgressReflection() {
+ static ProgressResponseReflection() {
byte[] descriptorData = global::System.Convert.FromBase64String(
string.Concat(
- "Cg5Qcm9ncmVzcy5wcm90bxIPVGFuZ28uUE1SLlN0dWJzIhEKD1Byb2dyZXNz",
- "UmVxdWVzdCIkChBQcm9ncmVzc1Jlc3BvbnNlEhAKCFByb2dyZXNzGAEgASgB",
- "QhsKGWNvbS50d2luZS50YW5nby5wbXIuc3R1YnNiBnByb3RvMw=="));
+ "ChZQcm9ncmVzc1Jlc3BvbnNlLnByb3RvEg9UYW5nby5QTVIuU3R1YnMiJAoQ",
+ "UHJvZ3Jlc3NSZXNwb25zZRIQCghQcm9ncmVzcxgBIAEoAUIbChljb20udHdp",
+ "bmUudGFuZ28ucG1yLnN0dWJzYgZwcm90bzM="));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { },
new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
- new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.ProgressRequest), global::Tango.PMR.Stubs.ProgressRequest.Parser, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.ProgressResponse), global::Tango.PMR.Stubs.ProgressResponse.Parser, new[]{ "Progress" }, null, null, null)
}));
}
@@ -36,95 +35,6 @@ namespace Tango.PMR.Stubs {
}
#region Messages
- public sealed partial class ProgressRequest : pb::IMessage<ProgressRequest> {
- private static readonly pb::MessageParser<ProgressRequest> _parser = new pb::MessageParser<ProgressRequest>(() => new ProgressRequest());
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public static pb::MessageParser<ProgressRequest> Parser { get { return _parser; } }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public static pbr::MessageDescriptor Descriptor {
- get { return global::Tango.PMR.Stubs.ProgressReflection.Descriptor.MessageTypes[0]; }
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- pbr::MessageDescriptor pb::IMessage.Descriptor {
- get { return Descriptor; }
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public ProgressRequest() {
- OnConstruction();
- }
-
- partial void OnConstruction();
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public ProgressRequest(ProgressRequest other) : this() {
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public ProgressRequest Clone() {
- return new ProgressRequest(this);
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public override bool Equals(object other) {
- return Equals(other as ProgressRequest);
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public bool Equals(ProgressRequest other) {
- if (ReferenceEquals(other, null)) {
- return false;
- }
- if (ReferenceEquals(other, this)) {
- return true;
- }
- return true;
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public override int GetHashCode() {
- int hash = 1;
- return hash;
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public override string ToString() {
- return pb::JsonFormatter.ToDiagnosticString(this);
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public void WriteTo(pb::CodedOutputStream output) {
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public int CalculateSize() {
- int size = 0;
- return size;
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public void MergeFrom(ProgressRequest other) {
- if (other == null) {
- return;
- }
- }
-
- [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
- public void MergeFrom(pb::CodedInputStream input) {
- uint tag;
- while ((tag = input.ReadTag()) != 0) {
- switch(tag) {
- default:
- input.SkipLastField();
- break;
- }
- }
- }
-
- }
-
public sealed partial class ProgressResponse : pb::IMessage<ProgressResponse> {
private static readonly pb::MessageParser<ProgressResponse> _parser = new pb::MessageParser<ProgressResponse>(() => new ProgressResponse());
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -132,7 +42,7 @@ namespace Tango.PMR.Stubs {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Tango.PMR.Stubs.ProgressReflection.Descriptor.MessageTypes[1]; }
+ get { return global::Tango.PMR.Stubs.ProgressResponseReflection.Descriptor.MessageTypes[0]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
diff --git a/Software/Visual Studio/Tango.PMR/Tango.PMR.csproj b/Software/Visual Studio/Tango.PMR/Tango.PMR.csproj
index dd873fe90..10a735256 100644
--- a/Software/Visual Studio/Tango.PMR/Tango.PMR.csproj
+++ b/Software/Visual Studio/Tango.PMR/Tango.PMR.csproj
@@ -53,8 +53,10 @@
<Compile Include="Jobs\Segment.cs" />
<Compile Include="MessageFactory.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
- <Compile Include="Stubs\Calculate.cs" />
- <Compile Include="Stubs\Progress.cs" />
+ <Compile Include="Stubs\CalculateRequest.cs" />
+ <Compile Include="Stubs\CalculateResponse.cs" />
+ <Compile Include="Stubs\ProgressRequest.cs" />
+ <Compile Include="Stubs\ProgressResponse.cs" />
<Compile Include="TangoMessage.cs" />
</ItemGroup>
<ItemGroup>
diff --git a/Software/Visual Studio/Tango.Stubs/Stubs/Progress.cs b/Software/Visual Studio/Tango.Stubs/Stubs/Progress.cs
index 91cd1ab6a..e378791d2 100644
--- a/Software/Visual Studio/Tango.Stubs/Stubs/Progress.cs
+++ b/Software/Visual Studio/Tango.Stubs/Stubs/Progress.cs
@@ -19,7 +19,7 @@ namespace Tango.Stubs.Stubs
protected override Task<string> OnRun(Action<String> multiResponseCallback)
{
- Transporter.SendMultiRequest<ProgressRequest, ProgressResponse>(MessageFactory.CreateTangoMessage<ProgressRequest>(), (response) =>
+ Transporter.SendContinuousRequest<ProgressRequest, ProgressResponse>(MessageFactory.CreateTangoMessage<ProgressRequest>(), (response) =>
{
multiResponseCallback(response.Progress.ToString());
});
diff --git a/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs b/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs
index b2fd3fd10..97757deac 100644
--- a/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs
+++ b/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs
@@ -75,7 +75,7 @@ namespace Tango.Transport.Adapters
/// <returns></returns>
public override Task Connect()
{
- ThrowFailedOrDisposed();
+ ThrowIfDisposed();
return Task.Factory.StartNew(() =>
{
@@ -133,7 +133,7 @@ namespace Tango.Transport.Adapters
/// <param name="data">The data.</param>
public override void Write(byte[] data)
{
- ThrowFailedOrDisposed();
+ ThrowIfDisposed();
try
{
diff --git a/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs b/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs
index eb7820697..665f2d779 100644
--- a/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs
+++ b/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs
@@ -40,7 +40,7 @@ namespace Tango.Transport.Adapters
/// <returns></returns>
public override Task Connect()
{
- ThrowFailedOrDisposed();
+ ThrowIfDisposed();
return Task.Factory.StartNew(() =>
{
@@ -79,7 +79,7 @@ namespace Tango.Transport.Adapters
/// <returns></returns>
public override Task Disconnect()
{
- ThrowFailedOrDisposed();
+ ThrowIfDisposed();
return Task.Factory.StartNew(() =>
{
@@ -114,7 +114,7 @@ namespace Tango.Transport.Adapters
/// <param name="data">The data.</param>
public override void Write(byte[] data)
{
- ThrowFailedOrDisposed();
+ ThrowIfDisposed();
try
{
diff --git a/Software/Visual Studio/Tango.Transport/ITransporter.cs b/Software/Visual Studio/Tango.Transport/ITransporter.cs
index 834c49818..07b7dd554 100644
--- a/Software/Visual Studio/Tango.Transport/ITransporter.cs
+++ b/Software/Visual Studio/Tango.Transport/ITransporter.cs
@@ -52,7 +52,7 @@ namespace Tango.Transport
/// <param name="adapter">Transport adapter</param>
/// <param name="responseCallback">The response callback delegate.</param>
/// <returns></returns>
- void SendMultiRequest<Request, Response>(TangoMessage<Request> request, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>;
+ void SendContinuousRequest<Request, Response>(TangoMessage<Request> request, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>;
/// <summary>
/// Sends a request through the specified adapter which is expected to return multiple response messages.
@@ -63,7 +63,7 @@ namespace Tango.Transport
/// <param name="adapter">Transport adapter</param>
/// <param name="responseCallback">The response callback delegate.</param>
/// <returns></returns>
- void SendMultiRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>;
+ void SendContinuousRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>;
/// <summary>
/// Sends a response.
diff --git a/Software/Visual Studio/Tango.Transport/PendingResponse.cs b/Software/Visual Studio/Tango.Transport/PendingResponse.cs
new file mode 100644
index 000000000..bb9718b19
--- /dev/null
+++ b/Software/Visual Studio/Tango.Transport/PendingResponse.cs
@@ -0,0 +1,35 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Transport
+{
+ /// <summary>
+ /// Represents a pending response waiting to be returned to the request sender.
+ /// </summary>
+ public class PendingResponse
+ {
+ /// <summary>
+ /// Gets or sets the adapter target response adapter.
+ /// </summary>
+ public ITransportAdapter Adapter { get; set; }
+
+ /// <summary>
+ /// Gets or sets a value indicating whether this is a continuous request.
+ /// </summary>
+ public bool IsContinuous { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PendingResponse"/> class.
+ /// </summary>
+ /// <param name="adapter">The adapter.</param>
+ /// <param name="isContinuous">if set to <c>true</c> [is continuous].</param>
+ public PendingResponse(ITransportAdapter adapter,bool isContinuous)
+ {
+ Adapter = adapter;
+ IsContinuous = isContinuous;
+ }
+ }
+}
diff --git a/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj b/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj
index 0f383b699..54c7565cf 100644
--- a/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj
+++ b/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj
@@ -68,6 +68,7 @@
<Compile Include="ITransportComponent.cs" />
<Compile Include="ITransportAdapter.cs" />
<Compile Include="Adapters\TcpTransportAdapter.cs" />
+ <Compile Include="PendingResponse.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ITransporter.cs" />
<Compile Include="Servers\ClientConnectedEventArgs.cs" />
diff --git a/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs b/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs
index 9f2fb3262..670ef20c3 100644
--- a/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs
+++ b/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs
@@ -84,9 +84,9 @@ namespace Tango.Transport
/// <summary>
/// Throws an exception if adapter is in a failed or disposed state.
/// </summary>
- protected virtual void ThrowFailedOrDisposed()
+ protected virtual void ThrowIfDisposed()
{
- if (State == TransportComponentState.Failed || State == TransportComponentState.Disposed)
+ if (State == TransportComponentState.Disposed)
{
throw LogManager.Log(new ObjectDisposedException("The adapter is in a " + State + " state."));
}
diff --git a/Software/Visual Studio/Tango.Transport/TransportMessage.cs b/Software/Visual Studio/Tango.Transport/TransportMessage.cs
index 0f8414333..90e600a06 100644
--- a/Software/Visual Studio/Tango.Transport/TransportMessage.cs
+++ b/Software/Visual Studio/Tango.Transport/TransportMessage.cs
@@ -36,7 +36,7 @@ namespace Tango.Transport
/// Notifies the message observer of the new result.
/// </summary>
/// <param name="result">The result.</param>
- public override void SetResult(object result)
+ public override void SetResult(object result, bool completed)
{
_completionSource.SetResult((T)result);
}
@@ -54,7 +54,7 @@ namespace Tango.Transport
/// Invokes the response callback.
/// </summary>
/// <param name="response">The response.</param>
- public override void InvokeResponseCallback(object response)
+ public override void InvokeResponseCallback(object response, bool completed)
{
ResponseCallback((T)response);
}
diff --git a/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs b/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs
index d15e17755..c9202efc6 100644
--- a/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs
+++ b/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs
@@ -19,7 +19,7 @@ namespace Tango.Transport
/// <summary>
/// Gets or sets a value indicating whether this instance is multi response.
/// </summary>
- public bool IsMultiResponse { get; set; }
+ public bool IsContinuous { get; set; }
/// <summary>
/// Gets or sets the message token.
@@ -45,7 +45,7 @@ namespace Tango.Transport
/// Notifies the message observer of the new result.
/// </summary>
/// <param name="result">The result.</param>
- public abstract void SetResult(object result);
+ public abstract void SetResult(object result, bool completed);
/// <summary>
/// Notifies the message observer of an exception.
@@ -57,7 +57,7 @@ namespace Tango.Transport
/// Invokes the response callback.
/// </summary>
/// <param name="response">The response.</param>
- public abstract void InvokeResponseCallback(object response);
+ public abstract void InvokeResponseCallback(object response, bool completed);
/// <summary>
/// Initializes a new instance of the <see cref="TransportMessageBase"/> class.
diff --git a/Software/Visual Studio/Tango.Transport/TransporterBase.cs b/Software/Visual Studio/Tango.Transport/TransporterBase.cs
index 5c07ed72a..3bfa019f3 100644
--- a/Software/Visual Studio/Tango.Transport/TransporterBase.cs
+++ b/Software/Visual Studio/Tango.Transport/TransporterBase.cs
@@ -25,7 +25,7 @@ namespace Tango.Transport
private Thread _pushThread;
private Thread _pullThread;
private ObservableCollection<ITransportAdapter> _adapters;
- private Dictionary<String, ITransportAdapter> _tokenAdapters;
+ private Dictionary<String, PendingResponse> _pendingResponses;
#region Events
@@ -120,9 +120,13 @@ namespace Tango.Transport
/// <param name="e">The e.</param>
protected virtual void OnAdapterStateChanged(object sender, TransportComponentState e)
{
- if (e == TransportComponentState.Failed)
+ if (e == TransportComponentState.Disposed)
{
- Disconnect().Wait();
+ Adapters.Remove(sender as ITransportAdapter);
+ }
+ else if (e == TransportComponentState.Failed)
+ {
+ //TODO: decide what to do in this case..
}
}
@@ -206,7 +210,7 @@ namespace Tango.Transport
public TransporterBase()
{
Adapters = new ObservableCollection<ITransportAdapter>();
- _tokenAdapters = new Dictionary<string, ITransportAdapter>();
+ _pendingResponses = new Dictionary<string, PendingResponse>();
_sendingQueue = new ConcurrentQueue<TransportMessageBase>();
_pendingRequests = new List<TransportMessageBase>();
_arrivedResponses = new ConcurrentQueue<KeyValuePair<ITransportAdapter, byte[]>>();
@@ -271,13 +275,13 @@ namespace Tango.Transport
/// <returns></returns>
public Task<Response> SendRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter) where Request : IMessage<Request> where Response : IMessage<Response>
{
- LogManager.Log("Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token + " on adapter: " + (adapter != null ? adapter.Address : "ALL"));
+ LogManager.Log("Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token + " on adapter: " + (adapter != null ? adapter.Address : "ALL"));
LogManager.Log("Expected response: " + typeof(Response).Name);
TaskCompletionSource<Response> source = new TaskCompletionSource<Response>();
TransportMessage<Response> message = new TransportMessage<Response>(adapter, request.Container.Token, request, TransportMessageDirection.Request, OnSerializeingMessage(request), source);
_sendingQueue.Enqueue(message);
- Task.Delay(RequestTimeout).ContinueWith((x) =>
+ Task.Delay(RequestTimeout).ContinueWith((x) =>
{
if (!source.Task.IsCompleted)
{
@@ -296,9 +300,9 @@ namespace Tango.Transport
/// <typeparam name="Response">The type of the response.</typeparam>
/// <param name="request">The request.</param>
/// <param name="responseCallback">The response callback delegate.</param>
- public void SendMultiRequest<Request, Response>(TangoMessage<Request> request, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>
+ public void SendContinuousRequest<Request, Response>(TangoMessage<Request> request, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>
{
- SendMultiRequest(request, null, responseCallback);
+ SendContinuousRequest(request, null, responseCallback);
}
/// <summary>
@@ -309,14 +313,17 @@ namespace Tango.Transport
/// <param name="request">The request.</param>
/// <param name="adapter">Transport adapter</param>
/// <param name="responseCallback">The response callback delegate.</param>
- public void SendMultiRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>
+ public void SendContinuousRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>
{
- LogManager.Log("Queuing multi response request message: " + typeof(Request).Name + " on adapter: " + (adapter != null ? adapter.Address : "ALL"));
+ LogManager.Log("Queuing continuous request message: " + typeof(Request).Name + " on adapter: " + (adapter != null ? adapter.Address : "ALL"));
LogManager.Log("Expected response: " + typeof(Response).Name);
+ request.Container.Continuous = true;
+ request.Container.Completed = false;
+
TransportMessage<Response> message = new TransportMessage<Response>(adapter, request.Container.Token, request, TransportMessageDirection.Request, OnSerializeingMessage(request), null)
{
- IsMultiResponse = true,
+ IsContinuous = true,
ResponseCallback = responseCallback,
};
_sendingQueue.Enqueue(message);
@@ -346,19 +353,31 @@ namespace Tango.Transport
LogManager.Log("Queuing response message: " + typeof(Response).Name);
+ PendingResponse pendingResponse = null;
ITransportAdapter adapter = null;
LogManager.Log("Searching for matching request token: " + token);
- if (_tokenAdapters.TryGetValue(token, out adapter))
+ if (_pendingResponses.TryGetValue(token, out pendingResponse))
{
+ adapter = pendingResponse.Adapter;
LogManager.Log("Found matching request token: " + token + " on adapter: " + (adapter != null ? adapter.Address : "ALL"));
- LogManager.Log("Removing matching request token.");
- _tokenAdapters.Remove(token);
+
+ if (!pendingResponse.IsContinuous)
+ {
+ LogManager.Log("Removing matching request token.");
+ _pendingResponses.Remove(token);
+ }
+ else if (response.Container.Completed)
+ {
+ LogManager.Log("Response completed. Removing matching request token.");
+ _pendingResponses.Remove(token);
+ }
}
else
{
- LogManager.Log("Matching request token was not found...");
+ //This should never happen.
+ throw LogManager.Log(new InvalidOperationException("Matching request token was not found!"));
}
TaskCompletionSource<object> source = new TaskCompletionSource<object>();
@@ -371,9 +390,9 @@ namespace Tango.Transport
#region Private Methods
- /// <summary>
- /// Starts the pull and push threads.
- /// </summary>
+ /// <summary>
+ /// Starts the pull and push threads.
+ /// </summary>
private void StartThreads()
{
_pullThread = new Thread(PullThreadMethod);
@@ -410,11 +429,13 @@ namespace Tango.Transport
foreach (var adapter in Adapters.Where(x => x.State == TransportComponentState.Connected))
{
adapter.Write(message.Serialize());
+ LogManager.Log("Message sent on adapter: " + adapter.Address + "...");
}
}
else
{
message.Adapter.Write(message.Serialize());
+ LogManager.Log("Message sent on adapter: " + message.Adapter.Address + "...");
}
if (message.Direction == TransportMessageDirection.Request)
@@ -423,7 +444,7 @@ namespace Tango.Transport
}
else
{
- message.SetResult(true);
+ message.SetResult(true, true);
}
}
catch (Exception ex)
@@ -472,7 +493,7 @@ namespace Tango.Transport
{
LogManager.Log("Found pending request: " + request.Message.GetType().GetGenericArguments()[0].Name);
- if (!request.IsMultiResponse)
+ if (!request.IsContinuous)
{
LogManager.Log("Pending request was identified as 'single response'. Removing pending request.");
@@ -481,21 +502,25 @@ namespace Tango.Transport
try
{
LogManager.Log("Parsing inner response message and setting pending request task result...");
- request.SetResult(OnParseMessage(container));
+ request.SetResult(OnParseMessage(container), true);
}
catch (Exception ex)
{
- request.SetException(LogManager.Log(ex,"Error parsing inner message."));
+ request.SetException(LogManager.Log(ex, "Error parsing inner message."));
}
}
else
{
- LogManager.Log("Pending request was identified as 'multi response'. keeping pending request.");
+ LogManager.Log("Pending request was identified as 'continuous response'. keeping pending request.");
try
{
- LogManager.Log("Parsing inner response message and invoking multi response callback...");
- request.InvokeResponseCallback(OnParseMessage(container));
+ LogManager.Log("Parsing inner response message and invoking continuous response callback...");
+ if (container.Completed)
+ {
+ LogManager.Log("Continuous sequence completed.");
+ }
+ request.InvokeResponseCallback(OnParseMessage(container), container.Completed);
}
catch (Exception ex)
{
@@ -510,7 +535,7 @@ namespace Tango.Transport
try
{
LogManager.Log("Saving request token and adapter: " + container.Token + ", " + data.Key.Address);
- _tokenAdapters.Add(container.Token, data.Key);
+ _pendingResponses.Add(container.Token, new PendingResponse(data.Key, container.Continuous));
LogManager.Log("Invoking RequestReceived event...");
Task.Factory.StartNew(() => OnRequestReceived(container));
}