diff options
| author | Roy <roy.mail.net@gmail.com> | 2017-11-15 01:13:28 +0200 |
|---|---|---|
| committer | Roy <roy.mail.net@gmail.com> | 2017-11-15 01:13:28 +0200 |
| commit | 382c28941990c2cbf99c2bb21ed4ec34d9ed28eb (patch) | |
| tree | 43a4822b15561347e217ea8255d4b18b18c7ea92 | |
| parent | ef781529f0ca3de76f1ab449b0773d7bb4b1aedc (diff) | |
| download | Tango-382c28941990c2cbf99c2bb21ed4ec34d9ed28eb.tar.gz Tango-382c28941990c2cbf99c2bb21ed4ec34d9ed28eb.zip | |
Added continuous definition and completed for MessageContainer.
Implemented auto continuous request completion.
Implemented some proofing for TransportAdapter failure.
29 files changed, 1029 insertions, 550 deletions
diff --git a/Software/Android-Studio/Tango.PMR/src/main/java/com/twine/tango/pmr/common/MessageContainerOuterClass.java b/Software/Android-Studio/Tango.PMR/src/main/java/com/twine/tango/pmr/common/MessageContainerOuterClass.java index 24e2fc7f9..c4919dae3 100644 --- a/Software/Android-Studio/Tango.PMR/src/main/java/com/twine/tango/pmr/common/MessageContainerOuterClass.java +++ b/Software/Android-Studio/Tango.PMR/src/main/java/com/twine/tango/pmr/common/MessageContainerOuterClass.java @@ -38,7 +38,17 @@ public final class MessageContainerOuterClass { getTokenBytes(); /** - * <code>bytes Data = 3;</code> + * <code>bool Continuous = 3;</code> + */ + boolean getContinuous(); + + /** + * <code>bool Completed = 4;</code> + */ + boolean getCompleted(); + + /** + * <code>bytes Data = 5;</code> */ com.google.protobuf.ByteString getData(); } @@ -57,6 +67,8 @@ public final class MessageContainerOuterClass { private MessageContainer() { type_ = 0; token_ = ""; + continuous_ = false; + completed_ = false; data_ = com.google.protobuf.ByteString.EMPTY; } @@ -100,7 +112,17 @@ public final class MessageContainerOuterClass { token_ = s; break; } - case 26: { + case 24: { + + continuous_ = input.readBool(); + break; + } + case 32: { + + completed_ = input.readBool(); + break; + } + case 42: { data_ = input.readBytes(); break; @@ -179,10 +201,28 @@ public final class MessageContainerOuterClass { } } - public static final int DATA_FIELD_NUMBER = 3; + public static final int CONTINUOUS_FIELD_NUMBER = 3; + private boolean continuous_; + /** + * <code>bool Continuous = 3;</code> + */ + public boolean getContinuous() { + return continuous_; + } + + public static final int COMPLETED_FIELD_NUMBER = 4; + private boolean completed_; + /** + * <code>bool Completed = 4;</code> + */ + public boolean getCompleted() { + return completed_; + } + + public static final int DATA_FIELD_NUMBER = 5; private com.google.protobuf.ByteString data_; /** - * <code>bytes Data = 3;</code> + * <code>bytes Data = 5;</code> */ public com.google.protobuf.ByteString getData() { return data_; @@ -206,8 +246,14 @@ public final class MessageContainerOuterClass { if (!getTokenBytes().isEmpty()) { com.google.protobuf.GeneratedMessageV3.writeString(output, 2, token_); } + if (continuous_ != false) { + output.writeBool(3, continuous_); + } + if (completed_ != false) { + output.writeBool(4, completed_); + } if (!data_.isEmpty()) { - output.writeBytes(3, data_); + output.writeBytes(5, data_); } unknownFields.writeTo(output); } @@ -224,9 +270,17 @@ public final class MessageContainerOuterClass { if (!getTokenBytes().isEmpty()) { size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, token_); } + if (continuous_ != false) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(3, continuous_); + } + if (completed_ != false) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(4, completed_); + } if (!data_.isEmpty()) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(3, data_); + .computeBytesSize(5, data_); } size += unknownFields.getSerializedSize(); memoizedSize = size; @@ -247,6 +301,10 @@ public final class MessageContainerOuterClass { result = result && type_ == other.type_; result = result && getToken() .equals(other.getToken()); + result = result && (getContinuous() + == other.getContinuous()); + result = result && (getCompleted() + == other.getCompleted()); result = result && getData() .equals(other.getData()); result = result && unknownFields.equals(other.unknownFields); @@ -264,6 +322,12 @@ public final class MessageContainerOuterClass { hash = (53 * hash) + type_; hash = (37 * hash) + TOKEN_FIELD_NUMBER; hash = (53 * hash) + getToken().hashCode(); + hash = (37 * hash) + CONTINUOUS_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean( + getContinuous()); + hash = (37 * hash) + COMPLETED_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean( + getCompleted()); hash = (37 * hash) + DATA_FIELD_NUMBER; hash = (53 * hash) + getData().hashCode(); hash = (29 * hash) + unknownFields.hashCode(); @@ -399,6 +463,10 @@ public final class MessageContainerOuterClass { token_ = ""; + continuous_ = false; + + completed_ = false; + data_ = com.google.protobuf.ByteString.EMPTY; return this; @@ -425,6 +493,8 @@ public final class MessageContainerOuterClass { com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer result = new com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer(this); result.type_ = type_; result.token_ = token_; + result.continuous_ = continuous_; + result.completed_ = completed_; result.data_ = data_; onBuilt(); return result; @@ -474,6 +544,12 @@ public final class MessageContainerOuterClass { token_ = other.token_; onChanged(); } + if (other.getContinuous() != false) { + setContinuous(other.getContinuous()); + } + if (other.getCompleted() != false) { + setCompleted(other.getCompleted()); + } if (other.getData() != com.google.protobuf.ByteString.EMPTY) { setData(other.getData()); } @@ -617,15 +693,67 @@ public final class MessageContainerOuterClass { return this; } + private boolean continuous_ ; + /** + * <code>bool Continuous = 3;</code> + */ + public boolean getContinuous() { + return continuous_; + } + /** + * <code>bool Continuous = 3;</code> + */ + public Builder setContinuous(boolean value) { + + continuous_ = value; + onChanged(); + return this; + } + /** + * <code>bool Continuous = 3;</code> + */ + public Builder clearContinuous() { + + continuous_ = false; + onChanged(); + return this; + } + + private boolean completed_ ; + /** + * <code>bool Completed = 4;</code> + */ + public boolean getCompleted() { + return completed_; + } + /** + * <code>bool Completed = 4;</code> + */ + public Builder setCompleted(boolean value) { + + completed_ = value; + onChanged(); + return this; + } + /** + * <code>bool Completed = 4;</code> + */ + public Builder clearCompleted() { + + completed_ = false; + onChanged(); + return this; + } + private com.google.protobuf.ByteString data_ = com.google.protobuf.ByteString.EMPTY; /** - * <code>bytes Data = 3;</code> + * <code>bytes Data = 5;</code> */ public com.google.protobuf.ByteString getData() { return data_; } /** - * <code>bytes Data = 3;</code> + * <code>bytes Data = 5;</code> */ public Builder setData(com.google.protobuf.ByteString value) { if (value == null) { @@ -637,7 +765,7 @@ public final class MessageContainerOuterClass { return this; } /** - * <code>bytes Data = 3;</code> + * <code>bytes Data = 5;</code> */ public Builder clearData() { @@ -709,9 +837,10 @@ public final class MessageContainerOuterClass { static { java.lang.String[] descriptorData = { "\n\026MessageContainer.proto\022\020Tango.PMR.Comm" + - "on\032\021MessageType.proto\"\\\n\020MessageContaine" + - "r\022+\n\004Type\030\001 \001(\0162\035.Tango.PMR.Common.Messa" + - "geType\022\r\n\005Token\030\002 \001(\t\022\014\n\004Data\030\003 \001(\014B\034\n\032c" + + "on\032\021MessageType.proto\"\203\001\n\020MessageContain" + + "er\022+\n\004Type\030\001 \001(\0162\035.Tango.PMR.Common.Mess" + + "ageType\022\r\n\005Token\030\002 \001(\t\022\022\n\nContinuous\030\003 \001" + + "(\010\022\021\n\tCompleted\030\004 \001(\010\022\014\n\004Data\030\005 \001(\014B\034\n\032c" + "om.twine.tango.pmr.commonb\006proto3" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = @@ -732,7 +861,7 @@ public final class MessageContainerOuterClass { internal_static_Tango_PMR_Common_MessageContainer_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_Tango_PMR_Common_MessageContainer_descriptor, - new java.lang.String[] { "Type", "Token", "Data", }); + new java.lang.String[] { "Type", "Token", "Continuous", "Completed", "Data", }); com.twine.tango.pmr.common.MessageTypeOuterClass.getDescriptor(); } diff --git a/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/App.java b/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/App.java index d26d62dbc..f0c71477f 100644 --- a/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/App.java +++ b/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/App.java @@ -60,8 +60,8 @@ public class App extends Application { // .b() .addObjectFormatter(GeneratedMessageV3.class, msg -> msg.toString()).build(); - File sdcard = Environment.getExternalStorageDirectory(); - File dir = new File(sdcard.getAbsolutePath() + "/twine/tango/logs"); + File sdcard = context.getFilesDir(); + File dir = new File(sdcard.getAbsolutePath() + "/logs"); boolean b = dir.mkdirs(); Printer androidPrinter = new AndroidPrinter(); @@ -71,6 +71,14 @@ public class App extends Application { .build(); XLog.init(config, androidPrinter, filePrinter); + XLog.i("Logger Initialized. logs will be saved to: " + dir.getAbsolutePath()); + + + Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> { + + XLog.e(throwable); + System.exit(1); + }); } public static ApplicationComponent getComponent() { diff --git a/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/views/stub/StubActivityVM.java b/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/views/stub/StubActivityVM.java index 6cc1ee32a..d95420dd5 100644 --- a/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/views/stub/StubActivityVM.java +++ b/Software/Android-Studio/Tango.Stubs.UI/src/main/java/com/twine/tango/stubs/ui/views/stub/StubActivityVM.java @@ -18,6 +18,8 @@ import com.twine.tango.transport.TransportComponentState; import javax.inject.Inject; +import io.reactivex.android.schedulers.AndroidSchedulers; + /** * Created by Roy on 11/7/2017. */ @@ -65,7 +67,7 @@ public class StubActivityVM extends ViewModelBase<StubActivityContract> private void runSelectedStub() { StubBase stub = this.stub.get().createInstance(transporter); - stub.run().subscribe((response) -> + stub.run().observeOn(AndroidSchedulers.mainThread()).subscribe((response) -> { XLog.i(response); }, (ex) -> diff --git a/Software/Android-Studio/Tango.Stubs/src/main/java/com/twine/tango/stubs/stubs/Progress.java b/Software/Android-Studio/Tango.Stubs/src/main/java/com/twine/tango/stubs/stubs/Progress.java index 25243afe3..4bfe84609 100644 --- a/Software/Android-Studio/Tango.Stubs/src/main/java/com/twine/tango/stubs/stubs/Progress.java +++ b/Software/Android-Studio/Tango.Stubs/src/main/java/com/twine/tango/stubs/stubs/Progress.java @@ -28,7 +28,7 @@ public class Progress extends StubBase { PublishSubject<String> subject = PublishSubject.create(); - getTransporter().<ProgressRequest, ProgressResponse>sendRequestMulti( + getTransporter().<ProgressRequest, ProgressResponse>sendContinuousRequest( MessageFactory.createTangoMessage( ProgressRequest.class, ProgressRequest.newBuilder().build())).subscribe((response) -> diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java index 7ae5579de..0ad375cba 100644 --- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java +++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java @@ -27,9 +27,9 @@ public interface ITransporter extends ITransportComponent { <Request extends GeneratedMessageV3,Response extends GeneratedMessageV3> Single<Response> sendRequest(TangoMessage<Request> request,ITransportAdapter adapter); - <Request extends GeneratedMessageV3,Response extends GeneratedMessageV3> Observable<Response> sendRequestMulti(TangoMessage<Request> request); + <Request extends GeneratedMessageV3,Response extends GeneratedMessageV3> Observable<Response> sendContinuousRequest(TangoMessage<Request> request); - <Request extends GeneratedMessageV3,Response extends GeneratedMessageV3> Observable<Response> sendRequestMulti(TangoMessage<Request> request,ITransportAdapter adapter); + <Request extends GeneratedMessageV3,Response extends GeneratedMessageV3> Observable<Response> sendContinuousRequest(TangoMessage<Request> request, ITransportAdapter adapter); <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response); diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java new file mode 100644 index 000000000..bb4f2505a --- /dev/null +++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/PendingResponse.java @@ -0,0 +1,63 @@ +package com.twine.tango.transport; + + +/** + * Represents a pending response waiting to be returned to the request sender. + */ +public class PendingResponse { + + private ITransportAdapter adapter; + private boolean isContinuous; + + /** + * Gets adapter. + * + * @return the adapter + */ + public ITransportAdapter getAdapter() + { + return adapter; + } + + /** + * Sets adapter. + * + * @param adapter the adapter + */ + public void setAdapter(ITransportAdapter adapter) + { + this.adapter = adapter; + } + + /** + * Is continuous boolean. + * + * @return the boolean + */ + public boolean isContinuous() + { + return isContinuous; + } + + /** + * Sets continuous. + * + * @param continuous the continuous + */ + public void setContinuous(boolean continuous) + { + isContinuous = continuous; + } + + /** + * Instantiates a new Pending response. + * + * @param adapter the adapter + * @param isContinuous is continuous + */ + public PendingResponse(ITransportAdapter adapter, boolean isContinuous) + { + this.adapter = adapter; + this.isContinuous = isContinuous; + } +} diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java index 1d28a25ad..faa50dadf 100644 --- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java +++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java @@ -70,8 +70,8 @@ public abstract class TransportAdapterBase implements ITransportAdapter { setState(TransportComponentState.Disposed); } - protected void throwFailedOrDisposed() throws ObjectDisposedException { - if (state == TransportComponentState.Failed || state == TransportComponentState.Disposed) { + protected void throwIfDisposed() throws ObjectDisposedException { + if (state == TransportComponentState.Disposed) { throw new ObjectDisposedException("The adapter is in a " + state + " state."); } } diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java index e625e4b0d..4ed894dba 100644 --- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java +++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessage.java @@ -1,35 +1,36 @@ package com.twine.tango.transport; -import com.twine.tango.core.Action; import com.twine.tango.core.Func; import io.reactivex.subjects.PublishSubject; -import java8.util.function.Consumer; -import java8.util.function.Supplier; /** * Created by Roy on 11/13/2017. */ -public class TransportMessage<T> extends TransportMessageBase { - +public class TransportMessage<T> extends TransportMessageBase +{ + private PublishSubject<T> publishSubject; - - public TransportMessage(ITransportAdapter adapter, String token, Object message, TransportMessageDirection direction, Func<byte[]> serialize, PublishSubject<T> publishSubject) { + + public TransportMessage(ITransportAdapter adapter, String token, Object message, TransportMessageDirection direction, Func<byte[]> serialize, PublishSubject<T> publishSubject) + { super(adapter, token, message, direction, serialize); this.publishSubject = publishSubject; } - + @SuppressWarnings("unchecked") @Override - protected void setResult(Object result) { + protected void setResult(Object result, boolean completed) + { - publishSubject.onNext((T)result); - if (!isMultiResponse()) publishSubject.onComplete(); + publishSubject.onNext((T) result); + if (completed) publishSubject.onComplete(); } - + @Override - protected void setException(Exception error) { + protected void setException(Exception error) + { publishSubject.onError(error); publishSubject.onComplete(); } diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java index 6d0af57c7..e236fc44f 100644 --- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java +++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportMessageBase.java @@ -2,73 +2,86 @@ package com.twine.tango.transport; import com.twine.tango.core.Func; -import java8.util.function.Supplier; - /** * Created by Roy on 11/13/2017. */ -public abstract class TransportMessageBase { - +public abstract class TransportMessageBase +{ + private ITransportAdapter adapter; - private boolean isMultiResponse; + private boolean isContinuous; private String token; private TransportMessageDirection direction; private Func<byte[]> serialize; private Object message; - - protected abstract void setResult(Object result); + + protected abstract void setResult(Object result, boolean completed); + protected abstract void setException(Exception error); - - public ITransportAdapter getAdapter() { + + public ITransportAdapter getAdapter() + { return adapter; } - - public void setAdapter(ITransportAdapter adapter) { + + public void setAdapter(ITransportAdapter adapter) + { this.adapter = adapter; } - - public boolean isMultiResponse() { - return isMultiResponse; + + public boolean isContinuous() + { + return isContinuous; } - - public void setMultiResponse(boolean multiResponse) { - isMultiResponse = multiResponse; + + public void setContinuous(boolean continuous) + { + isContinuous = continuous; } - - public String getToken() { + + public String getToken() + { return token; } - - public void setToken(String token) { + + public void setToken(String token) + { this.token = token; } - - public TransportMessageDirection getDirection() { + + public TransportMessageDirection getDirection() + { return direction; } - - public void setDirection(TransportMessageDirection direction) { + + public void setDirection(TransportMessageDirection direction) + { this.direction = direction; } - - public Func<byte[]> getSerialize() { + + public Func<byte[]> getSerialize() + { return serialize; } - - public void setSerialize(Func<byte[]> serialize) { + + public void setSerialize(Func<byte[]> serialize) + { this.serialize = serialize; } - - public Object getMessage() { + + public Object getMessage() + { return message; } - - public void setMessage(Object message) { + + public void setMessage(Object message) + { this.message = message; } - - public TransportMessageBase(ITransportAdapter adapter, String token, Object message,TransportMessageDirection direction, Func<byte[]> serialize) { + + public TransportMessageBase(ITransportAdapter adapter, String token, Object message, TransportMessageDirection direction, Func<byte[]> serialize) + { this.adapter = adapter; this.token = token; this.direction = direction; diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java index c1bd4f503..83ce58b97 100644 --- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java +++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java @@ -7,14 +7,12 @@ import android.util.Pair; import com.elvishew.xlog.XLog; import com.google.protobuf.GeneratedMessageV3; import com.google.protobuf.InvalidProtocolBufferException; -import com.twine.tango.core.Action; import com.twine.tango.core.EventHandler; import com.twine.tango.core.Func; import com.twine.tango.core.ObjectDisposedException; import com.twine.tango.core.ObservableCollection; import com.twine.tango.pmr.MessageFactory; import com.twine.tango.pmr.TangoMessage; -import com.twine.tango.pmr.common.MessageContainerOuterClass; import com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer; import org.joda.time.Period; @@ -27,6 +25,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + import io.reactivex.Completable; import io.reactivex.Observable; import io.reactivex.Single; @@ -38,169 +37,203 @@ import static br.com.zbra.androidlinq.Linq.stream; /** * Represents an {@link ITransporter} base class. */ -public abstract class TransporterBase implements ITransporter { - +public abstract class TransporterBase implements ITransporter +{ + private ConcurrentLinkedQueue<TransportMessageBase> sendingQueue; private List<TransportMessageBase> pendingRequests; private ConcurrentLinkedQueue<Pair<ITransportAdapter, byte[]>> arrivedResponses; private Thread pushThread; private Thread pullThread; private ObservableCollection<ITransportAdapter> adapters; - private Map<String, ITransportAdapter> tokenAdapters; + private Map<String, PendingResponse> pendingResponses; private TransportComponentState state; private Period requestTimeout; - + //region Events - + private EventHandler<MessageContainer> requestReceivedListener; private EventHandler<TransportComponentState> stateChangedListener; - + //endregion - + //region Properties - + @Override - public ObservableCollection<ITransportAdapter> getAdapters() { + public ObservableCollection<ITransportAdapter> getAdapters() + { return adapters; } - - protected void setAdapters(ObservableCollection<ITransportAdapter> adapters) { - - if (this.adapters != null) { + + protected void setAdapters(ObservableCollection<ITransportAdapter> adapters) + { + + if (this.adapters != null) + { adapters.clearOnChangeListener(); } - + this.adapters = adapters; - - if (this.adapters != null) { + + if (this.adapters != null) + { adapters.setOnChangeListener(this::onAdaptersChanged); } } - + @Override - public TransportComponentState getState() { + public TransportComponentState getState() + { return state; } - - protected void setState(TransportComponentState state) { + + protected void setState(TransportComponentState state) + { this.state = state; onStateChanged(this.state); } - + @Override - public Period getRequestTimeout() { + public Period getRequestTimeout() + { return requestTimeout; } - + @Override - public void setRequestTimeout(Period requestTimeout) { + public void setRequestTimeout(Period requestTimeout) + { this.requestTimeout = requestTimeout; } - + //endregion - + //region Protected Methods - - protected void onAdaptersChanged() { - + + protected void onAdaptersChanged() + { + XLog.i("Adapters collection changed, Listing adapters:"); - - for (ITransportAdapter adapter : adapters) { - + + for (ITransportAdapter adapter : adapters) + { + XLog.i(adapter.getClass().getName() + ", " + adapter.getAddress() + ", " + adapter.getState()); - + adapter.setStateChangedListener(this::onAdapterStateChanged); adapter.setDataAvailableListener(this::onAdapterDataAvailable); - - if (this.getState() == TransportComponentState.Connected && adapter.getState() == TransportComponentState.Disconnected) { - try { + + if (this.getState() == TransportComponentState.Connected && adapter.getState() == TransportComponentState.Disconnected) + { + try + { adapter.connect().subscribe(); - } catch (ObjectDisposedException e) { + } catch (ObjectDisposedException e) + { XLog.e(e); } } } } - - private void onAdapterDataAvailable(Object sender, byte[] data) { + + private void onAdapterDataAvailable(Object sender, byte[] data) + { arrivedResponses.add(new Pair<>((ITransportAdapter) sender, data)); } - - - private void onAdapterStateChanged(Object sender, TransportComponentState state) { - - if (state == TransportComponentState.Failed) { - disconnect().blockingAwait(); + + + private void onAdapterStateChanged(Object sender, TransportComponentState state) + { + + if (state == TransportComponentState.Disposed) + { + adapters.remove((ITransportAdapter) sender); + } else if (state == TransportComponentState.Failed) + { + //TODO: decide what to do here... } } - - protected void onStateChanged(TransportComponentState state) { + + protected void onStateChanged(TransportComponentState state) + { if (stateChangedListener != null) stateChangedListener.invoke(this, state); } - - protected <T extends GeneratedMessageV3> Func<byte[]> onSerializingMessage(TangoMessage<T> message) { - return new Func<byte[]>() { + + protected <T extends GeneratedMessageV3> Func<byte[]> onSerializingMessage(TangoMessage<T> message) + { + return new Func<byte[]>() + { @Override - public byte[] invoke() { + public byte[] invoke() + { return message.toBytes(); } }; } - - protected void onFailed(Exception ex) { + + protected void onFailed(Exception ex) + { disconnect().blockingAwait(); setState(TransportComponentState.Failed); XLog.e("Transporter failed.", ex); } - - protected void onRequestReceived(MessageContainer request) { + + protected void onRequestReceived(MessageContainer request) + { if (requestReceivedListener != null) requestReceivedListener.invoke(this, request); } - - protected MessageContainer onParseContainer(byte[] data) throws InvalidProtocolBufferException { + + protected MessageContainer onParseContainer(byte[] data) throws InvalidProtocolBufferException + { return MessageFactory.parseContainer(data); } - - protected GeneratedMessageV3 onParseMessage(MessageContainer container) throws InvalidProtocolBufferException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { + + protected GeneratedMessageV3 onParseMessage(MessageContainer container) throws InvalidProtocolBufferException, NoSuchMethodException, IllegalAccessException, InvocationTargetException + { return MessageFactory.parseMessageFromContainerAgnostic(container); } - + //endregion - + //region Constructors - - public TransporterBase() { + + public TransporterBase() + { setAdapters(new ObservableCollection<>()); - tokenAdapters = new HashMap<>(); + pendingResponses = new HashMap<>(); sendingQueue = new ConcurrentLinkedQueue<>(); pendingRequests = new ArrayList<>(); arrivedResponses = new ConcurrentLinkedQueue<>(); setRequestTimeout(Period.seconds(10)); } - - public TransporterBase(ITransportAdapter adapter) { + + public TransporterBase(ITransportAdapter adapter) + { this(); adapters.add(adapter); } - + //endregion - + //region Public Methods - + @Override - public void setStateChangedListener(EventHandler<TransportComponentState> stateChangedListener) { + public void setStateChangedListener(EventHandler<TransportComponentState> stateChangedListener) + { this.stateChangedListener = stateChangedListener; } - + @Override - public void setRequestReceiverListener(EventHandler<MessageContainer> listener) { + public void setRequestReceiverListener(EventHandler<MessageContainer> listener) + { this.requestReceivedListener = listener; } - - @Override - public Completable connect() throws ObjectDisposedException { - return Completable.create((x) -> { + @Override + public Completable connect() throws ObjectDisposedException + { + return Completable.create((x) -> + { + try { if (adapters.size() == 0) @@ -208,300 +241,326 @@ public abstract class TransporterBase implements ITransporter { throw new IllegalArgumentException("This transporter has zero adapters."); } - for (ITransportAdapter adapter : adapters) { - + for (ITransportAdapter adapter : adapters) + { + adapter.connect().blockingAwait(); } - + setState(TransportComponentState.Connected); startThreads(); - + XLog.i("Transporter connected..."); x.onComplete(); } catch (Exception e) { - XLog.e("Error connecting transporter",e); + XLog.e("Error connecting transporter", e); } }).subscribeOn(Schedulers.io()); } - + @Override - public Completable disconnect() { + public Completable disconnect() + { setState(TransportComponentState.Disconnected); - - return Completable.create((x) -> { - + + return Completable.create((x) -> + { + try { - for (ITransportAdapter adapter : adapters) { - + for (ITransportAdapter adapter : adapters) + { + adapter.disconnect().blockingAwait(); } - + XLog.i("Transporter disconnected..."); - + x.onComplete(); } catch (Exception e) { x.onError(e); } - + }).subscribeOn(Schedulers.io()); } - + @Override - public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<Response> sendRequest(TangoMessage<Request> request) { + public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<Response> sendRequest(TangoMessage<Request> request) + { return sendRequest(request, null); } - + @Override - public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<Response> sendRequest(TangoMessage<Request> request, ITransportAdapter adapter) { + public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Single<Response> sendRequest(TangoMessage<Request> request, ITransportAdapter adapter) + { XLog.i("Queuing request message: " + request.getClass().getSimpleName() + " Token: " + request.getContainer().getToken() + " on adapter: " + (adapter != null ? adapter.getAddress() : "ALL")); XLog.i("Expected response: " + Response.Builder.class.getSimpleName()); - + PublishSubject<Response> subject = PublishSubject.create(); TransportMessage<Response> message = new TransportMessage<>(adapter, request.getContainer().getToken(), request, TransportMessageDirection.Request, onSerializingMessage(request), subject); sendingQueue.add(message); - + Completable.timer(getRequestTimeout().getSeconds(), TimeUnit.SECONDS) .subscribe(() -> { - if (!subject.hasComplete()) { + if (!subject.hasComplete()) + { XLog.i("Request message " + request.getClass().getSimpleName() + " had timed out after " + getRequestTimeout().getSeconds() + " seconds."); XLog.i("Setting request task exception..."); subject.onError(new TimeoutException()); } }); - + return subject.singleOrError(); } - + @Override - public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> sendRequestMulti(TangoMessage<Request> request) { - return sendRequestMulti(request, null); + public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> sendContinuousRequest(TangoMessage<Request> request) + { + return sendContinuousRequest(request, null); } - + @Override - public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> sendRequestMulti(TangoMessage<Request> request, ITransportAdapter adapter) { - XLog.i("Queuing multi response request message: " + request.getClass().getSimpleName() + " Token: " + request.getContainer().getToken() + " on adapter: " + (adapter != null ? adapter.getAddress() : "ALL")); + public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> sendContinuousRequest(TangoMessage<Request> request, ITransportAdapter adapter) + { + XLog.i("Queuing continuous response request message: " + request.getClass().getSimpleName() + " Token: " + request.getContainer().getToken() + " on adapter: " + (adapter != null ? adapter.getAddress() : "ALL")); XLog.i("Expected response: " + Response.Builder.class.getSimpleName()); - + + request.getContainer().setContinuous(true); + request.getContainer().setCompleted(false); + PublishSubject<Response> subject = PublishSubject.create(); - TransportMessage<Response> message = new TransportMessage<>(adapter,request.getContainer().getToken(),request,TransportMessageDirection.Request,onSerializingMessage(request),subject); - message.setMultiResponse(true); + TransportMessage<Response> message = new TransportMessage<>(adapter, request.getContainer().getToken(), request, TransportMessageDirection.Request, onSerializingMessage(request), subject); + message.setContinuous(true); sendingQueue.add(message); - + return subject; } - + @Override - public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response) { - return sendResponse(response,response.getContainer().getToken()); + public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response) + { + return sendResponse(response, response.getContainer().getToken()); } - + @Override - public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response, String token) { - + public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response, String token) + { + response.getContainer().setToken(token); - + XLog.i("Queuing response message: " + response.getClass().getSimpleName()); - + + PendingResponse pendingResponse = null; ITransportAdapter adapter = null; - + XLog.i("Searching for matching request token: " + token); - - adapter = tokenAdapters.get(token); - - if (adapter != null) + + pendingResponse = pendingResponses.get(token); + + if (pendingResponse != null) { + adapter = pendingResponse.getAdapter(); XLog.i("Found matching request token: " + token + " on adapter: " + adapter.getAddress()); - XLog.i("Removing matching request token..."); - tokenAdapters.remove(token); - } - else + + if (!pendingResponse.isContinuous()) + { + XLog.i("Removing matching request token..."); + pendingResponses.remove(token); + } else if (response.getContainer().getCompleted()) + { + XLog.i("Response completed. Removing matching request token..."); + pendingResponses.remove(token); + } + } else { + //This should never happen. XLog.w("Matching request token was not found..."); + throw new RuntimeException(); } - + PublishSubject<Response> subject = PublishSubject.create(); - TransportMessage<Response> message = new TransportMessage<>(adapter,token,response,TransportMessageDirection.Response,onSerializingMessage(response),subject); + TransportMessage<Response> message = new TransportMessage<>(adapter, token, response, TransportMessageDirection.Response, onSerializingMessage(response), subject); sendingQueue.add(message); - + return subject.singleOrError(); } //endregion - + //region Private Methods - - private void startThreads() { + + private void startThreads() + { pullThread = new Thread(this::pullThreadMethod); pullThread.setName("Pull Thread"); pullThread.start(); - + pushThread = new Thread(this::pushThreadMethod); pushThread.setName("Push Thread"); pushThread.start(); } - + //endregion - + //region Push Thread - + public void pushThreadMethod() { - try { - + try + { + while (getState() == TransportComponentState.Connected) { if (sendingQueue.size() > 0) { TransportMessageBase message = null; message = sendingQueue.poll(); - + if (message != null) { - try { - + try + { if (message.getAdapter() == null) { - for (ITransportAdapter adapter : adapters) { + for (ITransportAdapter adapter : adapters) + { if (adapter.getState() == TransportComponentState.Connected) { adapter.write(message.getSerialize().invoke()); + XLog.i("message sent on adapter: " + adapter.getAddress() + "..."); } } - } - else + } else { message.getAdapter().write(message.getSerialize().invoke()); + XLog.i("message sent on adapter: " + message.getAdapter().getAddress() + "..."); } - + if (message.getDirection() == TransportMessageDirection.Request) { pendingRequests.add(message); - } - else + } else { - message.setResult(true); + message.setResult(true, true); } - } - catch (Exception ex) + } catch (Exception ex) { message.setException(ex); } } } - + SystemClock.sleep(10); } - - } - catch (Exception ex) + + } catch (Exception ex) { onFailed(ex); } } - + //endregion - + //region Pull Thread - + public void pullThreadMethod() { - try { - + try + { + while (getState() == TransportComponentState.Connected) { - Pair<ITransportAdapter,byte[]> data; - + Pair<ITransportAdapter, byte[]> data; + if (arrivedResponses.size() > 0) { data = arrivedResponses.poll(); - + if (data != null) { XLog.i("Message received on adapter: " + data.first.getAddress()); XLog.i("Parsing message container..."); MessageContainer container = onParseContainer(data.second); XLog.i("Searching for pending request token: " + container.getToken()); - TransportMessageBase request = stream(pendingRequests).singleOrDefault(x -> x.getToken().equals(container.getToken()),null); - + TransportMessageBase request = stream(pendingRequests).singleOrDefault(x -> x.getToken().equals(container.getToken()), null); + if (request != null) { XLog.i("Found pending request: " + request.getMessage().getClass().getSimpleName()); - - if (!request.isMultiResponse()) + + if (!request.isContinuous()) { XLog.i("Pending request was identified as 'single response'. Removing pending request."); - + pendingRequests.remove(request); - + try { XLog.i("Parsing inner response message and setting pending request task result..."); - request.setResult(onParseMessage(container)); - } - catch(Exception ex) + request.setResult(onParseMessage(container), true); + } catch (Exception ex) { - XLog.e("Error parsing inner message",ex); + XLog.e("Error parsing inner message", ex); request.setException(ex); } - } - else + } else { - XLog.i("Pending request was identified as 'multi response'. keeping pending request."); - + XLog.i("Pending request was identified as 'continuous response'. keeping pending request."); + try { - XLog.i("Parsing inner response message and invoking multi response callback..."); - request.setResult(onParseMessage(container)); - } - catch (Exception ex) + XLog.i("Parsing inner response message and invoking continuous response callback..."); + if (container.getCompleted()) + { + XLog.i("Continuous sequence completed."); + } + request.setResult(onParseMessage(container), container.getCompleted()); + } catch (Exception ex) { XLog.e("Error parsing inner message", ex); request.setException(ex); } } - } - else + } else { XLog.i("Message was identified as a new request message: " + container.getType().toString()); - + try { XLog.i("Saving request token and adapter: " + container.getToken() + ", " + data.first.getAddress()); - tokenAdapters.put(container.getToken(),data.first); + pendingResponses.put(container.getToken(), new PendingResponse(data.first, container.getContinuous())); XLog.i("Invoking RequestReceived event..."); AsyncTask.execute(() -> onRequestReceived(container)); - } - catch (Exception ex) + } catch (Exception ex) { //Ignore any exception that might occur on the event handler side... } } } } - + SystemClock.sleep(10); } - - } - catch (Exception ex) + + } catch (Exception ex) { onFailed(ex); } } - + //endregion - + //region Dispose - + @Override - public void dispose() { + public void dispose() + { disconnect().blockingAwait(); setState(TransportComponentState.Disposed); } - + //endregion } diff --git a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java index b47e12594..25c23919f 100644 --- a/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java +++ b/Software/Android-Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java @@ -11,8 +11,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import io.reactivex.Completable; @@ -74,7 +72,7 @@ public class TcpTransportAdapter extends TransportAdapterBase @Override public Completable connect() throws ObjectDisposedException { - throwFailedOrDisposed(); + throwIfDisposed(); return Completable.create((x) -> { @@ -134,7 +132,7 @@ public class TcpTransportAdapter extends TransportAdapterBase @Override public void write(byte[] data) throws ObjectDisposedException, IOException { - throwFailedOrDisposed(); + throwIfDisposed(); try { diff --git a/Software/PMR/Messages/Common/MessageContainer.proto b/Software/PMR/Messages/Common/MessageContainer.proto index ae507822f..c65796b76 100644 --- a/Software/PMR/Messages/Common/MessageContainer.proto +++ b/Software/PMR/Messages/Common/MessageContainer.proto @@ -9,6 +9,8 @@ message MessageContainer { MessageType Type = 1; string Token = 2; - bytes Data = 3; + bool Continuous = 3; + bool Completed = 4; + bytes Data = 5; } diff --git a/Software/Visual Studio/Tango.Emulations/Emulators/MachineEmulator.cs b/Software/Visual Studio/Tango.Emulations/Emulators/MachineEmulator.cs index 9e4337b9f..b41fd1804 100644 --- a/Software/Visual Studio/Tango.Emulations/Emulators/MachineEmulator.cs +++ b/Software/Visual Studio/Tango.Emulations/Emulators/MachineEmulator.cs @@ -71,6 +71,9 @@ namespace Tango.Emulations.Emulators Thread.Sleep(500); var res = MessageFactory.CreateTangoMessage<ProgressResponse>(container.Token); res.Message.Progress = i; + + if (i == 9) res.Container.Completed = true; + Transporter.SendResponse(res); } }); diff --git a/Software/Visual Studio/Tango.PMR/Common/MessageContainer.cs b/Software/Visual Studio/Tango.PMR/Common/MessageContainer.cs index 4b4a437a9..3939e53c7 100644 --- a/Software/Visual Studio/Tango.PMR/Common/MessageContainer.cs +++ b/Software/Visual Studio/Tango.PMR/Common/MessageContainer.cs @@ -23,14 +23,15 @@ namespace Tango.PMR.Common { byte[] descriptorData = global::System.Convert.FromBase64String( string.Concat( "ChZNZXNzYWdlQ29udGFpbmVyLnByb3RvEhBUYW5nby5QTVIuQ29tbW9uGhFN", - "ZXNzYWdlVHlwZS5wcm90byJcChBNZXNzYWdlQ29udGFpbmVyEisKBFR5cGUY", - "ASABKA4yHS5UYW5nby5QTVIuQ29tbW9uLk1lc3NhZ2VUeXBlEg0KBVRva2Vu", - "GAIgASgJEgwKBERhdGEYAyABKAxCHAoaY29tLnR3aW5lLnRhbmdvLnBtci5j", - "b21tb25iBnByb3RvMw==")); + "ZXNzYWdlVHlwZS5wcm90byKDAQoQTWVzc2FnZUNvbnRhaW5lchIrCgRUeXBl", + "GAEgASgOMh0uVGFuZ28uUE1SLkNvbW1vbi5NZXNzYWdlVHlwZRINCgVUb2tl", + "bhgCIAEoCRISCgpDb250aW51b3VzGAMgASgIEhEKCUNvbXBsZXRlZBgEIAEo", + "CBIMCgREYXRhGAUgASgMQhwKGmNvbS50d2luZS50YW5nby5wbXIuY29tbW9u", + "YgZwcm90bzM=")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { global::Tango.PMR.Common.MessageTypeReflection.Descriptor, }, new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { - new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Common.MessageContainer), global::Tango.PMR.Common.MessageContainer.Parser, new[]{ "Type", "Token", "Data" }, null, null, null) + new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Common.MessageContainer), global::Tango.PMR.Common.MessageContainer.Parser, new[]{ "Type", "Token", "Continuous", "Completed", "Data" }, null, null, null) })); } #endregion @@ -63,6 +64,8 @@ namespace Tango.PMR.Common { public MessageContainer(MessageContainer other) : this() { type_ = other.type_; token_ = other.token_; + continuous_ = other.continuous_; + completed_ = other.completed_; data_ = other.data_; } @@ -93,8 +96,30 @@ namespace Tango.PMR.Common { } } + /// <summary>Field number for the "Continuous" field.</summary> + public const int ContinuousFieldNumber = 3; + private bool continuous_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Continuous { + get { return continuous_; } + set { + continuous_ = value; + } + } + + /// <summary>Field number for the "Completed" field.</summary> + public const int CompletedFieldNumber = 4; + private bool completed_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Completed { + get { return completed_; } + set { + completed_ = value; + } + } + /// <summary>Field number for the "Data" field.</summary> - public const int DataFieldNumber = 3; + public const int DataFieldNumber = 5; private pb::ByteString data_ = pb::ByteString.Empty; [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public pb::ByteString Data { @@ -119,6 +144,8 @@ namespace Tango.PMR.Common { } if (Type != other.Type) return false; if (Token != other.Token) return false; + if (Continuous != other.Continuous) return false; + if (Completed != other.Completed) return false; if (Data != other.Data) return false; return true; } @@ -128,6 +155,8 @@ namespace Tango.PMR.Common { int hash = 1; if (Type != 0) hash ^= Type.GetHashCode(); if (Token.Length != 0) hash ^= Token.GetHashCode(); + if (Continuous != false) hash ^= Continuous.GetHashCode(); + if (Completed != false) hash ^= Completed.GetHashCode(); if (Data.Length != 0) hash ^= Data.GetHashCode(); return hash; } @@ -147,8 +176,16 @@ namespace Tango.PMR.Common { output.WriteRawTag(18); output.WriteString(Token); } + if (Continuous != false) { + output.WriteRawTag(24); + output.WriteBool(Continuous); + } + if (Completed != false) { + output.WriteRawTag(32); + output.WriteBool(Completed); + } if (Data.Length != 0) { - output.WriteRawTag(26); + output.WriteRawTag(42); output.WriteBytes(Data); } } @@ -162,6 +199,12 @@ namespace Tango.PMR.Common { if (Token.Length != 0) { size += 1 + pb::CodedOutputStream.ComputeStringSize(Token); } + if (Continuous != false) { + size += 1 + 1; + } + if (Completed != false) { + size += 1 + 1; + } if (Data.Length != 0) { size += 1 + pb::CodedOutputStream.ComputeBytesSize(Data); } @@ -179,6 +222,12 @@ namespace Tango.PMR.Common { if (other.Token.Length != 0) { Token = other.Token; } + if (other.Continuous != false) { + Continuous = other.Continuous; + } + if (other.Completed != false) { + Completed = other.Completed; + } if (other.Data.Length != 0) { Data = other.Data; } @@ -200,7 +249,15 @@ namespace Tango.PMR.Common { Token = input.ReadString(); break; } - case 26: { + case 24: { + Continuous = input.ReadBool(); + break; + } + case 32: { + Completed = input.ReadBool(); + break; + } + case 42: { Data = input.ReadBytes(); break; } diff --git a/Software/Visual Studio/Tango.PMR/Stubs/Calculate.cs b/Software/Visual Studio/Tango.PMR/Stubs/CalculateRequest.cs index 75aa074e1..36003965f 100644 --- a/Software/Visual Studio/Tango.PMR/Stubs/Calculate.cs +++ b/Software/Visual Studio/Tango.PMR/Stubs/CalculateRequest.cs @@ -1,5 +1,5 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! -// source: Calculate.proto +// source: CalculateRequest.proto #pragma warning disable 1591, 0612, 3021 #region Designer generated code @@ -9,28 +9,26 @@ using pbr = global::Google.Protobuf.Reflection; using scg = global::System.Collections.Generic; namespace Tango.PMR.Stubs { - /// <summary>Holder for reflection information generated from Calculate.proto</summary> - public static partial class CalculateReflection { + /// <summary>Holder for reflection information generated from CalculateRequest.proto</summary> + public static partial class CalculateRequestReflection { #region Descriptor - /// <summary>File descriptor for Calculate.proto</summary> + /// <summary>File descriptor for CalculateRequest.proto</summary> public static pbr::FileDescriptor Descriptor { get { return descriptor; } } private static pbr::FileDescriptor descriptor; - static CalculateReflection() { + static CalculateRequestReflection() { byte[] descriptorData = global::System.Convert.FromBase64String( string.Concat( - "Cg9DYWxjdWxhdGUucHJvdG8SD1RhbmdvLlBNUi5TdHVicyIoChBDYWxjdWxh", - "dGVSZXF1ZXN0EgkKAUEYASABKAESCQoBQhgCIAEoASIgChFDYWxjdWxhdGVS", - "ZXNwb25zZRILCgNTdW0YASABKAFCGwoZY29tLnR3aW5lLnRhbmdvLnBtci5z", - "dHVic2IGcHJvdG8z")); + "ChZDYWxjdWxhdGVSZXF1ZXN0LnByb3RvEg9UYW5nby5QTVIuU3R1YnMiKAoQ", + "Q2FsY3VsYXRlUmVxdWVzdBIJCgFBGAEgASgBEgkKAUIYAiABKAFCGwoZY29t", + "LnR3aW5lLnRhbmdvLnBtci5zdHVic2IGcHJvdG8z")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { - new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.CalculateRequest), global::Tango.PMR.Stubs.CalculateRequest.Parser, new[]{ "A", "B" }, null, null, null), - new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.CalculateResponse), global::Tango.PMR.Stubs.CalculateResponse.Parser, new[]{ "Sum" }, null, null, null) + new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.CalculateRequest), global::Tango.PMR.Stubs.CalculateRequest.Parser, new[]{ "A", "B" }, null, null, null) })); } #endregion @@ -44,7 +42,7 @@ namespace Tango.PMR.Stubs { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { - get { return global::Tango.PMR.Stubs.CalculateReflection.Descriptor.MessageTypes[0]; } + get { return global::Tango.PMR.Stubs.CalculateRequestReflection.Descriptor.MessageTypes[0]; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -182,123 +180,6 @@ namespace Tango.PMR.Stubs { } - public sealed partial class CalculateResponse : pb::IMessage<CalculateResponse> { - private static readonly pb::MessageParser<CalculateResponse> _parser = new pb::MessageParser<CalculateResponse>(() => new CalculateResponse()); - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public static pb::MessageParser<CalculateResponse> Parser { get { return _parser; } } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public static pbr::MessageDescriptor Descriptor { - get { return global::Tango.PMR.Stubs.CalculateReflection.Descriptor.MessageTypes[1]; } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - pbr::MessageDescriptor pb::IMessage.Descriptor { - get { return Descriptor; } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public CalculateResponse() { - OnConstruction(); - } - - partial void OnConstruction(); - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public CalculateResponse(CalculateResponse other) : this() { - sum_ = other.sum_; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public CalculateResponse Clone() { - return new CalculateResponse(this); - } - - /// <summary>Field number for the "Sum" field.</summary> - public const int SumFieldNumber = 1; - private double sum_; - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public double Sum { - get { return sum_; } - set { - sum_ = value; - } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public override bool Equals(object other) { - return Equals(other as CalculateResponse); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public bool Equals(CalculateResponse other) { - if (ReferenceEquals(other, null)) { - return false; - } - if (ReferenceEquals(other, this)) { - return true; - } - if (Sum != other.Sum) return false; - return true; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public override int GetHashCode() { - int hash = 1; - if (Sum != 0D) hash ^= Sum.GetHashCode(); - return hash; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public override string ToString() { - return pb::JsonFormatter.ToDiagnosticString(this); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public void WriteTo(pb::CodedOutputStream output) { - if (Sum != 0D) { - output.WriteRawTag(9); - output.WriteDouble(Sum); - } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public int CalculateSize() { - int size = 0; - if (Sum != 0D) { - size += 1 + 8; - } - return size; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public void MergeFrom(CalculateResponse other) { - if (other == null) { - return; - } - if (other.Sum != 0D) { - Sum = other.Sum; - } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public void MergeFrom(pb::CodedInputStream input) { - uint tag; - while ((tag = input.ReadTag()) != 0) { - switch(tag) { - default: - input.SkipLastField(); - break; - case 9: { - Sum = input.ReadDouble(); - break; - } - } - } - } - - } - #endregion } diff --git a/Software/Visual Studio/Tango.PMR/Stubs/CalculateResponse.cs b/Software/Visual Studio/Tango.PMR/Stubs/CalculateResponse.cs new file mode 100644 index 000000000..8d8ca7c44 --- /dev/null +++ b/Software/Visual Studio/Tango.PMR/Stubs/CalculateResponse.cs @@ -0,0 +1,159 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: CalculateResponse.proto +#pragma warning disable 1591, 0612, 3021 +#region Designer generated code + +using pb = global::Google.Protobuf; +using pbc = global::Google.Protobuf.Collections; +using pbr = global::Google.Protobuf.Reflection; +using scg = global::System.Collections.Generic; +namespace Tango.PMR.Stubs { + + /// <summary>Holder for reflection information generated from CalculateResponse.proto</summary> + public static partial class CalculateResponseReflection { + + #region Descriptor + /// <summary>File descriptor for CalculateResponse.proto</summary> + public static pbr::FileDescriptor Descriptor { + get { return descriptor; } + } + private static pbr::FileDescriptor descriptor; + + static CalculateResponseReflection() { + byte[] descriptorData = global::System.Convert.FromBase64String( + string.Concat( + "ChdDYWxjdWxhdGVSZXNwb25zZS5wcm90bxIPVGFuZ28uUE1SLlN0dWJzIiAK", + "EUNhbGN1bGF0ZVJlc3BvbnNlEgsKA1N1bRgBIAEoAUIbChljb20udHdpbmUu", + "dGFuZ28ucG1yLnN0dWJzYgZwcm90bzM=")); + descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, + new pbr::FileDescriptor[] { }, + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.CalculateResponse), global::Tango.PMR.Stubs.CalculateResponse.Parser, new[]{ "Sum" }, null, null, null) + })); + } + #endregion + + } + #region Messages + public sealed partial class CalculateResponse : pb::IMessage<CalculateResponse> { + private static readonly pb::MessageParser<CalculateResponse> _parser = new pb::MessageParser<CalculateResponse>(() => new CalculateResponse()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser<CalculateResponse> Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::Tango.PMR.Stubs.CalculateResponseReflection.Descriptor.MessageTypes[0]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public CalculateResponse() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public CalculateResponse(CalculateResponse other) : this() { + sum_ = other.sum_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public CalculateResponse Clone() { + return new CalculateResponse(this); + } + + /// <summary>Field number for the "Sum" field.</summary> + public const int SumFieldNumber = 1; + private double sum_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public double Sum { + get { return sum_; } + set { + sum_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as CalculateResponse); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(CalculateResponse other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Sum != other.Sum) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (Sum != 0D) hash ^= Sum.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (Sum != 0D) { + output.WriteRawTag(9); + output.WriteDouble(Sum); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (Sum != 0D) { + size += 1 + 8; + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(CalculateResponse other) { + if (other == null) { + return; + } + if (other.Sum != 0D) { + Sum = other.Sum; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 9: { + Sum = input.ReadDouble(); + break; + } + } + } + } + + } + + #endregion + +} + +#endregion Designer generated code diff --git a/Software/Visual Studio/Tango.PMR/Stubs/ProgressRequest.cs b/Software/Visual Studio/Tango.PMR/Stubs/ProgressRequest.cs new file mode 100644 index 000000000..1eda06217 --- /dev/null +++ b/Software/Visual Studio/Tango.PMR/Stubs/ProgressRequest.cs @@ -0,0 +1,131 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: ProgressRequest.proto +#pragma warning disable 1591, 0612, 3021 +#region Designer generated code + +using pb = global::Google.Protobuf; +using pbc = global::Google.Protobuf.Collections; +using pbr = global::Google.Protobuf.Reflection; +using scg = global::System.Collections.Generic; +namespace Tango.PMR.Stubs { + + /// <summary>Holder for reflection information generated from ProgressRequest.proto</summary> + public static partial class ProgressRequestReflection { + + #region Descriptor + /// <summary>File descriptor for ProgressRequest.proto</summary> + public static pbr::FileDescriptor Descriptor { + get { return descriptor; } + } + private static pbr::FileDescriptor descriptor; + + static ProgressRequestReflection() { + byte[] descriptorData = global::System.Convert.FromBase64String( + string.Concat( + "ChVQcm9ncmVzc1JlcXVlc3QucHJvdG8SD1RhbmdvLlBNUi5TdHVicyIRCg9Q", + "cm9ncmVzc1JlcXVlc3RCGwoZY29tLnR3aW5lLnRhbmdvLnBtci5zdHVic2IG", + "cHJvdG8z")); + descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, + new pbr::FileDescriptor[] { }, + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.ProgressRequest), global::Tango.PMR.Stubs.ProgressRequest.Parser, null, null, null, null) + })); + } + #endregion + + } + #region Messages + public sealed partial class ProgressRequest : pb::IMessage<ProgressRequest> { + private static readonly pb::MessageParser<ProgressRequest> _parser = new pb::MessageParser<ProgressRequest>(() => new ProgressRequest()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser<ProgressRequest> Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::Tango.PMR.Stubs.ProgressRequestReflection.Descriptor.MessageTypes[0]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public ProgressRequest() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public ProgressRequest(ProgressRequest other) : this() { + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public ProgressRequest Clone() { + return new ProgressRequest(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as ProgressRequest); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(ProgressRequest other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(ProgressRequest other) { + if (other == null) { + return; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + } + } + } + + } + + #endregion + +} + +#endregion Designer generated code diff --git a/Software/Visual Studio/Tango.PMR/Stubs/Progress.cs b/Software/Visual Studio/Tango.PMR/Stubs/ProgressResponse.cs index 2a00e15de..7ffc4a41d 100644 --- a/Software/Visual Studio/Tango.PMR/Stubs/Progress.cs +++ b/Software/Visual Studio/Tango.PMR/Stubs/ProgressResponse.cs @@ -1,5 +1,5 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! -// source: Progress.proto +// source: ProgressResponse.proto #pragma warning disable 1591, 0612, 3021 #region Designer generated code @@ -9,26 +9,25 @@ using pbr = global::Google.Protobuf.Reflection; using scg = global::System.Collections.Generic; namespace Tango.PMR.Stubs { - /// <summary>Holder for reflection information generated from Progress.proto</summary> - public static partial class ProgressReflection { + /// <summary>Holder for reflection information generated from ProgressResponse.proto</summary> + public static partial class ProgressResponseReflection { #region Descriptor - /// <summary>File descriptor for Progress.proto</summary> + /// <summary>File descriptor for ProgressResponse.proto</summary> public static pbr::FileDescriptor Descriptor { get { return descriptor; } } private static pbr::FileDescriptor descriptor; - static ProgressReflection() { + static ProgressResponseReflection() { byte[] descriptorData = global::System.Convert.FromBase64String( string.Concat( - "Cg5Qcm9ncmVzcy5wcm90bxIPVGFuZ28uUE1SLlN0dWJzIhEKD1Byb2dyZXNz", - "UmVxdWVzdCIkChBQcm9ncmVzc1Jlc3BvbnNlEhAKCFByb2dyZXNzGAEgASgB", - "QhsKGWNvbS50d2luZS50YW5nby5wbXIuc3R1YnNiBnByb3RvMw==")); + "ChZQcm9ncmVzc1Jlc3BvbnNlLnByb3RvEg9UYW5nby5QTVIuU3R1YnMiJAoQ", + "UHJvZ3Jlc3NSZXNwb25zZRIQCghQcm9ncmVzcxgBIAEoAUIbChljb20udHdp", + "bmUudGFuZ28ucG1yLnN0dWJzYgZwcm90bzM=")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { - new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.ProgressRequest), global::Tango.PMR.Stubs.ProgressRequest.Parser, null, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::Tango.PMR.Stubs.ProgressResponse), global::Tango.PMR.Stubs.ProgressResponse.Parser, new[]{ "Progress" }, null, null, null) })); } @@ -36,95 +35,6 @@ namespace Tango.PMR.Stubs { } #region Messages - public sealed partial class ProgressRequest : pb::IMessage<ProgressRequest> { - private static readonly pb::MessageParser<ProgressRequest> _parser = new pb::MessageParser<ProgressRequest>(() => new ProgressRequest()); - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public static pb::MessageParser<ProgressRequest> Parser { get { return _parser; } } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public static pbr::MessageDescriptor Descriptor { - get { return global::Tango.PMR.Stubs.ProgressReflection.Descriptor.MessageTypes[0]; } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - pbr::MessageDescriptor pb::IMessage.Descriptor { - get { return Descriptor; } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public ProgressRequest() { - OnConstruction(); - } - - partial void OnConstruction(); - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public ProgressRequest(ProgressRequest other) : this() { - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public ProgressRequest Clone() { - return new ProgressRequest(this); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public override bool Equals(object other) { - return Equals(other as ProgressRequest); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public bool Equals(ProgressRequest other) { - if (ReferenceEquals(other, null)) { - return false; - } - if (ReferenceEquals(other, this)) { - return true; - } - return true; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public override int GetHashCode() { - int hash = 1; - return hash; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public override string ToString() { - return pb::JsonFormatter.ToDiagnosticString(this); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public void WriteTo(pb::CodedOutputStream output) { - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public int CalculateSize() { - int size = 0; - return size; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public void MergeFrom(ProgressRequest other) { - if (other == null) { - return; - } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public void MergeFrom(pb::CodedInputStream input) { - uint tag; - while ((tag = input.ReadTag()) != 0) { - switch(tag) { - default: - input.SkipLastField(); - break; - } - } - } - - } - public sealed partial class ProgressResponse : pb::IMessage<ProgressResponse> { private static readonly pb::MessageParser<ProgressResponse> _parser = new pb::MessageParser<ProgressResponse>(() => new ProgressResponse()); [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -132,7 +42,7 @@ namespace Tango.PMR.Stubs { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { - get { return global::Tango.PMR.Stubs.ProgressReflection.Descriptor.MessageTypes[1]; } + get { return global::Tango.PMR.Stubs.ProgressResponseReflection.Descriptor.MessageTypes[0]; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] diff --git a/Software/Visual Studio/Tango.PMR/Tango.PMR.csproj b/Software/Visual Studio/Tango.PMR/Tango.PMR.csproj index dd873fe90..10a735256 100644 --- a/Software/Visual Studio/Tango.PMR/Tango.PMR.csproj +++ b/Software/Visual Studio/Tango.PMR/Tango.PMR.csproj @@ -53,8 +53,10 @@ <Compile Include="Jobs\Segment.cs" /> <Compile Include="MessageFactory.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> - <Compile Include="Stubs\Calculate.cs" /> - <Compile Include="Stubs\Progress.cs" /> + <Compile Include="Stubs\CalculateRequest.cs" /> + <Compile Include="Stubs\CalculateResponse.cs" /> + <Compile Include="Stubs\ProgressRequest.cs" /> + <Compile Include="Stubs\ProgressResponse.cs" /> <Compile Include="TangoMessage.cs" /> </ItemGroup> <ItemGroup> diff --git a/Software/Visual Studio/Tango.Stubs/Stubs/Progress.cs b/Software/Visual Studio/Tango.Stubs/Stubs/Progress.cs index 91cd1ab6a..e378791d2 100644 --- a/Software/Visual Studio/Tango.Stubs/Stubs/Progress.cs +++ b/Software/Visual Studio/Tango.Stubs/Stubs/Progress.cs @@ -19,7 +19,7 @@ namespace Tango.Stubs.Stubs protected override Task<string> OnRun(Action<String> multiResponseCallback) { - Transporter.SendMultiRequest<ProgressRequest, ProgressResponse>(MessageFactory.CreateTangoMessage<ProgressRequest>(), (response) => + Transporter.SendContinuousRequest<ProgressRequest, ProgressResponse>(MessageFactory.CreateTangoMessage<ProgressRequest>(), (response) => { multiResponseCallback(response.Progress.ToString()); }); diff --git a/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs b/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs index b2fd3fd10..97757deac 100644 --- a/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs +++ b/Software/Visual Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs @@ -75,7 +75,7 @@ namespace Tango.Transport.Adapters /// <returns></returns> public override Task Connect() { - ThrowFailedOrDisposed(); + ThrowIfDisposed(); return Task.Factory.StartNew(() => { @@ -133,7 +133,7 @@ namespace Tango.Transport.Adapters /// <param name="data">The data.</param> public override void Write(byte[] data) { - ThrowFailedOrDisposed(); + ThrowIfDisposed(); try { diff --git a/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs b/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs index eb7820697..665f2d779 100644 --- a/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs +++ b/Software/Visual Studio/Tango.Transport/Adapters/UsbTransportAdapter.cs @@ -40,7 +40,7 @@ namespace Tango.Transport.Adapters /// <returns></returns> public override Task Connect() { - ThrowFailedOrDisposed(); + ThrowIfDisposed(); return Task.Factory.StartNew(() => { @@ -79,7 +79,7 @@ namespace Tango.Transport.Adapters /// <returns></returns> public override Task Disconnect() { - ThrowFailedOrDisposed(); + ThrowIfDisposed(); return Task.Factory.StartNew(() => { @@ -114,7 +114,7 @@ namespace Tango.Transport.Adapters /// <param name="data">The data.</param> public override void Write(byte[] data) { - ThrowFailedOrDisposed(); + ThrowIfDisposed(); try { diff --git a/Software/Visual Studio/Tango.Transport/ITransporter.cs b/Software/Visual Studio/Tango.Transport/ITransporter.cs index 834c49818..07b7dd554 100644 --- a/Software/Visual Studio/Tango.Transport/ITransporter.cs +++ b/Software/Visual Studio/Tango.Transport/ITransporter.cs @@ -52,7 +52,7 @@ namespace Tango.Transport /// <param name="adapter">Transport adapter</param> /// <param name="responseCallback">The response callback delegate.</param> /// <returns></returns> - void SendMultiRequest<Request, Response>(TangoMessage<Request> request, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>; + void SendContinuousRequest<Request, Response>(TangoMessage<Request> request, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>; /// <summary> /// Sends a request through the specified adapter which is expected to return multiple response messages. @@ -63,7 +63,7 @@ namespace Tango.Transport /// <param name="adapter">Transport adapter</param> /// <param name="responseCallback">The response callback delegate.</param> /// <returns></returns> - void SendMultiRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>; + void SendContinuousRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response>; /// <summary> /// Sends a response. diff --git a/Software/Visual Studio/Tango.Transport/PendingResponse.cs b/Software/Visual Studio/Tango.Transport/PendingResponse.cs new file mode 100644 index 000000000..bb9718b19 --- /dev/null +++ b/Software/Visual Studio/Tango.Transport/PendingResponse.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Transport +{ + /// <summary> + /// Represents a pending response waiting to be returned to the request sender. + /// </summary> + public class PendingResponse + { + /// <summary> + /// Gets or sets the adapter target response adapter. + /// </summary> + public ITransportAdapter Adapter { get; set; } + + /// <summary> + /// Gets or sets a value indicating whether this is a continuous request. + /// </summary> + public bool IsContinuous { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="PendingResponse"/> class. + /// </summary> + /// <param name="adapter">The adapter.</param> + /// <param name="isContinuous">if set to <c>true</c> [is continuous].</param> + public PendingResponse(ITransportAdapter adapter,bool isContinuous) + { + Adapter = adapter; + IsContinuous = isContinuous; + } + } +} diff --git a/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj b/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj index 0f383b699..54c7565cf 100644 --- a/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj +++ b/Software/Visual Studio/Tango.Transport/Tango.Transport.csproj @@ -68,6 +68,7 @@ <Compile Include="ITransportComponent.cs" /> <Compile Include="ITransportAdapter.cs" /> <Compile Include="Adapters\TcpTransportAdapter.cs" /> + <Compile Include="PendingResponse.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="ITransporter.cs" /> <Compile Include="Servers\ClientConnectedEventArgs.cs" /> diff --git a/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs b/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs index 9f2fb3262..670ef20c3 100644 --- a/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs +++ b/Software/Visual Studio/Tango.Transport/TransportAdapterBase.cs @@ -84,9 +84,9 @@ namespace Tango.Transport /// <summary> /// Throws an exception if adapter is in a failed or disposed state. /// </summary> - protected virtual void ThrowFailedOrDisposed() + protected virtual void ThrowIfDisposed() { - if (State == TransportComponentState.Failed || State == TransportComponentState.Disposed) + if (State == TransportComponentState.Disposed) { throw LogManager.Log(new ObjectDisposedException("The adapter is in a " + State + " state.")); } diff --git a/Software/Visual Studio/Tango.Transport/TransportMessage.cs b/Software/Visual Studio/Tango.Transport/TransportMessage.cs index 0f8414333..90e600a06 100644 --- a/Software/Visual Studio/Tango.Transport/TransportMessage.cs +++ b/Software/Visual Studio/Tango.Transport/TransportMessage.cs @@ -36,7 +36,7 @@ namespace Tango.Transport /// Notifies the message observer of the new result. /// </summary> /// <param name="result">The result.</param> - public override void SetResult(object result) + public override void SetResult(object result, bool completed) { _completionSource.SetResult((T)result); } @@ -54,7 +54,7 @@ namespace Tango.Transport /// Invokes the response callback. /// </summary> /// <param name="response">The response.</param> - public override void InvokeResponseCallback(object response) + public override void InvokeResponseCallback(object response, bool completed) { ResponseCallback((T)response); } diff --git a/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs b/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs index d15e17755..c9202efc6 100644 --- a/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs +++ b/Software/Visual Studio/Tango.Transport/TransportMessageBase.cs @@ -19,7 +19,7 @@ namespace Tango.Transport /// <summary> /// Gets or sets a value indicating whether this instance is multi response. /// </summary> - public bool IsMultiResponse { get; set; } + public bool IsContinuous { get; set; } /// <summary> /// Gets or sets the message token. @@ -45,7 +45,7 @@ namespace Tango.Transport /// Notifies the message observer of the new result. /// </summary> /// <param name="result">The result.</param> - public abstract void SetResult(object result); + public abstract void SetResult(object result, bool completed); /// <summary> /// Notifies the message observer of an exception. @@ -57,7 +57,7 @@ namespace Tango.Transport /// Invokes the response callback. /// </summary> /// <param name="response">The response.</param> - public abstract void InvokeResponseCallback(object response); + public abstract void InvokeResponseCallback(object response, bool completed); /// <summary> /// Initializes a new instance of the <see cref="TransportMessageBase"/> class. diff --git a/Software/Visual Studio/Tango.Transport/TransporterBase.cs b/Software/Visual Studio/Tango.Transport/TransporterBase.cs index 5c07ed72a..3bfa019f3 100644 --- a/Software/Visual Studio/Tango.Transport/TransporterBase.cs +++ b/Software/Visual Studio/Tango.Transport/TransporterBase.cs @@ -25,7 +25,7 @@ namespace Tango.Transport private Thread _pushThread; private Thread _pullThread; private ObservableCollection<ITransportAdapter> _adapters; - private Dictionary<String, ITransportAdapter> _tokenAdapters; + private Dictionary<String, PendingResponse> _pendingResponses; #region Events @@ -120,9 +120,13 @@ namespace Tango.Transport /// <param name="e">The e.</param> protected virtual void OnAdapterStateChanged(object sender, TransportComponentState e) { - if (e == TransportComponentState.Failed) + if (e == TransportComponentState.Disposed) { - Disconnect().Wait(); + Adapters.Remove(sender as ITransportAdapter); + } + else if (e == TransportComponentState.Failed) + { + //TODO: decide what to do in this case.. } } @@ -206,7 +210,7 @@ namespace Tango.Transport public TransporterBase() { Adapters = new ObservableCollection<ITransportAdapter>(); - _tokenAdapters = new Dictionary<string, ITransportAdapter>(); + _pendingResponses = new Dictionary<string, PendingResponse>(); _sendingQueue = new ConcurrentQueue<TransportMessageBase>(); _pendingRequests = new List<TransportMessageBase>(); _arrivedResponses = new ConcurrentQueue<KeyValuePair<ITransportAdapter, byte[]>>(); @@ -271,13 +275,13 @@ namespace Tango.Transport /// <returns></returns> public Task<Response> SendRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter) where Request : IMessage<Request> where Response : IMessage<Response> { - LogManager.Log("Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token + " on adapter: " + (adapter != null ? adapter.Address : "ALL")); + LogManager.Log("Queuing request message: " + typeof(Request).Name + " Token: " + request.Container.Token + " on adapter: " + (adapter != null ? adapter.Address : "ALL")); LogManager.Log("Expected response: " + typeof(Response).Name); TaskCompletionSource<Response> source = new TaskCompletionSource<Response>(); TransportMessage<Response> message = new TransportMessage<Response>(adapter, request.Container.Token, request, TransportMessageDirection.Request, OnSerializeingMessage(request), source); _sendingQueue.Enqueue(message); - Task.Delay(RequestTimeout).ContinueWith((x) => + Task.Delay(RequestTimeout).ContinueWith((x) => { if (!source.Task.IsCompleted) { @@ -296,9 +300,9 @@ namespace Tango.Transport /// <typeparam name="Response">The type of the response.</typeparam> /// <param name="request">The request.</param> /// <param name="responseCallback">The response callback delegate.</param> - public void SendMultiRequest<Request, Response>(TangoMessage<Request> request, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response> + public void SendContinuousRequest<Request, Response>(TangoMessage<Request> request, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response> { - SendMultiRequest(request, null, responseCallback); + SendContinuousRequest(request, null, responseCallback); } /// <summary> @@ -309,14 +313,17 @@ namespace Tango.Transport /// <param name="request">The request.</param> /// <param name="adapter">Transport adapter</param> /// <param name="responseCallback">The response callback delegate.</param> - public void SendMultiRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response> + public void SendContinuousRequest<Request, Response>(TangoMessage<Request> request, ITransportAdapter adapter, Action<Response> responseCallback) where Request : IMessage<Request> where Response : IMessage<Response> { - LogManager.Log("Queuing multi response request message: " + typeof(Request).Name + " on adapter: " + (adapter != null ? adapter.Address : "ALL")); + LogManager.Log("Queuing continuous request message: " + typeof(Request).Name + " on adapter: " + (adapter != null ? adapter.Address : "ALL")); LogManager.Log("Expected response: " + typeof(Response).Name); + request.Container.Continuous = true; + request.Container.Completed = false; + TransportMessage<Response> message = new TransportMessage<Response>(adapter, request.Container.Token, request, TransportMessageDirection.Request, OnSerializeingMessage(request), null) { - IsMultiResponse = true, + IsContinuous = true, ResponseCallback = responseCallback, }; _sendingQueue.Enqueue(message); @@ -346,19 +353,31 @@ namespace Tango.Transport LogManager.Log("Queuing response message: " + typeof(Response).Name); + PendingResponse pendingResponse = null; ITransportAdapter adapter = null; LogManager.Log("Searching for matching request token: " + token); - if (_tokenAdapters.TryGetValue(token, out adapter)) + if (_pendingResponses.TryGetValue(token, out pendingResponse)) { + adapter = pendingResponse.Adapter; LogManager.Log("Found matching request token: " + token + " on adapter: " + (adapter != null ? adapter.Address : "ALL")); - LogManager.Log("Removing matching request token."); - _tokenAdapters.Remove(token); + + if (!pendingResponse.IsContinuous) + { + LogManager.Log("Removing matching request token."); + _pendingResponses.Remove(token); + } + else if (response.Container.Completed) + { + LogManager.Log("Response completed. Removing matching request token."); + _pendingResponses.Remove(token); + } } else { - LogManager.Log("Matching request token was not found..."); + //This should never happen. + throw LogManager.Log(new InvalidOperationException("Matching request token was not found!")); } TaskCompletionSource<object> source = new TaskCompletionSource<object>(); @@ -371,9 +390,9 @@ namespace Tango.Transport #region Private Methods - /// <summary> - /// Starts the pull and push threads. - /// </summary> + /// <summary> + /// Starts the pull and push threads. + /// </summary> private void StartThreads() { _pullThread = new Thread(PullThreadMethod); @@ -410,11 +429,13 @@ namespace Tango.Transport foreach (var adapter in Adapters.Where(x => x.State == TransportComponentState.Connected)) { adapter.Write(message.Serialize()); + LogManager.Log("Message sent on adapter: " + adapter.Address + "..."); } } else { message.Adapter.Write(message.Serialize()); + LogManager.Log("Message sent on adapter: " + message.Adapter.Address + "..."); } if (message.Direction == TransportMessageDirection.Request) @@ -423,7 +444,7 @@ namespace Tango.Transport } else { - message.SetResult(true); + message.SetResult(true, true); } } catch (Exception ex) @@ -472,7 +493,7 @@ namespace Tango.Transport { LogManager.Log("Found pending request: " + request.Message.GetType().GetGenericArguments()[0].Name); - if (!request.IsMultiResponse) + if (!request.IsContinuous) { LogManager.Log("Pending request was identified as 'single response'. Removing pending request."); @@ -481,21 +502,25 @@ namespace Tango.Transport try { LogManager.Log("Parsing inner response message and setting pending request task result..."); - request.SetResult(OnParseMessage(container)); + request.SetResult(OnParseMessage(container), true); } catch (Exception ex) { - request.SetException(LogManager.Log(ex,"Error parsing inner message.")); + request.SetException(LogManager.Log(ex, "Error parsing inner message.")); } } else { - LogManager.Log("Pending request was identified as 'multi response'. keeping pending request."); + LogManager.Log("Pending request was identified as 'continuous response'. keeping pending request."); try { - LogManager.Log("Parsing inner response message and invoking multi response callback..."); - request.InvokeResponseCallback(OnParseMessage(container)); + LogManager.Log("Parsing inner response message and invoking continuous response callback..."); + if (container.Completed) + { + LogManager.Log("Continuous sequence completed."); + } + request.InvokeResponseCallback(OnParseMessage(container), container.Completed); } catch (Exception ex) { @@ -510,7 +535,7 @@ namespace Tango.Transport try { LogManager.Log("Saving request token and adapter: " + container.Token + ", " + data.Key.Address); - _tokenAdapters.Add(container.Token, data.Key); + _pendingResponses.Add(container.Token, new PendingResponse(data.Key, container.Continuous)); LogManager.Log("Invoking RequestReceived event..."); Task.Factory.StartNew(() => OnRequestReceived(container)); } |
