diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-30 12:36:30 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-30 12:36:30 +0300 |
| commit | 4222eddece906d6f0877022c06b853deb5068472 (patch) | |
| tree | a29b706b3a5aedb28a42b209d5bb72b0ef94d40e /Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs | |
| parent | a802fe75f9538371004f1833e69a69b798892d0c (diff) | |
| download | Tango-4222eddece906d6f0877022c06b853deb5068472.tar.gz Tango-4222eddece906d6f0877022c06b853deb5068472.zip | |
Telemetry source.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs | 287 |
1 files changed, 223 insertions, 64 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs index 1314d3346..99d96edff 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; +using System.Diagnostics; using System.Linq; using System.Reflection; using System.Text; @@ -9,11 +10,13 @@ using System.Threading; using System.Threading.Tasks; using System.Timers; using Tango.Core; +using Tango.Core.ExtensionMethods; using Tango.Insights; using Tango.Integration.Operation; +using Tango.Logging; using Tango.PMR.Diagnostics; using Tango.PMR.Insights; -using Tango.Telemetry.TelemetryObjects; +using Tango.Telemetry.Telemetries; namespace Tango.Telemetry { @@ -26,9 +29,9 @@ namespace Tango.Telemetry private System.Timers.Timer _pendingStorageCheckTimer; private System.Timers.Timer _historicalDataTimer; - private bool _isDisposed; + protected bool _isDisposed; private Thread _publishThread; - private TelemetryPendingStorageModule _pendingStorageModule; + private TelemetryPendingStorageSource _pendingStorageSource; #region Properties @@ -38,8 +41,8 @@ namespace Tango.Telemetry public ITelemetryStorageManager StorageManager { get; private set; } - private List<ITelemetryModule> InnerModules { get; } - public ReadOnlyCollection<ITelemetryModule> Modules { get; } + private List<ITelemetrySource> InnerSources { get; } + public ReadOnlyCollection<ITelemetrySource> Sources { get; } private List<ITelemetryDestination> InnerDestinations { get; } public ReadOnlyCollection<ITelemetryDestination> Destinations { get; } @@ -54,10 +57,10 @@ namespace Tango.Telemetry { Config = config ?? new TelemetryPublisherConfiguration(); - _pendingStorageModule = new TelemetryPendingStorageModule(); + _pendingStorageSource = new TelemetryPendingStorageSource(); - InnerModules = new List<ITelemetryModule>(); - Modules = new ReadOnlyCollection<ITelemetryModule>(InnerModules); + InnerSources = new List<ITelemetrySource>(); + Sources = new ReadOnlyCollection<ITelemetrySource>(InnerSources); InnerDestinations = new List<ITelemetryDestination>(); Destinations = new ReadOnlyCollection<ITelemetryDestination>(InnerDestinations); @@ -77,31 +80,48 @@ namespace Tango.Telemetry #endregion - #region Modules + #region Sources - public void RegisterModule(ITelemetryModule module) + public void RegisterSource(ITelemetrySource source) { - if (InnerModules.Exists(x => x.GetType() == module.GetType())) + if (source == null) return; + + if (InnerSources.Exists(x => x.GetType() == source.GetType())) { - throw new InvalidOperationException($"Module {module.GetType().Name} has already been registered."); + LogManager.Log($"Telemetry source {source.Name} has already been registered. Ignoring.", LogCategory.Warning); + return; } - InnerModules.Add(module); + InnerSources.Add(source); - if (module is ITelemetryStreamingModule streamingModule) + if (source is ITelemetryStreamingSource streamingSource) { - streamingModule.TelemetryAvailable += Module_TelemetryAvailable; + streamingSource.TelemetryAvailable += StreamingSource_TelemetryAvailable; if (IsStarted) { - streamingModule.Start(); + try + { + streamingSource.Start(); + } + catch (Exception ex) + { + LogManager.Log(ex, $"Error starting telemetry source {source.Name}."); + } } } + + LogManager.Log($"Telemetry source {source.Name} registered."); } - private void Module_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e) + private void StreamingSource_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e) { - PushTelemetryPackage(sender as ITelemetryModule, e.TelemetryObject, e.Source); + var source = sender as ITelemetrySource; + if (source != null) + { + LogManager.Log($"Telemetry stream received {source.Name} -> {e.SourceType} -> {e.TelemetryObject.ToTelemetryName()}.", LogCategory.Debug); + PushTelemetryPackage(source, e.TelemetryObject, e.SourceType); + } } #endregion @@ -110,7 +130,17 @@ namespace Tango.Telemetry public void RegisterDestination(ITelemetryDestination destination) { + if (destination == null) return; + + if (InnerDestinations.Exists(x => x.Name == destination.Name)) + { + LogManager.Log($"Telemetry destination with name {destination.Name} has already been registered. Ignoring.", LogCategory.Warning); + return; + } + InnerDestinations.Add(destination); + + LogManager.Log($"Telemetry destination {destination.Name} registered."); } #endregion @@ -121,31 +151,45 @@ namespace Tango.Telemetry { if (!IsStarted) { - Config.Validate(); + try + { + LogManager.Log($"Starting telemetry publisher...\nConfig:\n{Config.ToJsonString()}\nSources: {String.Join(", ", Sources.Select(x => x.Name))}\nDestinations: {String.Join(", ", Destinations.Select(x => x.Name))}"); - IsStarted = true; + Config.Validate(); + Validate(); - if (_pendingStorageCheckTimer == null) - { - _pendingStorageCheckTimer = new System.Timers.Timer(); - _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds; - _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed; - } + IsStarted = true; - _pendingStorageCheckTimer.Start(); + if (_pendingStorageCheckTimer == null) + { + _pendingStorageCheckTimer = new System.Timers.Timer(); + _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds; + _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed; + } - if (_historicalDataTimer == null) - { - _historicalDataTimer = new System.Timers.Timer(); - _historicalDataTimer.Interval = Config.HistoryModulesRequestInterval.TotalMilliseconds; - _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed; - } + _pendingStorageCheckTimer.Start(); + + if (_historicalDataTimer == null) + { + _historicalDataTimer = new System.Timers.Timer(); + _historicalDataTimer.Interval = Config.HistorySourcesRequestInterval.TotalMilliseconds; + _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed; + } + + _historicalDataTimer.Start(); - _historicalDataTimer.Start(); + _publishThread.Start(); - _publishThread.Start(); + InnerSources.OfType<ITelemetryStreamingSource>().ToList().ForEach(x => x.Start()); - InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Start()); + LogManager.Log($"Telemetry publisher started."); + } + catch (Exception ex) + { + LogManager.Log(ex, "Error starting telemetry publisher."); + Stop(); + throw ex; + } } } @@ -154,10 +198,63 @@ namespace Tango.Telemetry if (IsStarted) { IsStarted = false; - InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Stop()); + + LogManager.Log("Stopping telemetry publisher..."); + + InnerSources.OfType<ITelemetryStreamingSource>().ToList().ForEach(x => + { + try + { + x.Stop(); + } + catch (Exception ex) + { + LogManager.Log(ex, $"Error while trying to stop telemetry source {x.Name}."); + } + }); _pendingStorageCheckTimer.Stop(); _historicalDataTimer.Stop(); QueueManager.Enqueue(null); + + LogManager.Log("Telemetry publisher stopped."); + } + } + + public void Validate() + { + // Validate all registered sources + foreach (var source in InnerSources) + { + if (string.IsNullOrWhiteSpace(source.Name)) + { + throw new ArgumentException("A registered telemetry source has an invalid or missing Name."); + } + } + + // Validate all registered destinations + foreach (var destination in InnerDestinations) + { + if (string.IsNullOrWhiteSpace(destination.Name)) + { + throw new ArgumentException("A registered telemetry destination has an invalid or missing Name."); + } + + if (destination.SupportedSourceTypes == null || !destination.SupportedSourceTypes.Any()) + { + throw new InvalidOperationException($"Telemetry destination '{destination.Name}' must support at least one telemetry source."); + } + } + + // Validate StorageManager + if (StorageManager == null) + { + throw new NullReferenceException("StorageManager is not configured."); + } + + // Validate QueueManager + if (QueueManager == null) + { + throw new NullReferenceException("QueueManager is not configured."); } } @@ -169,20 +266,39 @@ namespace Tango.Telemetry { _pendingStorageCheckTimer.Stop(); + LogManager.Log($"Fetching pending telemetries from storage (MaxCount: {Config.MaxPendingStorageTelemetriesPerCycle})..."); + var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle); + LogManager.Log($"Pending telemetries count is {batch.Count}. Publishing..."); + + Dictionary<String, int> destinationsPasses = new Dictionary<string, int>(); + List<TelemetryPublishResult> results = new List<TelemetryPublishResult>(); + foreach (var pendingTelemetry in batch) { - await PublishTelemetryPackage(new TelemetryPublishPackage() + var result = await PublishTelemetryPackage(new TelemetryPublishPackage() { - Module = _pendingStorageModule, + Source = _pendingStorageSource, PendingTelemetry = pendingTelemetry, - Source = TelemetrySource.PendingStorage + SourceType = TelemetrySourceTypes.PendingStorage }); + results.Add(result); + + foreach (var d in result.DestinationsResults) + { + if (d.Status == TelemetryPublishResult.DestinationStatus.Passed) + { + destinationsPasses[d.Destination.Name] += 1; + } + } + if (!IsStarted || _isDisposed) return; } + LogManager.Log($"Publishing pending telemetries completed after {results.Sum(x => x.TotalElapsedTime.Seconds)} seconds. Destination OK Count: {String.Join(", ", destinationsPasses.Select(x => x.Key + " -> " + x.Value))}"); + _pendingStorageCheckTimer.Start(); } @@ -190,23 +306,25 @@ namespace Tango.Telemetry { _historicalDataTimer.Stop(); + LogManager.Log(""); + if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries) { - foreach (var module in InnerModules.OfType<ITelemetryHistoryModule>().ToList()) + foreach (var source in InnerSources.OfType<ITelemetryHistorySource>().ToList()) { - TelemetryHistoryModuleCheckPoint checkPoint = StorageManager.GetHistoryModuleCheckPoint(module); - if (await module.CanRequestHistory(checkPoint.Time)) + TelemetryHistorySourceCheckPoint checkPoint = StorageManager.GetHistorySourceCheckPoint(source); + if (await source.CanRequestHistory(checkPoint.Time)) { - var historyTelemetries = (await module.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList(); + var historyTelemetries = (await source.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList(); foreach (var telemetry in historyTelemetries) { - PushTelemetryPackage(module, telemetry, TelemetrySource.ExternalStorage); + PushTelemetryPackage(source, telemetry, TelemetrySourceTypes.ExternalStorage); checkPoint.Time = telemetry.Time; checkPoint.TotalCount++; } - StorageManager.SetHistoryModuleCheckPoint(module, checkPoint.Time, checkPoint.TotalCount); + StorageManager.SetHistorySourceCheckPoint(source, checkPoint.Time, checkPoint.TotalCount); } } } @@ -218,16 +336,16 @@ namespace Tango.Telemetry #region Push - private void PushTelemetryPackage(ITelemetryModule module, ITelemetry telemetry, TelemetrySource source) + private void PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType) { PendingTelemetry pendingTelemetry = new PendingTelemetry(); pendingTelemetry.Created = DateTime.UtcNow; - //pendingTelemetry.Expires = module.GetExpiration(); - pendingTelemetry.Module = module.Name; - pendingTelemetry.Source = source; + //pendingTelemetry.Expires = source.GetExpiration(); + pendingTelemetry.Source = source.Name; + pendingTelemetry.SourceType = sourceType; pendingTelemetry.TelemetryObject = telemetry; - PushTelemetryPackage(new TelemetryPublishPackage() { Module = module, PendingTelemetry = pendingTelemetry, Source = source }); + PushTelemetryPackage(new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType }); } private void PushTelemetryPackage(TelemetryPublishPackage package) @@ -261,9 +379,13 @@ namespace Tango.Telemetry } } - protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package) + protected virtual async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package) { - if (!IsStarted || _isDisposed) return; + Stopwatch totalWatch = Stopwatch.StartNew(); + + var result = new TelemetryPublishResult(); + + if (!IsStarted || _isDisposed) return result; List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>(); @@ -275,7 +397,7 @@ namespace Tango.Telemetry //Add all destinations if streaming or external (They will be remove later if successfull) //If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db. - if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) + if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { foreach (var destination in Destinations) { @@ -286,26 +408,58 @@ namespace Tango.Telemetry } } - foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source))) + foreach (var destination in Destinations.Where(x => x.Enable && x.SupportedSourceTypes.Contains(package.SourceType))) { if (pendingDestinations.Exists(x => x.Name == destination.Name)) { + Stopwatch destinationWatch = Stopwatch.StartNew(); + + var destinationResult = new TelemetryPublishResult.DestinationResult(); + destinationResult.Destination = destination; + result.DestinationsResults.Add(destinationResult); + try { pendingDestinations.RemoveAll(x => x.Name == destination.Name); if (OnPublishingPackage(package, destination)) { - await destination.Publish(package, properties); - OnPackagePublished(package, destination); + if (await destination.IsAvailable()) + { + await destination.Publish(package, properties); + OnPackagePublished(package, destination); + + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + } + else + { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) + { + if (!pendingDestinations.Exists(x => x.Name == destination.Name)) + { + pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name }); + } + } + } } } catch (Exception ex) { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; + destinationResult.Error = ex; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); OnPackagePublishFailed(package, destination, ex); - if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage)) + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { if (!pendingDestinations.Exists(x => x.Name == destination.Name)) { @@ -318,7 +472,7 @@ namespace Tango.Telemetry package.PendingTelemetry.PendingDestinations = pendingDestinations; - if (package.Source == TelemetrySource.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0) + if (package.SourceType == TelemetrySourceTypes.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0) { StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } @@ -326,6 +480,11 @@ namespace Tango.Telemetry { StorageManager.UpsertPendingTelemetry(package.PendingTelemetry); } + + totalWatch.Stop(); + result.TotalElapsedTime = totalWatch.Elapsed; + + return result; } #endregion @@ -373,14 +532,14 @@ namespace Tango.Telemetry if (!_isDisposed) { _isDisposed = true; - foreach (var module in InnerModules) + foreach (var source in InnerSources) { - if (module is ITelemetryStreamingModule streamingModule) + if (source is ITelemetryStreamingSource streamingSource) { - streamingModule.Stop(); - streamingModule.TelemetryAvailable -= Module_TelemetryAvailable; + streamingSource.Stop(); + streamingSource.TelemetryAvailable -= StreamingSource_TelemetryAvailable; } - module.Dispose(); + source.Dispose(); } if (IsStarted) |
