diff options
| author | Roy <roy.mail.net@gmail.com> | 2017-12-26 21:16:15 +0200 |
|---|---|---|
| committer | Roy <roy.mail.net@gmail.com> | 2017-12-26 21:16:15 +0200 |
| commit | 2ea2bb5bcd96045f1bd6cb4c3d8b8416dbaa05dc (patch) | |
| tree | a21ff27fff08876e835df82c5242def1f0d09c17 /Software/Android_Studio/Tango.Transport | |
| parent | 6450fc175114a6f8d0b75cb21386d1bb0c902711 (diff) | |
| download | Tango-2ea2bb5bcd96045f1bd6cb4c3d8b8416dbaa05dc.tar.gz Tango-2ea2bb5bcd96045f1bd6cb4c3d8b8416dbaa05dc.zip | |
MERGE
Diffstat (limited to 'Software/Android_Studio/Tango.Transport')
8 files changed, 229 insertions, 43 deletions
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java index abd12e0f0..2e076b1c8 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java @@ -40,6 +40,20 @@ public interface ITransportAdapter extends ITransportComponent { */ void setAddress(String address); + /** + * Sets the adapter mode. + * + * @param mode the mode + */ + void setAdapterMode(TransportAdapterMode mode); + + /** + * Gets the adapter mode. + * + * @return the adapter mode + */ + TransportAdapterMode getAdapterMode(); + /** * Gets whether this adapter is authenticated. diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java index 9d2bce329..1ad704829 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java @@ -8,6 +8,7 @@ import com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer; import org.joda.time.Period; +import io.reactivex.Completable; import io.reactivex.Observable; import io.reactivex.Single; @@ -73,7 +74,7 @@ public interface ITransporter extends ITransportComponent { * @param response the response * @return the single */ - <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response); + <Response extends GeneratedMessageV3> Completable sendResponse(TangoMessage<Response> response); /** * Sends a response with the specified request token. @@ -83,7 +84,7 @@ public interface ITransporter extends ITransportComponent { * @param token the token * @return the single */ - <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response,String token); + <Response extends GeneratedMessageV3> Completable sendResponse(TangoMessage<Response> response,String token); /** * Sets request received listener. diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/transporters/ResponseErrorException.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ResponseErrorException.java index 465412f68..896e92b38 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/transporters/ResponseErrorException.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ResponseErrorException.java @@ -1,5 +1,6 @@ -package com.twine.tango.transport.transporters; +package com.twine.tango.transport; +import com.twine.tango.pmr.TangoMessage; import com.twine.tango.pmr.common.ErrorCodeOuterClass.ErrorCode; @@ -10,6 +11,18 @@ public class ResponseErrorException extends Exception { private ErrorCode error; + private TangoMessage response; + + public TangoMessage getResponse() + { + return response; + } + + public void setResponse(TangoMessage response) + { + this.response = response; + } + /** * Gets the error. * @@ -35,9 +48,10 @@ public class ResponseErrorException extends Exception * * @param error the error */ - public ResponseErrorException(ErrorCode error) + public ResponseErrorException(ErrorCode error,TangoMessage response) { super("Response received with error " + error.toString()); setError(error); + setResponse(response); } } diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java index b9221ea27..6d5667ada 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java @@ -6,6 +6,9 @@ import com.twine.tango.core.IEventHandler; import com.twine.tango.core.ObjectDisposedException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; import io.reactivex.Completable; @@ -15,18 +18,20 @@ import io.reactivex.Completable; */ public abstract class TransportAdapterBase implements ITransportAdapter { - + protected static final int MAX_BUFFER_SIZE = 1024 * 1024 * 10; //10 MB. private boolean isAuthenticated; private String address; private Event<TransportComponentState> stateChangedEvent; private IEventHandler<byte[]> dataAvailableListener; private TransportComponentState state; + private TransportAdapterMode adapterMode; /** * Instantiates a new Transport adapter base. */ public TransportAdapterBase() { + setAdapterMode(TransportAdapterMode.NO_HEADER); stateChangedEvent = new Event<>(); setAuthenticated(true); } @@ -56,6 +61,18 @@ public abstract class TransportAdapterBase implements ITransportAdapter } @Override + public void setAdapterMode(TransportAdapterMode mode) + { + adapterMode = mode; + } + + @Override + public TransportAdapterMode getAdapterMode() + { + return adapterMode; + } + + @Override public boolean isAuthenticated() { return isAuthenticated; @@ -155,4 +172,36 @@ public abstract class TransportAdapterBase implements ITransportAdapter setState(TransportComponentState.Failed); XLog.e(ex); } + + /** + * Applies any additional headers if required. + * + * @param data the data + * @return the byte [ ] + */ + protected byte[] postProcessBuffer(byte[] data) + { + if (adapterMode == TransportAdapterMode.NO_HEADER) + { + return data; + } else + { + byte[] size = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(data.length).array(); + return concatByteArrays(size, data); + } + } + + /** + * Concat byte arrays byte [ ]. + * + * @param first the first + * @param second the second + * @return the byte [ ] + */ + protected byte[] concatByteArrays(byte[] first, byte[] second) + { + byte[] result = Arrays.copyOf(first, first.length + second.length); + System.arraycopy(second, 0, result, first.length, second.length); + return result; + } } diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java new file mode 100644 index 000000000..699be758d --- /dev/null +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java @@ -0,0 +1,17 @@ +package com.twine.tango.transport; + + +/** + * Represents an {@link com.twine.tango.transport.ITransportAdapter} read/write mode. + */ +public enum TransportAdapterMode +{ + /** + * Pure protobuf message with no message size. + */ + NO_HEADER, + /** + * Prepends and truncates the message size before writing and reading. + */ + PREPEND_HEADER +} diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java index 3c09d547f..ff192bc3b 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java @@ -16,7 +16,6 @@ import com.twine.tango.pmr.MessageFactory; import com.twine.tango.pmr.TangoMessage; import com.twine.tango.pmr.common.ErrorCodeOuterClass.ErrorCode; import com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer; -import com.twine.tango.transport.transporters.ResponseErrorException; import org.joda.time.Period; @@ -414,13 +413,13 @@ public abstract class TransporterBase implements ITransporter } @Override - public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response) + public <Response extends GeneratedMessageV3> Completable sendResponse(TangoMessage<Response> response) { return sendResponse(response, response.getContainer().getToken()); } @Override - public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response, String token) + public <Response extends GeneratedMessageV3> Completable sendResponse(TangoMessage<Response> response, String token) { response.getContainer().setToken(token); @@ -459,7 +458,7 @@ public abstract class TransporterBase implements ITransporter TransportMessage<Response> message = new TransportMessage<>(adapter, token, response, TransportMessageDirection.Response, onSerializingMessage(response), subject); sendingQueue.add(message); - return subject.singleOrError(); + return subject.singleOrError().toCompletable(); } //endregion @@ -537,7 +536,7 @@ public abstract class TransporterBase implements ITransporter pendingRequests.add(message); } else { - message.setResult(true, true); + message.setResult(new Object(), true); } } catch (Exception ex) { @@ -613,8 +612,7 @@ public abstract class TransporterBase implements ITransporter } else { XLog.i("Parsing inner response message and setting pending request task result..."); - request.setException(new ResponseErrorException(container.getError())); - + request.setException(new ResponseErrorException(container.getError(), onParseTangoMessage(data.second))); } } catch (Exception ex) { @@ -638,7 +636,7 @@ public abstract class TransporterBase implements ITransporter } else { XLog.i("Response has returned with error: " + container.getError().toString()); - request.setException(new ResponseErrorException(container.getError())); + request.setException(new ResponseErrorException(container.getError(), onParseTangoMessage(data.second))); } } catch (Exception ex) { diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java index b72d1669b..f2e7a83a8 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java @@ -5,6 +5,7 @@ import android.os.SystemClock; import com.elvishew.xlog.XLog; import com.twine.tango.core.ObjectDisposedException; import com.twine.tango.transport.TransportAdapterBase; +import com.twine.tango.transport.TransportAdapterMode; import com.twine.tango.transport.TransportComponentState; import java.io.IOException; @@ -12,6 +13,8 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.Selector; import io.reactivex.Completable; @@ -36,6 +39,7 @@ public class TcpTransportAdapter extends TransportAdapterBase super(); setAddress("127.0.0.1"); setPort(9999); + setAdapterMode(TransportAdapterMode.PREPEND_HEADER); } public TcpTransportAdapter(String address, int port) @@ -144,7 +148,7 @@ public class TcpTransportAdapter extends TransportAdapterBase try { - socket.getOutputStream().write(data); + socket.getOutputStream().write(postProcessBuffer(data)); } catch (IOException e) { onFailed(e); @@ -167,39 +171,59 @@ public class TcpTransportAdapter extends TransportAdapterBase { InputStream stream = socket.getInputStream(); - if (stream.available() > 0) + if (getAdapterMode() == TransportAdapterMode.NO_HEADER) { - byte[] data = new byte[stream.available()]; - int read = stream.read(data); - - if (read == -1) + if (stream.available() > 0) { - throw new SocketException("Error reading from TCP adapter."); + byte[] data = new byte[stream.available()]; + int read = stream.read(data); + + if (read == -1) + { + throw new SocketException("Error reading from TCP adapter."); + } + + onDataAvailable(data); + } + } + else + { + if (stream.available() > 4) + { + byte[] size = new byte[4]; + + int read = stream.read(size, 0, size.length); + + if (read == -1) + { + throw new SocketException("Error reading from TCP adapter."); + } + + int expectedSize = ByteBuffer.wrap(size).order(ByteOrder.LITTLE_ENDIAN).getInt(); + + while (stream.available() < expectedSize) + { + SystemClock.sleep(10); + + if (getState() != TransportComponentState.Connected) + { + break; + } + } + + byte[] data = new byte[expectedSize]; + read = stream.read(data); + + if (read == -1) + { + throw new SocketException("Error reading from TCP adapter."); + } + + onDataAvailable(data); } - - onDataAvailable(data); } SystemClock.sleep(10); -// counter++; - -// if (counter > 200) -// { -// try -// { -// if (socket.getChannel().keyFor(selector).isWritable() && socket.getChannel().keyFor(selector).isReadable()) -// { -// onFailed(new SocketTimeoutException("Client disconnected.")); -// return; -// } -// } catch (Exception e) -// { -// onFailed(e); -// return; -// } -// -// counter = 0; -// } } } catch (Exception e) @@ -210,4 +234,20 @@ public class TcpTransportAdapter extends TransportAdapterBase } //endregion + + //region Private Methods + + private void setSocketProperties() + { + try + { + socket.setReceiveBufferSize(MAX_BUFFER_SIZE); + socket.setSendBufferSize(MAX_BUFFER_SIZE); + } catch (SocketException e) + { + e.printStackTrace(); + } + } + + //endregion } diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java index 8ad55708c..60efe976a 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java @@ -15,9 +15,13 @@ import com.felhr.usbserial.UsbSerialInterface; import com.twine.tango.core.ContextHelper; import com.twine.tango.core.ObjectDisposedException; import com.twine.tango.transport.TransportAdapterBase; +import com.twine.tango.transport.TransportAdapterMode; import com.twine.tango.transport.TransportComponentState; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; import java.util.Map; import io.reactivex.Completable; @@ -37,6 +41,8 @@ public class UsbTransportAdapter extends TransportAdapterBase private UsbManager usbManager; private boolean hasPermission; private PublishSubject connectionSubject; + private byte[] current_buffer; + private int current_buffer_bar; public UsbTransportAdapter() { @@ -51,7 +57,7 @@ public class UsbTransportAdapter extends TransportAdapterBase @Override public void write(byte[] data) throws ObjectDisposedException, IOException { - serial.write(data); + serial.write(postProcessBuffer(data)); } @Override @@ -126,7 +132,54 @@ public class UsbTransportAdapter extends TransportAdapterBase private void onUsbReceiveData(byte[] data) { - onDataAvailable(data); + if (getAdapterMode() == TransportAdapterMode.NO_HEADER) + { + onDataAvailable(data); + } else + { + if (current_buffer == null) + { + byte[] size = Arrays.copyOfRange(data, 0, 4); + int expectedSize = ByteBuffer.wrap(size).order(ByteOrder.LITTLE_ENDIAN).getInt(); + current_buffer = new byte[expectedSize]; + byte[] rest = Arrays.copyOfRange(data, 4, data.length); + if (rest.length == expectedSize) + { + current_buffer = null; + onDataAvailable(rest); + } else + { + current_buffer_bar = rest.length; + ByteBuffer.wrap(current_buffer).put(rest); + } + } else + { + int remaining = current_buffer.length - current_buffer_bar; + + if (data.length < remaining) + { + current_buffer_bar += data.length; + ByteBuffer.wrap(current_buffer).put(data); + } else if (data.length > remaining) + { + byte[] end = Arrays.copyOfRange(data, 0, remaining); + ByteBuffer.wrap(current_buffer).put(end); + onDataAvailable(current_buffer); + current_buffer = null; + current_buffer_bar = 0; + byte[] rest = Arrays.copyOfRange(data, remaining, data.length); + onUsbReceiveData(rest); + } + else //Equals + { + byte[] end = Arrays.copyOfRange(data, 0, remaining); + ByteBuffer.wrap(current_buffer).put(end); + onDataAvailable(current_buffer); + current_buffer = null; + current_buffer_bar = 0; + } + } + } } @Override |
