diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-29 19:53:35 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-29 19:53:35 +0300 |
| commit | a802fe75f9538371004f1833e69a69b798892d0c (patch) | |
| tree | 9d4612cf4dd6c543650b9ee10599db4b30782391 /Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs | |
| parent | 72c6399ec345ec26bd7f79651667ffa585474919 (diff) | |
| download | Tango-a802fe75f9538371004f1833e69a69b798892d0c.tar.gz Tango-a802fe75f9538371004f1833e69a69b798892d0c.zip | |
Telemetry
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs | 272 |
1 files changed, 150 insertions, 122 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs index 8891a6cc4..1314d3346 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Collections.ObjectModel; using System.Linq; using System.Reflection; using System.Text; @@ -16,64 +17,100 @@ using Tango.Telemetry.TelemetryObjects; namespace Tango.Telemetry { - public class TelemetryPublisher : ExtendedObject, IDisposable + public class TelemetryPublisher : ExtendedObject, ITelemetryPublisher { - public const int MIN_SAMPLING_INTERVAL_SECONDS = 1; - public event EventHandler<TelemetryPackagePublishingEventArgs> PublishingPackage; public event EventHandler<TelemetryPackagePublishedEventArgs> PackagePublished; public event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed; - private System.Timers.Timer _diagnosticsSamplingTimer; private System.Timers.Timer _pendingStorageCheckTimer; + private System.Timers.Timer _historicalDataTimer; + private bool _isDisposed; private Thread _publishThread; - private ProducerConsumerQueue<TelemetryPublishPackage> _publishQueue; - - private List<StartDiagnosticsResponse> _diagnosticsQueue; - private bool _writing; - private bool _emptyWritten; - - private IMachineOperator _machineOperator; + private TelemetryPendingStorageModule _pendingStorageModule; #region Properties - public TelemetryPublisherConfiguration Config { get; private set; } - public ITelemetryPendingStorageManager PendingStorageManager { get; private set; } public bool IsStarted { get; private set; } + public TelemetryPublisherConfiguration Config { get; private set; } + + public ITelemetryStorageManager StorageManager { get; private set; } + + private List<ITelemetryModule> InnerModules { get; } + public ReadOnlyCollection<ITelemetryModule> Modules { get; } + + private List<ITelemetryDestination> InnerDestinations { get; } + public ReadOnlyCollection<ITelemetryDestination> Destinations { get; } + + public ITelemetryQueueManager QueueManager { get; private set; } + #endregion #region Constructor - public TelemetryPublisher(IMachineOperator machineOperator, ITelemetryPendingStorageManager storageManager, TelemetryPublisherConfiguration config) + public TelemetryPublisher(TelemetryPublisherConfiguration config) { - _machineOperator = machineOperator; - PendingStorageManager = storageManager; - Config = config ?? new TelemetryPublisherConfiguration(); - _publishQueue = new ProducerConsumerQueue<TelemetryPublishPackage>(); + _pendingStorageModule = new TelemetryPendingStorageModule(); + + InnerModules = new List<ITelemetryModule>(); + Modules = new ReadOnlyCollection<ITelemetryModule>(InnerModules); + + InnerDestinations = new List<ITelemetryDestination>(); + Destinations = new ReadOnlyCollection<ITelemetryDestination>(InnerDestinations); + _publishThread = new Thread(PublishThreadMethod); _publishThread.IsBackground = true; - _diagnosticsQueue = new List<StartDiagnosticsResponse>(); + StorageManager = new TelemetryLiteDBStorageManager(); + QueueManager = new TelemetryInMemoryQueueManager(); + } - RegisterForEvents(); + public TelemetryPublisher(ITelemetryStorageManager storageManager, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : this(config) + { + StorageManager = storageManager; + QueueManager = queueManager; } #endregion - #region Register / Unregister Events + #region Modules - private void RegisterForEvents() + public void RegisterModule(ITelemetryModule module) { - _machineOperator.DiagnosticsDataAvailable += MachineOperator_DiagnosticsDataAvailable; + if (InnerModules.Exists(x => x.GetType() == module.GetType())) + { + throw new InvalidOperationException($"Module {module.GetType().Name} has already been registered."); + } + + InnerModules.Add(module); + + if (module is ITelemetryStreamingModule streamingModule) + { + streamingModule.TelemetryAvailable += Module_TelemetryAvailable; + + if (IsStarted) + { + streamingModule.Start(); + } + } + } + + private void Module_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e) + { + PushTelemetryPackage(sender as ITelemetryModule, e.TelemetryObject, e.Source); } - private void UnregisterEvents() + #endregion + + #region Destinations + + public void RegisterDestination(ITelemetryDestination destination) { - _machineOperator.DiagnosticsDataAvailable -= MachineOperator_DiagnosticsDataAvailable; + InnerDestinations.Add(destination); } #endregion @@ -88,18 +125,6 @@ namespace Tango.Telemetry IsStarted = true; - if (Config.DiagnosticsSamplingInterval.TotalSeconds < MIN_SAMPLING_INTERVAL_SECONDS) - { - Config.DiagnosticsSamplingInterval = TimeSpan.FromSeconds(MIN_SAMPLING_INTERVAL_SECONDS); - } - - if (_diagnosticsSamplingTimer == null) - { - _diagnosticsSamplingTimer = new System.Timers.Timer(); - _diagnosticsSamplingTimer.Interval = Config.DiagnosticsSamplingInterval.TotalMilliseconds; - _diagnosticsSamplingTimer.Elapsed += DiagnosticsSamplingTimer_Elapsed; - } - if (_pendingStorageCheckTimer == null) { _pendingStorageCheckTimer = new System.Timers.Timer(); @@ -107,13 +132,20 @@ namespace Tango.Telemetry _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed; } - _diagnosticsQueue.Clear(); - - _writing = false; - _diagnosticsSamplingTimer.Start(); _pendingStorageCheckTimer.Start(); + if (_historicalDataTimer == null) + { + _historicalDataTimer = new System.Timers.Timer(); + _historicalDataTimer.Interval = Config.HistoryModulesRequestInterval.TotalMilliseconds; + _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed; + } + + _historicalDataTimer.Start(); + _publishThread.Start(); + + InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Start()); } } @@ -122,103 +154,85 @@ namespace Tango.Telemetry if (IsStarted) { IsStarted = false; - _diagnosticsSamplingTimer.Stop(); + InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Stop()); _pendingStorageCheckTimer.Stop(); - _diagnosticsQueue.Clear(); - _publishQueue.BlockEnqueue(null); + _historicalDataTimer.Stop(); + QueueManager.Enqueue(null); } } #endregion - #region Incoming Data Event Handlers + #region Timers - private void MachineOperator_DiagnosticsDataAvailable(object sender, StartDiagnosticsResponse diagnostics) + private async void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e) { - if (IsStarted && diagnostics.Monitors != null) + _pendingStorageCheckTimer.Stop(); + + var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle); + + foreach (var pendingTelemetry in batch) { - _diagnosticsQueue.Add(diagnostics); - } - } + await PublishTelemetryPackage(new TelemetryPublishPackage() + { + Module = _pendingStorageModule, + PendingTelemetry = pendingTelemetry, + Source = TelemetrySource.PendingStorage + }); - #endregion + if (!IsStarted || _isDisposed) return; + } - #region Timers + _pendingStorageCheckTimer.Start(); + } - private void DiagnosticsSamplingTimer_Elapsed(object sender, ElapsedEventArgs e) + private async void HistoricalDataTimer_Elapsed(object sender, ElapsedEventArgs e) { - if (!IsStarted || _writing) return; + _historicalDataTimer.Stop(); - try + if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries) { - _diagnosticsSamplingTimer.Stop(); - - _writing = true; - - if (_diagnosticsQueue.Count > 0) + foreach (var module in InnerModules.OfType<ITelemetryHistoryModule>().ToList()) { - var queue = _diagnosticsQueue.ToList(); - _diagnosticsQueue.Clear(); - _emptyWritten = false; - - var monitorsAvg = InsightsHelper.AverageMonitors(queue.Select(x => x.Monitors).ToList()); - queue.Clear(); + TelemetryHistoryModuleCheckPoint checkPoint = StorageManager.GetHistoryModuleCheckPoint(module); + if (await module.CanRequestHistory(checkPoint.Time)) + { + var historyTelemetries = (await module.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList(); - TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); - frame.Monitors = monitorsAvg; - frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); + foreach (var telemetry in historyTelemetries) + { + PushTelemetryPackage(module, telemetry, TelemetrySource.ExternalStorage); + checkPoint.Time = telemetry.Time; + checkPoint.TotalCount++; + } - PushTelemetryPackage(frame, TelemetrySource.Streaming); - } - else - { - if (!_emptyWritten) - { - TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); - frame.Monitors = new InsightsMonitors(); - frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); - PushTelemetryPackage(frame, TelemetrySource.Streaming); - _emptyWritten = true; + StorageManager.SetHistoryModuleCheckPoint(module, checkPoint.Time, checkPoint.TotalCount); } } } - catch (Exception ex) - { - LogManager.Log(ex, "Error occurred on insights frame insertion."); - } - finally - { - _writing = false; - _diagnosticsSamplingTimer.Start(); - } - } - - private void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e) - { - _pendingStorageCheckTimer.Stop(); - - var telemetryAll = PendingStorageManager.GetTelemetryAll(); - - foreach (var t in telemetryAll) - { - PushTelemetryPackage(t, TelemetrySource.PendingStorage); - } - _pendingStorageCheckTimer.Start(); + _historicalDataTimer.Start(); } #endregion #region Push - private void PushTelemetryPackage(ITelemetry telemetryObject, TelemetrySource source) + private void PushTelemetryPackage(ITelemetryModule module, ITelemetry telemetry, TelemetrySource source) { - _publishQueue.BlockEnqueue(new TelemetryPublishPackage() { TelemetryObject = telemetryObject, Source = source }); + PendingTelemetry pendingTelemetry = new PendingTelemetry(); + pendingTelemetry.Created = DateTime.UtcNow; + //pendingTelemetry.Expires = module.GetExpiration(); + pendingTelemetry.Module = module.Name; + pendingTelemetry.Source = source; + pendingTelemetry.TelemetryObject = telemetry; + + PushTelemetryPackage(new TelemetryPublishPackage() { Module = module, PendingTelemetry = pendingTelemetry, Source = source }); } private void PushTelemetryPackage(TelemetryPublishPackage package) { - _publishQueue.BlockEnqueue(package); + QueueManager.Enqueue(package); } #endregion @@ -229,10 +243,10 @@ namespace Tango.Telemetry { while (IsStarted) { - TelemetryPublishPackage package = _publishQueue.BlockDequeue(); + TelemetryPublishPackage package = QueueManager.Dequeue(); if (package == null) { - _publishQueue = new ProducerConsumerQueue<TelemetryPublishPackage>(); + QueueManager.Clear(); return; } @@ -249,19 +263,21 @@ namespace Tango.Telemetry protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package) { + if (!IsStarted || _isDisposed) return; + List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>(); properties.Add(new KeyValuePair<string, string>("MachineID", Config.MachineID)); properties.Add(new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName())); properties.Add(new KeyValuePair<string, string>("Environment", Config.Environment)); - List<TelemetryPendingDestination> pendingDestinations = package.TelemetryObject.PendingDestinations.ToList(); + List<TelemetryPendingDestination> pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); //Add all destinations if streaming or external (They will be remove later if successfull) //If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db. if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) { - foreach (var destination in Config.TelemetryDestinations) + foreach (var destination in Destinations) { if (!pendingDestinations.Exists(x => x.Name == destination.Name)) { @@ -270,7 +286,7 @@ namespace Tango.Telemetry } } - foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source))) + foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source))) { if (pendingDestinations.Exists(x => x.Name == destination.Name)) { @@ -300,15 +316,15 @@ namespace Tango.Telemetry } } - package.TelemetryObject.PendingDestinations = pendingDestinations; + package.PendingTelemetry.PendingDestinations = pendingDestinations; - if (package.Source == TelemetrySource.PendingStorage && package.TelemetryObject.PendingDestinations.Count == 0) + if (package.Source == TelemetrySource.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0) { - PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject); + StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } else { - PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject); + StorageManager.UpsertPendingTelemetry(package.PendingTelemetry); } } @@ -354,16 +370,28 @@ namespace Tango.Telemetry public void Dispose() { - UnregisterEvents(); - - if (IsStarted) + if (!_isDisposed) { - Stop(); - } + _isDisposed = true; + foreach (var module in InnerModules) + { + if (module is ITelemetryStreamingModule streamingModule) + { + streamingModule.Stop(); + streamingModule.TelemetryAvailable -= Module_TelemetryAvailable; + } + module.Dispose(); + } - foreach (var destination in Config.TelemetryDestinations) - { - destination.Dispose(); + if (IsStarted) + { + Stop(); + } + + foreach (var destination in Destinations) + { + destination.Dispose(); + } } } |
