aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Android_Studio/Tango.Integration/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Android_Studio/Tango.Integration/src/main/java')
-rw-r--r--Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/SerialAdapter.java20
-rw-r--r--Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/TcpAdapter.java92
-rw-r--r--Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperator.java244
-rw-r--r--Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperatorInterface.java19
-rw-r--r--Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MessageBase.java24
-rw-r--r--Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/RequestMessage.java59
-rw-r--r--Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/ResponseMessage.java37
7 files changed, 495 insertions, 0 deletions
diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/SerialAdapter.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/SerialAdapter.java
new file mode 100644
index 000000000..5224a0729
--- /dev/null
+++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/SerialAdapter.java
@@ -0,0 +1,20 @@
+package com.twine.tango.integration.adapters;
+
+import java.io.IOException;
+
+import io.reactivex.Observable;
+
+/**
+ * Created by Roy on 11/8/2017.
+ */
+
+public interface SerialAdapter {
+
+ void setAddress(String address);
+ String getAddress();
+ void open() throws IOException;
+ void close() throws IOException;
+ void write(byte[] data) throws IOException;
+ Observable<byte[]> registerReceiveListener();
+ boolean getIsOpen();
+}
diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/TcpAdapter.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/TcpAdapter.java
new file mode 100644
index 000000000..3858b2c5f
--- /dev/null
+++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/TcpAdapter.java
@@ -0,0 +1,92 @@
+package com.twine.tango.integration.adapters;
+
+import android.os.SystemClock;
+import com.elvishew.xlog.XLog;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import io.reactivex.Observable;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.subjects.PublishSubject;
+
+/**
+ * Created by Roy on 11/8/2017.
+ */
+
+public class TcpAdapter implements SerialAdapter {
+
+ private Socket socket;
+ private String address;
+ private boolean isOpen;
+ private static final int port = 9999;
+ private Thread receiveThread;
+ private PublishSubject<byte[]> subject;
+
+ public TcpAdapter(String address) {
+ this.address = address;
+ }
+
+ @Override
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ @Override
+ public String getAddress() {
+ return this.address;
+ }
+
+ @Override
+ public void open() throws IOException {
+ socket = new Socket(address, port);
+ isOpen = true;
+ receiveThread = new Thread(this::receiveThreadMethod);
+ receiveThread.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ socket.close();
+ isOpen = false;
+ }
+
+ @Override
+ public void write(byte[] data) throws IOException {
+ socket.getOutputStream().write(data);
+ }
+
+ @Override
+ public Observable<byte[]> registerReceiveListener() {
+ subject = PublishSubject.create();
+ return subject.observeOn(Schedulers.io());
+ }
+
+ @Override
+ public boolean getIsOpen() {
+ return isOpen;
+ }
+
+ private void receiveThreadMethod() {
+ while (isOpen) {
+
+ try {
+
+ InputStream stream = socket.getInputStream();
+
+ if (stream.available() > 0)
+ {
+ byte[] data = new byte[stream.available()];
+ int read = stream.read(data);
+ if (subject != null && read > 0) subject.onNext(data);
+ }
+
+ } catch (Exception e) {
+ isOpen = false;
+ XLog.e(e);
+ if (subject != null) subject.onError(e);
+ }
+
+ SystemClock.sleep(10);
+ }
+ }
+}
diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperator.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperator.java
new file mode 100644
index 000000000..33a7d40ff
--- /dev/null
+++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperator.java
@@ -0,0 +1,244 @@
+package com.twine.tango.integration.machine;
+
+import android.os.SystemClock;
+import com.elvishew.xlog.XLog;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.twine.tango.integration.adapters.SerialAdapter;
+import com.twine.tango.pmr.MessageFactory;
+import com.twine.tango.pmr.TangoMessage;
+import com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.inject.Inject;
+
+import io.reactivex.Observable;
+import io.reactivex.Scheduler;
+import io.reactivex.android.schedulers.AndroidSchedulers;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.subjects.PublishSubject;
+
+/**
+ * Created by Roy on 11/8/2017.
+ */
+
+public class MachineOperator implements MachineOperatorInterface {
+
+ private SerialAdapter channel;
+ private Calendar calendar;
+ private DateFormat dateFormatter;
+ private ConcurrentLinkedQueue<RequestMessage> requestsQueue;
+ private List<RequestMessage> pendingRequests;
+ private ConcurrentLinkedQueue<ResponseMessage> responsesQueue;
+ private PublishSubject<byte[]> receiveDataSubject;
+ private Thread pushThread;
+ private Thread pullThread;
+ private boolean isConnected;
+ private Scheduler ioScheduler;
+ private Scheduler uiScheduler;
+
+ @Inject
+ public MachineOperator(SerialAdapter channel) {
+
+ ioScheduler = Schedulers.io();
+ uiScheduler = AndroidSchedulers.mainThread();
+ calendar = Calendar.getInstance();
+ dateFormatter = new SimpleDateFormat("HH:mm:ss.SSS", Locale.US);
+ requestsQueue = new ConcurrentLinkedQueue<>();
+ pendingRequests = new ArrayList<>();
+ responsesQueue = new ConcurrentLinkedQueue<>();
+
+ setCommunicationChannel(channel);
+
+ connect().subscribe();
+ }
+
+ private void setCommunicationChannel(SerialAdapter channel) {
+ if (channel.getIsOpen()) {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ XLog.e(e);
+ }
+ }
+
+ boolean reconnect = false;
+
+ if (isConnected) {
+ reconnect = true;
+ disconnect();
+ }
+
+ this.channel = channel;
+ this.channel.registerReceiveListener().subscribe((data) ->
+ {
+
+ try {
+ MessageContainer container = MessageContainer.parseFrom(data);
+
+ RequestMessage pendingRequest = Observable.fromIterable(pendingRequests).filter(x -> x.getToken().equals(container.getToken())).blockingFirst(null);
+
+ if (pendingRequest != null) {
+ pendingRequests.remove(pendingRequest);
+ responsesQueue.add(new ResponseMessage(pendingRequest.getSubject(), pendingRequest.getResponseClass(), data));
+ } else {
+ XLog.w("Could not find matching request for response: %s", container.getType().toString());
+ }
+ } catch (InvalidProtocolBufferException e) {
+ XLog.e(e);
+ }
+
+ if (receiveDataSubject != null)
+ {
+ receiveDataSubject.onNext(data);
+ }
+
+ });
+
+ if (reconnect) {
+ connect();
+ }
+ }
+
+ public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> send(TangoMessage<Request> message, Class<Response> className) {
+
+ PublishSubject<Response> subject = PublishSubject.create();
+ requestsQueue.add(new RequestMessage<>(subject, message, className, UUID.randomUUID().toString(), calendar.getTime()));
+ return subject.subscribeOn(ioScheduler).observeOn(uiScheduler);
+ }
+
+
+ @Override
+ public Observable connect() {
+
+ return Observable.create((x) ->
+ {
+ try {
+ channel.open();
+ isConnected = true;
+ startThreads();
+ x.onComplete();
+ } catch (Exception e) {
+ XLog.e("Could not connect to Tango machine.", e);
+ x.onError(e);
+ }
+
+ }).subscribeOn(ioScheduler).observeOn(uiScheduler);
+
+ }
+
+ @Override
+ public Observable disconnect() {
+
+ return Observable.create((x) ->
+ {
+ try {
+ channel.close();
+ isConnected = false;
+ x.onComplete();
+ stopThreads();
+ } catch (Exception e) {
+ XLog.e("Could not disconnect from Tango machine.", e);
+ x.onError(e);
+ }
+
+ }).subscribeOn(ioScheduler).observeOn(uiScheduler);
+
+ }
+
+ private void startThreads() {
+ pushThread = new Thread(this::pushThreadMethod, "Push Thread");
+ pushThread.start();
+
+ pullThread = new Thread(this::pullThreadMethod, "Pull Thread");
+ pullThread.start();
+ }
+
+ private void stopThreads() {
+ isConnected = false;
+ pullThread.interrupt();
+ pushThread.interrupt();
+ }
+
+ public void writeData(byte[] data)
+ {
+ try {
+ channel.write(data);
+ } catch (IOException e) {
+ XLog.e(e);
+ }
+ }
+
+ @Override
+ public Observable<byte[]> receiveData() {
+ receiveDataSubject = PublishSubject.create();
+ return receiveDataSubject.observeOn(uiScheduler).subscribeOn(ioScheduler);
+ }
+
+ private void pushThreadMethod() {
+
+ while (isConnected) {
+
+ if (requestsQueue.size() > 0) {
+ RequestMessage request = requestsQueue.poll();
+ if (request != null) {
+ pendingRequests.add(request);
+ try {
+ channel.write(request.getMessage().toBytes());
+ } catch (IOException e) {
+ isConnected = false;
+ XLog.e(e);
+ }
+ }
+ }
+
+ SystemClock.sleep(10);
+ }
+ }
+
+ private void pullThreadMethod() {
+ while (isConnected) {
+
+ if (responsesQueue.size() > 0) {
+ ResponseMessage response = responsesQueue.poll();
+ if (response != null) {
+
+ try {
+ TangoMessage message = MessageFactory.parseTangoMessage(response.getData());
+ response.getSubject().onNext(message);
+ response.getSubject().onComplete();
+ } catch (Exception e) {
+ response.getSubject().onError(e);
+ XLog.e("Could not parse response.", e);
+ }
+
+ }
+ }
+
+ SystemClock.sleep(10);
+ }
+ }
+
+ private void postRequest(RequestMessage request) {
+
+// SystemClock.sleep(5000);
+//
+// TangoMessage<Job> jobTangoMessage = MessageFactory.createTangoMessage(Job.class);
+//
+// jobTangoMessage.setMessage(Job.newBuilder().setName("Result Job").build());
+//
+// XLog.d("Posting result for request: %s TIME: %s", request.token, dateFormatter.format(request.dateTime));
+//
+// request.subject.onNext(jobTangoMessage.getMessage());
+//
+// request.subject.onComplete();
+ }
+}
diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperatorInterface.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperatorInterface.java
new file mode 100644
index 000000000..39e7e31d7
--- /dev/null
+++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperatorInterface.java
@@ -0,0 +1,19 @@
+package com.twine.tango.integration.machine;
+
+import com.google.protobuf.GeneratedMessageV3;
+import com.twine.tango.pmr.TangoMessage;
+
+import io.reactivex.Observable;
+
+/**
+ * Created by Roy on 11/8/2017.
+ */
+
+public interface MachineOperatorInterface {
+
+ <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> send(TangoMessage<Request> message,Class<Response> className);
+ Observable connect();
+ Observable disconnect();
+ void writeData(byte[] data);
+ Observable<byte[]> receiveData();
+}
diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MessageBase.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MessageBase.java
new file mode 100644
index 000000000..c34e3653a
--- /dev/null
+++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MessageBase.java
@@ -0,0 +1,24 @@
+package com.twine.tango.integration.machine;
+
+import io.reactivex.subjects.PublishSubject;
+
+/**
+ * Created by Roy on 11/8/2017.
+ */
+
+public class MessageBase {
+
+ private PublishSubject subject;
+
+ public PublishSubject getSubject() {
+ return subject;
+ }
+
+ public void setSubject(PublishSubject subject) {
+ this.subject = subject;
+ }
+
+ public MessageBase(PublishSubject subject) {
+ this.subject = subject;
+ }
+}
diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/RequestMessage.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/RequestMessage.java
new file mode 100644
index 000000000..dde1ff765
--- /dev/null
+++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/RequestMessage.java
@@ -0,0 +1,59 @@
+package com.twine.tango.integration.machine;
+
+import com.google.protobuf.GeneratedMessageV3;
+import com.twine.tango.pmr.TangoMessage;
+
+import java.util.Date;
+
+import io.reactivex.subjects.PublishSubject;
+
+/**
+ * Created by Roy on 11/8/2017.
+ */
+public class RequestMessage<T extends GeneratedMessageV3,Response extends GeneratedMessageV3> extends MessageBase {
+
+ private String token;
+ private Date dateTime;
+ private TangoMessage<T> message;
+ private Class<Response> responseClass;
+
+ public String getToken() {
+ return token;
+ }
+
+ public void setToken(String token) {
+ this.token = token;
+ }
+
+ public Date getDateTime() {
+ return dateTime;
+ }
+
+ public void setDateTime(Date dateTime) {
+ this.dateTime = dateTime;
+ }
+
+ public TangoMessage<T> getMessage() {
+ return message;
+ }
+
+ public void setMessage(TangoMessage<T> message) {
+ this.message = message;
+ }
+
+ public Class<Response> getResponseClass() {
+ return responseClass;
+ }
+
+ public void setResponseClass(Class<Response> responseClass) {
+ this.responseClass = responseClass;
+ }
+
+ public RequestMessage(PublishSubject subject, TangoMessage<T> message, Class<Response> responseClass, String token, Date time) {
+ super(subject);
+ this.message = message;
+ this.token = token;
+ this.dateTime = time;
+ this.responseClass = responseClass;
+ }
+}
diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/ResponseMessage.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/ResponseMessage.java
new file mode 100644
index 000000000..13de24286
--- /dev/null
+++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/ResponseMessage.java
@@ -0,0 +1,37 @@
+package com.twine.tango.integration.machine;
+
+import com.twine.tango.pmr.TangoMessage;
+
+import io.reactivex.subjects.PublishSubject;
+
+/**
+ * Created by Roy on 11/8/2017.
+ */
+
+public class ResponseMessage extends MessageBase {
+
+ private byte[] data;
+ private Class<?> responseClass;
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public Class<?> getResponseClass() {
+ return responseClass;
+ }
+
+ public void setResponseClass(Class<?> responseClass) {
+ this.responseClass = responseClass;
+ }
+
+ public ResponseMessage(PublishSubject subject, Class<?> responseClass, byte[] data) {
+ super(subject);
+ this.data = data;
+ this.responseClass = responseClass;
+ }
+}