From 4222eddece906d6f0877022c06b853deb5068472 Mon Sep 17 00:00:00 2001 From: Roy Ben Shabat Date: Wed, 30 Jul 2025 12:36:30 +0300 Subject: Telemetry source. --- .../Destinations/AzureHubTelemetryDestination.cs | 26 +- .../Destinations/MqttTelemetryDestination.cs | 36 ++- .../Tango.Telemetry/ITelemetryDestination.cs | 5 +- .../Tango.Telemetry/ITelemetryHistoryModule.cs | 14 - .../Tango.Telemetry/ITelemetryHistorySource.cs | 14 + .../Tango.Telemetry/ITelemetryModule.cs | 13 - .../Tango.Telemetry/ITelemetryPublisher.cs | 4 +- .../Tango.Telemetry/ITelemetrySource.cs | 13 + .../Tango.Telemetry/ITelemetryStorageManager.cs | 4 +- .../Tango.Telemetry/ITelemetryStreamingModule.cs | 15 -- .../Tango.Telemetry/ITelemetryStreamingSource.cs | 15 ++ .../Modules/TelemetryDiagnosticsModule.cs | 134 ---------- .../Modules/TelemetryDiagnosticsModuleConfig.cs | 19 -- .../Modules/TelemetryJobRunsHistoryModule.cs | 61 ----- .../Modules/TelemetryJobRunsHistoryModuleConfig.cs | 19 -- .../Tango.Telemetry/PendingTelemetry.cs | 4 +- .../Sources/TelemetryDiagnosticsSource.cs | 134 ++++++++++ .../Sources/TelemetryDiagnosticsSourceConfig.cs | 19 ++ .../Sources/TelemetryJobRunsHistoryModule.cs | 61 +++++ .../Sources/TelemetryJobRunsHistorySourceConfig.cs | 19 ++ .../Tango.Telemetry/Tango.Telemetry.csproj | 27 +- .../Telemetries/TelemetryDiagnosticsFrame.cs | 18 ++ .../Tango.Telemetry/Telemetries/TelemetryJobRun.cs | 14 + .../Tango.Telemetry/TelemetryAvailableEventArgs.cs | 6 +- .../Visual_Studio/Tango.Telemetry/TelemetryBase.cs | 9 +- .../Tango.Telemetry/TelemetryConfigurableModule.cs | 20 -- .../Tango.Telemetry/TelemetryConfigurableSource.cs | 20 ++ .../TelemetryHistoryModuleCheckPoint.cs | 17 -- .../TelemetryHistorySourceCheckPoint.cs | 17 ++ .../TelemetryLiteDBStorageManager.cs | 16 +- .../TelemetryObjects/TelemetryDiagnosticsFrame.cs | 18 -- .../TelemetryObjects/TelemetryJobRun.cs | 14 - .../TelemetryPendingStorageModule.cs | 18 -- .../TelemetryPendingStorageSource.cs | 18 ++ .../Tango.Telemetry/TelemetryPublishPackage.cs | 4 +- .../Tango.Telemetry/TelemetryPublishResult.cs | 41 +++ .../Tango.Telemetry/TelemetryPublisher.cs | 287 ++++++++++++++++----- .../Tango.Telemetry/TelemetryPublisherAdvanced.cs | 81 +++++- .../TelemetryPublisherConfiguration.cs | 4 +- .../Tango.Telemetry/TelemetrySource.cs | 15 -- .../Tango.Telemetry/TelemetrySourceTypes.cs | 15 ++ 41 files changed, 814 insertions(+), 494 deletions(-) delete mode 100644 Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/ITelemetryHistorySource.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingSource.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSourceConfig.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySourceConfig.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryDiagnosticsFrame.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableSource.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetryHistorySourceCheckPoint.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryDiagnosticsFrame.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryJobRun.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs delete mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetrySource.cs create mode 100644 Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs (limited to 'Software/Visual_Studio') diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs index 75307f844..7a9cc9f7a 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs @@ -12,16 +12,30 @@ namespace Tango.Telemetry.Destinations { private DeviceClient _hubClient; - public string Name { get; private set; } = "Azure IoT Hub"; + public string Name { get; set; } = "Azure IoT Hub"; public bool Enable { get; set; } = true; public String ConnectionString { get; private set; } - public IReadOnlyList SupportedSources { get; private set; } + public ConnectionStatus HubConnectionStatus { get; private set; } + public IReadOnlyList SupportedSourceTypes { get; private set; } public AzureHubTelemetryDestination(String connectionString) { + HubConnectionStatus = ConnectionStatus.Connected; ConnectionString = connectionString; - SupportedSources = new List() { TelemetrySource.PendingStorage, TelemetrySource.Streaming, TelemetrySource.ExternalStorage }; + SupportedSourceTypes = new List() { TelemetrySourceTypes.PendingStorage, TelemetrySourceTypes.Streaming, TelemetrySourceTypes.ExternalStorage }; + } + + public Task IsAvailable() + { + if (_hubClient == null) + { + return Task.FromResult(true); + } + else + { + return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); + } } public async Task Publish(TelemetryPublishPackage package, List> properties) @@ -31,6 +45,7 @@ namespace Tango.Telemetry.Destinations _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); _hubClient.SetConnectionStatusChangesHandler((status, reason) => { + HubConnectionStatus = status; LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}."); }); } @@ -53,5 +68,10 @@ namespace Tango.Telemetry.Destinations { _hubClient?.Dispose(); } + + public override string ToString() + { + return Name; + } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs index b22849f3c..b5ff05c29 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs @@ -18,14 +18,15 @@ namespace Tango.Telemetry.Destinations { private IMqttClient _mqttClient; private IMqttClientOptions _mqttOptions; + private DateTime _nextRealAvailabilityCheck; - public string Name { get; private set; } = "MQTT"; + public string Name { get; set; } = "MQTT"; public bool Enable { get; set; } = true; public String Address { get; private set; } public int Port { get; private set; } public String Topic { get; private set; } - public IReadOnlyList SupportedSources { get; private set; } + public IReadOnlyList SupportedSourceTypes { get; private set; } /// /// @@ -35,12 +36,34 @@ namespace Tango.Telemetry.Destinations /// Default 1883 public MqttTelemetryDestination(String topic, String address = "localhost", int port = 1883) { + _nextRealAvailabilityCheck = DateTime.Now; Topic = topic; + Address = address; Port = port; - SupportedSources = new List() { TelemetrySource.Streaming }; + SupportedSourceTypes = new List() { TelemetrySourceTypes.Streaming }; } - public async Task EnsureConnection() + public async Task IsAvailable() + { + if (_mqttClient == null) + { + return await EnsureConnection(); + } + else + { + if (DateTime.Now > _nextRealAvailabilityCheck) + { + _nextRealAvailabilityCheck = DateTime.Now.AddMinutes(5); + return await EnsureConnection(); + } + else + { + return _mqttClient.IsConnected; + } + } + } + + private async Task EnsureConnection() { if (_mqttClient == null || !_mqttClient.IsConnected) { @@ -98,5 +121,10 @@ namespace Tango.Telemetry.Destinations { _mqttClient?.Dispose(); } + + public override string ToString() + { + return $"{Name} -> {Address}:{Port}"; + } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs index a8cc46fd5..10424531b 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs @@ -9,8 +9,9 @@ namespace Tango.Telemetry public interface ITelemetryDestination : IDisposable { bool Enable { get; set; } - String Name { get; } - IReadOnlyList SupportedSources { get; } + String Name { get; set; } + Task IsAvailable(); + IReadOnlyList SupportedSourceTypes { get; } Task Publish(TelemetryPublishPackage package, List> properties); } } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs deleted file mode 100644 index 314b99046..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Telemetry -{ - public interface ITelemetryHistoryModule : ITelemetryModule - { - Task CanRequestHistory(DateTime from); - Task> RequestHistory(DateTime from); - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistorySource.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistorySource.cs new file mode 100644 index 000000000..0b118d85c --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistorySource.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetryHistorySource : ITelemetrySource + { + Task CanRequestHistory(DateTime from); + Task> RequestHistory(DateTime from); + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs deleted file mode 100644 index a27a03fc8..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Telemetry -{ - public interface ITelemetryModule : IDisposable - { - String Name { get; } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs index 0c63dc906..b44f567da 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs @@ -14,8 +14,8 @@ namespace Tango.Telemetry event EventHandler PublishPackageFailed; ITelemetryStorageManager StorageManager { get; } ITelemetryQueueManager QueueManager { get; } - ReadOnlyCollection Modules { get; } - void RegisterModule(ITelemetryModule module); + ReadOnlyCollection Sources { get; } + void RegisterSource(ITelemetrySource source); ReadOnlyCollection Destinations { get; } void RegisterDestination(ITelemetryDestination destination); bool IsStarted { get; } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs new file mode 100644 index 000000000..74d58ed4a --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetrySource : IDisposable + { + String Name { get; } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs index 2c54a2a51..ae63e41cd 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs @@ -12,7 +12,7 @@ namespace Tango.Telemetry void DeletePendingTelemetry(PendingTelemetry pendingTelemetry); List GetPendingTelemetries(int maxCount); int GetPendingTelemetriesCount(); - TelemetryHistoryModuleCheckPoint GetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule); - void SetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule, DateTime time, int totalCount); + TelemetryHistorySourceCheckPoint GetHistorySourceCheckPoint(ITelemetryHistorySource source); + void SetHistorySourceCheckPoint(ITelemetryHistorySource source, DateTime time, int totalCount); } } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs deleted file mode 100644 index 12b576b1b..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Telemetry -{ - public interface ITelemetryStreamingModule : ITelemetryModule - { - event EventHandler TelemetryAvailable; - void Start(); - void Stop(); - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingSource.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingSource.cs new file mode 100644 index 000000000..3c60c35ee --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingSource.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetryStreamingSource : ITelemetrySource + { + event EventHandler TelemetryAvailable; + void Start(); + void Stop(); + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs deleted file mode 100644 index a35e08328..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs +++ /dev/null @@ -1,134 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using System.Timers; -using Tango.Core; -using Tango.Insights; -using Tango.Integration.Operation; -using Tango.Logging; -using Tango.PMR.Diagnostics; -using Tango.PMR.Insights; -using Tango.Telemetry.TelemetryObjects; - -namespace Tango.Telemetry.Modules -{ - public class TelemetryDiagnosticsModule : TelemetryConfigurableModule, ITelemetryModule - { - public const int MIN_SAMPLING_INTERVAL_SECONDS = 1; - - private IMachineOperator _machineOperator; - private bool _isStarted; - - private Timer _diagnosticsSamplingTimer; - private List _diagnosticsQueue; - private bool _writing; - private bool _emptyWritten; - - public event EventHandler TelemetryAvailable; - - public string Name { get; private set; } = "Diagnostics"; - - private TelemetryDiagnosticsModule() : base() - { - _diagnosticsQueue = new List(); - } - - public TelemetryDiagnosticsModule(IMachineOperator machineOperator) : base() - { - _machineOperator = machineOperator; - _machineOperator.DiagnosticsDataAvailable += DiagnosticsDataAvailable; - } - - private void DiagnosticsDataAvailable(object sender, StartDiagnosticsResponse diagnostics) - { - if (_isStarted && diagnostics.Monitors != null) - { - _diagnosticsQueue.Add(diagnostics); - } - } - - public void Start() - { - _isStarted = true; - - if (Config.DiagnosticsSamplingInterval.TotalSeconds < MIN_SAMPLING_INTERVAL_SECONDS) - { - Config.DiagnosticsSamplingInterval = TimeSpan.FromSeconds(MIN_SAMPLING_INTERVAL_SECONDS); - } - - if (_diagnosticsSamplingTimer == null) - { - _diagnosticsSamplingTimer = new Timer(); - _diagnosticsSamplingTimer.Interval = Config.DiagnosticsSamplingInterval.TotalMilliseconds; - _diagnosticsSamplingTimer.Elapsed += DiagnosticsSamplingTimer_Elapsed; - } - - - _diagnosticsQueue.Clear(); - - _writing = false; - _diagnosticsSamplingTimer.Start(); - } - - public void Stop() - { - _isStarted = false; - } - - private void DiagnosticsSamplingTimer_Elapsed(object sender, ElapsedEventArgs e) - { - if (!_isStarted || _writing) return; - - try - { - _diagnosticsSamplingTimer.Stop(); - - _writing = true; - - if (_diagnosticsQueue.Count > 0) - { - var queue = _diagnosticsQueue.ToList(); - _diagnosticsQueue.Clear(); - _emptyWritten = false; - - var monitorsAvg = InsightsHelper.AverageMonitors(queue.Select(x => x.Monitors).ToList()); - queue.Clear(); - - TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); - frame.Monitors = monitorsAvg; - frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); - - - TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySource.Streaming)); - } - else - { - if (!_emptyWritten) - { - TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); - frame.Monitors = new InsightsMonitors(); - frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); - TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySource.Streaming)); - _emptyWritten = true; - } - } - } - catch (Exception ex) - { - LogManager.Log(ex, "Error occurred on insights frame insertion."); - } - finally - { - _writing = false; - _diagnosticsSamplingTimer.Start(); - } - } - - public void Dispose() - { - _machineOperator.DiagnosticsDataAvailable -= DiagnosticsDataAvailable; - } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs deleted file mode 100644 index 8d13b3cca..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Tango.Settings; - -namespace Tango.Telemetry.Modules -{ - public class TelemetryDiagnosticsModuleConfig : SettingsBase - { - public TimeSpan DiagnosticsSamplingInterval { get; set; } - - public TelemetryDiagnosticsModuleConfig() - { - DiagnosticsSamplingInterval = TimeSpan.FromSeconds(10); - } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs deleted file mode 100644 index 0ba2935c1..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs +++ /dev/null @@ -1,61 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Data.Entity; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Tango.BL; -using Tango.Telemetry.TelemetryObjects; - -namespace Tango.Telemetry.Modules -{ - public class TelemetryJobRunsHistoryModule : TelemetryConfigurableModule, ITelemetryHistoryModule - { - private bool _isBusy; - - public string Name { get; private set; } = "JobRuns History"; - - public Task CanRequestHistory(DateTime from) - { - return Task.FromResult(!_isBusy); - } - - public async Task> RequestHistory(DateTime from) - { - try - { - _isBusy = true; - - using (ObservablesContext db = ObservablesContext.CreateDefault()) - { - var runs = await db.JobRuns - .Where(x => x.LastUpdated > from) - .OrderBy(x => x.LastUpdated) - .Take(Config.MaxJobRunsPerRequest) - .ToListAsync(); - - List tRuns = new List(); - - foreach (var run in runs) - { - TelemetryJobRun tRun = new TelemetryJobRun(); - tRun.Time = run.LastUpdated; - //Fill the object.. - tRuns.Add(tRun); - } - - return tRuns; - } - } - finally - { - _isBusy = false; - } - } - - public void Dispose() - { - - } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs deleted file mode 100644 index 412dea5f9..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Tango.Settings; - -namespace Tango.Telemetry.Modules -{ - public class TelemetryJobRunsHistoryModuleConfig : SettingsBase - { - public int MaxJobRunsPerRequest { get; set; } - - public TelemetryJobRunsHistoryModuleConfig() - { - MaxJobRunsPerRequest = 100; - } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs index 2be3b1c45..5e1b5778d 100644 --- a/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs +++ b/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs @@ -14,8 +14,8 @@ namespace Tango.Telemetry public DateTime Created { get; set; } public DateTime Expires { get; set; } public ITelemetry TelemetryObject { get; set; } - public String Module { get; set; } - public TelemetrySource Source { get; set; } + public String Source { get; set; } + public TelemetrySourceTypes SourceType { get; set; } public List PendingDestinations { get; set; } public PendingTelemetry() diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs new file mode 100644 index 000000000..22fac087b --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs @@ -0,0 +1,134 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Timers; +using Tango.Core; +using Tango.Insights; +using Tango.Integration.Operation; +using Tango.Logging; +using Tango.PMR.Diagnostics; +using Tango.PMR.Insights; +using Tango.Telemetry.Telemetries; + +namespace Tango.Telemetry.Sources +{ + public class TelemetryDiagnosticsSource : TelemetryConfigurableSource, ITelemetrySource + { + public const int MIN_SAMPLING_INTERVAL_SECONDS = 1; + + private IMachineOperator _machineOperator; + private bool _isStarted; + + private Timer _diagnosticsSamplingTimer; + private List _diagnosticsQueue; + private bool _writing; + private bool _emptyWritten; + + public event EventHandler TelemetryAvailable; + + public string Name { get; private set; } = "Diagnostics"; + + private TelemetryDiagnosticsSource() : base() + { + _diagnosticsQueue = new List(); + } + + public TelemetryDiagnosticsSource(IMachineOperator machineOperator) : base() + { + _machineOperator = machineOperator; + _machineOperator.DiagnosticsDataAvailable += DiagnosticsDataAvailable; + } + + private void DiagnosticsDataAvailable(object sender, StartDiagnosticsResponse diagnostics) + { + if (_isStarted && diagnostics.Monitors != null) + { + _diagnosticsQueue.Add(diagnostics); + } + } + + public void Start() + { + _isStarted = true; + + if (Config.DiagnosticsSamplingInterval.TotalSeconds < MIN_SAMPLING_INTERVAL_SECONDS) + { + Config.DiagnosticsSamplingInterval = TimeSpan.FromSeconds(MIN_SAMPLING_INTERVAL_SECONDS); + } + + if (_diagnosticsSamplingTimer == null) + { + _diagnosticsSamplingTimer = new Timer(); + _diagnosticsSamplingTimer.Interval = Config.DiagnosticsSamplingInterval.TotalMilliseconds; + _diagnosticsSamplingTimer.Elapsed += DiagnosticsSamplingTimer_Elapsed; + } + + + _diagnosticsQueue.Clear(); + + _writing = false; + _diagnosticsSamplingTimer.Start(); + } + + public void Stop() + { + _isStarted = false; + } + + private void DiagnosticsSamplingTimer_Elapsed(object sender, ElapsedEventArgs e) + { + if (!_isStarted || _writing) return; + + try + { + _diagnosticsSamplingTimer.Stop(); + + _writing = true; + + if (_diagnosticsQueue.Count > 0) + { + var queue = _diagnosticsQueue.ToList(); + _diagnosticsQueue.Clear(); + _emptyWritten = false; + + var monitorsAvg = InsightsHelper.AverageMonitors(queue.Select(x => x.Monitors).ToList()); + queue.Clear(); + + TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); + frame.Monitors = monitorsAvg; + frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); + + + TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySourceTypes.Streaming)); + } + else + { + if (!_emptyWritten) + { + TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); + frame.Monitors = new InsightsMonitors(); + frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); + TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySourceTypes.Streaming)); + _emptyWritten = true; + } + } + } + catch (Exception ex) + { + LogManager.Log(ex, "Error occurred on insights frame insertion."); + } + finally + { + _writing = false; + _diagnosticsSamplingTimer.Start(); + } + } + + public void Dispose() + { + _machineOperator.DiagnosticsDataAvailable -= DiagnosticsDataAvailable; + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSourceConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSourceConfig.cs new file mode 100644 index 000000000..806277d5b --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSourceConfig.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Settings; + +namespace Tango.Telemetry.Sources +{ + public class TelemetryDiagnosticsSourceConfig : SettingsBase + { + public TimeSpan DiagnosticsSamplingInterval { get; set; } + + public TelemetryDiagnosticsSourceConfig() + { + DiagnosticsSamplingInterval = TimeSpan.FromSeconds(10); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs new file mode 100644 index 000000000..6cb597e4b --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs @@ -0,0 +1,61 @@ +using System; +using System.Collections.Generic; +using System.Data.Entity; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.BL; +using Tango.Telemetry.Telemetries; + +namespace Tango.Telemetry.Sources +{ + public class TelemetryJobRunsHistorySource : TelemetryConfigurableSource, ITelemetryHistorySource + { + private bool _isBusy; + + public string Name { get; private set; } = "JobRuns History"; + + public Task CanRequestHistory(DateTime from) + { + return Task.FromResult(!_isBusy); + } + + public async Task> RequestHistory(DateTime from) + { + try + { + _isBusy = true; + + using (ObservablesContext db = ObservablesContext.CreateDefault()) + { + var runs = await db.JobRuns + .Where(x => x.LastUpdated > from) + .OrderBy(x => x.LastUpdated) + .Take(Config.MaxJobRunsPerRequest) + .ToListAsync(); + + List tRuns = new List(); + + foreach (var run in runs) + { + TelemetryJobRun tRun = new TelemetryJobRun(); + tRun.Time = run.LastUpdated; + //Fill the object.. + tRuns.Add(tRun); + } + + return tRuns; + } + } + finally + { + _isBusy = false; + } + } + + public void Dispose() + { + + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySourceConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySourceConfig.cs new file mode 100644 index 000000000..da2043941 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySourceConfig.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Settings; + +namespace Tango.Telemetry.Sources +{ + public class TelemetryJobRunsHistorySourceConfig : SettingsBase + { + public int MaxJobRunsPerRequest { get; set; } + + public TelemetryJobRunsHistorySourceConfig() + { + MaxJobRunsPerRequest = 100; + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj index 27ac76783..02f8417f1 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj +++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj @@ -207,25 +207,25 @@ - - + + - - - - - + + + + + - - + + - + @@ -233,14 +233,15 @@ - + + - - + + diff --git a/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryDiagnosticsFrame.cs b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryDiagnosticsFrame.cs new file mode 100644 index 000000000..da64c50a0 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryDiagnosticsFrame.cs @@ -0,0 +1,18 @@ +using LiteDB; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.PMR.Insights; + +namespace Tango.Telemetry.Telemetries +{ + [TelemetryName("DiagnosticsFrame")] + public class TelemetryDiagnosticsFrame : TelemetryBase + { + public InsightsMonitors Monitors { get; set; } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs new file mode 100644 index 000000000..7b1ca3a1c --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry.Telemetries +{ + [TelemetryName("JobRun")] + public class TelemetryJobRun : TelemetryBase + { + + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs index 163b72f7a..a9ddf890c 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs @@ -8,13 +8,13 @@ namespace Tango.Telemetry { public class TelemetryAvailableEventArgs : EventArgs { - public TelemetrySource Source { get; set; } + public TelemetrySourceTypes SourceType { get; set; } public ITelemetry TelemetryObject { get; set; } - public TelemetryAvailableEventArgs(ITelemetry telemetryObject, TelemetrySource source) + public TelemetryAvailableEventArgs(ITelemetry telemetryObject, TelemetrySourceTypes source) { TelemetryObject = telemetryObject; - Source = source; + SourceType = source; } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs index 133409ff9..20d5211e7 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs @@ -1,4 +1,5 @@ -using Newtonsoft.Json; +using LiteDB; +using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; @@ -9,6 +10,12 @@ namespace Tango.Telemetry { public class TelemetryBase : ITelemetry { + [BsonIgnore] //This will be used for column mapping in ADX + public String Type + { + get { return this.ToTelemetryName(); } + } + public DateTime Time { get; set; } public byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true) diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs deleted file mode 100644 index 8ffb96af5..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs +++ /dev/null @@ -1,20 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Tango.Core; -using Tango.Settings; - -namespace Tango.Telemetry -{ - public abstract class TelemetryConfigurableModule : ExtendedObject where T : SettingsBase - { - public T Config { get; set; } - - public TelemetryConfigurableModule() - { - Config = SettingsManager.Default.GetOrCreate(); - } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableSource.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableSource.cs new file mode 100644 index 000000000..c1068582b --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableSource.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Core; +using Tango.Settings; + +namespace Tango.Telemetry +{ + public abstract class TelemetryConfigurableSource : ExtendedObject where T : SettingsBase + { + public T Config { get; set; } + + public TelemetryConfigurableSource() + { + Config = SettingsManager.Default.GetOrCreate(); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs deleted file mode 100644 index 2f57a1926..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs +++ /dev/null @@ -1,17 +0,0 @@ -using LiteDB; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Telemetry -{ - public class TelemetryHistoryModuleCheckPoint - { - [BsonId] - public String ModuleName { get; set; } - public DateTime Time { get; set; } - public int TotalCount { get; set; } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryHistorySourceCheckPoint.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistorySourceCheckPoint.cs new file mode 100644 index 000000000..ee82de05a --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistorySourceCheckPoint.cs @@ -0,0 +1,17 @@ +using LiteDB; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public class TelemetryHistorySourceCheckPoint + { + [BsonId] + public String SourceName { get; set; } + public DateTime Time { get; set; } + public int TotalCount { get; set; } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs index 781b3f3e6..2ceb95298 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs @@ -51,9 +51,9 @@ namespace Tango.Telemetry return _database.GetCollection("PendingTelemetries"); } - private ILiteCollection GetModulesCheckpointCollection() + private ILiteCollection GetSourcesCheckpointCollection() { - return _database.GetCollection("ModuleCheckPoints"); + return _database.GetCollection("SourcesCheckPoints"); } public void UpsertPendingTelemetry(PendingTelemetry pendingTelemetry) @@ -83,21 +83,21 @@ namespace Tango.Telemetry } } - public TelemetryHistoryModuleCheckPoint GetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule) + public TelemetryHistorySourceCheckPoint GetHistorySourceCheckPoint(ITelemetryHistorySource source) { lock (_lock) { - var collection = GetModulesCheckpointCollection(); - return collection.FindOne(x => x.ModuleName == historyModule.Name); + var collection = GetSourcesCheckpointCollection(); + return collection.FindOne(x => x.SourceName == source.Name); } } - public void SetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule, DateTime time, int totalCount) + public void SetHistorySourceCheckPoint(ITelemetryHistorySource source, DateTime time, int totalCount) { lock (_lock) { - var collection = GetModulesCheckpointCollection(); - collection.Upsert(new TelemetryHistoryModuleCheckPoint() { ModuleName = historyModule.Name, Time = time, TotalCount = totalCount }); + var collection = GetSourcesCheckpointCollection(); + collection.Upsert(new TelemetryHistorySourceCheckPoint() { SourceName = source.Name, Time = time, TotalCount = totalCount }); } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryDiagnosticsFrame.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryDiagnosticsFrame.cs deleted file mode 100644 index 86762596e..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryDiagnosticsFrame.cs +++ /dev/null @@ -1,18 +0,0 @@ -using LiteDB; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Tango.PMR.Insights; - -namespace Tango.Telemetry.TelemetryObjects -{ - [TelemetryName("Diagnostics")] - public class TelemetryDiagnosticsFrame : TelemetryBase - { - public InsightsMonitors Monitors { get; set; } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryJobRun.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryJobRun.cs deleted file mode 100644 index 828f61360..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryJobRun.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Telemetry.TelemetryObjects -{ - [TelemetryName("JobRuns")] - public class TelemetryJobRun : TelemetryBase - { - - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs deleted file mode 100644 index 6aae43f67..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Telemetry -{ - public class TelemetryPendingStorageModule : ITelemetryModule - { - public string Name { get; private set; } = "Pending Storage"; - - public void Dispose() - { - - } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs new file mode 100644 index 000000000..6aa1f5527 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public class TelemetryPendingStorageSource : ITelemetrySource + { + public string Name { get; private set; } = "Pending Storage"; + + public void Dispose() + { + + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs index 5bd207e9f..baafb70a7 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs @@ -10,9 +10,9 @@ namespace Tango.Telemetry { private String _payload; - public ITelemetryModule Module { get; set; } + public ITelemetrySource Source { get; set; } public PendingTelemetry PendingTelemetry { get; set; } - public TelemetrySource Source { get; set; } + public TelemetrySourceTypes SourceType { get; set; } public String ToPayload() { diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs new file mode 100644 index 000000000..37c6af412 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public class TelemetryPublishResult + { + public enum DestinationStatus + { + None, + Passed, + Unavailable, + Failed, + Postponed + } + + public class DestinationResult + { + public ITelemetryDestination Destination { get; set; } + public DestinationStatus Status { get; set; } + public Exception Error { get; set; } + public TimeSpan ElapsedTime { get; set; } + } + + public List DestinationsResults { get; set; } + public TimeSpan TotalElapsedTime { get; set; } + + public TimeSpan OverheadTime + { + get { return TimeSpan.FromMilliseconds(TotalElapsedTime.TotalMilliseconds - DestinationsResults.Sum(x => x.ElapsedTime.TotalMilliseconds)); } + } + + public TelemetryPublishResult() + { + DestinationsResults = new List(); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs index 1314d3346..99d96edff 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; +using System.Diagnostics; using System.Linq; using System.Reflection; using System.Text; @@ -9,11 +10,13 @@ using System.Threading; using System.Threading.Tasks; using System.Timers; using Tango.Core; +using Tango.Core.ExtensionMethods; using Tango.Insights; using Tango.Integration.Operation; +using Tango.Logging; using Tango.PMR.Diagnostics; using Tango.PMR.Insights; -using Tango.Telemetry.TelemetryObjects; +using Tango.Telemetry.Telemetries; namespace Tango.Telemetry { @@ -26,9 +29,9 @@ namespace Tango.Telemetry private System.Timers.Timer _pendingStorageCheckTimer; private System.Timers.Timer _historicalDataTimer; - private bool _isDisposed; + protected bool _isDisposed; private Thread _publishThread; - private TelemetryPendingStorageModule _pendingStorageModule; + private TelemetryPendingStorageSource _pendingStorageSource; #region Properties @@ -38,8 +41,8 @@ namespace Tango.Telemetry public ITelemetryStorageManager StorageManager { get; private set; } - private List InnerModules { get; } - public ReadOnlyCollection Modules { get; } + private List InnerSources { get; } + public ReadOnlyCollection Sources { get; } private List InnerDestinations { get; } public ReadOnlyCollection Destinations { get; } @@ -54,10 +57,10 @@ namespace Tango.Telemetry { Config = config ?? new TelemetryPublisherConfiguration(); - _pendingStorageModule = new TelemetryPendingStorageModule(); + _pendingStorageSource = new TelemetryPendingStorageSource(); - InnerModules = new List(); - Modules = new ReadOnlyCollection(InnerModules); + InnerSources = new List(); + Sources = new ReadOnlyCollection(InnerSources); InnerDestinations = new List(); Destinations = new ReadOnlyCollection(InnerDestinations); @@ -77,31 +80,48 @@ namespace Tango.Telemetry #endregion - #region Modules + #region Sources - public void RegisterModule(ITelemetryModule module) + public void RegisterSource(ITelemetrySource source) { - if (InnerModules.Exists(x => x.GetType() == module.GetType())) + if (source == null) return; + + if (InnerSources.Exists(x => x.GetType() == source.GetType())) { - throw new InvalidOperationException($"Module {module.GetType().Name} has already been registered."); + LogManager.Log($"Telemetry source {source.Name} has already been registered. Ignoring.", LogCategory.Warning); + return; } - InnerModules.Add(module); + InnerSources.Add(source); - if (module is ITelemetryStreamingModule streamingModule) + if (source is ITelemetryStreamingSource streamingSource) { - streamingModule.TelemetryAvailable += Module_TelemetryAvailable; + streamingSource.TelemetryAvailable += StreamingSource_TelemetryAvailable; if (IsStarted) { - streamingModule.Start(); + try + { + streamingSource.Start(); + } + catch (Exception ex) + { + LogManager.Log(ex, $"Error starting telemetry source {source.Name}."); + } } } + + LogManager.Log($"Telemetry source {source.Name} registered."); } - private void Module_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e) + private void StreamingSource_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e) { - PushTelemetryPackage(sender as ITelemetryModule, e.TelemetryObject, e.Source); + var source = sender as ITelemetrySource; + if (source != null) + { + LogManager.Log($"Telemetry stream received {source.Name} -> {e.SourceType} -> {e.TelemetryObject.ToTelemetryName()}.", LogCategory.Debug); + PushTelemetryPackage(source, e.TelemetryObject, e.SourceType); + } } #endregion @@ -110,7 +130,17 @@ namespace Tango.Telemetry public void RegisterDestination(ITelemetryDestination destination) { + if (destination == null) return; + + if (InnerDestinations.Exists(x => x.Name == destination.Name)) + { + LogManager.Log($"Telemetry destination with name {destination.Name} has already been registered. Ignoring.", LogCategory.Warning); + return; + } + InnerDestinations.Add(destination); + + LogManager.Log($"Telemetry destination {destination.Name} registered."); } #endregion @@ -121,31 +151,45 @@ namespace Tango.Telemetry { if (!IsStarted) { - Config.Validate(); + try + { + LogManager.Log($"Starting telemetry publisher...\nConfig:\n{Config.ToJsonString()}\nSources: {String.Join(", ", Sources.Select(x => x.Name))}\nDestinations: {String.Join(", ", Destinations.Select(x => x.Name))}"); - IsStarted = true; + Config.Validate(); + Validate(); - if (_pendingStorageCheckTimer == null) - { - _pendingStorageCheckTimer = new System.Timers.Timer(); - _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds; - _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed; - } + IsStarted = true; - _pendingStorageCheckTimer.Start(); + if (_pendingStorageCheckTimer == null) + { + _pendingStorageCheckTimer = new System.Timers.Timer(); + _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds; + _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed; + } - if (_historicalDataTimer == null) - { - _historicalDataTimer = new System.Timers.Timer(); - _historicalDataTimer.Interval = Config.HistoryModulesRequestInterval.TotalMilliseconds; - _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed; - } + _pendingStorageCheckTimer.Start(); + + if (_historicalDataTimer == null) + { + _historicalDataTimer = new System.Timers.Timer(); + _historicalDataTimer.Interval = Config.HistorySourcesRequestInterval.TotalMilliseconds; + _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed; + } + + _historicalDataTimer.Start(); - _historicalDataTimer.Start(); + _publishThread.Start(); - _publishThread.Start(); + InnerSources.OfType().ToList().ForEach(x => x.Start()); - InnerModules.OfType().ToList().ForEach(x => x.Start()); + LogManager.Log($"Telemetry publisher started."); + } + catch (Exception ex) + { + LogManager.Log(ex, "Error starting telemetry publisher."); + Stop(); + throw ex; + } } } @@ -154,10 +198,63 @@ namespace Tango.Telemetry if (IsStarted) { IsStarted = false; - InnerModules.OfType().ToList().ForEach(x => x.Stop()); + + LogManager.Log("Stopping telemetry publisher..."); + + InnerSources.OfType().ToList().ForEach(x => + { + try + { + x.Stop(); + } + catch (Exception ex) + { + LogManager.Log(ex, $"Error while trying to stop telemetry source {x.Name}."); + } + }); _pendingStorageCheckTimer.Stop(); _historicalDataTimer.Stop(); QueueManager.Enqueue(null); + + LogManager.Log("Telemetry publisher stopped."); + } + } + + public void Validate() + { + // Validate all registered sources + foreach (var source in InnerSources) + { + if (string.IsNullOrWhiteSpace(source.Name)) + { + throw new ArgumentException("A registered telemetry source has an invalid or missing Name."); + } + } + + // Validate all registered destinations + foreach (var destination in InnerDestinations) + { + if (string.IsNullOrWhiteSpace(destination.Name)) + { + throw new ArgumentException("A registered telemetry destination has an invalid or missing Name."); + } + + if (destination.SupportedSourceTypes == null || !destination.SupportedSourceTypes.Any()) + { + throw new InvalidOperationException($"Telemetry destination '{destination.Name}' must support at least one telemetry source."); + } + } + + // Validate StorageManager + if (StorageManager == null) + { + throw new NullReferenceException("StorageManager is not configured."); + } + + // Validate QueueManager + if (QueueManager == null) + { + throw new NullReferenceException("QueueManager is not configured."); } } @@ -169,20 +266,39 @@ namespace Tango.Telemetry { _pendingStorageCheckTimer.Stop(); + LogManager.Log($"Fetching pending telemetries from storage (MaxCount: {Config.MaxPendingStorageTelemetriesPerCycle})..."); + var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle); + LogManager.Log($"Pending telemetries count is {batch.Count}. Publishing..."); + + Dictionary destinationsPasses = new Dictionary(); + List results = new List(); + foreach (var pendingTelemetry in batch) { - await PublishTelemetryPackage(new TelemetryPublishPackage() + var result = await PublishTelemetryPackage(new TelemetryPublishPackage() { - Module = _pendingStorageModule, + Source = _pendingStorageSource, PendingTelemetry = pendingTelemetry, - Source = TelemetrySource.PendingStorage + SourceType = TelemetrySourceTypes.PendingStorage }); + results.Add(result); + + foreach (var d in result.DestinationsResults) + { + if (d.Status == TelemetryPublishResult.DestinationStatus.Passed) + { + destinationsPasses[d.Destination.Name] += 1; + } + } + if (!IsStarted || _isDisposed) return; } + LogManager.Log($"Publishing pending telemetries completed after {results.Sum(x => x.TotalElapsedTime.Seconds)} seconds. Destination OK Count: {String.Join(", ", destinationsPasses.Select(x => x.Key + " -> " + x.Value))}"); + _pendingStorageCheckTimer.Start(); } @@ -190,23 +306,25 @@ namespace Tango.Telemetry { _historicalDataTimer.Stop(); + LogManager.Log(""); + if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries) { - foreach (var module in InnerModules.OfType().ToList()) + foreach (var source in InnerSources.OfType().ToList()) { - TelemetryHistoryModuleCheckPoint checkPoint = StorageManager.GetHistoryModuleCheckPoint(module); - if (await module.CanRequestHistory(checkPoint.Time)) + TelemetryHistorySourceCheckPoint checkPoint = StorageManager.GetHistorySourceCheckPoint(source); + if (await source.CanRequestHistory(checkPoint.Time)) { - var historyTelemetries = (await module.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList(); + var historyTelemetries = (await source.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList(); foreach (var telemetry in historyTelemetries) { - PushTelemetryPackage(module, telemetry, TelemetrySource.ExternalStorage); + PushTelemetryPackage(source, telemetry, TelemetrySourceTypes.ExternalStorage); checkPoint.Time = telemetry.Time; checkPoint.TotalCount++; } - StorageManager.SetHistoryModuleCheckPoint(module, checkPoint.Time, checkPoint.TotalCount); + StorageManager.SetHistorySourceCheckPoint(source, checkPoint.Time, checkPoint.TotalCount); } } } @@ -218,16 +336,16 @@ namespace Tango.Telemetry #region Push - private void PushTelemetryPackage(ITelemetryModule module, ITelemetry telemetry, TelemetrySource source) + private void PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType) { PendingTelemetry pendingTelemetry = new PendingTelemetry(); pendingTelemetry.Created = DateTime.UtcNow; - //pendingTelemetry.Expires = module.GetExpiration(); - pendingTelemetry.Module = module.Name; - pendingTelemetry.Source = source; + //pendingTelemetry.Expires = source.GetExpiration(); + pendingTelemetry.Source = source.Name; + pendingTelemetry.SourceType = sourceType; pendingTelemetry.TelemetryObject = telemetry; - PushTelemetryPackage(new TelemetryPublishPackage() { Module = module, PendingTelemetry = pendingTelemetry, Source = source }); + PushTelemetryPackage(new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType }); } private void PushTelemetryPackage(TelemetryPublishPackage package) @@ -261,9 +379,13 @@ namespace Tango.Telemetry } } - protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package) + protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package) { - if (!IsStarted || _isDisposed) return; + Stopwatch totalWatch = Stopwatch.StartNew(); + + var result = new TelemetryPublishResult(); + + if (!IsStarted || _isDisposed) return result; List> properties = new List>(); @@ -275,7 +397,7 @@ namespace Tango.Telemetry //Add all destinations if streaming or external (They will be remove later if successfull) //If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db. - if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) + if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { foreach (var destination in Destinations) { @@ -286,26 +408,58 @@ namespace Tango.Telemetry } } - foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source))) + foreach (var destination in Destinations.Where(x => x.Enable && x.SupportedSourceTypes.Contains(package.SourceType))) { if (pendingDestinations.Exists(x => x.Name == destination.Name)) { + Stopwatch destinationWatch = Stopwatch.StartNew(); + + var destinationResult = new TelemetryPublishResult.DestinationResult(); + destinationResult.Destination = destination; + result.DestinationsResults.Add(destinationResult); + try { pendingDestinations.RemoveAll(x => x.Name == destination.Name); if (OnPublishingPackage(package, destination)) { - await destination.Publish(package, properties); - OnPackagePublished(package, destination); + if (await destination.IsAvailable()) + { + await destination.Publish(package, properties); + OnPackagePublished(package, destination); + + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + } + else + { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) + { + if (!pendingDestinations.Exists(x => x.Name == destination.Name)) + { + pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name }); + } + } + } } } catch (Exception ex) { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; + destinationResult.Error = ex; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); OnPackagePublishFailed(package, destination, ex); - if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage)) + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { if (!pendingDestinations.Exists(x => x.Name == destination.Name)) { @@ -318,7 +472,7 @@ namespace Tango.Telemetry package.PendingTelemetry.PendingDestinations = pendingDestinations; - if (package.Source == TelemetrySource.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0) + if (package.SourceType == TelemetrySourceTypes.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0) { StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } @@ -326,6 +480,11 @@ namespace Tango.Telemetry { StorageManager.UpsertPendingTelemetry(package.PendingTelemetry); } + + totalWatch.Stop(); + result.TotalElapsedTime = totalWatch.Elapsed; + + return result; } #endregion @@ -373,14 +532,14 @@ namespace Tango.Telemetry if (!_isDisposed) { _isDisposed = true; - foreach (var module in InnerModules) + foreach (var source in InnerSources) { - if (module is ITelemetryStreamingModule streamingModule) + if (source is ITelemetryStreamingSource streamingSource) { - streamingModule.Stop(); - streamingModule.TelemetryAvailable -= Module_TelemetryAvailable; + streamingSource.Stop(); + streamingSource.TelemetryAvailable -= StreamingSource_TelemetryAvailable; } - module.Dispose(); + source.Dispose(); } if (IsStarted) diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs index 0183de6c2..c4e7d3d3e 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -20,8 +21,14 @@ namespace Tango.Telemetry } - protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package) + protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package) { + Stopwatch totalWatch = Stopwatch.StartNew(); + + var result = new TelemetryPublishResult(); + + if (!IsStarted || _isDisposed) return result; + List> properties = new List> { new KeyValuePair("MachineID", Config.MachineID), @@ -33,7 +40,7 @@ namespace Tango.Telemetry var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); // For Streaming/External: initialize pending destinations list (used if publishing fails) - if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) + if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { foreach (var dest in Destinations) { @@ -50,35 +57,86 @@ namespace Tango.Telemetry } } - foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source))) + foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType))) { var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name); if (pendingEntry == null) continue; + Stopwatch destinationWatch = Stopwatch.StartNew(); + + var destinationResult = new TelemetryPublishResult.DestinationResult(); + destinationResult.Destination = destination; + result.DestinationsResults.Add(destinationResult); + // Respect backoff timing if (now < pendingEntry.NextEligibleAttempt) + { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed; + destinationResult.ElapsedTime = destinationWatch.Elapsed; continue; + } try { if (OnPublishingPackage(package, destination)) { - await destination.Publish(package, properties); - OnPackagePublished(package, destination); + if (await destination.IsAvailable()) + { + await destination.Publish(package, properties); + OnPackagePublished(package, destination); - // On success: remove entry from pending list - pendingDestinations.RemoveAll(x => x.Name == destination.Name); + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + + // On success: remove entry from pending list + pendingDestinations.RemoveAll(x => x.Name == destination.Name); + } + else + { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + + // Only track retry state if retry is supported + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) + { + if (pendingEntry == null) + { + pendingEntry = new TelemetryPendingDestination { Name = destination.Name }; + pendingDestinations.Add(pendingEntry); + } + + pendingEntry.RetryCount++; + pendingEntry.LastAttempt = now; + + // Apply exponential backoff + int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); + pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds); + } + else + { + // Remove if not retryable + pendingDestinations.RemoveAll(x => x.Name == destination.Name); + } + } } } catch (Exception ex) { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; + destinationResult.Error = ex; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); OnPackagePublishFailed(package, destination, ex); // Only track retry state if retry is supported - if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage)) + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { if (pendingEntry == null) { @@ -103,7 +161,7 @@ namespace Tango.Telemetry package.PendingTelemetry.PendingDestinations = new List(pendingDestinations); - if (package.Source == TelemetrySource.PendingStorage && !pendingDestinations.Any()) + if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any()) { StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } @@ -111,6 +169,11 @@ namespace Tango.Telemetry { StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } + + totalWatch.Stop(); + result.TotalElapsedTime = totalWatch.Elapsed; + + return result; } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs index 424249f7d..4a3776b87 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs @@ -14,14 +14,14 @@ namespace Tango.Telemetry public String Environment { get; set; } public TimeSpan PendingStorageCheckInterval { get; set; } public int MaxPendingStorageTelemetriesPerCycle { get; set; } - public TimeSpan HistoryModulesRequestInterval { get; set; } + public TimeSpan HistorySourcesRequestInterval { get; set; } public int MaxPendingTelemetries { get; set; } public TelemetryPublisherConfiguration() { PendingStorageCheckInterval = TimeSpan.FromMinutes(1); MaxPendingStorageTelemetriesPerCycle = 100; - HistoryModulesRequestInterval = TimeSpan.FromMinutes(1); + HistorySourcesRequestInterval = TimeSpan.FromMinutes(1); MaxPendingTelemetries = 200; } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetrySource.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetrySource.cs deleted file mode 100644 index 09bbdc539..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetrySource.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Telemetry -{ - public enum TelemetrySource - { - Streaming, - ExternalStorage, - PendingStorage, - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs new file mode 100644 index 000000000..1ed80d5c9 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public enum TelemetrySourceTypes + { + Streaming, + ExternalStorage, + PendingStorage, + } +} -- cgit v1.3.1