aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
diff options
context:
space:
mode:
authorRoy Ben Shabat <Roy.mail.net@gmail.com>2025-07-30 12:36:30 +0300
committerRoy Ben Shabat <Roy.mail.net@gmail.com>2025-07-30 12:36:30 +0300
commit4222eddece906d6f0877022c06b853deb5068472 (patch)
treea29b706b3a5aedb28a42b209d5bb72b0ef94d40e /Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
parenta802fe75f9538371004f1833e69a69b798892d0c (diff)
downloadTango-4222eddece906d6f0877022c06b853deb5068472.tar.gz
Tango-4222eddece906d6f0877022c06b853deb5068472.zip
Telemetry source.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs287
1 files changed, 223 insertions, 64 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
index 1314d3346..99d96edff 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
@@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
+using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Text;
@@ -9,11 +10,13 @@ using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Tango.Core;
+using Tango.Core.ExtensionMethods;
using Tango.Insights;
using Tango.Integration.Operation;
+using Tango.Logging;
using Tango.PMR.Diagnostics;
using Tango.PMR.Insights;
-using Tango.Telemetry.TelemetryObjects;
+using Tango.Telemetry.Telemetries;
namespace Tango.Telemetry
{
@@ -26,9 +29,9 @@ namespace Tango.Telemetry
private System.Timers.Timer _pendingStorageCheckTimer;
private System.Timers.Timer _historicalDataTimer;
- private bool _isDisposed;
+ protected bool _isDisposed;
private Thread _publishThread;
- private TelemetryPendingStorageModule _pendingStorageModule;
+ private TelemetryPendingStorageSource _pendingStorageSource;
#region Properties
@@ -38,8 +41,8 @@ namespace Tango.Telemetry
public ITelemetryStorageManager StorageManager { get; private set; }
- private List<ITelemetryModule> InnerModules { get; }
- public ReadOnlyCollection<ITelemetryModule> Modules { get; }
+ private List<ITelemetrySource> InnerSources { get; }
+ public ReadOnlyCollection<ITelemetrySource> Sources { get; }
private List<ITelemetryDestination> InnerDestinations { get; }
public ReadOnlyCollection<ITelemetryDestination> Destinations { get; }
@@ -54,10 +57,10 @@ namespace Tango.Telemetry
{
Config = config ?? new TelemetryPublisherConfiguration();
- _pendingStorageModule = new TelemetryPendingStorageModule();
+ _pendingStorageSource = new TelemetryPendingStorageSource();
- InnerModules = new List<ITelemetryModule>();
- Modules = new ReadOnlyCollection<ITelemetryModule>(InnerModules);
+ InnerSources = new List<ITelemetrySource>();
+ Sources = new ReadOnlyCollection<ITelemetrySource>(InnerSources);
InnerDestinations = new List<ITelemetryDestination>();
Destinations = new ReadOnlyCollection<ITelemetryDestination>(InnerDestinations);
@@ -77,31 +80,48 @@ namespace Tango.Telemetry
#endregion
- #region Modules
+ #region Sources
- public void RegisterModule(ITelemetryModule module)
+ public void RegisterSource(ITelemetrySource source)
{
- if (InnerModules.Exists(x => x.GetType() == module.GetType()))
+ if (source == null) return;
+
+ if (InnerSources.Exists(x => x.GetType() == source.GetType()))
{
- throw new InvalidOperationException($"Module {module.GetType().Name} has already been registered.");
+ LogManager.Log($"Telemetry source {source.Name} has already been registered. Ignoring.", LogCategory.Warning);
+ return;
}
- InnerModules.Add(module);
+ InnerSources.Add(source);
- if (module is ITelemetryStreamingModule streamingModule)
+ if (source is ITelemetryStreamingSource streamingSource)
{
- streamingModule.TelemetryAvailable += Module_TelemetryAvailable;
+ streamingSource.TelemetryAvailable += StreamingSource_TelemetryAvailable;
if (IsStarted)
{
- streamingModule.Start();
+ try
+ {
+ streamingSource.Start();
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, $"Error starting telemetry source {source.Name}.");
+ }
}
}
+
+ LogManager.Log($"Telemetry source {source.Name} registered.");
}
- private void Module_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e)
+ private void StreamingSource_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e)
{
- PushTelemetryPackage(sender as ITelemetryModule, e.TelemetryObject, e.Source);
+ var source = sender as ITelemetrySource;
+ if (source != null)
+ {
+ LogManager.Log($"Telemetry stream received {source.Name} -> {e.SourceType} -> {e.TelemetryObject.ToTelemetryName()}.", LogCategory.Debug);
+ PushTelemetryPackage(source, e.TelemetryObject, e.SourceType);
+ }
}
#endregion
@@ -110,7 +130,17 @@ namespace Tango.Telemetry
public void RegisterDestination(ITelemetryDestination destination)
{
+ if (destination == null) return;
+
+ if (InnerDestinations.Exists(x => x.Name == destination.Name))
+ {
+ LogManager.Log($"Telemetry destination with name {destination.Name} has already been registered. Ignoring.", LogCategory.Warning);
+ return;
+ }
+
InnerDestinations.Add(destination);
+
+ LogManager.Log($"Telemetry destination {destination.Name} registered.");
}
#endregion
@@ -121,31 +151,45 @@ namespace Tango.Telemetry
{
if (!IsStarted)
{
- Config.Validate();
+ try
+ {
+ LogManager.Log($"Starting telemetry publisher...\nConfig:\n{Config.ToJsonString()}\nSources: {String.Join(", ", Sources.Select(x => x.Name))}\nDestinations: {String.Join(", ", Destinations.Select(x => x.Name))}");
- IsStarted = true;
+ Config.Validate();
+ Validate();
- if (_pendingStorageCheckTimer == null)
- {
- _pendingStorageCheckTimer = new System.Timers.Timer();
- _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds;
- _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed;
- }
+ IsStarted = true;
- _pendingStorageCheckTimer.Start();
+ if (_pendingStorageCheckTimer == null)
+ {
+ _pendingStorageCheckTimer = new System.Timers.Timer();
+ _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds;
+ _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed;
+ }
- if (_historicalDataTimer == null)
- {
- _historicalDataTimer = new System.Timers.Timer();
- _historicalDataTimer.Interval = Config.HistoryModulesRequestInterval.TotalMilliseconds;
- _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed;
- }
+ _pendingStorageCheckTimer.Start();
+
+ if (_historicalDataTimer == null)
+ {
+ _historicalDataTimer = new System.Timers.Timer();
+ _historicalDataTimer.Interval = Config.HistorySourcesRequestInterval.TotalMilliseconds;
+ _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed;
+ }
+
+ _historicalDataTimer.Start();
- _historicalDataTimer.Start();
+ _publishThread.Start();
- _publishThread.Start();
+ InnerSources.OfType<ITelemetryStreamingSource>().ToList().ForEach(x => x.Start());
- InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Start());
+ LogManager.Log($"Telemetry publisher started.");
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Error starting telemetry publisher.");
+ Stop();
+ throw ex;
+ }
}
}
@@ -154,10 +198,63 @@ namespace Tango.Telemetry
if (IsStarted)
{
IsStarted = false;
- InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Stop());
+
+ LogManager.Log("Stopping telemetry publisher...");
+
+ InnerSources.OfType<ITelemetryStreamingSource>().ToList().ForEach(x =>
+ {
+ try
+ {
+ x.Stop();
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, $"Error while trying to stop telemetry source {x.Name}.");
+ }
+ });
_pendingStorageCheckTimer.Stop();
_historicalDataTimer.Stop();
QueueManager.Enqueue(null);
+
+ LogManager.Log("Telemetry publisher stopped.");
+ }
+ }
+
+ public void Validate()
+ {
+ // Validate all registered sources
+ foreach (var source in InnerSources)
+ {
+ if (string.IsNullOrWhiteSpace(source.Name))
+ {
+ throw new ArgumentException("A registered telemetry source has an invalid or missing Name.");
+ }
+ }
+
+ // Validate all registered destinations
+ foreach (var destination in InnerDestinations)
+ {
+ if (string.IsNullOrWhiteSpace(destination.Name))
+ {
+ throw new ArgumentException("A registered telemetry destination has an invalid or missing Name.");
+ }
+
+ if (destination.SupportedSourceTypes == null || !destination.SupportedSourceTypes.Any())
+ {
+ throw new InvalidOperationException($"Telemetry destination '{destination.Name}' must support at least one telemetry source.");
+ }
+ }
+
+ // Validate StorageManager
+ if (StorageManager == null)
+ {
+ throw new NullReferenceException("StorageManager is not configured.");
+ }
+
+ // Validate QueueManager
+ if (QueueManager == null)
+ {
+ throw new NullReferenceException("QueueManager is not configured.");
}
}
@@ -169,20 +266,39 @@ namespace Tango.Telemetry
{
_pendingStorageCheckTimer.Stop();
+ LogManager.Log($"Fetching pending telemetries from storage (MaxCount: {Config.MaxPendingStorageTelemetriesPerCycle})...");
+
var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle);
+ LogManager.Log($"Pending telemetries count is {batch.Count}. Publishing...");
+
+ Dictionary<String, int> destinationsPasses = new Dictionary<string, int>();
+ List<TelemetryPublishResult> results = new List<TelemetryPublishResult>();
+
foreach (var pendingTelemetry in batch)
{
- await PublishTelemetryPackage(new TelemetryPublishPackage()
+ var result = await PublishTelemetryPackage(new TelemetryPublishPackage()
{
- Module = _pendingStorageModule,
+ Source = _pendingStorageSource,
PendingTelemetry = pendingTelemetry,
- Source = TelemetrySource.PendingStorage
+ SourceType = TelemetrySourceTypes.PendingStorage
});
+ results.Add(result);
+
+ foreach (var d in result.DestinationsResults)
+ {
+ if (d.Status == TelemetryPublishResult.DestinationStatus.Passed)
+ {
+ destinationsPasses[d.Destination.Name] += 1;
+ }
+ }
+
if (!IsStarted || _isDisposed) return;
}
+ LogManager.Log($"Publishing pending telemetries completed after {results.Sum(x => x.TotalElapsedTime.Seconds)} seconds. Destination OK Count: {String.Join(", ", destinationsPasses.Select(x => x.Key + " -> " + x.Value))}");
+
_pendingStorageCheckTimer.Start();
}
@@ -190,23 +306,25 @@ namespace Tango.Telemetry
{
_historicalDataTimer.Stop();
+ LogManager.Log("");
+
if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries)
{
- foreach (var module in InnerModules.OfType<ITelemetryHistoryModule>().ToList())
+ foreach (var source in InnerSources.OfType<ITelemetryHistorySource>().ToList())
{
- TelemetryHistoryModuleCheckPoint checkPoint = StorageManager.GetHistoryModuleCheckPoint(module);
- if (await module.CanRequestHistory(checkPoint.Time))
+ TelemetryHistorySourceCheckPoint checkPoint = StorageManager.GetHistorySourceCheckPoint(source);
+ if (await source.CanRequestHistory(checkPoint.Time))
{
- var historyTelemetries = (await module.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList();
+ var historyTelemetries = (await source.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList();
foreach (var telemetry in historyTelemetries)
{
- PushTelemetryPackage(module, telemetry, TelemetrySource.ExternalStorage);
+ PushTelemetryPackage(source, telemetry, TelemetrySourceTypes.ExternalStorage);
checkPoint.Time = telemetry.Time;
checkPoint.TotalCount++;
}
- StorageManager.SetHistoryModuleCheckPoint(module, checkPoint.Time, checkPoint.TotalCount);
+ StorageManager.SetHistorySourceCheckPoint(source, checkPoint.Time, checkPoint.TotalCount);
}
}
}
@@ -218,16 +336,16 @@ namespace Tango.Telemetry
#region Push
- private void PushTelemetryPackage(ITelemetryModule module, ITelemetry telemetry, TelemetrySource source)
+ private void PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType)
{
PendingTelemetry pendingTelemetry = new PendingTelemetry();
pendingTelemetry.Created = DateTime.UtcNow;
- //pendingTelemetry.Expires = module.GetExpiration();
- pendingTelemetry.Module = module.Name;
- pendingTelemetry.Source = source;
+ //pendingTelemetry.Expires = source.GetExpiration();
+ pendingTelemetry.Source = source.Name;
+ pendingTelemetry.SourceType = sourceType;
pendingTelemetry.TelemetryObject = telemetry;
- PushTelemetryPackage(new TelemetryPublishPackage() { Module = module, PendingTelemetry = pendingTelemetry, Source = source });
+ PushTelemetryPackage(new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType });
}
private void PushTelemetryPackage(TelemetryPublishPackage package)
@@ -261,9 +379,13 @@ namespace Tango.Telemetry
}
}
- protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package)
+ protected virtual async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package)
{
- if (!IsStarted || _isDisposed) return;
+ Stopwatch totalWatch = Stopwatch.StartNew();
+
+ var result = new TelemetryPublishResult();
+
+ if (!IsStarted || _isDisposed) return result;
List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>();
@@ -275,7 +397,7 @@ namespace Tango.Telemetry
//Add all destinations if streaming or external (They will be remove later if successfull)
//If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db.
- if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage)
+ if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage)
{
foreach (var destination in Destinations)
{
@@ -286,26 +408,58 @@ namespace Tango.Telemetry
}
}
- foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source)))
+ foreach (var destination in Destinations.Where(x => x.Enable && x.SupportedSourceTypes.Contains(package.SourceType)))
{
if (pendingDestinations.Exists(x => x.Name == destination.Name))
{
+ Stopwatch destinationWatch = Stopwatch.StartNew();
+
+ var destinationResult = new TelemetryPublishResult.DestinationResult();
+ destinationResult.Destination = destination;
+ result.DestinationsResults.Add(destinationResult);
+
try
{
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
if (OnPublishingPackage(package, destination))
{
- await destination.Publish(package, properties);
- OnPackagePublished(package, destination);
+ if (await destination.IsAvailable())
+ {
+ await destination.Publish(package, properties);
+ OnPackagePublished(package, destination);
+
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+ }
+ else
+ {
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
+ if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
+ {
+ if (!pendingDestinations.Exists(x => x.Name == destination.Name))
+ {
+ pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name });
+ }
+ }
+ }
}
}
catch (Exception ex)
{
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed;
+ destinationResult.Error = ex;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}.");
OnPackagePublishFailed(package, destination, ex);
- if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage))
+ if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
if (!pendingDestinations.Exists(x => x.Name == destination.Name))
{
@@ -318,7 +472,7 @@ namespace Tango.Telemetry
package.PendingTelemetry.PendingDestinations = pendingDestinations;
- if (package.Source == TelemetrySource.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0)
+ if (package.SourceType == TelemetrySourceTypes.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0)
{
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
@@ -326,6 +480,11 @@ namespace Tango.Telemetry
{
StorageManager.UpsertPendingTelemetry(package.PendingTelemetry);
}
+
+ totalWatch.Stop();
+ result.TotalElapsedTime = totalWatch.Elapsed;
+
+ return result;
}
#endregion
@@ -373,14 +532,14 @@ namespace Tango.Telemetry
if (!_isDisposed)
{
_isDisposed = true;
- foreach (var module in InnerModules)
+ foreach (var source in InnerSources)
{
- if (module is ITelemetryStreamingModule streamingModule)
+ if (source is ITelemetryStreamingSource streamingSource)
{
- streamingModule.Stop();
- streamingModule.TelemetryAvailable -= Module_TelemetryAvailable;
+ streamingSource.Stop();
+ streamingSource.TelemetryAvailable -= StreamingSource_TelemetryAvailable;
}
- module.Dispose();
+ source.Dispose();
}
if (IsStarted)