From 8b4e9bff64d712c3dc35a05278fddbbf7814c2ad Mon Sep 17 00:00:00 2001 From: Roy Ben-Shabat Date: Wed, 27 Dec 2017 19:13:23 +0200 Subject: Fixed an issue with keep alive blocking pulling thread. Implemented the new TCP reading for java. Modified accuracy of date time on SQLServer. --- .../tango/transport/TransportAdapterBase.java | 2 +- .../com/twine/tango/transport/TransporterBase.java | 8 ++-- .../transport/adapters/TcpTransportAdapter.java | 49 ++++++++++++---------- 3 files changed, 31 insertions(+), 28 deletions(-) (limited to 'Software/Android_Studio') diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java index 6d5667ada..e3335ad92 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransportAdapterBase.java @@ -18,7 +18,7 @@ import io.reactivex.Completable; */ public abstract class TransportAdapterBase implements ITransportAdapter { - protected static final int MAX_BUFFER_SIZE = 1024 * 1024 * 10; //10 MB. + protected static final int MAX_BUFFER_SIZE = 1024; //10 MB. private boolean isAuthenticated; private String address; private Event stateChangedEvent; diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java index 5b410d22c..92ecf1220 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/TransporterBase.java @@ -735,11 +735,11 @@ public abstract class TransporterBase implements ITransporter if (container.getType() == MessageType.KeepAliveRequest) { XLog.i("Submitting keep alive response..."); - sendResponse(MessageFactory.createTangoMessage(KeepAliveResponse.class, container.getToken())).subscribe(() -> + sendResponse(MessageFactory.createTangoMessage(KeepAliveResponse.class, container.getToken())).observeOn(Schedulers.newThread()).subscribe(() -> { }, (ex) -> { - ex.printStackTrace(); + //ex.printStackTrace(); }); } else @@ -775,7 +775,7 @@ public abstract class TransporterBase implements ITransporter while (state == TransportComponentState.Connected) { - this.sendRequest(MessageFactory.createTangoMessage(KeepAliveRequest.class), Period.seconds(10)).subscribe((x) -> + this.sendRequest(MessageFactory.createTangoMessage(KeepAliveRequest.class), Period.seconds(5)).subscribe((x) -> { }, (ex) -> @@ -787,7 +787,7 @@ public abstract class TransporterBase implements ITransporter }); - SystemClock.sleep(10000); + SystemClock.sleep(5000); } } diff --git a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java index f2e7a83a8..bbec3501a 100644 --- a/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java +++ b/Software/Android_Studio/Tango.Transport/src/main/java/com/twine/tango/transport/adapters/TcpTransportAdapter.java @@ -100,7 +100,8 @@ public class TcpTransportAdapter extends TransportAdapterBase XLog.i("TCP adapter connected..."); x.onComplete(); } - } catch (Exception e) + } + catch (Exception e) { XLog.e("Could not connect the TCP adapter."); x.onError(e); @@ -123,7 +124,8 @@ public class TcpTransportAdapter extends TransportAdapterBase { socket.getOutputStream().close(); socket.close(); - } catch (Exception e) + } + catch (Exception e) { //Ignore further disposing attempts failure. } @@ -131,7 +133,8 @@ public class TcpTransportAdapter extends TransportAdapterBase XLog.i("TCP adapter disconnected."); x.onComplete(); } - } catch (IOException e) + } + catch (IOException e) { XLog.e("Could not disconnect the TCP adapter."); XLog.e(e); @@ -149,7 +152,8 @@ public class TcpTransportAdapter extends TransportAdapterBase try { socket.getOutputStream().write(postProcessBuffer(data)); - } catch (IOException e) + } + catch (IOException e) { onFailed(e); } @@ -177,12 +181,12 @@ public class TcpTransportAdapter extends TransportAdapterBase { byte[] data = new byte[stream.available()]; int read = stream.read(data); - + if (read == -1) { throw new SocketException("Error reading from TCP adapter."); } - + onDataAvailable(data); } } @@ -191,34 +195,31 @@ public class TcpTransportAdapter extends TransportAdapterBase if (stream.available() > 4) { byte[] size = new byte[4]; - + int read = stream.read(size, 0, size.length); - + if (read == -1) { throw new SocketException("Error reading from TCP adapter."); } - + int expectedSize = ByteBuffer.wrap(size).order(ByteOrder.LITTLE_ENDIAN).getInt(); - - while (stream.available() < expectedSize) + + byte[] data = new byte[expectedSize]; + read = 0; + + while (read < expectedSize) { - SystemClock.sleep(10); + read += stream.read(data, read, Math.min(stream.available(), expectedSize - read)); + + SystemClock.sleep(2); if (getState() != TransportComponentState.Connected) { break; } } - - byte[] data = new byte[expectedSize]; - read = stream.read(data); - - if (read == -1) - { - throw new SocketException("Error reading from TCP adapter."); - } - + onDataAvailable(data); } } @@ -226,7 +227,8 @@ public class TcpTransportAdapter extends TransportAdapterBase SystemClock.sleep(10); } - } catch (Exception e) + } + catch (Exception e) { onFailed(e); } @@ -243,7 +245,8 @@ public class TcpTransportAdapter extends TransportAdapterBase { socket.setReceiveBufferSize(MAX_BUFFER_SIZE); socket.setSendBufferSize(MAX_BUFFER_SIZE); - } catch (SocketException e) + } + catch (SocketException e) { e.printStackTrace(); } -- cgit v1.3.1