aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs272
1 files changed, 150 insertions, 122 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
index 8891a6cc4..1314d3346 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Collections.ObjectModel;
using System.Linq;
using System.Reflection;
using System.Text;
@@ -16,64 +17,100 @@ using Tango.Telemetry.TelemetryObjects;
namespace Tango.Telemetry
{
- public class TelemetryPublisher : ExtendedObject, IDisposable
+ public class TelemetryPublisher : ExtendedObject, ITelemetryPublisher
{
- public const int MIN_SAMPLING_INTERVAL_SECONDS = 1;
-
public event EventHandler<TelemetryPackagePublishingEventArgs> PublishingPackage;
public event EventHandler<TelemetryPackagePublishedEventArgs> PackagePublished;
public event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed;
- private System.Timers.Timer _diagnosticsSamplingTimer;
private System.Timers.Timer _pendingStorageCheckTimer;
+ private System.Timers.Timer _historicalDataTimer;
+ private bool _isDisposed;
private Thread _publishThread;
- private ProducerConsumerQueue<TelemetryPublishPackage> _publishQueue;
-
- private List<StartDiagnosticsResponse> _diagnosticsQueue;
- private bool _writing;
- private bool _emptyWritten;
-
- private IMachineOperator _machineOperator;
+ private TelemetryPendingStorageModule _pendingStorageModule;
#region Properties
- public TelemetryPublisherConfiguration Config { get; private set; }
- public ITelemetryPendingStorageManager PendingStorageManager { get; private set; }
public bool IsStarted { get; private set; }
+ public TelemetryPublisherConfiguration Config { get; private set; }
+
+ public ITelemetryStorageManager StorageManager { get; private set; }
+
+ private List<ITelemetryModule> InnerModules { get; }
+ public ReadOnlyCollection<ITelemetryModule> Modules { get; }
+
+ private List<ITelemetryDestination> InnerDestinations { get; }
+ public ReadOnlyCollection<ITelemetryDestination> Destinations { get; }
+
+ public ITelemetryQueueManager QueueManager { get; private set; }
+
#endregion
#region Constructor
- public TelemetryPublisher(IMachineOperator machineOperator, ITelemetryPendingStorageManager storageManager, TelemetryPublisherConfiguration config)
+ public TelemetryPublisher(TelemetryPublisherConfiguration config)
{
- _machineOperator = machineOperator;
- PendingStorageManager = storageManager;
-
Config = config ?? new TelemetryPublisherConfiguration();
- _publishQueue = new ProducerConsumerQueue<TelemetryPublishPackage>();
+ _pendingStorageModule = new TelemetryPendingStorageModule();
+
+ InnerModules = new List<ITelemetryModule>();
+ Modules = new ReadOnlyCollection<ITelemetryModule>(InnerModules);
+
+ InnerDestinations = new List<ITelemetryDestination>();
+ Destinations = new ReadOnlyCollection<ITelemetryDestination>(InnerDestinations);
+
_publishThread = new Thread(PublishThreadMethod);
_publishThread.IsBackground = true;
- _diagnosticsQueue = new List<StartDiagnosticsResponse>();
+ StorageManager = new TelemetryLiteDBStorageManager();
+ QueueManager = new TelemetryInMemoryQueueManager();
+ }
- RegisterForEvents();
+ public TelemetryPublisher(ITelemetryStorageManager storageManager, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : this(config)
+ {
+ StorageManager = storageManager;
+ QueueManager = queueManager;
}
#endregion
- #region Register / Unregister Events
+ #region Modules
- private void RegisterForEvents()
+ public void RegisterModule(ITelemetryModule module)
{
- _machineOperator.DiagnosticsDataAvailable += MachineOperator_DiagnosticsDataAvailable;
+ if (InnerModules.Exists(x => x.GetType() == module.GetType()))
+ {
+ throw new InvalidOperationException($"Module {module.GetType().Name} has already been registered.");
+ }
+
+ InnerModules.Add(module);
+
+ if (module is ITelemetryStreamingModule streamingModule)
+ {
+ streamingModule.TelemetryAvailable += Module_TelemetryAvailable;
+
+ if (IsStarted)
+ {
+ streamingModule.Start();
+ }
+ }
+ }
+
+ private void Module_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e)
+ {
+ PushTelemetryPackage(sender as ITelemetryModule, e.TelemetryObject, e.Source);
}
- private void UnregisterEvents()
+ #endregion
+
+ #region Destinations
+
+ public void RegisterDestination(ITelemetryDestination destination)
{
- _machineOperator.DiagnosticsDataAvailable -= MachineOperator_DiagnosticsDataAvailable;
+ InnerDestinations.Add(destination);
}
#endregion
@@ -88,18 +125,6 @@ namespace Tango.Telemetry
IsStarted = true;
- if (Config.DiagnosticsSamplingInterval.TotalSeconds < MIN_SAMPLING_INTERVAL_SECONDS)
- {
- Config.DiagnosticsSamplingInterval = TimeSpan.FromSeconds(MIN_SAMPLING_INTERVAL_SECONDS);
- }
-
- if (_diagnosticsSamplingTimer == null)
- {
- _diagnosticsSamplingTimer = new System.Timers.Timer();
- _diagnosticsSamplingTimer.Interval = Config.DiagnosticsSamplingInterval.TotalMilliseconds;
- _diagnosticsSamplingTimer.Elapsed += DiagnosticsSamplingTimer_Elapsed;
- }
-
if (_pendingStorageCheckTimer == null)
{
_pendingStorageCheckTimer = new System.Timers.Timer();
@@ -107,13 +132,20 @@ namespace Tango.Telemetry
_pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed;
}
- _diagnosticsQueue.Clear();
-
- _writing = false;
- _diagnosticsSamplingTimer.Start();
_pendingStorageCheckTimer.Start();
+ if (_historicalDataTimer == null)
+ {
+ _historicalDataTimer = new System.Timers.Timer();
+ _historicalDataTimer.Interval = Config.HistoryModulesRequestInterval.TotalMilliseconds;
+ _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed;
+ }
+
+ _historicalDataTimer.Start();
+
_publishThread.Start();
+
+ InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Start());
}
}
@@ -122,103 +154,85 @@ namespace Tango.Telemetry
if (IsStarted)
{
IsStarted = false;
- _diagnosticsSamplingTimer.Stop();
+ InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Stop());
_pendingStorageCheckTimer.Stop();
- _diagnosticsQueue.Clear();
- _publishQueue.BlockEnqueue(null);
+ _historicalDataTimer.Stop();
+ QueueManager.Enqueue(null);
}
}
#endregion
- #region Incoming Data Event Handlers
+ #region Timers
- private void MachineOperator_DiagnosticsDataAvailable(object sender, StartDiagnosticsResponse diagnostics)
+ private async void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e)
{
- if (IsStarted && diagnostics.Monitors != null)
+ _pendingStorageCheckTimer.Stop();
+
+ var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle);
+
+ foreach (var pendingTelemetry in batch)
{
- _diagnosticsQueue.Add(diagnostics);
- }
- }
+ await PublishTelemetryPackage(new TelemetryPublishPackage()
+ {
+ Module = _pendingStorageModule,
+ PendingTelemetry = pendingTelemetry,
+ Source = TelemetrySource.PendingStorage
+ });
- #endregion
+ if (!IsStarted || _isDisposed) return;
+ }
- #region Timers
+ _pendingStorageCheckTimer.Start();
+ }
- private void DiagnosticsSamplingTimer_Elapsed(object sender, ElapsedEventArgs e)
+ private async void HistoricalDataTimer_Elapsed(object sender, ElapsedEventArgs e)
{
- if (!IsStarted || _writing) return;
+ _historicalDataTimer.Stop();
- try
+ if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries)
{
- _diagnosticsSamplingTimer.Stop();
-
- _writing = true;
-
- if (_diagnosticsQueue.Count > 0)
+ foreach (var module in InnerModules.OfType<ITelemetryHistoryModule>().ToList())
{
- var queue = _diagnosticsQueue.ToList();
- _diagnosticsQueue.Clear();
- _emptyWritten = false;
-
- var monitorsAvg = InsightsHelper.AverageMonitors(queue.Select(x => x.Monitors).ToList());
- queue.Clear();
+ TelemetryHistoryModuleCheckPoint checkPoint = StorageManager.GetHistoryModuleCheckPoint(module);
+ if (await module.CanRequestHistory(checkPoint.Time))
+ {
+ var historyTelemetries = (await module.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList();
- TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame();
- frame.Monitors = monitorsAvg;
- frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval);
+ foreach (var telemetry in historyTelemetries)
+ {
+ PushTelemetryPackage(module, telemetry, TelemetrySource.ExternalStorage);
+ checkPoint.Time = telemetry.Time;
+ checkPoint.TotalCount++;
+ }
- PushTelemetryPackage(frame, TelemetrySource.Streaming);
- }
- else
- {
- if (!_emptyWritten)
- {
- TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame();
- frame.Monitors = new InsightsMonitors();
- frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval);
- PushTelemetryPackage(frame, TelemetrySource.Streaming);
- _emptyWritten = true;
+ StorageManager.SetHistoryModuleCheckPoint(module, checkPoint.Time, checkPoint.TotalCount);
}
}
}
- catch (Exception ex)
- {
- LogManager.Log(ex, "Error occurred on insights frame insertion.");
- }
- finally
- {
- _writing = false;
- _diagnosticsSamplingTimer.Start();
- }
- }
-
- private void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e)
- {
- _pendingStorageCheckTimer.Stop();
-
- var telemetryAll = PendingStorageManager.GetTelemetryAll();
-
- foreach (var t in telemetryAll)
- {
- PushTelemetryPackage(t, TelemetrySource.PendingStorage);
- }
- _pendingStorageCheckTimer.Start();
+ _historicalDataTimer.Start();
}
#endregion
#region Push
- private void PushTelemetryPackage(ITelemetry telemetryObject, TelemetrySource source)
+ private void PushTelemetryPackage(ITelemetryModule module, ITelemetry telemetry, TelemetrySource source)
{
- _publishQueue.BlockEnqueue(new TelemetryPublishPackage() { TelemetryObject = telemetryObject, Source = source });
+ PendingTelemetry pendingTelemetry = new PendingTelemetry();
+ pendingTelemetry.Created = DateTime.UtcNow;
+ //pendingTelemetry.Expires = module.GetExpiration();
+ pendingTelemetry.Module = module.Name;
+ pendingTelemetry.Source = source;
+ pendingTelemetry.TelemetryObject = telemetry;
+
+ PushTelemetryPackage(new TelemetryPublishPackage() { Module = module, PendingTelemetry = pendingTelemetry, Source = source });
}
private void PushTelemetryPackage(TelemetryPublishPackage package)
{
- _publishQueue.BlockEnqueue(package);
+ QueueManager.Enqueue(package);
}
#endregion
@@ -229,10 +243,10 @@ namespace Tango.Telemetry
{
while (IsStarted)
{
- TelemetryPublishPackage package = _publishQueue.BlockDequeue();
+ TelemetryPublishPackage package = QueueManager.Dequeue();
if (package == null)
{
- _publishQueue = new ProducerConsumerQueue<TelemetryPublishPackage>();
+ QueueManager.Clear();
return;
}
@@ -249,19 +263,21 @@ namespace Tango.Telemetry
protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package)
{
+ if (!IsStarted || _isDisposed) return;
+
List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>();
properties.Add(new KeyValuePair<string, string>("MachineID", Config.MachineID));
properties.Add(new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()));
properties.Add(new KeyValuePair<string, string>("Environment", Config.Environment));
- List<TelemetryPendingDestination> pendingDestinations = package.TelemetryObject.PendingDestinations.ToList();
+ List<TelemetryPendingDestination> pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList();
//Add all destinations if streaming or external (They will be remove later if successfull)
//If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db.
if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage)
{
- foreach (var destination in Config.TelemetryDestinations)
+ foreach (var destination in Destinations)
{
if (!pendingDestinations.Exists(x => x.Name == destination.Name))
{
@@ -270,7 +286,7 @@ namespace Tango.Telemetry
}
}
- foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source)))
+ foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source)))
{
if (pendingDestinations.Exists(x => x.Name == destination.Name))
{
@@ -300,15 +316,15 @@ namespace Tango.Telemetry
}
}
- package.TelemetryObject.PendingDestinations = pendingDestinations;
+ package.PendingTelemetry.PendingDestinations = pendingDestinations;
- if (package.Source == TelemetrySource.PendingStorage && package.TelemetryObject.PendingDestinations.Count == 0)
+ if (package.Source == TelemetrySource.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0)
{
- PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject);
+ StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
else
{
- PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject);
+ StorageManager.UpsertPendingTelemetry(package.PendingTelemetry);
}
}
@@ -354,16 +370,28 @@ namespace Tango.Telemetry
public void Dispose()
{
- UnregisterEvents();
-
- if (IsStarted)
+ if (!_isDisposed)
{
- Stop();
- }
+ _isDisposed = true;
+ foreach (var module in InnerModules)
+ {
+ if (module is ITelemetryStreamingModule streamingModule)
+ {
+ streamingModule.Stop();
+ streamingModule.TelemetryAvailable -= Module_TelemetryAvailable;
+ }
+ module.Dispose();
+ }
- foreach (var destination in Config.TelemetryDestinations)
- {
- destination.Dispose();
+ if (IsStarted)
+ {
+ Stop();
+ }
+
+ foreach (var destination in Destinations)
+ {
+ destination.Dispose();
+ }
}
}