aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio
diff options
context:
space:
mode:
authorRoy Ben Shabat <Roy.mail.net@gmail.com>2025-07-29 19:53:35 +0300
committerRoy Ben Shabat <Roy.mail.net@gmail.com>2025-07-29 19:53:35 +0300
commita802fe75f9538371004f1833e69a69b798892d0c (patch)
tree9d4612cf4dd6c543650b9ee10599db4b30782391 /Software/Visual_Studio
parent72c6399ec345ec26bd7f79651667ffa585474919 (diff)
downloadTango-a802fe75f9538371004f1833e69a69b798892d0c.tar.gz
Tango-a802fe75f9538371004f1833e69a69b798892d0c.zip
Telemetry
Diffstat (limited to 'Software/Visual_Studio')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs2
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ExtensionMethods/ITelemetryExtensions.cs2
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs3
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs14
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs13
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs15
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs25
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryQueueManager.cs16
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs18
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs15
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs2
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs134
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs19
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs61
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs19
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs26
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj34
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs20
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs11
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs20
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs17
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryInMemoryQueueManager.cs36
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs113
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs80
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs18
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs5
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs272
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs14
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs26
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/packages.config1
30 files changed, 784 insertions, 267 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
index 08412446c..b22849f3c 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
@@ -78,7 +78,7 @@ namespace Tango.Telemetry.Destinations
if (await EnsureConnection())
{
var message = new MqttApplicationMessageBuilder()
- .WithTopic($"{Topic}/{package.TelemetryObject.TelemetryName()}")
+ .WithTopic($"{Topic}/{package.PendingTelemetry.TelemetryObject.ToTelemetryName()}")
.WithPayload(package.ToPayload())
.WithExactlyOnceQoS()
.WithRetainFlag(false)
diff --git a/Software/Visual_Studio/Tango.Telemetry/ExtensionMethods/ITelemetryExtensions.cs b/Software/Visual_Studio/Tango.Telemetry/ExtensionMethods/ITelemetryExtensions.cs
index 6130f332e..ba016ed72 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ExtensionMethods/ITelemetryExtensions.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ExtensionMethods/ITelemetryExtensions.cs
@@ -7,7 +7,7 @@ using Tango.Telemetry;
public static class ITelemetryObjectExtensions
{
- public static String TelemetryName(this ITelemetry obj)
+ public static String ToTelemetryName(this ITelemetry obj)
{
var att = obj.GetType().GetCustomAttributes(typeof(TelemetryNameAttribute), false).Cast<TelemetryNameAttribute>().FirstOrDefault();
return att?.Name ?? obj.GetType().Name;
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs
index 7059a3c75..67bb8f5d2 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs
@@ -10,10 +10,7 @@ namespace Tango.Telemetry
{
public interface ITelemetry
{
- [BsonId(true)]
- int Id { get; set; }
DateTime Time { get; set; }
- List<TelemetryPendingDestination> PendingDestinations { get; set; }
String ToJson(Formatting format = Formatting.None, bool flatten = true);
byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true);
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs
new file mode 100644
index 000000000..314b99046
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs
@@ -0,0 +1,14 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public interface ITelemetryHistoryModule : ITelemetryModule
+ {
+ Task<bool> CanRequestHistory(DateTime from);
+ Task<IEnumerable<ITelemetry>> RequestHistory(DateTime from);
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs
new file mode 100644
index 000000000..a27a03fc8
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs
@@ -0,0 +1,13 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public interface ITelemetryModule : IDisposable
+ {
+ String Name { get; }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs
deleted file mode 100644
index 139600af0..000000000
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs
+++ /dev/null
@@ -1,15 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Tango.Telemetry
-{
- public interface ITelemetryPendingStorageManager
- {
- void InsertOrUpdateTelemetryObject<T>(T telemetry) where T : ITelemetry;
- void DeleteTelemetryObject<T>(T telemetry) where T : ITelemetry;
- List<ITelemetry> GetTelemetryAll();
- }
-}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs
new file mode 100644
index 000000000..0c63dc906
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs
@@ -0,0 +1,25 @@
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public interface ITelemetryPublisher : IDisposable
+ {
+ event EventHandler<TelemetryPackagePublishingEventArgs> PublishingPackage;
+ event EventHandler<TelemetryPackagePublishedEventArgs> PackagePublished;
+ event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed;
+ ITelemetryStorageManager StorageManager { get; }
+ ITelemetryQueueManager QueueManager { get; }
+ ReadOnlyCollection<ITelemetryModule> Modules { get; }
+ void RegisterModule(ITelemetryModule module);
+ ReadOnlyCollection<ITelemetryDestination> Destinations { get; }
+ void RegisterDestination(ITelemetryDestination destination);
+ bool IsStarted { get; }
+ void Start();
+ void Stop();
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryQueueManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryQueueManager.cs
new file mode 100644
index 000000000..bb06bdd54
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryQueueManager.cs
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public interface ITelemetryQueueManager
+ {
+ void Enqueue(TelemetryPublishPackage package);
+ TelemetryPublishPackage Dequeue();
+ int Count { get; }
+ void Clear();
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs
new file mode 100644
index 000000000..2c54a2a51
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public interface ITelemetryStorageManager
+ {
+ void UpsertPendingTelemetry(PendingTelemetry pendingTelemetry);
+ void DeletePendingTelemetry(PendingTelemetry pendingTelemetry);
+ List<PendingTelemetry> GetPendingTelemetries(int maxCount);
+ int GetPendingTelemetriesCount();
+ TelemetryHistoryModuleCheckPoint GetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule);
+ void SetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule, DateTime time, int totalCount);
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs
new file mode 100644
index 000000000..12b576b1b
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public interface ITelemetryStreamingModule : ITelemetryModule
+ {
+ event EventHandler<TelemetryAvailableEventArgs> TelemetryAvailable;
+ void Start();
+ void Stop();
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs b/Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs
index 230a4612c..6658dcd9f 100644
--- a/Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs
@@ -7,7 +7,7 @@ using System.Reflection;
namespace Tango.Telemetry
{
- public static class JsonFlattener
+ internal static class JsonFlattener
{
public static string FlattenObjectToFlatJson(object obj, Formatting format)
{
diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs
new file mode 100644
index 000000000..a35e08328
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs
@@ -0,0 +1,134 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.Timers;
+using Tango.Core;
+using Tango.Insights;
+using Tango.Integration.Operation;
+using Tango.Logging;
+using Tango.PMR.Diagnostics;
+using Tango.PMR.Insights;
+using Tango.Telemetry.TelemetryObjects;
+
+namespace Tango.Telemetry.Modules
+{
+ public class TelemetryDiagnosticsModule : TelemetryConfigurableModule<TelemetryDiagnosticsModuleConfig>, ITelemetryModule
+ {
+ public const int MIN_SAMPLING_INTERVAL_SECONDS = 1;
+
+ private IMachineOperator _machineOperator;
+ private bool _isStarted;
+
+ private Timer _diagnosticsSamplingTimer;
+ private List<StartDiagnosticsResponse> _diagnosticsQueue;
+ private bool _writing;
+ private bool _emptyWritten;
+
+ public event EventHandler<TelemetryAvailableEventArgs> TelemetryAvailable;
+
+ public string Name { get; private set; } = "Diagnostics";
+
+ private TelemetryDiagnosticsModule() : base()
+ {
+ _diagnosticsQueue = new List<StartDiagnosticsResponse>();
+ }
+
+ public TelemetryDiagnosticsModule(IMachineOperator machineOperator) : base()
+ {
+ _machineOperator = machineOperator;
+ _machineOperator.DiagnosticsDataAvailable += DiagnosticsDataAvailable;
+ }
+
+ private void DiagnosticsDataAvailable(object sender, StartDiagnosticsResponse diagnostics)
+ {
+ if (_isStarted && diagnostics.Monitors != null)
+ {
+ _diagnosticsQueue.Add(diagnostics);
+ }
+ }
+
+ public void Start()
+ {
+ _isStarted = true;
+
+ if (Config.DiagnosticsSamplingInterval.TotalSeconds < MIN_SAMPLING_INTERVAL_SECONDS)
+ {
+ Config.DiagnosticsSamplingInterval = TimeSpan.FromSeconds(MIN_SAMPLING_INTERVAL_SECONDS);
+ }
+
+ if (_diagnosticsSamplingTimer == null)
+ {
+ _diagnosticsSamplingTimer = new Timer();
+ _diagnosticsSamplingTimer.Interval = Config.DiagnosticsSamplingInterval.TotalMilliseconds;
+ _diagnosticsSamplingTimer.Elapsed += DiagnosticsSamplingTimer_Elapsed;
+ }
+
+
+ _diagnosticsQueue.Clear();
+
+ _writing = false;
+ _diagnosticsSamplingTimer.Start();
+ }
+
+ public void Stop()
+ {
+ _isStarted = false;
+ }
+
+ private void DiagnosticsSamplingTimer_Elapsed(object sender, ElapsedEventArgs e)
+ {
+ if (!_isStarted || _writing) return;
+
+ try
+ {
+ _diagnosticsSamplingTimer.Stop();
+
+ _writing = true;
+
+ if (_diagnosticsQueue.Count > 0)
+ {
+ var queue = _diagnosticsQueue.ToList();
+ _diagnosticsQueue.Clear();
+ _emptyWritten = false;
+
+ var monitorsAvg = InsightsHelper.AverageMonitors(queue.Select(x => x.Monitors).ToList());
+ queue.Clear();
+
+ TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame();
+ frame.Monitors = monitorsAvg;
+ frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval);
+
+
+ TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySource.Streaming));
+ }
+ else
+ {
+ if (!_emptyWritten)
+ {
+ TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame();
+ frame.Monitors = new InsightsMonitors();
+ frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval);
+ TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySource.Streaming));
+ _emptyWritten = true;
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Error occurred on insights frame insertion.");
+ }
+ finally
+ {
+ _writing = false;
+ _diagnosticsSamplingTimer.Start();
+ }
+ }
+
+ public void Dispose()
+ {
+ _machineOperator.DiagnosticsDataAvailable -= DiagnosticsDataAvailable;
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs
new file mode 100644
index 000000000..8d13b3cca
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs
@@ -0,0 +1,19 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Settings;
+
+namespace Tango.Telemetry.Modules
+{
+ public class TelemetryDiagnosticsModuleConfig : SettingsBase
+ {
+ public TimeSpan DiagnosticsSamplingInterval { get; set; }
+
+ public TelemetryDiagnosticsModuleConfig()
+ {
+ DiagnosticsSamplingInterval = TimeSpan.FromSeconds(10);
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs
new file mode 100644
index 000000000..0ba2935c1
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs
@@ -0,0 +1,61 @@
+using System;
+using System.Collections.Generic;
+using System.Data.Entity;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.BL;
+using Tango.Telemetry.TelemetryObjects;
+
+namespace Tango.Telemetry.Modules
+{
+ public class TelemetryJobRunsHistoryModule : TelemetryConfigurableModule<TelemetryJobRunsHistoryModuleConfig>, ITelemetryHistoryModule
+ {
+ private bool _isBusy;
+
+ public string Name { get; private set; } = "JobRuns History";
+
+ public Task<bool> CanRequestHistory(DateTime from)
+ {
+ return Task.FromResult(!_isBusy);
+ }
+
+ public async Task<IEnumerable<ITelemetry>> RequestHistory(DateTime from)
+ {
+ try
+ {
+ _isBusy = true;
+
+ using (ObservablesContext db = ObservablesContext.CreateDefault())
+ {
+ var runs = await db.JobRuns
+ .Where(x => x.LastUpdated > from)
+ .OrderBy(x => x.LastUpdated)
+ .Take(Config.MaxJobRunsPerRequest)
+ .ToListAsync();
+
+ List<TelemetryJobRun> tRuns = new List<TelemetryJobRun>();
+
+ foreach (var run in runs)
+ {
+ TelemetryJobRun tRun = new TelemetryJobRun();
+ tRun.Time = run.LastUpdated;
+ //Fill the object..
+ tRuns.Add(tRun);
+ }
+
+ return tRuns;
+ }
+ }
+ finally
+ {
+ _isBusy = false;
+ }
+ }
+
+ public void Dispose()
+ {
+
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs
new file mode 100644
index 000000000..412dea5f9
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs
@@ -0,0 +1,19 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Settings;
+
+namespace Tango.Telemetry.Modules
+{
+ public class TelemetryJobRunsHistoryModuleConfig : SettingsBase
+ {
+ public int MaxJobRunsPerRequest { get; set; }
+
+ public TelemetryJobRunsHistoryModuleConfig()
+ {
+ MaxJobRunsPerRequest = 100;
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs
new file mode 100644
index 000000000..2be3b1c45
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs
@@ -0,0 +1,26 @@
+using LiteDB;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public class PendingTelemetry
+ {
+ [BsonId(true)]
+ public int Id { get; set; }
+ public DateTime Created { get; set; }
+ public DateTime Expires { get; set; }
+ public ITelemetry TelemetryObject { get; set; }
+ public String Module { get; set; }
+ public TelemetrySource Source { get; set; }
+ public List<TelemetryPendingDestination> PendingDestinations { get; set; }
+
+ public PendingTelemetry()
+ {
+ PendingDestinations = new List<TelemetryPendingDestination>();
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
index 98df17cb8..27ac76783 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
+++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
@@ -49,6 +49,12 @@
<Reference Include="DotNetty.Transport, Version=0.6.0.0, Culture=neutral, PublicKeyToken=bc13ca065fa06c29, processorArchitecture=MSIL">
<HintPath>..\packages\DotNetty.Transport.0.6.0\lib\net45\DotNetty.Transport.dll</HintPath>
</Reference>
+ <Reference Include="EntityFramework, Version=6.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL">
+ <HintPath>..\packages\EntityFramework.6.2.0\lib\net45\EntityFramework.dll</HintPath>
+ </Reference>
+ <Reference Include="EntityFramework.SqlServer, Version=6.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL">
+ <HintPath>..\packages\EntityFramework.6.2.0\lib\net45\EntityFramework.SqlServer.dll</HintPath>
+ </Reference>
<Reference Include="LiteDB, Version=5.0.4.0, Culture=neutral, PublicKeyToken=4ee40123013c9f27, processorArchitecture=MSIL">
<HintPath>..\packages\LiteDB.5.0.4\lib\net45\LiteDB.dll</HintPath>
</Reference>
@@ -99,6 +105,7 @@
<HintPath>..\packages\System.Collections.Immutable.1.5.0\lib\netstandard2.0\System.Collections.Immutable.dll</HintPath>
</Reference>
<Reference Include="System.ComponentModel.Composition" />
+ <Reference Include="System.ComponentModel.DataAnnotations" />
<Reference Include="System.Configuration" />
<Reference Include="System.Console, Version=4.0.1.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<HintPath>..\packages\System.Console.4.3.0\lib\net46\System.Console.dll</HintPath>
@@ -200,11 +207,26 @@
<Compile Include="ExtensionMethods\ITelemetryExtensions.cs" />
<Compile Include="ITelemetryDestination.cs" />
<Compile Include="Destinations\MqttTelemetryDestination.cs" />
- <Compile Include="ITelemetryPendingStorageManager.cs" />
+ <Compile Include="ITelemetryHistoryModule.cs" />
+ <Compile Include="ITelemetryModule.cs" />
+ <Compile Include="ITelemetryQueueManager.cs" />
+ <Compile Include="ITelemetryStorageManager.cs" />
+ <Compile Include="ITelemetryPublisher.cs" />
+ <Compile Include="ITelemetryStreamingModule.cs" />
+ <Compile Include="Modules\TelemetryDiagnosticsModule.cs" />
+ <Compile Include="Modules\TelemetryDiagnosticsModuleConfig.cs" />
+ <Compile Include="Modules\TelemetryJobRunsHistoryModule.cs" />
+ <Compile Include="Modules\TelemetryJobRunsHistoryModuleConfig.cs" />
+ <Compile Include="PendingTelemetry.cs" />
+ <Compile Include="TelemetryConfigurableModule.cs" />
+ <Compile Include="TelemetryHistoryModuleCheckPoint.cs" />
<Compile Include="TelemetryNameAttribute.cs" />
+ <Compile Include="TelemetryAvailableEventArgs.cs" />
<Compile Include="TelemetryPackagePublishedEventArgs.cs" />
<Compile Include="TelemetryPackagePublishFailedEventArgs.cs" />
<Compile Include="TelemetryPendingDestination.cs" />
+ <Compile Include="TelemetryPendingStorageModule.cs" />
+ <Compile Include="TelemetryInMemoryQueueManager.cs" />
<Compile Include="TelemetryPublisherAdvanced.cs" />
<Compile Include="TelemetryPublisherEventArgs.cs" />
<Compile Include="TelemetryPackagePublishingEventArgs.cs" />
@@ -212,7 +234,7 @@
<Compile Include="TelemetryPublisher.cs" />
<Compile Include="TelemetryPublisherConfiguration.cs" />
<Compile Include="TelemetrySource.cs" />
- <Compile Include="TelemetryLiteDbPendingStorageManager.cs" />
+ <Compile Include="TelemetryLiteDBStorageManager.cs" />
<Compile Include="ITelemetry.cs" />
<Compile Include="JsonFlattener.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
@@ -248,13 +270,15 @@
<Project>{e4927038-348d-4295-aaf4-861c58cb3943}</Project>
<Name>Tango.PMR</Name>
</ProjectReference>
+ <ProjectReference Include="..\Tango.Settings\Tango.Settings.csproj">
+ <Project>{d8f1ad85-526a-4f50-b6dc-d437af63d8d8}</Project>
+ <Name>Tango.Settings</Name>
+ </ProjectReference>
<ProjectReference Include="..\Tango.Transport\Tango.Transport.csproj">
<Project>{74E700B0-1156-4126-BE40-EE450D3C3026}</Project>
<Name>Tango.Transport</Name>
</ProjectReference>
</ItemGroup>
- <ItemGroup>
- <Folder Include="EventArgs\" />
- </ItemGroup>
+ <ItemGroup />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project> \ No newline at end of file
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs
new file mode 100644
index 000000000..163b72f7a
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs
@@ -0,0 +1,20 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public class TelemetryAvailableEventArgs : EventArgs
+ {
+ public TelemetrySource Source { get; set; }
+ public ITelemetry TelemetryObject { get; set; }
+
+ public TelemetryAvailableEventArgs(ITelemetry telemetryObject, TelemetrySource source)
+ {
+ TelemetryObject = telemetryObject;
+ Source = source;
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
index ca6c2925e..133409ff9 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
@@ -1,5 +1,4 @@
-using LiteDB;
-using Newtonsoft.Json;
+using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -10,15 +9,7 @@ namespace Tango.Telemetry
{
public class TelemetryBase : ITelemetry
{
- [BsonId(true)]
- public int Id { get; set; }
public DateTime Time { get; set; }
- public List<TelemetryPendingDestination> PendingDestinations { get; set; }
-
- public TelemetryBase()
- {
- PendingDestinations = new List<TelemetryPendingDestination>();
- }
public byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true)
{
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs
new file mode 100644
index 000000000..8ffb96af5
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs
@@ -0,0 +1,20 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Core;
+using Tango.Settings;
+
+namespace Tango.Telemetry
+{
+ public abstract class TelemetryConfigurableModule<T> : ExtendedObject where T : SettingsBase
+ {
+ public T Config { get; set; }
+
+ public TelemetryConfigurableModule()
+ {
+ Config = SettingsManager.Default.GetOrCreate<T>();
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs
new file mode 100644
index 000000000..2f57a1926
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs
@@ -0,0 +1,17 @@
+using LiteDB;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public class TelemetryHistoryModuleCheckPoint
+ {
+ [BsonId]
+ public String ModuleName { get; set; }
+ public DateTime Time { get; set; }
+ public int TotalCount { get; set; }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryInMemoryQueueManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryInMemoryQueueManager.cs
new file mode 100644
index 000000000..2197bac99
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryInMemoryQueueManager.cs
@@ -0,0 +1,36 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Core;
+
+namespace Tango.Telemetry
+{
+ public class TelemetryInMemoryQueueManager : ITelemetryQueueManager
+ {
+ private ProducerConsumerQueue<TelemetryPublishPackage> _queue;
+
+ public int Count { get { return _queue.Count; } }
+
+ public TelemetryInMemoryQueueManager()
+ {
+ _queue = new ProducerConsumerQueue<TelemetryPublishPackage>();
+ }
+
+ public TelemetryPublishPackage Dequeue()
+ {
+ return _queue.BlockDequeue();
+ }
+
+ public void Enqueue(TelemetryPublishPackage package)
+ {
+ _queue.BlockEnqueue(package);
+ }
+
+ public void Clear()
+ {
+ _queue = new ProducerConsumerQueue<TelemetryPublishPackage>();
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
new file mode 100644
index 000000000..781b3f3e6
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
@@ -0,0 +1,113 @@
+using LiteDB;
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public class TelemetryLiteDBStorageManager : ITelemetryStorageManager
+ {
+ private bool _disposed;
+ private LiteDatabase _database;
+ private static Object _lock = new object();
+
+ public String DatabasePath { get; private set; }
+
+ public TelemetryLiteDBStorageManager()
+ {
+ DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry");
+ Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath));
+
+ _database = new LiteDatabase($"Filename={DatabasePath}");
+ _database.Pragma("TIMEOUT", 10); //Read Timeout
+ _database.Pragma("UTC_DATE", true); //Keep time as UTC when getting data
+ _database.Commit();
+ }
+
+ public virtual void Dispose()
+ {
+ if (_database != null && !_disposed)
+ {
+ try
+ {
+ _disposed = true;
+ _database.Dispose();
+ _database = null;
+ }
+ catch { }
+ }
+ }
+
+ ~TelemetryLiteDBStorageManager()
+ {
+ Dispose();
+ }
+
+ private ILiteCollection<PendingTelemetry> GetPendingTelemetriesCollection()
+ {
+ return _database.GetCollection<PendingTelemetry>("PendingTelemetries");
+ }
+
+ private ILiteCollection<TelemetryHistoryModuleCheckPoint> GetModulesCheckpointCollection()
+ {
+ return _database.GetCollection<TelemetryHistoryModuleCheckPoint>("ModuleCheckPoints");
+ }
+
+ public void UpsertPendingTelemetry(PendingTelemetry pendingTelemetry)
+ {
+ lock (_lock)
+ {
+ var collection = GetPendingTelemetriesCollection();
+ collection.Upsert(pendingTelemetry);
+ }
+ }
+
+ public void DeletePendingTelemetry(PendingTelemetry pendingTelemetry)
+ {
+ lock (_lock)
+ {
+ var collection = GetPendingTelemetriesCollection();
+ collection.Delete(pendingTelemetry.Id);
+ }
+ }
+
+ public List<PendingTelemetry> GetPendingTelemetries(int maxCount)
+ {
+ lock (_lock)
+ {
+ var collection = GetPendingTelemetriesCollection();
+ return collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList();
+ }
+ }
+
+ public TelemetryHistoryModuleCheckPoint GetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule)
+ {
+ lock (_lock)
+ {
+ var collection = GetModulesCheckpointCollection();
+ return collection.FindOne(x => x.ModuleName == historyModule.Name);
+ }
+ }
+
+ public void SetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule, DateTime time, int totalCount)
+ {
+ lock (_lock)
+ {
+ var collection = GetModulesCheckpointCollection();
+ collection.Upsert(new TelemetryHistoryModuleCheckPoint() { ModuleName = historyModule.Name, Time = time, TotalCount = totalCount });
+ }
+ }
+
+ public int GetPendingTelemetriesCount()
+ {
+ lock (_lock)
+ {
+ var collection = GetPendingTelemetriesCollection();
+ return collection.Count();
+ }
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs
deleted file mode 100644
index 47b8db826..000000000
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs
+++ /dev/null
@@ -1,80 +0,0 @@
-using LiteDB;
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Tango.Telemetry
-{
- public class TelemetryLiteDbPendingStorageManager : ITelemetryPendingStorageManager
- {
- private bool _disposed;
- private LiteDatabase _database;
-
- public String DatabasePath { get; private set; }
-
- public TelemetryLiteDbPendingStorageManager()
- {
- DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry");
- Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath));
-
- _database = new LiteDatabase($"Filename={DatabasePath}");
- _database.Pragma("TIMEOUT", 10); //Read Timeout
- _database.Pragma("UTC_DATE", true); //Keep time as UTC when getting data
- _database.Commit();
- }
-
- public virtual void Dispose()
- {
- if (_database != null)
- {
- try
- {
- _disposed = true;
- _database.Dispose();
- _database = null;
- }
- catch { }
- }
- }
-
- ~TelemetryLiteDbPendingStorageManager()
- {
- Dispose();
- }
-
- private ILiteCollection<T> GetCollection<T>() where T : ITelemetry
- {
- return _database.GetCollection<T>();
- }
-
- public void InsertOrUpdateTelemetryObject<T>(T telemetry) where T : ITelemetry
- {
- var collection = GetCollection<T>();
- collection.Upsert(telemetry);
- }
-
- public void DeleteTelemetryObject<T>(T telemetry) where T : ITelemetry
- {
- var collection = GetCollection<T>();
- collection.Delete(telemetry.Id);
- }
-
- public List<ITelemetry> GetTelemetryAll()
- {
- var names = _database.GetCollectionNames();
-
- List<ITelemetry> telemetryAll = new List<ITelemetry>();
-
- foreach (var name in names)
- {
- var collection = _database.GetCollection<ITelemetry>(name);
- telemetryAll.AddRange(collection.FindAll().ToList());
- }
-
- return telemetryAll.OrderBy(x => x.Time).ToList();
- }
- }
-}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs
new file mode 100644
index 000000000..6aae43f67
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public class TelemetryPendingStorageModule : ITelemetryModule
+ {
+ public string Name { get; private set; } = "Pending Storage";
+
+ public void Dispose()
+ {
+
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs
index a3ae6359c..5bd207e9f 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs
@@ -10,14 +10,15 @@ namespace Tango.Telemetry
{
private String _payload;
- public ITelemetry TelemetryObject { get; set; }
+ public ITelemetryModule Module { get; set; }
+ public PendingTelemetry PendingTelemetry { get; set; }
public TelemetrySource Source { get; set; }
public String ToPayload()
{
if (_payload == null)
{
- _payload = TelemetryObject.ToJson(Newtonsoft.Json.Formatting.None, flatten: true);
+ _payload = PendingTelemetry.TelemetryObject.ToJson(Newtonsoft.Json.Formatting.None, flatten: true);
}
return _payload;
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
index 8891a6cc4..1314d3346 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Collections.ObjectModel;
using System.Linq;
using System.Reflection;
using System.Text;
@@ -16,64 +17,100 @@ using Tango.Telemetry.TelemetryObjects;
namespace Tango.Telemetry
{
- public class TelemetryPublisher : ExtendedObject, IDisposable
+ public class TelemetryPublisher : ExtendedObject, ITelemetryPublisher
{
- public const int MIN_SAMPLING_INTERVAL_SECONDS = 1;
-
public event EventHandler<TelemetryPackagePublishingEventArgs> PublishingPackage;
public event EventHandler<TelemetryPackagePublishedEventArgs> PackagePublished;
public event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed;
- private System.Timers.Timer _diagnosticsSamplingTimer;
private System.Timers.Timer _pendingStorageCheckTimer;
+ private System.Timers.Timer _historicalDataTimer;
+ private bool _isDisposed;
private Thread _publishThread;
- private ProducerConsumerQueue<TelemetryPublishPackage> _publishQueue;
-
- private List<StartDiagnosticsResponse> _diagnosticsQueue;
- private bool _writing;
- private bool _emptyWritten;
-
- private IMachineOperator _machineOperator;
+ private TelemetryPendingStorageModule _pendingStorageModule;
#region Properties
- public TelemetryPublisherConfiguration Config { get; private set; }
- public ITelemetryPendingStorageManager PendingStorageManager { get; private set; }
public bool IsStarted { get; private set; }
+ public TelemetryPublisherConfiguration Config { get; private set; }
+
+ public ITelemetryStorageManager StorageManager { get; private set; }
+
+ private List<ITelemetryModule> InnerModules { get; }
+ public ReadOnlyCollection<ITelemetryModule> Modules { get; }
+
+ private List<ITelemetryDestination> InnerDestinations { get; }
+ public ReadOnlyCollection<ITelemetryDestination> Destinations { get; }
+
+ public ITelemetryQueueManager QueueManager { get; private set; }
+
#endregion
#region Constructor
- public TelemetryPublisher(IMachineOperator machineOperator, ITelemetryPendingStorageManager storageManager, TelemetryPublisherConfiguration config)
+ public TelemetryPublisher(TelemetryPublisherConfiguration config)
{
- _machineOperator = machineOperator;
- PendingStorageManager = storageManager;
-
Config = config ?? new TelemetryPublisherConfiguration();
- _publishQueue = new ProducerConsumerQueue<TelemetryPublishPackage>();
+ _pendingStorageModule = new TelemetryPendingStorageModule();
+
+ InnerModules = new List<ITelemetryModule>();
+ Modules = new ReadOnlyCollection<ITelemetryModule>(InnerModules);
+
+ InnerDestinations = new List<ITelemetryDestination>();
+ Destinations = new ReadOnlyCollection<ITelemetryDestination>(InnerDestinations);
+
_publishThread = new Thread(PublishThreadMethod);
_publishThread.IsBackground = true;
- _diagnosticsQueue = new List<StartDiagnosticsResponse>();
+ StorageManager = new TelemetryLiteDBStorageManager();
+ QueueManager = new TelemetryInMemoryQueueManager();
+ }
- RegisterForEvents();
+ public TelemetryPublisher(ITelemetryStorageManager storageManager, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : this(config)
+ {
+ StorageManager = storageManager;
+ QueueManager = queueManager;
}
#endregion
- #region Register / Unregister Events
+ #region Modules
- private void RegisterForEvents()
+ public void RegisterModule(ITelemetryModule module)
{
- _machineOperator.DiagnosticsDataAvailable += MachineOperator_DiagnosticsDataAvailable;
+ if (InnerModules.Exists(x => x.GetType() == module.GetType()))
+ {
+ throw new InvalidOperationException($"Module {module.GetType().Name} has already been registered.");
+ }
+
+ InnerModules.Add(module);
+
+ if (module is ITelemetryStreamingModule streamingModule)
+ {
+ streamingModule.TelemetryAvailable += Module_TelemetryAvailable;
+
+ if (IsStarted)
+ {
+ streamingModule.Start();
+ }
+ }
+ }
+
+ private void Module_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e)
+ {
+ PushTelemetryPackage(sender as ITelemetryModule, e.TelemetryObject, e.Source);
}
- private void UnregisterEvents()
+ #endregion
+
+ #region Destinations
+
+ public void RegisterDestination(ITelemetryDestination destination)
{
- _machineOperator.DiagnosticsDataAvailable -= MachineOperator_DiagnosticsDataAvailable;
+ InnerDestinations.Add(destination);
}
#endregion
@@ -88,18 +125,6 @@ namespace Tango.Telemetry
IsStarted = true;
- if (Config.DiagnosticsSamplingInterval.TotalSeconds < MIN_SAMPLING_INTERVAL_SECONDS)
- {
- Config.DiagnosticsSamplingInterval = TimeSpan.FromSeconds(MIN_SAMPLING_INTERVAL_SECONDS);
- }
-
- if (_diagnosticsSamplingTimer == null)
- {
- _diagnosticsSamplingTimer = new System.Timers.Timer();
- _diagnosticsSamplingTimer.Interval = Config.DiagnosticsSamplingInterval.TotalMilliseconds;
- _diagnosticsSamplingTimer.Elapsed += DiagnosticsSamplingTimer_Elapsed;
- }
-
if (_pendingStorageCheckTimer == null)
{
_pendingStorageCheckTimer = new System.Timers.Timer();
@@ -107,13 +132,20 @@ namespace Tango.Telemetry
_pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed;
}
- _diagnosticsQueue.Clear();
-
- _writing = false;
- _diagnosticsSamplingTimer.Start();
_pendingStorageCheckTimer.Start();
+ if (_historicalDataTimer == null)
+ {
+ _historicalDataTimer = new System.Timers.Timer();
+ _historicalDataTimer.Interval = Config.HistoryModulesRequestInterval.TotalMilliseconds;
+ _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed;
+ }
+
+ _historicalDataTimer.Start();
+
_publishThread.Start();
+
+ InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Start());
}
}
@@ -122,103 +154,85 @@ namespace Tango.Telemetry
if (IsStarted)
{
IsStarted = false;
- _diagnosticsSamplingTimer.Stop();
+ InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Stop());
_pendingStorageCheckTimer.Stop();
- _diagnosticsQueue.Clear();
- _publishQueue.BlockEnqueue(null);
+ _historicalDataTimer.Stop();
+ QueueManager.Enqueue(null);
}
}
#endregion
- #region Incoming Data Event Handlers
+ #region Timers
- private void MachineOperator_DiagnosticsDataAvailable(object sender, StartDiagnosticsResponse diagnostics)
+ private async void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e)
{
- if (IsStarted && diagnostics.Monitors != null)
+ _pendingStorageCheckTimer.Stop();
+
+ var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle);
+
+ foreach (var pendingTelemetry in batch)
{
- _diagnosticsQueue.Add(diagnostics);
- }
- }
+ await PublishTelemetryPackage(new TelemetryPublishPackage()
+ {
+ Module = _pendingStorageModule,
+ PendingTelemetry = pendingTelemetry,
+ Source = TelemetrySource.PendingStorage
+ });
- #endregion
+ if (!IsStarted || _isDisposed) return;
+ }
- #region Timers
+ _pendingStorageCheckTimer.Start();
+ }
- private void DiagnosticsSamplingTimer_Elapsed(object sender, ElapsedEventArgs e)
+ private async void HistoricalDataTimer_Elapsed(object sender, ElapsedEventArgs e)
{
- if (!IsStarted || _writing) return;
+ _historicalDataTimer.Stop();
- try
+ if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries)
{
- _diagnosticsSamplingTimer.Stop();
-
- _writing = true;
-
- if (_diagnosticsQueue.Count > 0)
+ foreach (var module in InnerModules.OfType<ITelemetryHistoryModule>().ToList())
{
- var queue = _diagnosticsQueue.ToList();
- _diagnosticsQueue.Clear();
- _emptyWritten = false;
-
- var monitorsAvg = InsightsHelper.AverageMonitors(queue.Select(x => x.Monitors).ToList());
- queue.Clear();
+ TelemetryHistoryModuleCheckPoint checkPoint = StorageManager.GetHistoryModuleCheckPoint(module);
+ if (await module.CanRequestHistory(checkPoint.Time))
+ {
+ var historyTelemetries = (await module.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList();
- TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame();
- frame.Monitors = monitorsAvg;
- frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval);
+ foreach (var telemetry in historyTelemetries)
+ {
+ PushTelemetryPackage(module, telemetry, TelemetrySource.ExternalStorage);
+ checkPoint.Time = telemetry.Time;
+ checkPoint.TotalCount++;
+ }
- PushTelemetryPackage(frame, TelemetrySource.Streaming);
- }
- else
- {
- if (!_emptyWritten)
- {
- TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame();
- frame.Monitors = new InsightsMonitors();
- frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval);
- PushTelemetryPackage(frame, TelemetrySource.Streaming);
- _emptyWritten = true;
+ StorageManager.SetHistoryModuleCheckPoint(module, checkPoint.Time, checkPoint.TotalCount);
}
}
}
- catch (Exception ex)
- {
- LogManager.Log(ex, "Error occurred on insights frame insertion.");
- }
- finally
- {
- _writing = false;
- _diagnosticsSamplingTimer.Start();
- }
- }
-
- private void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e)
- {
- _pendingStorageCheckTimer.Stop();
-
- var telemetryAll = PendingStorageManager.GetTelemetryAll();
-
- foreach (var t in telemetryAll)
- {
- PushTelemetryPackage(t, TelemetrySource.PendingStorage);
- }
- _pendingStorageCheckTimer.Start();
+ _historicalDataTimer.Start();
}
#endregion
#region Push
- private void PushTelemetryPackage(ITelemetry telemetryObject, TelemetrySource source)
+ private void PushTelemetryPackage(ITelemetryModule module, ITelemetry telemetry, TelemetrySource source)
{
- _publishQueue.BlockEnqueue(new TelemetryPublishPackage() { TelemetryObject = telemetryObject, Source = source });
+ PendingTelemetry pendingTelemetry = new PendingTelemetry();
+ pendingTelemetry.Created = DateTime.UtcNow;
+ //pendingTelemetry.Expires = module.GetExpiration();
+ pendingTelemetry.Module = module.Name;
+ pendingTelemetry.Source = source;
+ pendingTelemetry.TelemetryObject = telemetry;
+
+ PushTelemetryPackage(new TelemetryPublishPackage() { Module = module, PendingTelemetry = pendingTelemetry, Source = source });
}
private void PushTelemetryPackage(TelemetryPublishPackage package)
{
- _publishQueue.BlockEnqueue(package);
+ QueueManager.Enqueue(package);
}
#endregion
@@ -229,10 +243,10 @@ namespace Tango.Telemetry
{
while (IsStarted)
{
- TelemetryPublishPackage package = _publishQueue.BlockDequeue();
+ TelemetryPublishPackage package = QueueManager.Dequeue();
if (package == null)
{
- _publishQueue = new ProducerConsumerQueue<TelemetryPublishPackage>();
+ QueueManager.Clear();
return;
}
@@ -249,19 +263,21 @@ namespace Tango.Telemetry
protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package)
{
+ if (!IsStarted || _isDisposed) return;
+
List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>();
properties.Add(new KeyValuePair<string, string>("MachineID", Config.MachineID));
properties.Add(new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()));
properties.Add(new KeyValuePair<string, string>("Environment", Config.Environment));
- List<TelemetryPendingDestination> pendingDestinations = package.TelemetryObject.PendingDestinations.ToList();
+ List<TelemetryPendingDestination> pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList();
//Add all destinations if streaming or external (They will be remove later if successfull)
//If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db.
if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage)
{
- foreach (var destination in Config.TelemetryDestinations)
+ foreach (var destination in Destinations)
{
if (!pendingDestinations.Exists(x => x.Name == destination.Name))
{
@@ -270,7 +286,7 @@ namespace Tango.Telemetry
}
}
- foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source)))
+ foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source)))
{
if (pendingDestinations.Exists(x => x.Name == destination.Name))
{
@@ -300,15 +316,15 @@ namespace Tango.Telemetry
}
}
- package.TelemetryObject.PendingDestinations = pendingDestinations;
+ package.PendingTelemetry.PendingDestinations = pendingDestinations;
- if (package.Source == TelemetrySource.PendingStorage && package.TelemetryObject.PendingDestinations.Count == 0)
+ if (package.Source == TelemetrySource.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0)
{
- PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject);
+ StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
else
{
- PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject);
+ StorageManager.UpsertPendingTelemetry(package.PendingTelemetry);
}
}
@@ -354,16 +370,28 @@ namespace Tango.Telemetry
public void Dispose()
{
- UnregisterEvents();
-
- if (IsStarted)
+ if (!_isDisposed)
{
- Stop();
- }
+ _isDisposed = true;
+ foreach (var module in InnerModules)
+ {
+ if (module is ITelemetryStreamingModule streamingModule)
+ {
+ streamingModule.Stop();
+ streamingModule.TelemetryAvailable -= Module_TelemetryAvailable;
+ }
+ module.Dispose();
+ }
- foreach (var destination in Config.TelemetryDestinations)
- {
- destination.Dispose();
+ if (IsStarted)
+ {
+ Stop();
+ }
+
+ foreach (var destination in Destinations)
+ {
+ destination.Dispose();
+ }
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
index 2d0f48126..0183de6c2 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
@@ -15,7 +15,7 @@ namespace Tango.Telemetry
/// </summary>
public TimeSpan MaxExponentialBackoff { get; set; } = TimeSpan.FromHours(1);
- public TelemetryPublisherAdvanced(IMachineOperator machineOperator, ITelemetryPendingStorageManager storageManager, TelemetryPublisherConfiguration config) : base(machineOperator, storageManager, config)
+ public TelemetryPublisherAdvanced(ITelemetryStorageManager storageManager, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : base(storageManager, queueManager, config)
{
}
@@ -30,12 +30,12 @@ namespace Tango.Telemetry
};
var now = DateTime.UtcNow;
- var pendingDestinations = package.TelemetryObject.PendingDestinations.ToList();
+ var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList();
// For Streaming/External: initialize pending destinations list (used if publishing fails)
if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage)
{
- foreach (var dest in Config.TelemetryDestinations)
+ foreach (var dest in Destinations)
{
if (!pendingDestinations.Any(x => x.Name == dest.Name))
{
@@ -50,7 +50,7 @@ namespace Tango.Telemetry
}
}
- foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source)))
+ foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source)))
{
var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name);
@@ -101,15 +101,15 @@ namespace Tango.Telemetry
}
}
- package.TelemetryObject.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations);
+ package.PendingTelemetry.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations);
if (package.Source == TelemetrySource.PendingStorage && !pendingDestinations.Any())
{
- PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject);
+ StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
else
{
- PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject);
+ StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
index da5856848..424249f7d 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
@@ -12,16 +12,17 @@ namespace Tango.Telemetry
public String MachineID { get; set; }
public MachineTypes MachineType { get; set; }
public String Environment { get; set; }
- public TimeSpan DiagnosticsSamplingInterval { get; set; }
public TimeSpan PendingStorageCheckInterval { get; set; }
- public List<ITelemetryDestination> TelemetryDestinations { get; private set; }
+ public int MaxPendingStorageTelemetriesPerCycle { get; set; }
+ public TimeSpan HistoryModulesRequestInterval { get; set; }
+ public int MaxPendingTelemetries { get; set; }
public TelemetryPublisherConfiguration()
{
- TelemetryDestinations = new List<ITelemetryDestination>();
-
- DiagnosticsSamplingInterval = TimeSpan.FromSeconds(10);
PendingStorageCheckInterval = TimeSpan.FromMinutes(1);
+ MaxPendingStorageTelemetriesPerCycle = 100;
+ HistoryModulesRequestInterval = TimeSpan.FromMinutes(1);
+ MaxPendingTelemetries = 200;
}
public void Validate()
@@ -35,23 +36,8 @@ namespace Tango.Telemetry
if (!Enum.IsDefined(typeof(MachineTypes), MachineType))
throw new ArgumentOutOfRangeException(nameof(MachineType), "MachineType is not a valid enum value.");
- if (DiagnosticsSamplingInterval.TotalSeconds < 1)
- throw new ArgumentOutOfRangeException(nameof(DiagnosticsSamplingInterval), "DiagnosticsSamplingInterval must be at least 1 second.");
-
if (PendingStorageCheckInterval.TotalSeconds < 5)
throw new ArgumentOutOfRangeException(nameof(PendingStorageCheckInterval), "PendingStorageCheckInterval must be at least 5 seconds.");
-
- if (TelemetryDestinations == null || TelemetryDestinations.Count == 0)
- throw new InvalidOperationException("At least one telemetry destination must be provided.");
-
- foreach (var destination in TelemetryDestinations)
- {
- if (destination == null)
- throw new InvalidOperationException("Telemetry destination list contains a null entry.");
-
- if (destination.SupportedSources == null || destination.SupportedSources.Count == 0)
- throw new InvalidOperationException($"Telemetry destination '{destination.Name}' has no supported sources defined.");
- }
}
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/packages.config b/Software/Visual_Studio/Tango.Telemetry/packages.config
index b94abaac9..1a4b2c3c8 100644
--- a/Software/Visual_Studio/Tango.Telemetry/packages.config
+++ b/Software/Visual_Studio/Tango.Telemetry/packages.config
@@ -6,6 +6,7 @@
<package id="DotNetty.Common" version="0.6.0" targetFramework="net461" />
<package id="DotNetty.Handlers" version="0.6.0" targetFramework="net461" />
<package id="DotNetty.Transport" version="0.6.0" targetFramework="net461" />
+ <package id="EntityFramework" version="6.2.0" targetFramework="net461" />
<package id="LiteDB" version="5.0.4" targetFramework="net461" />
<package id="Microsoft.AspNet.WebApi.Client" version="5.2.3" targetFramework="net461" />
<package id="Microsoft.Azure.Amqp" version="2.5.10" targetFramework="net461" />