diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-30 12:36:30 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-30 12:36:30 +0300 |
| commit | 4222eddece906d6f0877022c06b853deb5068472 (patch) | |
| tree | a29b706b3a5aedb28a42b209d5bb72b0ef94d40e /Software | |
| parent | a802fe75f9538371004f1833e69a69b798892d0c (diff) | |
| download | Tango-4222eddece906d6f0877022c06b853deb5068472.tar.gz Tango-4222eddece906d6f0877022c06b853deb5068472.zip | |
Telemetry source.
Diffstat (limited to 'Software')
28 files changed, 466 insertions, 146 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs index 75307f844..7a9cc9f7a 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs @@ -12,16 +12,30 @@ namespace Tango.Telemetry.Destinations { private DeviceClient _hubClient; - public string Name { get; private set; } = "Azure IoT Hub"; + public string Name { get; set; } = "Azure IoT Hub"; public bool Enable { get; set; } = true; public String ConnectionString { get; private set; } - public IReadOnlyList<TelemetrySource> SupportedSources { get; private set; } + public ConnectionStatus HubConnectionStatus { get; private set; } + public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; } public AzureHubTelemetryDestination(String connectionString) { + HubConnectionStatus = ConnectionStatus.Connected; ConnectionString = connectionString; - SupportedSources = new List<TelemetrySource>() { TelemetrySource.PendingStorage, TelemetrySource.Streaming, TelemetrySource.ExternalStorage }; + SupportedSourceTypes = new List<TelemetrySourceTypes>() { TelemetrySourceTypes.PendingStorage, TelemetrySourceTypes.Streaming, TelemetrySourceTypes.ExternalStorage }; + } + + public Task<bool> IsAvailable() + { + if (_hubClient == null) + { + return Task.FromResult(true); + } + else + { + return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); + } } public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties) @@ -31,6 +45,7 @@ namespace Tango.Telemetry.Destinations _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); _hubClient.SetConnectionStatusChangesHandler((status, reason) => { + HubConnectionStatus = status; LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}."); }); } @@ -53,5 +68,10 @@ namespace Tango.Telemetry.Destinations { _hubClient?.Dispose(); } + + public override string ToString() + { + return Name; + } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs index b22849f3c..b5ff05c29 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs @@ -18,14 +18,15 @@ namespace Tango.Telemetry.Destinations { private IMqttClient _mqttClient; private IMqttClientOptions _mqttOptions; + private DateTime _nextRealAvailabilityCheck; - public string Name { get; private set; } = "MQTT"; + public string Name { get; set; } = "MQTT"; public bool Enable { get; set; } = true; public String Address { get; private set; } public int Port { get; private set; } public String Topic { get; private set; } - public IReadOnlyList<TelemetrySource> SupportedSources { get; private set; } + public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; } /// <summary> /// @@ -35,12 +36,34 @@ namespace Tango.Telemetry.Destinations /// <param name="port">Default 1883</param> public MqttTelemetryDestination(String topic, String address = "localhost", int port = 1883) { + _nextRealAvailabilityCheck = DateTime.Now; Topic = topic; + Address = address; Port = port; - SupportedSources = new List<TelemetrySource>() { TelemetrySource.Streaming }; + SupportedSourceTypes = new List<TelemetrySourceTypes>() { TelemetrySourceTypes.Streaming }; } - public async Task<bool> EnsureConnection() + public async Task<bool> IsAvailable() + { + if (_mqttClient == null) + { + return await EnsureConnection(); + } + else + { + if (DateTime.Now > _nextRealAvailabilityCheck) + { + _nextRealAvailabilityCheck = DateTime.Now.AddMinutes(5); + return await EnsureConnection(); + } + else + { + return _mqttClient.IsConnected; + } + } + } + + private async Task<bool> EnsureConnection() { if (_mqttClient == null || !_mqttClient.IsConnected) { @@ -98,5 +121,10 @@ namespace Tango.Telemetry.Destinations { _mqttClient?.Dispose(); } + + public override string ToString() + { + return $"{Name} -> {Address}:{Port}"; + } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs index a8cc46fd5..10424531b 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs @@ -9,8 +9,9 @@ namespace Tango.Telemetry public interface ITelemetryDestination : IDisposable { bool Enable { get; set; } - String Name { get; } - IReadOnlyList<TelemetrySource> SupportedSources { get; } + String Name { get; set; } + Task<bool> IsAvailable(); + IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; } Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties); } } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistorySource.cs index 314b99046..0b118d85c 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistorySource.cs @@ -6,7 +6,7 @@ using System.Threading.Tasks; namespace Tango.Telemetry { - public interface ITelemetryHistoryModule : ITelemetryModule + public interface ITelemetryHistorySource : ITelemetrySource { Task<bool> CanRequestHistory(DateTime from); Task<IEnumerable<ITelemetry>> RequestHistory(DateTime from); diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs index 0c63dc906..b44f567da 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs @@ -14,8 +14,8 @@ namespace Tango.Telemetry event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed; ITelemetryStorageManager StorageManager { get; } ITelemetryQueueManager QueueManager { get; } - ReadOnlyCollection<ITelemetryModule> Modules { get; } - void RegisterModule(ITelemetryModule module); + ReadOnlyCollection<ITelemetrySource> Sources { get; } + void RegisterSource(ITelemetrySource source); ReadOnlyCollection<ITelemetryDestination> Destinations { get; } void RegisterDestination(ITelemetryDestination destination); bool IsStarted { get; } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs index a27a03fc8..74d58ed4a 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs @@ -6,7 +6,7 @@ using System.Threading.Tasks; namespace Tango.Telemetry { - public interface ITelemetryModule : IDisposable + public interface ITelemetrySource : IDisposable { String Name { get; } } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs index 2c54a2a51..ae63e41cd 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs @@ -12,7 +12,7 @@ namespace Tango.Telemetry void DeletePendingTelemetry(PendingTelemetry pendingTelemetry); List<PendingTelemetry> GetPendingTelemetries(int maxCount); int GetPendingTelemetriesCount(); - TelemetryHistoryModuleCheckPoint GetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule); - void SetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule, DateTime time, int totalCount); + TelemetryHistorySourceCheckPoint GetHistorySourceCheckPoint(ITelemetryHistorySource source); + void SetHistorySourceCheckPoint(ITelemetryHistorySource source, DateTime time, int totalCount); } } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingSource.cs index 12b576b1b..3c60c35ee 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingSource.cs @@ -6,7 +6,7 @@ using System.Threading.Tasks; namespace Tango.Telemetry { - public interface ITelemetryStreamingModule : ITelemetryModule + public interface ITelemetryStreamingSource : ITelemetrySource { event EventHandler<TelemetryAvailableEventArgs> TelemetryAvailable; void Start(); diff --git a/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs index 2be3b1c45..5e1b5778d 100644 --- a/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs +++ b/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs @@ -14,8 +14,8 @@ namespace Tango.Telemetry public DateTime Created { get; set; } public DateTime Expires { get; set; } public ITelemetry TelemetryObject { get; set; } - public String Module { get; set; } - public TelemetrySource Source { get; set; } + public String Source { get; set; } + public TelemetrySourceTypes SourceType { get; set; } public List<TelemetryPendingDestination> PendingDestinations { get; set; } public PendingTelemetry() diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs index a35e08328..22fac087b 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs @@ -10,11 +10,11 @@ using Tango.Integration.Operation; using Tango.Logging; using Tango.PMR.Diagnostics; using Tango.PMR.Insights; -using Tango.Telemetry.TelemetryObjects; +using Tango.Telemetry.Telemetries; -namespace Tango.Telemetry.Modules +namespace Tango.Telemetry.Sources { - public class TelemetryDiagnosticsModule : TelemetryConfigurableModule<TelemetryDiagnosticsModuleConfig>, ITelemetryModule + public class TelemetryDiagnosticsSource : TelemetryConfigurableSource<TelemetryDiagnosticsSourceConfig>, ITelemetrySource { public const int MIN_SAMPLING_INTERVAL_SECONDS = 1; @@ -30,12 +30,12 @@ namespace Tango.Telemetry.Modules public string Name { get; private set; } = "Diagnostics"; - private TelemetryDiagnosticsModule() : base() + private TelemetryDiagnosticsSource() : base() { _diagnosticsQueue = new List<StartDiagnosticsResponse>(); } - public TelemetryDiagnosticsModule(IMachineOperator machineOperator) : base() + public TelemetryDiagnosticsSource(IMachineOperator machineOperator) : base() { _machineOperator = machineOperator; _machineOperator.DiagnosticsDataAvailable += DiagnosticsDataAvailable; @@ -101,7 +101,7 @@ namespace Tango.Telemetry.Modules frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); - TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySource.Streaming)); + TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySourceTypes.Streaming)); } else { @@ -110,7 +110,7 @@ namespace Tango.Telemetry.Modules TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); frame.Monitors = new InsightsMonitors(); frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); - TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySource.Streaming)); + TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySourceTypes.Streaming)); _emptyWritten = true; } } diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSourceConfig.cs index 8d13b3cca..806277d5b 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSourceConfig.cs @@ -5,13 +5,13 @@ using System.Text; using System.Threading.Tasks; using Tango.Settings; -namespace Tango.Telemetry.Modules +namespace Tango.Telemetry.Sources { - public class TelemetryDiagnosticsModuleConfig : SettingsBase + public class TelemetryDiagnosticsSourceConfig : SettingsBase { public TimeSpan DiagnosticsSamplingInterval { get; set; } - public TelemetryDiagnosticsModuleConfig() + public TelemetryDiagnosticsSourceConfig() { DiagnosticsSamplingInterval = TimeSpan.FromSeconds(10); } diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs index 0ba2935c1..6cb597e4b 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs @@ -5,11 +5,11 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Tango.BL; -using Tango.Telemetry.TelemetryObjects; +using Tango.Telemetry.Telemetries; -namespace Tango.Telemetry.Modules +namespace Tango.Telemetry.Sources { - public class TelemetryJobRunsHistoryModule : TelemetryConfigurableModule<TelemetryJobRunsHistoryModuleConfig>, ITelemetryHistoryModule + public class TelemetryJobRunsHistorySource : TelemetryConfigurableSource<TelemetryJobRunsHistorySourceConfig>, ITelemetryHistorySource { private bool _isBusy; diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySourceConfig.cs index 412dea5f9..da2043941 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySourceConfig.cs @@ -5,13 +5,13 @@ using System.Text; using System.Threading.Tasks; using Tango.Settings; -namespace Tango.Telemetry.Modules +namespace Tango.Telemetry.Sources { - public class TelemetryJobRunsHistoryModuleConfig : SettingsBase + public class TelemetryJobRunsHistorySourceConfig : SettingsBase { public int MaxJobRunsPerRequest { get; set; } - public TelemetryJobRunsHistoryModuleConfig() + public TelemetryJobRunsHistorySourceConfig() { MaxJobRunsPerRequest = 100; } diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj index 27ac76783..02f8417f1 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj +++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj @@ -207,25 +207,25 @@ <Compile Include="ExtensionMethods\ITelemetryExtensions.cs" /> <Compile Include="ITelemetryDestination.cs" /> <Compile Include="Destinations\MqttTelemetryDestination.cs" /> - <Compile Include="ITelemetryHistoryModule.cs" /> - <Compile Include="ITelemetryModule.cs" /> + <Compile Include="ITelemetryHistorySource.cs" /> + <Compile Include="ITelemetrySource.cs" /> <Compile Include="ITelemetryQueueManager.cs" /> <Compile Include="ITelemetryStorageManager.cs" /> <Compile Include="ITelemetryPublisher.cs" /> - <Compile Include="ITelemetryStreamingModule.cs" /> - <Compile Include="Modules\TelemetryDiagnosticsModule.cs" /> - <Compile Include="Modules\TelemetryDiagnosticsModuleConfig.cs" /> - <Compile Include="Modules\TelemetryJobRunsHistoryModule.cs" /> - <Compile Include="Modules\TelemetryJobRunsHistoryModuleConfig.cs" /> + <Compile Include="ITelemetryStreamingSource.cs" /> + <Compile Include="Sources\TelemetryDiagnosticsSource.cs" /> + <Compile Include="Sources\TelemetryDiagnosticsSourceConfig.cs" /> + <Compile Include="Sources\TelemetryJobRunsHistoryModule.cs" /> + <Compile Include="Sources\TelemetryJobRunsHistorySourceConfig.cs" /> <Compile Include="PendingTelemetry.cs" /> - <Compile Include="TelemetryConfigurableModule.cs" /> - <Compile Include="TelemetryHistoryModuleCheckPoint.cs" /> + <Compile Include="TelemetryConfigurableSource.cs" /> + <Compile Include="TelemetryHistorySourceCheckPoint.cs" /> <Compile Include="TelemetryNameAttribute.cs" /> <Compile Include="TelemetryAvailableEventArgs.cs" /> <Compile Include="TelemetryPackagePublishedEventArgs.cs" /> <Compile Include="TelemetryPackagePublishFailedEventArgs.cs" /> <Compile Include="TelemetryPendingDestination.cs" /> - <Compile Include="TelemetryPendingStorageModule.cs" /> + <Compile Include="TelemetryPendingStorageSource.cs" /> <Compile Include="TelemetryInMemoryQueueManager.cs" /> <Compile Include="TelemetryPublisherAdvanced.cs" /> <Compile Include="TelemetryPublisherEventArgs.cs" /> @@ -233,14 +233,15 @@ <Compile Include="TelemetryPublishPackage.cs" /> <Compile Include="TelemetryPublisher.cs" /> <Compile Include="TelemetryPublisherConfiguration.cs" /> - <Compile Include="TelemetrySource.cs" /> + <Compile Include="TelemetryPublishResult.cs" /> + <Compile Include="TelemetrySourceTypes.cs" /> <Compile Include="TelemetryLiteDBStorageManager.cs" /> <Compile Include="ITelemetry.cs" /> <Compile Include="JsonFlattener.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="TelemetryBase.cs" /> - <Compile Include="TelemetryObjects\TelemetryDiagnosticsFrame.cs" /> - <Compile Include="TelemetryObjects\TelemetryJobRun.cs" /> + <Compile Include="Telemetries\TelemetryDiagnosticsFrame.cs" /> + <Compile Include="Telemetries\TelemetryJobRun.cs" /> </ItemGroup> <ItemGroup> <None Include="packages.config" /> diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryDiagnosticsFrame.cs b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryDiagnosticsFrame.cs index 86762596e..da64c50a0 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryDiagnosticsFrame.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryDiagnosticsFrame.cs @@ -8,9 +8,9 @@ using System.Text; using System.Threading.Tasks; using Tango.PMR.Insights; -namespace Tango.Telemetry.TelemetryObjects +namespace Tango.Telemetry.Telemetries { - [TelemetryName("Diagnostics")] + [TelemetryName("DiagnosticsFrame")] public class TelemetryDiagnosticsFrame : TelemetryBase { public InsightsMonitors Monitors { get; set; } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryJobRun.cs b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs index 828f61360..7b1ca3a1c 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryJobRun.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs @@ -4,9 +4,9 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace Tango.Telemetry.TelemetryObjects +namespace Tango.Telemetry.Telemetries { - [TelemetryName("JobRuns")] + [TelemetryName("JobRun")] public class TelemetryJobRun : TelemetryBase { diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs index 163b72f7a..a9ddf890c 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs @@ -8,13 +8,13 @@ namespace Tango.Telemetry { public class TelemetryAvailableEventArgs : EventArgs { - public TelemetrySource Source { get; set; } + public TelemetrySourceTypes SourceType { get; set; } public ITelemetry TelemetryObject { get; set; } - public TelemetryAvailableEventArgs(ITelemetry telemetryObject, TelemetrySource source) + public TelemetryAvailableEventArgs(ITelemetry telemetryObject, TelemetrySourceTypes source) { TelemetryObject = telemetryObject; - Source = source; + SourceType = source; } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs index 133409ff9..20d5211e7 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs @@ -1,4 +1,5 @@ -using Newtonsoft.Json; +using LiteDB; +using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; @@ -9,6 +10,12 @@ namespace Tango.Telemetry { public class TelemetryBase : ITelemetry { + [BsonIgnore] //This will be used for column mapping in ADX + public String Type + { + get { return this.ToTelemetryName(); } + } + public DateTime Time { get; set; } public byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true) diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableSource.cs index 8ffb96af5..c1068582b 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableSource.cs @@ -8,11 +8,11 @@ using Tango.Settings; namespace Tango.Telemetry { - public abstract class TelemetryConfigurableModule<T> : ExtendedObject where T : SettingsBase + public abstract class TelemetryConfigurableSource<T> : ExtendedObject where T : SettingsBase { public T Config { get; set; } - public TelemetryConfigurableModule() + public TelemetryConfigurableSource() { Config = SettingsManager.Default.GetOrCreate<T>(); } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistorySourceCheckPoint.cs index 2f57a1926..ee82de05a 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistorySourceCheckPoint.cs @@ -7,10 +7,10 @@ using System.Threading.Tasks; namespace Tango.Telemetry { - public class TelemetryHistoryModuleCheckPoint + public class TelemetryHistorySourceCheckPoint { [BsonId] - public String ModuleName { get; set; } + public String SourceName { get; set; } public DateTime Time { get; set; } public int TotalCount { get; set; } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs index 781b3f3e6..2ceb95298 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs @@ -51,9 +51,9 @@ namespace Tango.Telemetry return _database.GetCollection<PendingTelemetry>("PendingTelemetries"); } - private ILiteCollection<TelemetryHistoryModuleCheckPoint> GetModulesCheckpointCollection() + private ILiteCollection<TelemetryHistorySourceCheckPoint> GetSourcesCheckpointCollection() { - return _database.GetCollection<TelemetryHistoryModuleCheckPoint>("ModuleCheckPoints"); + return _database.GetCollection<TelemetryHistorySourceCheckPoint>("SourcesCheckPoints"); } public void UpsertPendingTelemetry(PendingTelemetry pendingTelemetry) @@ -83,21 +83,21 @@ namespace Tango.Telemetry } } - public TelemetryHistoryModuleCheckPoint GetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule) + public TelemetryHistorySourceCheckPoint GetHistorySourceCheckPoint(ITelemetryHistorySource source) { lock (_lock) { - var collection = GetModulesCheckpointCollection(); - return collection.FindOne(x => x.ModuleName == historyModule.Name); + var collection = GetSourcesCheckpointCollection(); + return collection.FindOne(x => x.SourceName == source.Name); } } - public void SetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule, DateTime time, int totalCount) + public void SetHistorySourceCheckPoint(ITelemetryHistorySource source, DateTime time, int totalCount) { lock (_lock) { - var collection = GetModulesCheckpointCollection(); - collection.Upsert(new TelemetryHistoryModuleCheckPoint() { ModuleName = historyModule.Name, Time = time, TotalCount = totalCount }); + var collection = GetSourcesCheckpointCollection(); + collection.Upsert(new TelemetryHistorySourceCheckPoint() { SourceName = source.Name, Time = time, TotalCount = totalCount }); } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs index 6aae43f67..6aa1f5527 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs @@ -6,7 +6,7 @@ using System.Threading.Tasks; namespace Tango.Telemetry { - public class TelemetryPendingStorageModule : ITelemetryModule + public class TelemetryPendingStorageSource : ITelemetrySource { public string Name { get; private set; } = "Pending Storage"; diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs index 5bd207e9f..baafb70a7 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs @@ -10,9 +10,9 @@ namespace Tango.Telemetry { private String _payload; - public ITelemetryModule Module { get; set; } + public ITelemetrySource Source { get; set; } public PendingTelemetry PendingTelemetry { get; set; } - public TelemetrySource Source { get; set; } + public TelemetrySourceTypes SourceType { get; set; } public String ToPayload() { diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs new file mode 100644 index 000000000..37c6af412 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public class TelemetryPublishResult + { + public enum DestinationStatus + { + None, + Passed, + Unavailable, + Failed, + Postponed + } + + public class DestinationResult + { + public ITelemetryDestination Destination { get; set; } + public DestinationStatus Status { get; set; } + public Exception Error { get; set; } + public TimeSpan ElapsedTime { get; set; } + } + + public List<DestinationResult> DestinationsResults { get; set; } + public TimeSpan TotalElapsedTime { get; set; } + + public TimeSpan OverheadTime + { + get { return TimeSpan.FromMilliseconds(TotalElapsedTime.TotalMilliseconds - DestinationsResults.Sum(x => x.ElapsedTime.TotalMilliseconds)); } + } + + public TelemetryPublishResult() + { + DestinationsResults = new List<DestinationResult>(); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs index 1314d3346..99d96edff 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; +using System.Diagnostics; using System.Linq; using System.Reflection; using System.Text; @@ -9,11 +10,13 @@ using System.Threading; using System.Threading.Tasks; using System.Timers; using Tango.Core; +using Tango.Core.ExtensionMethods; using Tango.Insights; using Tango.Integration.Operation; +using Tango.Logging; using Tango.PMR.Diagnostics; using Tango.PMR.Insights; -using Tango.Telemetry.TelemetryObjects; +using Tango.Telemetry.Telemetries; namespace Tango.Telemetry { @@ -26,9 +29,9 @@ namespace Tango.Telemetry private System.Timers.Timer _pendingStorageCheckTimer; private System.Timers.Timer _historicalDataTimer; - private bool _isDisposed; + protected bool _isDisposed; private Thread _publishThread; - private TelemetryPendingStorageModule _pendingStorageModule; + private TelemetryPendingStorageSource _pendingStorageSource; #region Properties @@ -38,8 +41,8 @@ namespace Tango.Telemetry public ITelemetryStorageManager StorageManager { get; private set; } - private List<ITelemetryModule> InnerModules { get; } - public ReadOnlyCollection<ITelemetryModule> Modules { get; } + private List<ITelemetrySource> InnerSources { get; } + public ReadOnlyCollection<ITelemetrySource> Sources { get; } private List<ITelemetryDestination> InnerDestinations { get; } public ReadOnlyCollection<ITelemetryDestination> Destinations { get; } @@ -54,10 +57,10 @@ namespace Tango.Telemetry { Config = config ?? new TelemetryPublisherConfiguration(); - _pendingStorageModule = new TelemetryPendingStorageModule(); + _pendingStorageSource = new TelemetryPendingStorageSource(); - InnerModules = new List<ITelemetryModule>(); - Modules = new ReadOnlyCollection<ITelemetryModule>(InnerModules); + InnerSources = new List<ITelemetrySource>(); + Sources = new ReadOnlyCollection<ITelemetrySource>(InnerSources); InnerDestinations = new List<ITelemetryDestination>(); Destinations = new ReadOnlyCollection<ITelemetryDestination>(InnerDestinations); @@ -77,31 +80,48 @@ namespace Tango.Telemetry #endregion - #region Modules + #region Sources - public void RegisterModule(ITelemetryModule module) + public void RegisterSource(ITelemetrySource source) { - if (InnerModules.Exists(x => x.GetType() == module.GetType())) + if (source == null) return; + + if (InnerSources.Exists(x => x.GetType() == source.GetType())) { - throw new InvalidOperationException($"Module {module.GetType().Name} has already been registered."); + LogManager.Log($"Telemetry source {source.Name} has already been registered. Ignoring.", LogCategory.Warning); + return; } - InnerModules.Add(module); + InnerSources.Add(source); - if (module is ITelemetryStreamingModule streamingModule) + if (source is ITelemetryStreamingSource streamingSource) { - streamingModule.TelemetryAvailable += Module_TelemetryAvailable; + streamingSource.TelemetryAvailable += StreamingSource_TelemetryAvailable; if (IsStarted) { - streamingModule.Start(); + try + { + streamingSource.Start(); + } + catch (Exception ex) + { + LogManager.Log(ex, $"Error starting telemetry source {source.Name}."); + } } } + + LogManager.Log($"Telemetry source {source.Name} registered."); } - private void Module_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e) + private void StreamingSource_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e) { - PushTelemetryPackage(sender as ITelemetryModule, e.TelemetryObject, e.Source); + var source = sender as ITelemetrySource; + if (source != null) + { + LogManager.Log($"Telemetry stream received {source.Name} -> {e.SourceType} -> {e.TelemetryObject.ToTelemetryName()}.", LogCategory.Debug); + PushTelemetryPackage(source, e.TelemetryObject, e.SourceType); + } } #endregion @@ -110,7 +130,17 @@ namespace Tango.Telemetry public void RegisterDestination(ITelemetryDestination destination) { + if (destination == null) return; + + if (InnerDestinations.Exists(x => x.Name == destination.Name)) + { + LogManager.Log($"Telemetry destination with name {destination.Name} has already been registered. Ignoring.", LogCategory.Warning); + return; + } + InnerDestinations.Add(destination); + + LogManager.Log($"Telemetry destination {destination.Name} registered."); } #endregion @@ -121,31 +151,45 @@ namespace Tango.Telemetry { if (!IsStarted) { - Config.Validate(); + try + { + LogManager.Log($"Starting telemetry publisher...\nConfig:\n{Config.ToJsonString()}\nSources: {String.Join(", ", Sources.Select(x => x.Name))}\nDestinations: {String.Join(", ", Destinations.Select(x => x.Name))}"); - IsStarted = true; + Config.Validate(); + Validate(); - if (_pendingStorageCheckTimer == null) - { - _pendingStorageCheckTimer = new System.Timers.Timer(); - _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds; - _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed; - } + IsStarted = true; - _pendingStorageCheckTimer.Start(); + if (_pendingStorageCheckTimer == null) + { + _pendingStorageCheckTimer = new System.Timers.Timer(); + _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds; + _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed; + } - if (_historicalDataTimer == null) - { - _historicalDataTimer = new System.Timers.Timer(); - _historicalDataTimer.Interval = Config.HistoryModulesRequestInterval.TotalMilliseconds; - _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed; - } + _pendingStorageCheckTimer.Start(); + + if (_historicalDataTimer == null) + { + _historicalDataTimer = new System.Timers.Timer(); + _historicalDataTimer.Interval = Config.HistorySourcesRequestInterval.TotalMilliseconds; + _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed; + } + + _historicalDataTimer.Start(); - _historicalDataTimer.Start(); + _publishThread.Start(); - _publishThread.Start(); + InnerSources.OfType<ITelemetryStreamingSource>().ToList().ForEach(x => x.Start()); - InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Start()); + LogManager.Log($"Telemetry publisher started."); + } + catch (Exception ex) + { + LogManager.Log(ex, "Error starting telemetry publisher."); + Stop(); + throw ex; + } } } @@ -154,10 +198,63 @@ namespace Tango.Telemetry if (IsStarted) { IsStarted = false; - InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Stop()); + + LogManager.Log("Stopping telemetry publisher..."); + + InnerSources.OfType<ITelemetryStreamingSource>().ToList().ForEach(x => + { + try + { + x.Stop(); + } + catch (Exception ex) + { + LogManager.Log(ex, $"Error while trying to stop telemetry source {x.Name}."); + } + }); _pendingStorageCheckTimer.Stop(); _historicalDataTimer.Stop(); QueueManager.Enqueue(null); + + LogManager.Log("Telemetry publisher stopped."); + } + } + + public void Validate() + { + // Validate all registered sources + foreach (var source in InnerSources) + { + if (string.IsNullOrWhiteSpace(source.Name)) + { + throw new ArgumentException("A registered telemetry source has an invalid or missing Name."); + } + } + + // Validate all registered destinations + foreach (var destination in InnerDestinations) + { + if (string.IsNullOrWhiteSpace(destination.Name)) + { + throw new ArgumentException("A registered telemetry destination has an invalid or missing Name."); + } + + if (destination.SupportedSourceTypes == null || !destination.SupportedSourceTypes.Any()) + { + throw new InvalidOperationException($"Telemetry destination '{destination.Name}' must support at least one telemetry source."); + } + } + + // Validate StorageManager + if (StorageManager == null) + { + throw new NullReferenceException("StorageManager is not configured."); + } + + // Validate QueueManager + if (QueueManager == null) + { + throw new NullReferenceException("QueueManager is not configured."); } } @@ -169,20 +266,39 @@ namespace Tango.Telemetry { _pendingStorageCheckTimer.Stop(); + LogManager.Log($"Fetching pending telemetries from storage (MaxCount: {Config.MaxPendingStorageTelemetriesPerCycle})..."); + var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle); + LogManager.Log($"Pending telemetries count is {batch.Count}. Publishing..."); + + Dictionary<String, int> destinationsPasses = new Dictionary<string, int>(); + List<TelemetryPublishResult> results = new List<TelemetryPublishResult>(); + foreach (var pendingTelemetry in batch) { - await PublishTelemetryPackage(new TelemetryPublishPackage() + var result = await PublishTelemetryPackage(new TelemetryPublishPackage() { - Module = _pendingStorageModule, + Source = _pendingStorageSource, PendingTelemetry = pendingTelemetry, - Source = TelemetrySource.PendingStorage + SourceType = TelemetrySourceTypes.PendingStorage }); + results.Add(result); + + foreach (var d in result.DestinationsResults) + { + if (d.Status == TelemetryPublishResult.DestinationStatus.Passed) + { + destinationsPasses[d.Destination.Name] += 1; + } + } + if (!IsStarted || _isDisposed) return; } + LogManager.Log($"Publishing pending telemetries completed after {results.Sum(x => x.TotalElapsedTime.Seconds)} seconds. Destination OK Count: {String.Join(", ", destinationsPasses.Select(x => x.Key + " -> " + x.Value))}"); + _pendingStorageCheckTimer.Start(); } @@ -190,23 +306,25 @@ namespace Tango.Telemetry { _historicalDataTimer.Stop(); + LogManager.Log(""); + if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries) { - foreach (var module in InnerModules.OfType<ITelemetryHistoryModule>().ToList()) + foreach (var source in InnerSources.OfType<ITelemetryHistorySource>().ToList()) { - TelemetryHistoryModuleCheckPoint checkPoint = StorageManager.GetHistoryModuleCheckPoint(module); - if (await module.CanRequestHistory(checkPoint.Time)) + TelemetryHistorySourceCheckPoint checkPoint = StorageManager.GetHistorySourceCheckPoint(source); + if (await source.CanRequestHistory(checkPoint.Time)) { - var historyTelemetries = (await module.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList(); + var historyTelemetries = (await source.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList(); foreach (var telemetry in historyTelemetries) { - PushTelemetryPackage(module, telemetry, TelemetrySource.ExternalStorage); + PushTelemetryPackage(source, telemetry, TelemetrySourceTypes.ExternalStorage); checkPoint.Time = telemetry.Time; checkPoint.TotalCount++; } - StorageManager.SetHistoryModuleCheckPoint(module, checkPoint.Time, checkPoint.TotalCount); + StorageManager.SetHistorySourceCheckPoint(source, checkPoint.Time, checkPoint.TotalCount); } } } @@ -218,16 +336,16 @@ namespace Tango.Telemetry #region Push - private void PushTelemetryPackage(ITelemetryModule module, ITelemetry telemetry, TelemetrySource source) + private void PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType) { PendingTelemetry pendingTelemetry = new PendingTelemetry(); pendingTelemetry.Created = DateTime.UtcNow; - //pendingTelemetry.Expires = module.GetExpiration(); - pendingTelemetry.Module = module.Name; - pendingTelemetry.Source = source; + //pendingTelemetry.Expires = source.GetExpiration(); + pendingTelemetry.Source = source.Name; + pendingTelemetry.SourceType = sourceType; pendingTelemetry.TelemetryObject = telemetry; - PushTelemetryPackage(new TelemetryPublishPackage() { Module = module, PendingTelemetry = pendingTelemetry, Source = source }); + PushTelemetryPackage(new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType }); } private void PushTelemetryPackage(TelemetryPublishPackage package) @@ -261,9 +379,13 @@ namespace Tango.Telemetry } } - protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package) + protected virtual async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package) { - if (!IsStarted || _isDisposed) return; + Stopwatch totalWatch = Stopwatch.StartNew(); + + var result = new TelemetryPublishResult(); + + if (!IsStarted || _isDisposed) return result; List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>(); @@ -275,7 +397,7 @@ namespace Tango.Telemetry //Add all destinations if streaming or external (They will be remove later if successfull) //If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db. - if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) + if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { foreach (var destination in Destinations) { @@ -286,26 +408,58 @@ namespace Tango.Telemetry } } - foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source))) + foreach (var destination in Destinations.Where(x => x.Enable && x.SupportedSourceTypes.Contains(package.SourceType))) { if (pendingDestinations.Exists(x => x.Name == destination.Name)) { + Stopwatch destinationWatch = Stopwatch.StartNew(); + + var destinationResult = new TelemetryPublishResult.DestinationResult(); + destinationResult.Destination = destination; + result.DestinationsResults.Add(destinationResult); + try { pendingDestinations.RemoveAll(x => x.Name == destination.Name); if (OnPublishingPackage(package, destination)) { - await destination.Publish(package, properties); - OnPackagePublished(package, destination); + if (await destination.IsAvailable()) + { + await destination.Publish(package, properties); + OnPackagePublished(package, destination); + + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + } + else + { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) + { + if (!pendingDestinations.Exists(x => x.Name == destination.Name)) + { + pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name }); + } + } + } } } catch (Exception ex) { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; + destinationResult.Error = ex; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); OnPackagePublishFailed(package, destination, ex); - if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage)) + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { if (!pendingDestinations.Exists(x => x.Name == destination.Name)) { @@ -318,7 +472,7 @@ namespace Tango.Telemetry package.PendingTelemetry.PendingDestinations = pendingDestinations; - if (package.Source == TelemetrySource.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0) + if (package.SourceType == TelemetrySourceTypes.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0) { StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } @@ -326,6 +480,11 @@ namespace Tango.Telemetry { StorageManager.UpsertPendingTelemetry(package.PendingTelemetry); } + + totalWatch.Stop(); + result.TotalElapsedTime = totalWatch.Elapsed; + + return result; } #endregion @@ -373,14 +532,14 @@ namespace Tango.Telemetry if (!_isDisposed) { _isDisposed = true; - foreach (var module in InnerModules) + foreach (var source in InnerSources) { - if (module is ITelemetryStreamingModule streamingModule) + if (source is ITelemetryStreamingSource streamingSource) { - streamingModule.Stop(); - streamingModule.TelemetryAvailable -= Module_TelemetryAvailable; + streamingSource.Stop(); + streamingSource.TelemetryAvailable -= StreamingSource_TelemetryAvailable; } - module.Dispose(); + source.Dispose(); } if (IsStarted) diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs index 0183de6c2..c4e7d3d3e 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -20,8 +21,14 @@ namespace Tango.Telemetry } - protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package) + protected override async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package) { + Stopwatch totalWatch = Stopwatch.StartNew(); + + var result = new TelemetryPublishResult(); + + if (!IsStarted || _isDisposed) return result; + List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>> { new KeyValuePair<string, string>("MachineID", Config.MachineID), @@ -33,7 +40,7 @@ namespace Tango.Telemetry var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); // For Streaming/External: initialize pending destinations list (used if publishing fails) - if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) + if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { foreach (var dest in Destinations) { @@ -50,35 +57,86 @@ namespace Tango.Telemetry } } - foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source))) + foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType))) { var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name); if (pendingEntry == null) continue; + Stopwatch destinationWatch = Stopwatch.StartNew(); + + var destinationResult = new TelemetryPublishResult.DestinationResult(); + destinationResult.Destination = destination; + result.DestinationsResults.Add(destinationResult); + // Respect backoff timing if (now < pendingEntry.NextEligibleAttempt) + { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed; + destinationResult.ElapsedTime = destinationWatch.Elapsed; continue; + } try { if (OnPublishingPackage(package, destination)) { - await destination.Publish(package, properties); - OnPackagePublished(package, destination); + if (await destination.IsAvailable()) + { + await destination.Publish(package, properties); + OnPackagePublished(package, destination); - // On success: remove entry from pending list - pendingDestinations.RemoveAll(x => x.Name == destination.Name); + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + + // On success: remove entry from pending list + pendingDestinations.RemoveAll(x => x.Name == destination.Name); + } + else + { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + + // Only track retry state if retry is supported + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) + { + if (pendingEntry == null) + { + pendingEntry = new TelemetryPendingDestination { Name = destination.Name }; + pendingDestinations.Add(pendingEntry); + } + + pendingEntry.RetryCount++; + pendingEntry.LastAttempt = now; + + // Apply exponential backoff + int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); + pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds); + } + else + { + // Remove if not retryable + pendingDestinations.RemoveAll(x => x.Name == destination.Name); + } + } } } catch (Exception ex) { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; + destinationResult.Error = ex; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); OnPackagePublishFailed(package, destination, ex); // Only track retry state if retry is supported - if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage)) + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { if (pendingEntry == null) { @@ -103,7 +161,7 @@ namespace Tango.Telemetry package.PendingTelemetry.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations); - if (package.Source == TelemetrySource.PendingStorage && !pendingDestinations.Any()) + if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any()) { StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } @@ -111,6 +169,11 @@ namespace Tango.Telemetry { StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } + + totalWatch.Stop(); + result.TotalElapsedTime = totalWatch.Elapsed; + + return result; } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs index 424249f7d..4a3776b87 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs @@ -14,14 +14,14 @@ namespace Tango.Telemetry public String Environment { get; set; } public TimeSpan PendingStorageCheckInterval { get; set; } public int MaxPendingStorageTelemetriesPerCycle { get; set; } - public TimeSpan HistoryModulesRequestInterval { get; set; } + public TimeSpan HistorySourcesRequestInterval { get; set; } public int MaxPendingTelemetries { get; set; } public TelemetryPublisherConfiguration() { PendingStorageCheckInterval = TimeSpan.FromMinutes(1); MaxPendingStorageTelemetriesPerCycle = 100; - HistoryModulesRequestInterval = TimeSpan.FromMinutes(1); + HistorySourcesRequestInterval = TimeSpan.FromMinutes(1); MaxPendingTelemetries = 200; } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetrySource.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs index 09bbdc539..1ed80d5c9 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetrySource.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs @@ -6,7 +6,7 @@ using System.Threading.Tasks; namespace Tango.Telemetry { - public enum TelemetrySource + public enum TelemetrySourceTypes { Streaming, ExternalStorage, |
