From 2ea2bb5bcd96045f1bd6cb4c3d8b8416dbaa05dc Mon Sep 17 00:00:00 2001 From: Roy Date: Tue, 26 Dec 2017 21:16:15 +0200 Subject: MERGE --- .../twine/tango/transport/ITransportAdapter.java | 14 ++++ .../com/twine/tango/transport/ITransporter.java | 5 +- .../tango/transport/ResponseErrorException.java | 57 +++++++++++++ .../tango/transport/TransportAdapterBase.java | 51 +++++++++++- .../tango/transport/TransportAdapterMode.java | 17 ++++ .../com/twine/tango/transport/TransporterBase.java | 14 ++-- .../transport/adapters/TcpTransportAdapter.java | 96 +++++++++++++++------- .../transport/adapters/UsbTransportAdapter.java | 57 ++++++++++++- .../transporters/ResponseErrorException.java | 43 ---------- 9 files changed, 270 insertions(+), 84 deletions(-) create mode 100644 Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ResponseErrorException.java create mode 100644 Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java delete mode 100644 Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/transporters/ResponseErrorException.java (limited to 'Software/Android_Studio/Tango.Transport') diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java index abd12e0f0..2e076b1c8 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java @@ -40,6 +40,20 @@ public interface ITransportAdapter extends ITransportComponent { */ void setAddress(String address); + /** + * Sets the adapter mode. + * + * @param mode the mode + */ + void setAdapterMode(TransportAdapterMode mode); + + /** + * Gets the adapter mode. + * + * @return the adapter mode + */ + TransportAdapterMode getAdapterMode(); + /** * Gets whether this adapter is authenticated. diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java index 9d2bce329..1ad704829 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java @@ -8,6 +8,7 @@ import com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer; import org.joda.time.Period; +import io.reactivex.Completable; import io.reactivex.Observable; import io.reactivex.Single; @@ -73,7 +74,7 @@ public interface ITransporter extends ITransportComponent { * @param response the response * @return the single */ - Single sendResponse(TangoMessage response); + Completable sendResponse(TangoMessage response); /** * Sends a response with the specified request token. @@ -83,7 +84,7 @@ public interface ITransporter extends ITransportComponent { * @param token the token * @return the single */ - Single sendResponse(TangoMessage response,String token); + Completable sendResponse(TangoMessage response,String token); /** * Sets request received listener. diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ResponseErrorException.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ResponseErrorException.java new file mode 100644 index 000000000..896e92b38 --- /dev/null +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ResponseErrorException.java @@ -0,0 +1,57 @@ +package com.twine.tango.transport; + +import com.twine.tango.pmr.TangoMessage; +import com.twine.tango.pmr.common.ErrorCodeOuterClass.ErrorCode; + + +/** + * Represents a tango message response exception which indicates a response container with {@link ErrorCode} different from None. + */ +public class ResponseErrorException extends Exception +{ + private ErrorCode error; + + private TangoMessage response; + + public TangoMessage getResponse() + { + return response; + } + + public void setResponse(TangoMessage response) + { + this.response = response; + } + + /** + * Gets the error. + * + * @return the error + */ + public ErrorCode getError() + { + return error; + } + + /** + * Sets the error. + * + * @param error the error + */ + public void setError(ErrorCode error) + { + this.error = error; + } + + /** + * Instantiates a new Response error exception. + * + * @param error the error + */ + public ResponseErrorException(ErrorCode error,TangoMessage response) + { + super("Response received with error " + error.toString()); + setError(error); + setResponse(response); + } +} diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java index b9221ea27..6d5667ada 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java @@ -6,6 +6,9 @@ import com.twine.tango.core.IEventHandler; import com.twine.tango.core.ObjectDisposedException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; import io.reactivex.Completable; @@ -15,18 +18,20 @@ import io.reactivex.Completable; */ public abstract class TransportAdapterBase implements ITransportAdapter { - + protected static final int MAX_BUFFER_SIZE = 1024 * 1024 * 10; //10 MB. private boolean isAuthenticated; private String address; private Event stateChangedEvent; private IEventHandler dataAvailableListener; private TransportComponentState state; + private TransportAdapterMode adapterMode; /** * Instantiates a new Transport adapter base. */ public TransportAdapterBase() { + setAdapterMode(TransportAdapterMode.NO_HEADER); stateChangedEvent = new Event<>(); setAuthenticated(true); } @@ -55,6 +60,18 @@ public abstract class TransportAdapterBase implements ITransportAdapter this.address = address; } + @Override + public void setAdapterMode(TransportAdapterMode mode) + { + adapterMode = mode; + } + + @Override + public TransportAdapterMode getAdapterMode() + { + return adapterMode; + } + @Override public boolean isAuthenticated() { @@ -155,4 +172,36 @@ public abstract class TransportAdapterBase implements ITransportAdapter setState(TransportComponentState.Failed); XLog.e(ex); } + + /** + * Applies any additional headers if required. + * + * @param data the data + * @return the byte [ ] + */ + protected byte[] postProcessBuffer(byte[] data) + { + if (adapterMode == TransportAdapterMode.NO_HEADER) + { + return data; + } else + { + byte[] size = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(data.length).array(); + return concatByteArrays(size, data); + } + } + + /** + * Concat byte arrays byte [ ]. + * + * @param first the first + * @param second the second + * @return the byte [ ] + */ + protected byte[] concatByteArrays(byte[] first, byte[] second) + { + byte[] result = Arrays.copyOf(first, first.length + second.length); + System.arraycopy(second, 0, result, first.length, second.length); + return result; + } } diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java new file mode 100644 index 000000000..699be758d --- /dev/null +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java @@ -0,0 +1,17 @@ +package com.twine.tango.transport; + + +/** + * Represents an {@link com.twine.tango.transport.ITransportAdapter} read/write mode. + */ +public enum TransportAdapterMode +{ + /** + * Pure protobuf message with no message size. + */ + NO_HEADER, + /** + * Prepends and truncates the message size before writing and reading. + */ + PREPEND_HEADER +} diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java index 3c09d547f..ff192bc3b 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java @@ -16,7 +16,6 @@ import com.twine.tango.pmr.MessageFactory; import com.twine.tango.pmr.TangoMessage; import com.twine.tango.pmr.common.ErrorCodeOuterClass.ErrorCode; import com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer; -import com.twine.tango.transport.transporters.ResponseErrorException; import org.joda.time.Period; @@ -414,13 +413,13 @@ public abstract class TransporterBase implements ITransporter } @Override - public Single sendResponse(TangoMessage response) + public Completable sendResponse(TangoMessage response) { return sendResponse(response, response.getContainer().getToken()); } @Override - public Single sendResponse(TangoMessage response, String token) + public Completable sendResponse(TangoMessage response, String token) { response.getContainer().setToken(token); @@ -459,7 +458,7 @@ public abstract class TransporterBase implements ITransporter TransportMessage message = new TransportMessage<>(adapter, token, response, TransportMessageDirection.Response, onSerializingMessage(response), subject); sendingQueue.add(message); - return subject.singleOrError(); + return subject.singleOrError().toCompletable(); } //endregion @@ -537,7 +536,7 @@ public abstract class TransporterBase implements ITransporter pendingRequests.add(message); } else { - message.setResult(true, true); + message.setResult(new Object(), true); } } catch (Exception ex) { @@ -613,8 +612,7 @@ public abstract class TransporterBase implements ITransporter } else { XLog.i("Parsing inner response message and setting pending request task result..."); - request.setException(new ResponseErrorException(container.getError())); - + request.setException(new ResponseErrorException(container.getError(), onParseTangoMessage(data.second))); } } catch (Exception ex) { @@ -638,7 +636,7 @@ public abstract class TransporterBase implements ITransporter } else { XLog.i("Response has returned with error: " + container.getError().toString()); - request.setException(new ResponseErrorException(container.getError())); + request.setException(new ResponseErrorException(container.getError(), onParseTangoMessage(data.second))); } } catch (Exception ex) { diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java index b72d1669b..f2e7a83a8 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java @@ -5,6 +5,7 @@ import android.os.SystemClock; import com.elvishew.xlog.XLog; import com.twine.tango.core.ObjectDisposedException; import com.twine.tango.transport.TransportAdapterBase; +import com.twine.tango.transport.TransportAdapterMode; import com.twine.tango.transport.TransportComponentState; import java.io.IOException; @@ -12,6 +13,8 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.Selector; import io.reactivex.Completable; @@ -36,6 +39,7 @@ public class TcpTransportAdapter extends TransportAdapterBase super(); setAddress("127.0.0.1"); setPort(9999); + setAdapterMode(TransportAdapterMode.PREPEND_HEADER); } public TcpTransportAdapter(String address, int port) @@ -144,7 +148,7 @@ public class TcpTransportAdapter extends TransportAdapterBase try { - socket.getOutputStream().write(data); + socket.getOutputStream().write(postProcessBuffer(data)); } catch (IOException e) { onFailed(e); @@ -167,39 +171,59 @@ public class TcpTransportAdapter extends TransportAdapterBase { InputStream stream = socket.getInputStream(); - if (stream.available() > 0) + if (getAdapterMode() == TransportAdapterMode.NO_HEADER) { - byte[] data = new byte[stream.available()]; - int read = stream.read(data); - - if (read == -1) + if (stream.available() > 0) { - throw new SocketException("Error reading from TCP adapter."); + byte[] data = new byte[stream.available()]; + int read = stream.read(data); + + if (read == -1) + { + throw new SocketException("Error reading from TCP adapter."); + } + + onDataAvailable(data); + } + } + else + { + if (stream.available() > 4) + { + byte[] size = new byte[4]; + + int read = stream.read(size, 0, size.length); + + if (read == -1) + { + throw new SocketException("Error reading from TCP adapter."); + } + + int expectedSize = ByteBuffer.wrap(size).order(ByteOrder.LITTLE_ENDIAN).getInt(); + + while (stream.available() < expectedSize) + { + SystemClock.sleep(10); + + if (getState() != TransportComponentState.Connected) + { + break; + } + } + + byte[] data = new byte[expectedSize]; + read = stream.read(data); + + if (read == -1) + { + throw new SocketException("Error reading from TCP adapter."); + } + + onDataAvailable(data); } - - onDataAvailable(data); } SystemClock.sleep(10); -// counter++; - -// if (counter > 200) -// { -// try -// { -// if (socket.getChannel().keyFor(selector).isWritable() && socket.getChannel().keyFor(selector).isReadable()) -// { -// onFailed(new SocketTimeoutException("Client disconnected.")); -// return; -// } -// } catch (Exception e) -// { -// onFailed(e); -// return; -// } -// -// counter = 0; -// } } } catch (Exception e) @@ -210,4 +234,20 @@ public class TcpTransportAdapter extends TransportAdapterBase } //endregion + + //region Private Methods + + private void setSocketProperties() + { + try + { + socket.setReceiveBufferSize(MAX_BUFFER_SIZE); + socket.setSendBufferSize(MAX_BUFFER_SIZE); + } catch (SocketException e) + { + e.printStackTrace(); + } + } + + //endregion } diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java index 8ad55708c..60efe976a 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java @@ -15,9 +15,13 @@ import com.felhr.usbserial.UsbSerialInterface; import com.twine.tango.core.ContextHelper; import com.twine.tango.core.ObjectDisposedException; import com.twine.tango.transport.TransportAdapterBase; +import com.twine.tango.transport.TransportAdapterMode; import com.twine.tango.transport.TransportComponentState; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; import java.util.Map; import io.reactivex.Completable; @@ -37,6 +41,8 @@ public class UsbTransportAdapter extends TransportAdapterBase private UsbManager usbManager; private boolean hasPermission; private PublishSubject connectionSubject; + private byte[] current_buffer; + private int current_buffer_bar; public UsbTransportAdapter() { @@ -51,7 +57,7 @@ public class UsbTransportAdapter extends TransportAdapterBase @Override public void write(byte[] data) throws ObjectDisposedException, IOException { - serial.write(data); + serial.write(postProcessBuffer(data)); } @Override @@ -126,7 +132,54 @@ public class UsbTransportAdapter extends TransportAdapterBase private void onUsbReceiveData(byte[] data) { - onDataAvailable(data); + if (getAdapterMode() == TransportAdapterMode.NO_HEADER) + { + onDataAvailable(data); + } else + { + if (current_buffer == null) + { + byte[] size = Arrays.copyOfRange(data, 0, 4); + int expectedSize = ByteBuffer.wrap(size).order(ByteOrder.LITTLE_ENDIAN).getInt(); + current_buffer = new byte[expectedSize]; + byte[] rest = Arrays.copyOfRange(data, 4, data.length); + if (rest.length == expectedSize) + { + current_buffer = null; + onDataAvailable(rest); + } else + { + current_buffer_bar = rest.length; + ByteBuffer.wrap(current_buffer).put(rest); + } + } else + { + int remaining = current_buffer.length - current_buffer_bar; + + if (data.length < remaining) + { + current_buffer_bar += data.length; + ByteBuffer.wrap(current_buffer).put(data); + } else if (data.length > remaining) + { + byte[] end = Arrays.copyOfRange(data, 0, remaining); + ByteBuffer.wrap(current_buffer).put(end); + onDataAvailable(current_buffer); + current_buffer = null; + current_buffer_bar = 0; + byte[] rest = Arrays.copyOfRange(data, remaining, data.length); + onUsbReceiveData(rest); + } + else //Equals + { + byte[] end = Arrays.copyOfRange(data, 0, remaining); + ByteBuffer.wrap(current_buffer).put(end); + onDataAvailable(current_buffer); + current_buffer = null; + current_buffer_bar = 0; + } + } + } } @Override diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/transporters/ResponseErrorException.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/transporters/ResponseErrorException.java deleted file mode 100644 index 465412f68..000000000 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/transporters/ResponseErrorException.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.twine.tango.transport.transporters; - -import com.twine.tango.pmr.common.ErrorCodeOuterClass.ErrorCode; - - -/** - * Represents a tango message response exception which indicates a response container with {@link ErrorCode} different from None. - */ -public class ResponseErrorException extends Exception -{ - private ErrorCode error; - - /** - * Gets the error. - * - * @return the error - */ - public ErrorCode getError() - { - return error; - } - - /** - * Sets the error. - * - * @param error the error - */ - public void setError(ErrorCode error) - { - this.error = error; - } - - /** - * Instantiates a new Response error exception. - * - * @param error the error - */ - public ResponseErrorException(ErrorCode error) - { - super("Response received with error " + error.toString()); - setError(error); - } -} -- cgit v1.3.1