diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2020-08-22 05:15:57 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2020-08-22 05:15:57 +0300 |
| commit | a9b1c4cb5c4d945565e72e80ac0f938a49da3e5f (patch) | |
| tree | 739307651882bce538653dc76c58c546b2d87699 /Software/Visual_Studio/Tango.Transport | |
| parent | 7bd70fcb311c808b65b62e774755dcbd6b0d63cd (diff) | |
| download | Tango-a9b1c4cb5c4d945565e72e80ac0f938a49da3e5f.tar.gz Tango-a9b1c4cb5c4d945565e72e80ac0f938a49da3e5f.zip | |
Implemented pre-connection protocol configuration.
Improved error handling across transport layer.
Improved unexpected app crash for PPC.
Improved transporter request received handlers tunneling.
Diffstat (limited to 'Software/Visual_Studio/Tango.Transport')
8 files changed, 164 insertions, 77 deletions
diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs index f2692426f..c9a2453f0 100644 --- a/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs +++ b/Software/Visual_Studio/Tango.Transport/Adapters/SignalRTransportAdapter.cs @@ -115,7 +115,7 @@ namespace Tango.Transport.Adapters } catch (Exception ex) { - OnFailed(LogManager.Log(ex, $"Error writing to SignalR adapter ({Address}).")); + OnFailed(LogManager.Log(ex, $"{ComponentName}: Error writing to SignalR adapter ({Address}).")); return; } } @@ -129,7 +129,7 @@ namespace Tango.Transport.Adapters { if (State != TransportComponentState.Connected) { - LogManager.Log("Connecting SignalR adapter..."); + LogManager.Log($"{ComponentName}: Connecting SignalR adapter..."); bool completed = false; @@ -158,8 +158,8 @@ namespace Tango.Transport.Adapters { completed = true; - LogManager.Log($"SignalR adapter session created ({SessionID})..."); - LogManager.Log("SingalR adapter connected."); + LogManager.Log($"{ComponentName}: SignalR adapter session created ({SessionID})..."); + LogManager.Log($"{ComponentName}: SingalR adapter connected."); State = TransportComponentState.Connected; StartPushThread(); @@ -171,7 +171,7 @@ namespace Tango.Transport.Adapters { if (!completed) { - LogManager.Log(ex, "Error occurred after session created."); + LogManager.Log(ex, $"{ComponentName}: Error occurred after session created."); completed = true; completionSource.SetException(ex); } @@ -187,14 +187,14 @@ namespace Tango.Transport.Adapters { if (Mode == SignalRTransportAdapterMode.CreateSession) { - LogManager.Log("Creating SignalR adapter Session..."); + LogManager.Log($"{ComponentName}: Creating SignalR adapter Session..."); SessionID = await _proxy.Invoke<String>("CreateSession", SerialNumber); } else { - LogManager.Log($"Joining SignalR adapter session ({SessionID})..."); + LogManager.Log($"{ComponentName}: Joining SignalR adapter session ({SessionID})..."); await _proxy.Invoke("JoinSession", SessionID); - LogManager.Log("SingalR adapter connected."); + LogManager.Log($"{ComponentName}: SingalR adapter connected."); } if (Mode == SignalRTransportAdapterMode.JoinSession) @@ -214,7 +214,7 @@ namespace Tango.Transport.Adapters if (!completed) { completed = true; - LogManager.Log(ex, "Error occurred on connection state changed event."); + LogManager.Log(ex, $"{ComponentName}: Error occurred on connection state changed event."); completionSource.SetException(ex); } } @@ -239,7 +239,7 @@ namespace Tango.Transport.Adapters { if (State == TransportComponentState.Connected) { - LogManager.Log("Disconnecting SignalR adapter..."); + LogManager.Log($"{ComponentName}: Disconnecting SignalR adapter..."); Core.Threading.TimeoutTask.StartNew(() => { try @@ -258,11 +258,11 @@ namespace Tango.Transport.Adapters } catch (Exception ex) { - LogManager.Log(ex, "Error disposing SignalR adapter connection."); + LogManager.Log(ex, $"{ComponentName}: Error disposing SignalR adapter connection."); } }, TimeSpan.FromSeconds(5)); - LogManager.Log("SignalR adapter disconnected."); + LogManager.Log($"{ComponentName}: SignalR adapter disconnected."); State = TransportComponentState.Disconnected; } }); @@ -315,7 +315,7 @@ namespace Tango.Transport.Adapters } catch (Exception ex) { - OnFailed(LogManager.Log(ex, $"Error writing to SignalR adapter ({Address}).")); + OnFailed(LogManager.Log(ex, $"{ComponentName}: Error writing to SignalR adapter ({Address}).")); return; } } @@ -332,17 +332,40 @@ namespace Tango.Transport.Adapters /// <param name="dataCollection">The data collection.</param> private void OnDataAvailable(List<byte[]> dataCollection) { - foreach (var data in dataCollection) + try { - if (EnableCompression) - { - OnDataAvailable(Compression.GZipHelper.Decompress(data)); - } - else + foreach (var data in dataCollection) { - OnDataAvailable(data); + if (EnableCompression) + { + try + { + var decompressed = Compression.GZipHelper.Decompress(data); + OnDataAvailable(decompressed); + } + catch (Exception ex) + { + if (ex.Message.Contains("GZip")) + { + //Temporarily ignore, probably switching protocol definitions... + OnDataAvailable(data); + } + else + { + throw ex; + } + } + } + else + { + OnDataAvailable(data); + } } } + catch (Exception ex) + { + OnFailed(ex); + } } private void StartPushThread() diff --git a/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs b/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs index f4bfb0ba8..427c335ff 100644 --- a/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs +++ b/Software/Visual_Studio/Tango.Transport/Adapters/TcpTransportAdapter.cs @@ -249,7 +249,21 @@ namespace Tango.Transport.Adapters if (EnableCompression) { - data = GZipHelper.Decompress(data); + try + { + data = GZipHelper.Decompress(data); + } + catch (Exception ex) + { + if (ex.Message.Contains("GZip")) + { + //Temporarily ignore, probably switching protocol definitions... + } + else + { + throw ex; + } + } } OnDataAvailable(data); diff --git a/Software/Visual_Studio/Tango.Transport/GenericMessageSerializer.cs b/Software/Visual_Studio/Tango.Transport/GenericMessageSerializer.cs index 55d07ef37..50c4221ab 100644 --- a/Software/Visual_Studio/Tango.Transport/GenericMessageSerializer.cs +++ b/Software/Visual_Studio/Tango.Transport/GenericMessageSerializer.cs @@ -16,31 +16,21 @@ namespace Tango.Transport { public static class GenericMessageSerializer { - public enum GenericMessageSerializerMode - { - Json, - Bson, - Protobuf - } - - public static GenericMessageSerializerMode Mode { get; set; } - static GenericMessageSerializer() { - Mode = GenericMessageSerializerMode.Bson; ProtoBuf.Meta.RuntimeTypeModel.Default.AutoAddMissingTypes = true; ProtoBuf.Meta.RuntimeTypeModel.Default.AutoAddProtoContractTypesOnly = false; ProtoBuf.Meta.RuntimeTypeModel.Default.InferTagFromNameDefault = true; ProtoBuf.Meta.RuntimeTypeModel.Default.UseImplicitZeroDefaults = true; } - public static object Deserialize(Type type, byte[] array) + public static object Deserialize(Type type, byte[] array, GenericMessageProtocol mode) { - if (Mode == GenericMessageSerializerMode.Json) + if (mode == GenericMessageProtocol.Json) { return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(array), type); } - else if (Mode == GenericMessageSerializerMode.Bson) + else if (mode == GenericMessageProtocol.Bson) { return DeserializeFromBson(array, type); } @@ -55,32 +45,32 @@ namespace Tango.Transport } } - public static object DeserializeFromByteString(Type type, ByteString byteString) + public static object DeserializeFromByteString(Type type, ByteString byteString, GenericMessageProtocol mode) { - if (Mode == GenericMessageSerializerMode.Json) + if (mode == GenericMessageProtocol.Json) { return JsonConvert.DeserializeObject(byteString.ToStringUtf8(), type); } - else if (Mode == GenericMessageSerializerMode.Bson) + else if (mode == GenericMessageProtocol.Bson) { return DeserializeFromBson(byteString.ToByteArray(), type); } else { AutoProtobuf.Build(type); - return Deserialize(type, byteString.ToByteArray()); + return Deserialize(type, byteString.ToByteArray(), mode); } } //--------------------------------------------------------------------- - public static byte[] Serialize<T>(T message) + public static byte[] Serialize<T>(T message, GenericMessageProtocol mode) { - if (Mode == GenericMessageSerializerMode.Json) + if (mode == GenericMessageProtocol.Json) { return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); } - else if (Mode == GenericMessageSerializerMode.Bson) + else if (mode == GenericMessageProtocol.Bson) { return SerializeToBson(message); } @@ -96,13 +86,13 @@ namespace Tango.Transport } } - public static T Deserialize<T>(byte[] array) + public static T Deserialize<T>(byte[] array, GenericMessageProtocol mode) { - if (Mode == GenericMessageSerializerMode.Json) + if (mode == GenericMessageProtocol.Json) { - return (T)Deserialize(typeof(T), array); + return (T)Deserialize(typeof(T), array, mode); } - else if (Mode == GenericMessageSerializerMode.Bson) + else if (mode == GenericMessageProtocol.Bson) { return DeserializeFromBson<T>(array); } @@ -117,33 +107,33 @@ namespace Tango.Transport } } - public static ByteString SerializeToByteString<T>(T message) + public static ByteString SerializeToByteString<T>(T message, GenericMessageProtocol mode) { - if (Mode == GenericMessageSerializerMode.Json) + if (mode == GenericMessageProtocol.Json) { - var byteString = ByteString.CopyFromUtf8(JsonConvert.SerializeObject(message)); + var byteString = ByteString.CopyFromUtf8(JsonConvert.SerializeObject(message)); return byteString; } - else if (Mode == GenericMessageSerializerMode.Bson) + else if (mode == GenericMessageProtocol.Bson) { var byteString = ByteString.CopyFrom(SerializeToBson(message)); return byteString; - } + } else { AutoProtobuf.Build<T>(); - return ByteString.CopyFrom(Serialize<T>(message)); + return ByteString.CopyFrom(Serialize<T>(message, mode)); } } - public static T DeserializeFromByteString<T>(ByteString byteString) + public static T DeserializeFromByteString<T>(ByteString byteString, GenericMessageProtocol mode) { - if (Mode == GenericMessageSerializerMode.Json) + if (mode == GenericMessageProtocol.Json) { return JsonConvert.DeserializeObject<T>(byteString.ToStringUtf8()); } - else if (Mode == GenericMessageSerializerMode.Bson) + else if (mode == GenericMessageProtocol.Bson) { return DeserializeFromBson<T>(byteString.ToByteArray()); } @@ -151,15 +141,15 @@ namespace Tango.Transport { AutoProtobuf.Build<T>(); - return Deserialize<T>(byteString.ToByteArray()); + return Deserialize<T>(byteString.ToByteArray(), mode); } } - public static T ExtractGenericRequestFromContainer<T>(MessageContainer container) where T : class + public static T ExtractGenericRequestFromContainer<T>(MessageContainer container, GenericMessageProtocol mode) where T : class { var message = MessageFactory.ExtractMessageFromContainer(container); var genericType = Type.GetType((message as GenericRequest).Type); - var innerMessage = DeserializeFromByteString(genericType, (message as GenericRequest).Data); + var innerMessage = DeserializeFromByteString(genericType, (message as GenericRequest).Data, mode); return innerMessage as T; } diff --git a/Software/Visual_Studio/Tango.Transport/ITransporter.cs b/Software/Visual_Studio/Tango.Transport/ITransporter.cs index 1b0391ff5..777b97e19 100644 --- a/Software/Visual_Studio/Tango.Transport/ITransporter.cs +++ b/Software/Visual_Studio/Tango.Transport/ITransporter.cs @@ -10,6 +10,7 @@ using Tango.Transport.Adapters; using Tango.PMR; using Tango.PMR.Common; using System.Collections.ObjectModel; +using Tango.PMR.Integration; namespace Tango.Transport { @@ -37,6 +38,11 @@ namespace Tango.Transport Exception FailedStateException { get; } /// <summary> + /// Gets or sets the generic protocol used to serialize/deserialize generic messages. + /// </summary> + GenericMessageProtocol GenericProtocol { get; set; } + + /// <summary> /// Registers a custom request handler. /// </summary> /// <typeparam name="Request">The type of the request.</typeparam> @@ -183,7 +189,7 @@ namespace Tango.Transport /// <summary> /// Occurs when a new request message has been received. /// </summary> - event EventHandler<MessageContainer> RequestReceived; + event EventHandler<RequestReceivedEventArgs> RequestReceived; /// <summary> /// Occurs when a new response message has been received. diff --git a/Software/Visual_Studio/Tango.Transport/RequestReceivedEventArgs.cs b/Software/Visual_Studio/Tango.Transport/RequestReceivedEventArgs.cs new file mode 100644 index 000000000..b7a406fd6 --- /dev/null +++ b/Software/Visual_Studio/Tango.Transport/RequestReceivedEventArgs.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.PMR.Common; + +namespace Tango.Transport +{ + public class RequestReceivedEventArgs + { + public MessageContainer Container { get; set; } + public bool Handled { get; set; } + + public RequestReceivedEventArgs() + { + + } + + public RequestReceivedEventArgs(MessageContainer container) : this() + { + Container = container; + } + } +} diff --git a/Software/Visual_Studio/Tango.Transport/Tango.Transport.csproj b/Software/Visual_Studio/Tango.Transport/Tango.Transport.csproj index 2521daa2f..054b4420c 100644 --- a/Software/Visual_Studio/Tango.Transport/Tango.Transport.csproj +++ b/Software/Visual_Studio/Tango.Transport/Tango.Transport.csproj @@ -128,6 +128,7 @@ <Compile Include="ITransporter.cs" /> <Compile Include="GenericMessageSerializer.cs" /> <Compile Include="RequestFailedEventArgs.cs" /> + <Compile Include="RequestReceivedEventArgs.cs" /> <Compile Include="ResponseErrorException.cs" /> <Compile Include="Routing\SimpleTransportRouter.cs" /> <Compile Include="Servers\ClientConnectedEventArgs.cs" /> @@ -188,7 +189,7 @@ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <ProjectExtensions> <VisualStudio> - <UserProperties BuildVersion_AssemblyInfoFilename="Properties\AssemblyInfo.cs" BuildVersion_UpdateAssemblyVersion="True" BuildVersion_BuildVersioningStyle="None.None.Increment.TimeStamp" BuildVersion_UseGlobalSettings="False" BuildVersion_StartDate="2000/1/1" /> + <UserProperties BuildVersion_StartDate="2000/1/1" BuildVersion_UseGlobalSettings="False" BuildVersion_BuildVersioningStyle="None.None.Increment.TimeStamp" BuildVersion_UpdateAssemblyVersion="True" BuildVersion_AssemblyInfoFilename="Properties\AssemblyInfo.cs" /> </VisualStudio> </ProjectExtensions> </Project>
\ No newline at end of file diff --git a/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs b/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs index 09acc4ea5..065b9dc41 100644 --- a/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransportAdapterBase.cs @@ -109,10 +109,15 @@ namespace Tango.Transport } } + private bool _enableCompression; /// <summary> /// Gets or sets a value indicating whether to enable compression/decompression of data. /// </summary> - public bool EnableCompression { get; set; } + public bool EnableCompression + { + get { return _enableCompression; } + set { _enableCompression = value; RaisePropertyChangedAuto(); } + } #endregion diff --git a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs index cf5eeda7f..435e66196 100644 --- a/Software/Visual_Studio/Tango.Transport/TransporterBase.cs +++ b/Software/Visual_Studio/Tango.Transport/TransporterBase.cs @@ -62,7 +62,7 @@ namespace Tango.Transport /// <summary> /// Occurs when a new request message has been received. /// </summary> - public event EventHandler<MessageContainer> RequestReceived; + public event EventHandler<RequestReceivedEventArgs> RequestReceived; /// <summary> /// Occurs when a new response message has been received. @@ -188,6 +188,16 @@ namespace Tango.Transport /// </summary> public Exception FailedStateException { get; private set; } + private GenericMessageProtocol _genericProtocol; + /// <summary> + /// Gets or sets the generic protocol used to serialize/deserialize generic messages. + /// </summary> + public GenericMessageProtocol GenericProtocol + { + get { return _genericProtocol; } + set { _genericProtocol = value; RaisePropertyChangedAuto(); } + } + #endregion #region Virtual Methods @@ -301,8 +311,10 @@ namespace Tango.Transport /// Called when a new request has been received. /// </summary> /// <param name="container">The request.</param> - protected virtual void OnRequestReceived(MessageContainer container) + protected virtual void OnRequestReceived(RequestReceivedEventArgs e) { + var container = e.Container; + if (_requestHandlers.Count > 0) { if (container.Type != MessageType.GenericRequest) @@ -311,6 +323,8 @@ namespace Tango.Transport if (handlers.Count > 0) //Handle { + e.Handled = true; + var request = MessageFactory.ExtractMessageFromContainer(container); foreach (var handler in handlers) @@ -334,7 +348,9 @@ namespace Tango.Transport if (handlers.Count > 0) { - var innerRequest = GenericMessageSerializer.DeserializeFromByteString(handlers[0].RequestType, genericRequest.Data); + e.Handled = true; + + var innerRequest = GenericMessageSerializer.DeserializeFromByteString(handlers[0].RequestType, genericRequest.Data, GenericProtocol); foreach (var handler in handlers) { @@ -351,7 +367,7 @@ namespace Tango.Transport } } - RequestReceived?.Invoke(this, container); + RequestReceived?.Invoke(this, e); } /// <summary> @@ -930,10 +946,10 @@ namespace Tango.Transport { GenericRequest genericRequest = new GenericRequest(); genericRequest.Type = request.GetType().AssemblyQualifiedName; - genericRequest.Data = GenericMessageSerializer.SerializeToByteString<Request>(request); + genericRequest.Data = GenericMessageSerializer.SerializeToByteString<Request>(request, GenericProtocol); var response = await SendRequest<GenericRequest, GenericResponse>(genericRequest, config); - var responseObject = GenericMessageSerializer.DeserializeFromByteString<Response>(response.Message.Data); + var responseObject = GenericMessageSerializer.DeserializeFromByteString<Response>(response.Message.Data, GenericProtocol); return responseObject; } @@ -949,7 +965,7 @@ namespace Tango.Transport GenericRequest genericRequest = new GenericRequest(); genericRequest.Type = request.GetType().AssemblyQualifiedName; - genericRequest.Data = GenericMessageSerializer.SerializeToByteString<Request>(request); + genericRequest.Data = GenericMessageSerializer.SerializeToByteString<Request>(request, GenericProtocol); Subject<Response> subject = new Subject<Response>(); @@ -958,7 +974,7 @@ namespace Tango.Transport try { - var responseObject = GenericMessageSerializer.DeserializeFromByteString<Response>(response.Message.Data); + var responseObject = GenericMessageSerializer.DeserializeFromByteString<Response>(response.Message.Data, GenericProtocol); subject.OnNext(responseObject); } catch (Exception ex) @@ -1194,8 +1210,7 @@ namespace Tango.Transport GenericResponse genericResponse = new GenericResponse(); genericResponse.Type = response.GetType().AssemblyQualifiedName; - genericResponse.Data = GenericMessageSerializer.SerializeToByteString<Response>(response); - + genericResponse.Data = GenericMessageSerializer.SerializeToByteString<Response>(response, GenericProtocol); await SendResponse<GenericResponse>(genericResponse, token, config); } @@ -1513,7 +1528,7 @@ namespace Tango.Transport { Type genericType = Type.GetType((messageContent as GenericResponse).Type); responseType = genericType.Name; - messageContent = GenericMessageSerializer.DeserializeFromByteString(genericType, (messageContent as GenericResponse).Data); + messageContent = GenericMessageSerializer.DeserializeFromByteString(genericType, (messageContent as GenericResponse).Data, GenericProtocol); } } catch { } @@ -1643,7 +1658,7 @@ namespace Tango.Transport try { - Task.Factory.StartNew(() => OnRequestReceived(container)); + Task.Factory.StartNew(() => OnRequestReceived(new RequestReceivedEventArgs(container))); } catch { @@ -1696,12 +1711,20 @@ namespace Tango.Transport if (_arrivedResponses.Count == 0) { retryCounter--; - var response = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), new TransportRequestConfig() + + if (State == TransportComponentState.Connected) { - Timeout = KeepAliveTimeout, - Priority = QueuePriority.High - }).Result; - retryCounter = KeepAliveRetries; + var response = SendRequest<KeepAliveRequest, KeepAliveResponse>(new KeepAliveRequest(), new TransportRequestConfig() + { + Timeout = KeepAliveTimeout, + Priority = QueuePriority.High + }).Result; + retryCounter = KeepAliveRetries; + } + else + { + continue; + } } else { |
