aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Android_Studio/Tango.Transport
diff options
context:
space:
mode:
authorRoy <roy.mail.net@gmail.com>2017-12-26 21:16:15 +0200
committerRoy <roy.mail.net@gmail.com>2017-12-26 21:16:15 +0200
commit2ea2bb5bcd96045f1bd6cb4c3d8b8416dbaa05dc (patch)
treea21ff27fff08876e835df82c5242def1f0d09c17 /Software/Android_Studio/Tango.Transport
parent6450fc175114a6f8d0b75cb21386d1bb0c902711 (diff)
downloadTango-2ea2bb5bcd96045f1bd6cb4c3d8b8416dbaa05dc.tar.gz
Tango-2ea2bb5bcd96045f1bd6cb4c3d8b8416dbaa05dc.zip
MERGE
Diffstat (limited to 'Software/Android_Studio/Tango.Transport')
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java14
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java5
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ResponseErrorException.java (renamed from Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/transporters/ResponseErrorException.java)18
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java51
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java17
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java14
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java96
-rw-r--r--Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java57
8 files changed, 229 insertions, 43 deletions
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java
index abd12e0f0..2e076b1c8 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransportAdapter.java
@@ -40,6 +40,20 @@ public interface ITransportAdapter extends ITransportComponent {
*/
void setAddress(String address);
+ /**
+ * Sets the adapter mode.
+ *
+ * @param mode the mode
+ */
+ void setAdapterMode(TransportAdapterMode mode);
+
+ /**
+ * Gets the adapter mode.
+ *
+ * @return the adapter mode
+ */
+ TransportAdapterMode getAdapterMode();
+
/**
* Gets whether this adapter is authenticated.
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java
index 9d2bce329..1ad704829 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ITransporter.java
@@ -8,6 +8,7 @@ import com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer;
import org.joda.time.Period;
+import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.Single;
@@ -73,7 +74,7 @@ public interface ITransporter extends ITransportComponent {
* @param response the response
* @return the single
*/
- <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response);
+ <Response extends GeneratedMessageV3> Completable sendResponse(TangoMessage<Response> response);
/**
* Sends a response with the specified request token.
@@ -83,7 +84,7 @@ public interface ITransporter extends ITransportComponent {
* @param token the token
* @return the single
*/
- <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response,String token);
+ <Response extends GeneratedMessageV3> Completable sendResponse(TangoMessage<Response> response,String token);
/**
* Sets request received listener.
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/transporters/ResponseErrorException.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ResponseErrorException.java
index 465412f68..896e92b38 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/transporters/ResponseErrorException.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/ResponseErrorException.java
@@ -1,5 +1,6 @@
-package com.twine.tango.transport.transporters;
+package com.twine.tango.transport;
+import com.twine.tango.pmr.TangoMessage;
import com.twine.tango.pmr.common.ErrorCodeOuterClass.ErrorCode;
@@ -10,6 +11,18 @@ public class ResponseErrorException extends Exception
{
private ErrorCode error;
+ private TangoMessage response;
+
+ public TangoMessage getResponse()
+ {
+ return response;
+ }
+
+ public void setResponse(TangoMessage response)
+ {
+ this.response = response;
+ }
+
/**
* Gets the error.
*
@@ -35,9 +48,10 @@ public class ResponseErrorException extends Exception
*
* @param error the error
*/
- public ResponseErrorException(ErrorCode error)
+ public ResponseErrorException(ErrorCode error,TangoMessage response)
{
super("Response received with error " + error.toString());
setError(error);
+ setResponse(response);
}
}
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java
index b9221ea27..6d5667ada 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java
@@ -6,6 +6,9 @@ import com.twine.tango.core.IEventHandler;
import com.twine.tango.core.ObjectDisposedException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
import io.reactivex.Completable;
@@ -15,18 +18,20 @@ import io.reactivex.Completable;
*/
public abstract class TransportAdapterBase implements ITransportAdapter
{
-
+ protected static final int MAX_BUFFER_SIZE = 1024 * 1024 * 10; //10 MB.
private boolean isAuthenticated;
private String address;
private Event<TransportComponentState> stateChangedEvent;
private IEventHandler<byte[]> dataAvailableListener;
private TransportComponentState state;
+ private TransportAdapterMode adapterMode;
/**
* Instantiates a new Transport adapter base.
*/
public TransportAdapterBase()
{
+ setAdapterMode(TransportAdapterMode.NO_HEADER);
stateChangedEvent = new Event<>();
setAuthenticated(true);
}
@@ -56,6 +61,18 @@ public abstract class TransportAdapterBase implements ITransportAdapter
}
@Override
+ public void setAdapterMode(TransportAdapterMode mode)
+ {
+ adapterMode = mode;
+ }
+
+ @Override
+ public TransportAdapterMode getAdapterMode()
+ {
+ return adapterMode;
+ }
+
+ @Override
public boolean isAuthenticated()
{
return isAuthenticated;
@@ -155,4 +172,36 @@ public abstract class TransportAdapterBase implements ITransportAdapter
setState(TransportComponentState.Failed);
XLog.e(ex);
}
+
+ /**
+ * Applies any additional headers if required.
+ *
+ * @param data the data
+ * @return the byte [ ]
+ */
+ protected byte[] postProcessBuffer(byte[] data)
+ {
+ if (adapterMode == TransportAdapterMode.NO_HEADER)
+ {
+ return data;
+ } else
+ {
+ byte[] size = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(data.length).array();
+ return concatByteArrays(size, data);
+ }
+ }
+
+ /**
+ * Concat byte arrays byte [ ].
+ *
+ * @param first the first
+ * @param second the second
+ * @return the byte [ ]
+ */
+ protected byte[] concatByteArrays(byte[] first, byte[] second)
+ {
+ byte[] result = Arrays.copyOf(first, first.length + second.length);
+ System.arraycopy(second, 0, result, first.length, second.length);
+ return result;
+ }
}
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java
new file mode 100644
index 000000000..699be758d
--- /dev/null
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterMode.java
@@ -0,0 +1,17 @@
+package com.twine.tango.transport;
+
+
+/**
+ * Represents an {@link com.twine.tango.transport.ITransportAdapter} read/write mode.
+ */
+public enum TransportAdapterMode
+{
+ /**
+ * Pure protobuf message with no message size.
+ */
+ NO_HEADER,
+ /**
+ * Prepends and truncates the message size before writing and reading.
+ */
+ PREPEND_HEADER
+}
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java
index 3c09d547f..ff192bc3b 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java
@@ -16,7 +16,6 @@ import com.twine.tango.pmr.MessageFactory;
import com.twine.tango.pmr.TangoMessage;
import com.twine.tango.pmr.common.ErrorCodeOuterClass.ErrorCode;
import com.twine.tango.pmr.common.MessageContainerOuterClass.MessageContainer;
-import com.twine.tango.transport.transporters.ResponseErrorException;
import org.joda.time.Period;
@@ -414,13 +413,13 @@ public abstract class TransporterBase implements ITransporter
}
@Override
- public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response)
+ public <Response extends GeneratedMessageV3> Completable sendResponse(TangoMessage<Response> response)
{
return sendResponse(response, response.getContainer().getToken());
}
@Override
- public <Response extends GeneratedMessageV3> Single<Response> sendResponse(TangoMessage<Response> response, String token)
+ public <Response extends GeneratedMessageV3> Completable sendResponse(TangoMessage<Response> response, String token)
{
response.getContainer().setToken(token);
@@ -459,7 +458,7 @@ public abstract class TransporterBase implements ITransporter
TransportMessage<Response> message = new TransportMessage<>(adapter, token, response, TransportMessageDirection.Response, onSerializingMessage(response), subject);
sendingQueue.add(message);
- return subject.singleOrError();
+ return subject.singleOrError().toCompletable();
}
//endregion
@@ -537,7 +536,7 @@ public abstract class TransporterBase implements ITransporter
pendingRequests.add(message);
} else
{
- message.setResult(true, true);
+ message.setResult(new Object(), true);
}
} catch (Exception ex)
{
@@ -613,8 +612,7 @@ public abstract class TransporterBase implements ITransporter
} else
{
XLog.i("Parsing inner response message and setting pending request task result...");
- request.setException(new ResponseErrorException(container.getError()));
-
+ request.setException(new ResponseErrorException(container.getError(), onParseTangoMessage(data.second)));
}
} catch (Exception ex)
{
@@ -638,7 +636,7 @@ public abstract class TransporterBase implements ITransporter
} else
{
XLog.i("Response has returned with error: " + container.getError().toString());
- request.setException(new ResponseErrorException(container.getError()));
+ request.setException(new ResponseErrorException(container.getError(), onParseTangoMessage(data.second)));
}
} catch (Exception ex)
{
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java
index b72d1669b..f2e7a83a8 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java
@@ -5,6 +5,7 @@ import android.os.SystemClock;
import com.elvishew.xlog.XLog;
import com.twine.tango.core.ObjectDisposedException;
import com.twine.tango.transport.TransportAdapterBase;
+import com.twine.tango.transport.TransportAdapterMode;
import com.twine.tango.transport.TransportComponentState;
import java.io.IOException;
@@ -12,6 +13,8 @@ import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.channels.Selector;
import io.reactivex.Completable;
@@ -36,6 +39,7 @@ public class TcpTransportAdapter extends TransportAdapterBase
super();
setAddress("127.0.0.1");
setPort(9999);
+ setAdapterMode(TransportAdapterMode.PREPEND_HEADER);
}
public TcpTransportAdapter(String address, int port)
@@ -144,7 +148,7 @@ public class TcpTransportAdapter extends TransportAdapterBase
try
{
- socket.getOutputStream().write(data);
+ socket.getOutputStream().write(postProcessBuffer(data));
} catch (IOException e)
{
onFailed(e);
@@ -167,39 +171,59 @@ public class TcpTransportAdapter extends TransportAdapterBase
{
InputStream stream = socket.getInputStream();
- if (stream.available() > 0)
+ if (getAdapterMode() == TransportAdapterMode.NO_HEADER)
{
- byte[] data = new byte[stream.available()];
- int read = stream.read(data);
-
- if (read == -1)
+ if (stream.available() > 0)
{
- throw new SocketException("Error reading from TCP adapter.");
+ byte[] data = new byte[stream.available()];
+ int read = stream.read(data);
+
+ if (read == -1)
+ {
+ throw new SocketException("Error reading from TCP adapter.");
+ }
+
+ onDataAvailable(data);
+ }
+ }
+ else
+ {
+ if (stream.available() > 4)
+ {
+ byte[] size = new byte[4];
+
+ int read = stream.read(size, 0, size.length);
+
+ if (read == -1)
+ {
+ throw new SocketException("Error reading from TCP adapter.");
+ }
+
+ int expectedSize = ByteBuffer.wrap(size).order(ByteOrder.LITTLE_ENDIAN).getInt();
+
+ while (stream.available() < expectedSize)
+ {
+ SystemClock.sleep(10);
+
+ if (getState() != TransportComponentState.Connected)
+ {
+ break;
+ }
+ }
+
+ byte[] data = new byte[expectedSize];
+ read = stream.read(data);
+
+ if (read == -1)
+ {
+ throw new SocketException("Error reading from TCP adapter.");
+ }
+
+ onDataAvailable(data);
}
-
- onDataAvailable(data);
}
SystemClock.sleep(10);
-// counter++;
-
-// if (counter > 200)
-// {
-// try
-// {
-// if (socket.getChannel().keyFor(selector).isWritable() && socket.getChannel().keyFor(selector).isReadable())
-// {
-// onFailed(new SocketTimeoutException("Client disconnected."));
-// return;
-// }
-// } catch (Exception e)
-// {
-// onFailed(e);
-// return;
-// }
-//
-// counter = 0;
-// }
}
} catch (Exception e)
@@ -210,4 +234,20 @@ public class TcpTransportAdapter extends TransportAdapterBase
}
//endregion
+
+ //region Private Methods
+
+ private void setSocketProperties()
+ {
+ try
+ {
+ socket.setReceiveBufferSize(MAX_BUFFER_SIZE);
+ socket.setSendBufferSize(MAX_BUFFER_SIZE);
+ } catch (SocketException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ //endregion
}
diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java
index 8ad55708c..60efe976a 100644
--- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java
+++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/UsbTransportAdapter.java
@@ -15,9 +15,13 @@ import com.felhr.usbserial.UsbSerialInterface;
import com.twine.tango.core.ContextHelper;
import com.twine.tango.core.ObjectDisposedException;
import com.twine.tango.transport.TransportAdapterBase;
+import com.twine.tango.transport.TransportAdapterMode;
import com.twine.tango.transport.TransportComponentState;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
import java.util.Map;
import io.reactivex.Completable;
@@ -37,6 +41,8 @@ public class UsbTransportAdapter extends TransportAdapterBase
private UsbManager usbManager;
private boolean hasPermission;
private PublishSubject connectionSubject;
+ private byte[] current_buffer;
+ private int current_buffer_bar;
public UsbTransportAdapter()
{
@@ -51,7 +57,7 @@ public class UsbTransportAdapter extends TransportAdapterBase
@Override
public void write(byte[] data) throws ObjectDisposedException, IOException
{
- serial.write(data);
+ serial.write(postProcessBuffer(data));
}
@Override
@@ -126,7 +132,54 @@ public class UsbTransportAdapter extends TransportAdapterBase
private void onUsbReceiveData(byte[] data)
{
- onDataAvailable(data);
+ if (getAdapterMode() == TransportAdapterMode.NO_HEADER)
+ {
+ onDataAvailable(data);
+ } else
+ {
+ if (current_buffer == null)
+ {
+ byte[] size = Arrays.copyOfRange(data, 0, 4);
+ int expectedSize = ByteBuffer.wrap(size).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ current_buffer = new byte[expectedSize];
+ byte[] rest = Arrays.copyOfRange(data, 4, data.length);
+ if (rest.length == expectedSize)
+ {
+ current_buffer = null;
+ onDataAvailable(rest);
+ } else
+ {
+ current_buffer_bar = rest.length;
+ ByteBuffer.wrap(current_buffer).put(rest);
+ }
+ } else
+ {
+ int remaining = current_buffer.length - current_buffer_bar;
+
+ if (data.length < remaining)
+ {
+ current_buffer_bar += data.length;
+ ByteBuffer.wrap(current_buffer).put(data);
+ } else if (data.length > remaining)
+ {
+ byte[] end = Arrays.copyOfRange(data, 0, remaining);
+ ByteBuffer.wrap(current_buffer).put(end);
+ onDataAvailable(current_buffer);
+ current_buffer = null;
+ current_buffer_bar = 0;
+ byte[] rest = Arrays.copyOfRange(data, remaining, data.length);
+ onUsbReceiveData(rest);
+ }
+ else //Equals
+ {
+ byte[] end = Arrays.copyOfRange(data, 0, remaining);
+ ByteBuffer.wrap(current_buffer).put(end);
+ onDataAvailable(current_buffer);
+ current_buffer = null;
+ current_buffer_bar = 0;
+ }
+ }
+ }
}
@Override