diff options
| author | Roy Ben-Shabat <Roy@Twine-s.com> | 2017-11-16 13:38:56 +0200 |
|---|---|---|
| committer | Roy Ben-Shabat <Roy@Twine-s.com> | 2017-11-16 13:38:56 +0200 |
| commit | 914f4db513477d9aff726546bac47545195a3e37 (patch) | |
| tree | d2ff190fd84b1dfaa03eec76563c431592ece7ff /Software/Android_Studio/Tango.Integration/src/main/java | |
| parent | 65d01ff549d80fbe13ff5e966df216c9f7c03653 (diff) | |
| download | Tango-914f4db513477d9aff726546bac47545195a3e37.tar.gz Tango-914f4db513477d9aff726546bac47545195a3e37.zip | |
Rename "Visual Studio" to "Visual_Studio"
Rename "External Repositories" to "External_Repositories".
Diffstat (limited to 'Software/Android_Studio/Tango.Integration/src/main/java')
7 files changed, 495 insertions, 0 deletions
diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/SerialAdapter.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/SerialAdapter.java new file mode 100644 index 000000000..5224a0729 --- /dev/null +++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/SerialAdapter.java @@ -0,0 +1,20 @@ +package com.twine.tango.integration.adapters; + +import java.io.IOException; + +import io.reactivex.Observable; + +/** + * Created by Roy on 11/8/2017. + */ + +public interface SerialAdapter { + + void setAddress(String address); + String getAddress(); + void open() throws IOException; + void close() throws IOException; + void write(byte[] data) throws IOException; + Observable<byte[]> registerReceiveListener(); + boolean getIsOpen(); +} diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/TcpAdapter.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/TcpAdapter.java new file mode 100644 index 000000000..3858b2c5f --- /dev/null +++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/adapters/TcpAdapter.java @@ -0,0 +1,92 @@ +package com.twine.tango.integration.adapters; + +import android.os.SystemClock; +import com.elvishew.xlog.XLog; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; + +/** + * Created by Roy on 11/8/2017. + */ + +public class TcpAdapter implements SerialAdapter { + + private Socket socket; + private String address; + private boolean isOpen; + private static final int port = 9999; + private Thread receiveThread; + private PublishSubject<byte[]> subject; + + public TcpAdapter(String address) { + this.address = address; + } + + @Override + public void setAddress(String address) { + this.address = address; + } + + @Override + public String getAddress() { + return this.address; + } + + @Override + public void open() throws IOException { + socket = new Socket(address, port); + isOpen = true; + receiveThread = new Thread(this::receiveThreadMethod); + receiveThread.start(); + } + + @Override + public void close() throws IOException { + socket.close(); + isOpen = false; + } + + @Override + public void write(byte[] data) throws IOException { + socket.getOutputStream().write(data); + } + + @Override + public Observable<byte[]> registerReceiveListener() { + subject = PublishSubject.create(); + return subject.observeOn(Schedulers.io()); + } + + @Override + public boolean getIsOpen() { + return isOpen; + } + + private void receiveThreadMethod() { + while (isOpen) { + + try { + + InputStream stream = socket.getInputStream(); + + if (stream.available() > 0) + { + byte[] data = new byte[stream.available()]; + int read = stream.read(data); + if (subject != null && read > 0) subject.onNext(data); + } + + } catch (Exception e) { + isOpen = false; + XLog.e(e); + if (subject != null) subject.onError(e); + } + + SystemClock.sleep(10); + } + } +} diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperator.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperator.java new file mode 100644 index 000000000..33a7d40ff --- /dev/null +++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperator.java @@ -0,0 +1,244 @@ +package com.twine.tango.integration.machine; + +import android.os.SystemClock; +import com.elvishew.xlog.XLog; +import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.InvalidProtocolBufferException; +import com.twine.tango.integration.adapters.SerialAdapter; +import com.twine.tango.pmr.MessageFactory; +import com.twine.tango.pmr.TangoMessage; +import com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer; +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.Locale; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; + +import javax.inject.Inject; + +import io.reactivex.Observable; +import io.reactivex.Scheduler; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; + +/** + * Created by Roy on 11/8/2017. + */ + +public class MachineOperator implements MachineOperatorInterface { + + private SerialAdapter channel; + private Calendar calendar; + private DateFormat dateFormatter; + private ConcurrentLinkedQueue<RequestMessage> requestsQueue; + private List<RequestMessage> pendingRequests; + private ConcurrentLinkedQueue<ResponseMessage> responsesQueue; + private PublishSubject<byte[]> receiveDataSubject; + private Thread pushThread; + private Thread pullThread; + private boolean isConnected; + private Scheduler ioScheduler; + private Scheduler uiScheduler; + + @Inject + public MachineOperator(SerialAdapter channel) { + + ioScheduler = Schedulers.io(); + uiScheduler = AndroidSchedulers.mainThread(); + calendar = Calendar.getInstance(); + dateFormatter = new SimpleDateFormat("HH:mm:ss.SSS", Locale.US); + requestsQueue = new ConcurrentLinkedQueue<>(); + pendingRequests = new ArrayList<>(); + responsesQueue = new ConcurrentLinkedQueue<>(); + + setCommunicationChannel(channel); + + connect().subscribe(); + } + + private void setCommunicationChannel(SerialAdapter channel) { + if (channel.getIsOpen()) { + try { + channel.close(); + } catch (IOException e) { + XLog.e(e); + } + } + + boolean reconnect = false; + + if (isConnected) { + reconnect = true; + disconnect(); + } + + this.channel = channel; + this.channel.registerReceiveListener().subscribe((data) -> + { + + try { + MessageContainer container = MessageContainer.parseFrom(data); + + RequestMessage pendingRequest = Observable.fromIterable(pendingRequests).filter(x -> x.getToken().equals(container.getToken())).blockingFirst(null); + + if (pendingRequest != null) { + pendingRequests.remove(pendingRequest); + responsesQueue.add(new ResponseMessage(pendingRequest.getSubject(), pendingRequest.getResponseClass(), data)); + } else { + XLog.w("Could not find matching request for response: %s", container.getType().toString()); + } + } catch (InvalidProtocolBufferException e) { + XLog.e(e); + } + + if (receiveDataSubject != null) + { + receiveDataSubject.onNext(data); + } + + }); + + if (reconnect) { + connect(); + } + } + + public <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> send(TangoMessage<Request> message, Class<Response> className) { + + PublishSubject<Response> subject = PublishSubject.create(); + requestsQueue.add(new RequestMessage<>(subject, message, className, UUID.randomUUID().toString(), calendar.getTime())); + return subject.subscribeOn(ioScheduler).observeOn(uiScheduler); + } + + + @Override + public Observable connect() { + + return Observable.create((x) -> + { + try { + channel.open(); + isConnected = true; + startThreads(); + x.onComplete(); + } catch (Exception e) { + XLog.e("Could not connect to Tango machine.", e); + x.onError(e); + } + + }).subscribeOn(ioScheduler).observeOn(uiScheduler); + + } + + @Override + public Observable disconnect() { + + return Observable.create((x) -> + { + try { + channel.close(); + isConnected = false; + x.onComplete(); + stopThreads(); + } catch (Exception e) { + XLog.e("Could not disconnect from Tango machine.", e); + x.onError(e); + } + + }).subscribeOn(ioScheduler).observeOn(uiScheduler); + + } + + private void startThreads() { + pushThread = new Thread(this::pushThreadMethod, "Push Thread"); + pushThread.start(); + + pullThread = new Thread(this::pullThreadMethod, "Pull Thread"); + pullThread.start(); + } + + private void stopThreads() { + isConnected = false; + pullThread.interrupt(); + pushThread.interrupt(); + } + + public void writeData(byte[] data) + { + try { + channel.write(data); + } catch (IOException e) { + XLog.e(e); + } + } + + @Override + public Observable<byte[]> receiveData() { + receiveDataSubject = PublishSubject.create(); + return receiveDataSubject.observeOn(uiScheduler).subscribeOn(ioScheduler); + } + + private void pushThreadMethod() { + + while (isConnected) { + + if (requestsQueue.size() > 0) { + RequestMessage request = requestsQueue.poll(); + if (request != null) { + pendingRequests.add(request); + try { + channel.write(request.getMessage().toBytes()); + } catch (IOException e) { + isConnected = false; + XLog.e(e); + } + } + } + + SystemClock.sleep(10); + } + } + + private void pullThreadMethod() { + while (isConnected) { + + if (responsesQueue.size() > 0) { + ResponseMessage response = responsesQueue.poll(); + if (response != null) { + + try { + TangoMessage message = MessageFactory.parseTangoMessage(response.getData()); + response.getSubject().onNext(message); + response.getSubject().onComplete(); + } catch (Exception e) { + response.getSubject().onError(e); + XLog.e("Could not parse response.", e); + } + + } + } + + SystemClock.sleep(10); + } + } + + private void postRequest(RequestMessage request) { + +// SystemClock.sleep(5000); +// +// TangoMessage<Job> jobTangoMessage = MessageFactory.createTangoMessage(Job.class); +// +// jobTangoMessage.setMessage(Job.newBuilder().setName("Result Job").build()); +// +// XLog.d("Posting result for request: %s TIME: %s", request.token, dateFormatter.format(request.dateTime)); +// +// request.subject.onNext(jobTangoMessage.getMessage()); +// +// request.subject.onComplete(); + } +} diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperatorInterface.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperatorInterface.java new file mode 100644 index 000000000..39e7e31d7 --- /dev/null +++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MachineOperatorInterface.java @@ -0,0 +1,19 @@ +package com.twine.tango.integration.machine; + +import com.google.protobuf.GeneratedMessageV3; +import com.twine.tango.pmr.TangoMessage; + +import io.reactivex.Observable; + +/** + * Created by Roy on 11/8/2017. + */ + +public interface MachineOperatorInterface { + + <Request extends GeneratedMessageV3, Response extends GeneratedMessageV3> Observable<Response> send(TangoMessage<Request> message,Class<Response> className); + Observable connect(); + Observable disconnect(); + void writeData(byte[] data); + Observable<byte[]> receiveData(); +} diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MessageBase.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MessageBase.java new file mode 100644 index 000000000..c34e3653a --- /dev/null +++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/MessageBase.java @@ -0,0 +1,24 @@ +package com.twine.tango.integration.machine; + +import io.reactivex.subjects.PublishSubject; + +/** + * Created by Roy on 11/8/2017. + */ + +public class MessageBase { + + private PublishSubject subject; + + public PublishSubject getSubject() { + return subject; + } + + public void setSubject(PublishSubject subject) { + this.subject = subject; + } + + public MessageBase(PublishSubject subject) { + this.subject = subject; + } +} diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/RequestMessage.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/RequestMessage.java new file mode 100644 index 000000000..dde1ff765 --- /dev/null +++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/RequestMessage.java @@ -0,0 +1,59 @@ +package com.twine.tango.integration.machine; + +import com.google.protobuf.GeneratedMessageV3; +import com.twine.tango.pmr.TangoMessage; + +import java.util.Date; + +import io.reactivex.subjects.PublishSubject; + +/** + * Created by Roy on 11/8/2017. + */ +public class RequestMessage<T extends GeneratedMessageV3,Response extends GeneratedMessageV3> extends MessageBase { + + private String token; + private Date dateTime; + private TangoMessage<T> message; + private Class<Response> responseClass; + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } + + public Date getDateTime() { + return dateTime; + } + + public void setDateTime(Date dateTime) { + this.dateTime = dateTime; + } + + public TangoMessage<T> getMessage() { + return message; + } + + public void setMessage(TangoMessage<T> message) { + this.message = message; + } + + public Class<Response> getResponseClass() { + return responseClass; + } + + public void setResponseClass(Class<Response> responseClass) { + this.responseClass = responseClass; + } + + public RequestMessage(PublishSubject subject, TangoMessage<T> message, Class<Response> responseClass, String token, Date time) { + super(subject); + this.message = message; + this.token = token; + this.dateTime = time; + this.responseClass = responseClass; + } +} diff --git a/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/ResponseMessage.java b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/ResponseMessage.java new file mode 100644 index 000000000..13de24286 --- /dev/null +++ b/Software/Android_Studio/Tango.Integration/src/main/java/com/twine/tango/integration/machine/ResponseMessage.java @@ -0,0 +1,37 @@ +package com.twine.tango.integration.machine; + +import com.twine.tango.pmr.TangoMessage; + +import io.reactivex.subjects.PublishSubject; + +/** + * Created by Roy on 11/8/2017. + */ + +public class ResponseMessage extends MessageBase { + + private byte[] data; + private Class<?> responseClass; + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + public Class<?> getResponseClass() { + return responseClass; + } + + public void setResponseClass(Class<?> responseClass) { + this.responseClass = responseClass; + } + + public ResponseMessage(PublishSubject subject, Class<?> responseClass, byte[] data) { + super(subject); + this.data = data; + this.responseClass = responseClass; + } +} |
