diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-29 19:53:35 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-29 19:53:35 +0300 |
| commit | a802fe75f9538371004f1833e69a69b798892d0c (patch) | |
| tree | 9d4612cf4dd6c543650b9ee10599db4b30782391 /Software | |
| parent | 72c6399ec345ec26bd7f79651667ffa585474919 (diff) | |
| download | Tango-a802fe75f9538371004f1833e69a69b798892d0c.tar.gz Tango-a802fe75f9538371004f1833e69a69b798892d0c.zip | |
Telemetry
Diffstat (limited to 'Software')
30 files changed, 784 insertions, 267 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs index 08412446c..b22849f3c 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs @@ -78,7 +78,7 @@ namespace Tango.Telemetry.Destinations if (await EnsureConnection()) { var message = new MqttApplicationMessageBuilder() - .WithTopic($"{Topic}/{package.TelemetryObject.TelemetryName()}") + .WithTopic($"{Topic}/{package.PendingTelemetry.TelemetryObject.ToTelemetryName()}") .WithPayload(package.ToPayload()) .WithExactlyOnceQoS() .WithRetainFlag(false) diff --git a/Software/Visual_Studio/Tango.Telemetry/ExtensionMethods/ITelemetryExtensions.cs b/Software/Visual_Studio/Tango.Telemetry/ExtensionMethods/ITelemetryExtensions.cs index 6130f332e..ba016ed72 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ExtensionMethods/ITelemetryExtensions.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ExtensionMethods/ITelemetryExtensions.cs @@ -7,7 +7,7 @@ using Tango.Telemetry; public static class ITelemetryObjectExtensions { - public static String TelemetryName(this ITelemetry obj) + public static String ToTelemetryName(this ITelemetry obj) { var att = obj.GetType().GetCustomAttributes(typeof(TelemetryNameAttribute), false).Cast<TelemetryNameAttribute>().FirstOrDefault(); return att?.Name ?? obj.GetType().Name; diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs index 7059a3c75..67bb8f5d2 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs @@ -10,10 +10,7 @@ namespace Tango.Telemetry { public interface ITelemetry { - [BsonId(true)] - int Id { get; set; } DateTime Time { get; set; } - List<TelemetryPendingDestination> PendingDestinations { get; set; } String ToJson(Formatting format = Formatting.None, bool flatten = true); byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true); } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs new file mode 100644 index 000000000..314b99046 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetryHistoryModule : ITelemetryModule + { + Task<bool> CanRequestHistory(DateTime from); + Task<IEnumerable<ITelemetry>> RequestHistory(DateTime from); + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs new file mode 100644 index 000000000..a27a03fc8 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetryModule : IDisposable + { + String Name { get; } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs deleted file mode 100644 index 139600af0..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Telemetry -{ - public interface ITelemetryPendingStorageManager - { - void InsertOrUpdateTelemetryObject<T>(T telemetry) where T : ITelemetry; - void DeleteTelemetryObject<T>(T telemetry) where T : ITelemetry; - List<ITelemetry> GetTelemetryAll(); - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs new file mode 100644 index 000000000..0c63dc906 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetryPublisher : IDisposable + { + event EventHandler<TelemetryPackagePublishingEventArgs> PublishingPackage; + event EventHandler<TelemetryPackagePublishedEventArgs> PackagePublished; + event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed; + ITelemetryStorageManager StorageManager { get; } + ITelemetryQueueManager QueueManager { get; } + ReadOnlyCollection<ITelemetryModule> Modules { get; } + void RegisterModule(ITelemetryModule module); + ReadOnlyCollection<ITelemetryDestination> Destinations { get; } + void RegisterDestination(ITelemetryDestination destination); + bool IsStarted { get; } + void Start(); + void Stop(); + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryQueueManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryQueueManager.cs new file mode 100644 index 000000000..bb06bdd54 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryQueueManager.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetryQueueManager + { + void Enqueue(TelemetryPublishPackage package); + TelemetryPublishPackage Dequeue(); + int Count { get; } + void Clear(); + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs new file mode 100644 index 000000000..2c54a2a51 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetryStorageManager + { + void UpsertPendingTelemetry(PendingTelemetry pendingTelemetry); + void DeletePendingTelemetry(PendingTelemetry pendingTelemetry); + List<PendingTelemetry> GetPendingTelemetries(int maxCount); + int GetPendingTelemetriesCount(); + TelemetryHistoryModuleCheckPoint GetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule); + void SetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule, DateTime time, int totalCount); + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs new file mode 100644 index 000000000..12b576b1b --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetryStreamingModule : ITelemetryModule + { + event EventHandler<TelemetryAvailableEventArgs> TelemetryAvailable; + void Start(); + void Stop(); + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs b/Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs index 230a4612c..6658dcd9f 100644 --- a/Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs +++ b/Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs @@ -7,7 +7,7 @@ using System.Reflection; namespace Tango.Telemetry { - public static class JsonFlattener + internal static class JsonFlattener { public static string FlattenObjectToFlatJson(object obj, Formatting format) { diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs new file mode 100644 index 000000000..a35e08328 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs @@ -0,0 +1,134 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Timers; +using Tango.Core; +using Tango.Insights; +using Tango.Integration.Operation; +using Tango.Logging; +using Tango.PMR.Diagnostics; +using Tango.PMR.Insights; +using Tango.Telemetry.TelemetryObjects; + +namespace Tango.Telemetry.Modules +{ + public class TelemetryDiagnosticsModule : TelemetryConfigurableModule<TelemetryDiagnosticsModuleConfig>, ITelemetryModule + { + public const int MIN_SAMPLING_INTERVAL_SECONDS = 1; + + private IMachineOperator _machineOperator; + private bool _isStarted; + + private Timer _diagnosticsSamplingTimer; + private List<StartDiagnosticsResponse> _diagnosticsQueue; + private bool _writing; + private bool _emptyWritten; + + public event EventHandler<TelemetryAvailableEventArgs> TelemetryAvailable; + + public string Name { get; private set; } = "Diagnostics"; + + private TelemetryDiagnosticsModule() : base() + { + _diagnosticsQueue = new List<StartDiagnosticsResponse>(); + } + + public TelemetryDiagnosticsModule(IMachineOperator machineOperator) : base() + { + _machineOperator = machineOperator; + _machineOperator.DiagnosticsDataAvailable += DiagnosticsDataAvailable; + } + + private void DiagnosticsDataAvailable(object sender, StartDiagnosticsResponse diagnostics) + { + if (_isStarted && diagnostics.Monitors != null) + { + _diagnosticsQueue.Add(diagnostics); + } + } + + public void Start() + { + _isStarted = true; + + if (Config.DiagnosticsSamplingInterval.TotalSeconds < MIN_SAMPLING_INTERVAL_SECONDS) + { + Config.DiagnosticsSamplingInterval = TimeSpan.FromSeconds(MIN_SAMPLING_INTERVAL_SECONDS); + } + + if (_diagnosticsSamplingTimer == null) + { + _diagnosticsSamplingTimer = new Timer(); + _diagnosticsSamplingTimer.Interval = Config.DiagnosticsSamplingInterval.TotalMilliseconds; + _diagnosticsSamplingTimer.Elapsed += DiagnosticsSamplingTimer_Elapsed; + } + + + _diagnosticsQueue.Clear(); + + _writing = false; + _diagnosticsSamplingTimer.Start(); + } + + public void Stop() + { + _isStarted = false; + } + + private void DiagnosticsSamplingTimer_Elapsed(object sender, ElapsedEventArgs e) + { + if (!_isStarted || _writing) return; + + try + { + _diagnosticsSamplingTimer.Stop(); + + _writing = true; + + if (_diagnosticsQueue.Count > 0) + { + var queue = _diagnosticsQueue.ToList(); + _diagnosticsQueue.Clear(); + _emptyWritten = false; + + var monitorsAvg = InsightsHelper.AverageMonitors(queue.Select(x => x.Monitors).ToList()); + queue.Clear(); + + TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); + frame.Monitors = monitorsAvg; + frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); + + + TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySource.Streaming)); + } + else + { + if (!_emptyWritten) + { + TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); + frame.Monitors = new InsightsMonitors(); + frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); + TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySource.Streaming)); + _emptyWritten = true; + } + } + } + catch (Exception ex) + { + LogManager.Log(ex, "Error occurred on insights frame insertion."); + } + finally + { + _writing = false; + _diagnosticsSamplingTimer.Start(); + } + } + + public void Dispose() + { + _machineOperator.DiagnosticsDataAvailable -= DiagnosticsDataAvailable; + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs new file mode 100644 index 000000000..8d13b3cca --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Settings; + +namespace Tango.Telemetry.Modules +{ + public class TelemetryDiagnosticsModuleConfig : SettingsBase + { + public TimeSpan DiagnosticsSamplingInterval { get; set; } + + public TelemetryDiagnosticsModuleConfig() + { + DiagnosticsSamplingInterval = TimeSpan.FromSeconds(10); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs new file mode 100644 index 000000000..0ba2935c1 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs @@ -0,0 +1,61 @@ +using System; +using System.Collections.Generic; +using System.Data.Entity; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.BL; +using Tango.Telemetry.TelemetryObjects; + +namespace Tango.Telemetry.Modules +{ + public class TelemetryJobRunsHistoryModule : TelemetryConfigurableModule<TelemetryJobRunsHistoryModuleConfig>, ITelemetryHistoryModule + { + private bool _isBusy; + + public string Name { get; private set; } = "JobRuns History"; + + public Task<bool> CanRequestHistory(DateTime from) + { + return Task.FromResult(!_isBusy); + } + + public async Task<IEnumerable<ITelemetry>> RequestHistory(DateTime from) + { + try + { + _isBusy = true; + + using (ObservablesContext db = ObservablesContext.CreateDefault()) + { + var runs = await db.JobRuns + .Where(x => x.LastUpdated > from) + .OrderBy(x => x.LastUpdated) + .Take(Config.MaxJobRunsPerRequest) + .ToListAsync(); + + List<TelemetryJobRun> tRuns = new List<TelemetryJobRun>(); + + foreach (var run in runs) + { + TelemetryJobRun tRun = new TelemetryJobRun(); + tRun.Time = run.LastUpdated; + //Fill the object.. + tRuns.Add(tRun); + } + + return tRuns; + } + } + finally + { + _isBusy = false; + } + } + + public void Dispose() + { + + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs new file mode 100644 index 000000000..412dea5f9 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Settings; + +namespace Tango.Telemetry.Modules +{ + public class TelemetryJobRunsHistoryModuleConfig : SettingsBase + { + public int MaxJobRunsPerRequest { get; set; } + + public TelemetryJobRunsHistoryModuleConfig() + { + MaxJobRunsPerRequest = 100; + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs new file mode 100644 index 000000000..2be3b1c45 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs @@ -0,0 +1,26 @@ +using LiteDB; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public class PendingTelemetry + { + [BsonId(true)] + public int Id { get; set; } + public DateTime Created { get; set; } + public DateTime Expires { get; set; } + public ITelemetry TelemetryObject { get; set; } + public String Module { get; set; } + public TelemetrySource Source { get; set; } + public List<TelemetryPendingDestination> PendingDestinations { get; set; } + + public PendingTelemetry() + { + PendingDestinations = new List<TelemetryPendingDestination>(); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj index 98df17cb8..27ac76783 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj +++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj @@ -49,6 +49,12 @@ <Reference Include="DotNetty.Transport, Version=0.6.0.0, Culture=neutral, PublicKeyToken=bc13ca065fa06c29, processorArchitecture=MSIL"> <HintPath>..\packages\DotNetty.Transport.0.6.0\lib\net45\DotNetty.Transport.dll</HintPath> </Reference> + <Reference Include="EntityFramework, Version=6.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"> + <HintPath>..\packages\EntityFramework.6.2.0\lib\net45\EntityFramework.dll</HintPath> + </Reference> + <Reference Include="EntityFramework.SqlServer, Version=6.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"> + <HintPath>..\packages\EntityFramework.6.2.0\lib\net45\EntityFramework.SqlServer.dll</HintPath> + </Reference> <Reference Include="LiteDB, Version=5.0.4.0, Culture=neutral, PublicKeyToken=4ee40123013c9f27, processorArchitecture=MSIL"> <HintPath>..\packages\LiteDB.5.0.4\lib\net45\LiteDB.dll</HintPath> </Reference> @@ -99,6 +105,7 @@ <HintPath>..\packages\System.Collections.Immutable.1.5.0\lib\netstandard2.0\System.Collections.Immutable.dll</HintPath> </Reference> <Reference Include="System.ComponentModel.Composition" /> + <Reference Include="System.ComponentModel.DataAnnotations" /> <Reference Include="System.Configuration" /> <Reference Include="System.Console, Version=4.0.1.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL"> <HintPath>..\packages\System.Console.4.3.0\lib\net46\System.Console.dll</HintPath> @@ -200,11 +207,26 @@ <Compile Include="ExtensionMethods\ITelemetryExtensions.cs" /> <Compile Include="ITelemetryDestination.cs" /> <Compile Include="Destinations\MqttTelemetryDestination.cs" /> - <Compile Include="ITelemetryPendingStorageManager.cs" /> + <Compile Include="ITelemetryHistoryModule.cs" /> + <Compile Include="ITelemetryModule.cs" /> + <Compile Include="ITelemetryQueueManager.cs" /> + <Compile Include="ITelemetryStorageManager.cs" /> + <Compile Include="ITelemetryPublisher.cs" /> + <Compile Include="ITelemetryStreamingModule.cs" /> + <Compile Include="Modules\TelemetryDiagnosticsModule.cs" /> + <Compile Include="Modules\TelemetryDiagnosticsModuleConfig.cs" /> + <Compile Include="Modules\TelemetryJobRunsHistoryModule.cs" /> + <Compile Include="Modules\TelemetryJobRunsHistoryModuleConfig.cs" /> + <Compile Include="PendingTelemetry.cs" /> + <Compile Include="TelemetryConfigurableModule.cs" /> + <Compile Include="TelemetryHistoryModuleCheckPoint.cs" /> <Compile Include="TelemetryNameAttribute.cs" /> + <Compile Include="TelemetryAvailableEventArgs.cs" /> <Compile Include="TelemetryPackagePublishedEventArgs.cs" /> <Compile Include="TelemetryPackagePublishFailedEventArgs.cs" /> <Compile Include="TelemetryPendingDestination.cs" /> + <Compile Include="TelemetryPendingStorageModule.cs" /> + <Compile Include="TelemetryInMemoryQueueManager.cs" /> <Compile Include="TelemetryPublisherAdvanced.cs" /> <Compile Include="TelemetryPublisherEventArgs.cs" /> <Compile Include="TelemetryPackagePublishingEventArgs.cs" /> @@ -212,7 +234,7 @@ <Compile Include="TelemetryPublisher.cs" /> <Compile Include="TelemetryPublisherConfiguration.cs" /> <Compile Include="TelemetrySource.cs" /> - <Compile Include="TelemetryLiteDbPendingStorageManager.cs" /> + <Compile Include="TelemetryLiteDBStorageManager.cs" /> <Compile Include="ITelemetry.cs" /> <Compile Include="JsonFlattener.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> @@ -248,13 +270,15 @@ <Project>{e4927038-348d-4295-aaf4-861c58cb3943}</Project> <Name>Tango.PMR</Name> </ProjectReference> + <ProjectReference Include="..\Tango.Settings\Tango.Settings.csproj"> + <Project>{d8f1ad85-526a-4f50-b6dc-d437af63d8d8}</Project> + <Name>Tango.Settings</Name> + </ProjectReference> <ProjectReference Include="..\Tango.Transport\Tango.Transport.csproj"> <Project>{74E700B0-1156-4126-BE40-EE450D3C3026}</Project> <Name>Tango.Transport</Name> </ProjectReference> </ItemGroup> - <ItemGroup> - <Folder Include="EventArgs\" /> - </ItemGroup> + <ItemGroup /> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> </Project>
\ No newline at end of file diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs new file mode 100644 index 000000000..163b72f7a --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public class TelemetryAvailableEventArgs : EventArgs + { + public TelemetrySource Source { get; set; } + public ITelemetry TelemetryObject { get; set; } + + public TelemetryAvailableEventArgs(ITelemetry telemetryObject, TelemetrySource source) + { + TelemetryObject = telemetryObject; + Source = source; + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs index ca6c2925e..133409ff9 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs @@ -1,5 +1,4 @@ -using LiteDB; -using Newtonsoft.Json; +using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; @@ -10,15 +9,7 @@ namespace Tango.Telemetry { public class TelemetryBase : ITelemetry { - [BsonId(true)] - public int Id { get; set; } public DateTime Time { get; set; } - public List<TelemetryPendingDestination> PendingDestinations { get; set; } - - public TelemetryBase() - { - PendingDestinations = new List<TelemetryPendingDestination>(); - } public byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true) { diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs new file mode 100644 index 000000000..8ffb96af5 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Core; +using Tango.Settings; + +namespace Tango.Telemetry +{ + public abstract class TelemetryConfigurableModule<T> : ExtendedObject where T : SettingsBase + { + public T Config { get; set; } + + public TelemetryConfigurableModule() + { + Config = SettingsManager.Default.GetOrCreate<T>(); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs new file mode 100644 index 000000000..2f57a1926 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs @@ -0,0 +1,17 @@ +using LiteDB; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public class TelemetryHistoryModuleCheckPoint + { + [BsonId] + public String ModuleName { get; set; } + public DateTime Time { get; set; } + public int TotalCount { get; set; } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryInMemoryQueueManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryInMemoryQueueManager.cs new file mode 100644 index 000000000..2197bac99 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryInMemoryQueueManager.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Core; + +namespace Tango.Telemetry +{ + public class TelemetryInMemoryQueueManager : ITelemetryQueueManager + { + private ProducerConsumerQueue<TelemetryPublishPackage> _queue; + + public int Count { get { return _queue.Count; } } + + public TelemetryInMemoryQueueManager() + { + _queue = new ProducerConsumerQueue<TelemetryPublishPackage>(); + } + + public TelemetryPublishPackage Dequeue() + { + return _queue.BlockDequeue(); + } + + public void Enqueue(TelemetryPublishPackage package) + { + _queue.BlockEnqueue(package); + } + + public void Clear() + { + _queue = new ProducerConsumerQueue<TelemetryPublishPackage>(); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs new file mode 100644 index 000000000..781b3f3e6 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs @@ -0,0 +1,113 @@ +using LiteDB; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public class TelemetryLiteDBStorageManager : ITelemetryStorageManager + { + private bool _disposed; + private LiteDatabase _database; + private static Object _lock = new object(); + + public String DatabasePath { get; private set; } + + public TelemetryLiteDBStorageManager() + { + DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry"); + Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath)); + + _database = new LiteDatabase($"Filename={DatabasePath}"); + _database.Pragma("TIMEOUT", 10); //Read Timeout + _database.Pragma("UTC_DATE", true); //Keep time as UTC when getting data + _database.Commit(); + } + + public virtual void Dispose() + { + if (_database != null && !_disposed) + { + try + { + _disposed = true; + _database.Dispose(); + _database = null; + } + catch { } + } + } + + ~TelemetryLiteDBStorageManager() + { + Dispose(); + } + + private ILiteCollection<PendingTelemetry> GetPendingTelemetriesCollection() + { + return _database.GetCollection<PendingTelemetry>("PendingTelemetries"); + } + + private ILiteCollection<TelemetryHistoryModuleCheckPoint> GetModulesCheckpointCollection() + { + return _database.GetCollection<TelemetryHistoryModuleCheckPoint>("ModuleCheckPoints"); + } + + public void UpsertPendingTelemetry(PendingTelemetry pendingTelemetry) + { + lock (_lock) + { + var collection = GetPendingTelemetriesCollection(); + collection.Upsert(pendingTelemetry); + } + } + + public void DeletePendingTelemetry(PendingTelemetry pendingTelemetry) + { + lock (_lock) + { + var collection = GetPendingTelemetriesCollection(); + collection.Delete(pendingTelemetry.Id); + } + } + + public List<PendingTelemetry> GetPendingTelemetries(int maxCount) + { + lock (_lock) + { + var collection = GetPendingTelemetriesCollection(); + return collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList(); + } + } + + public TelemetryHistoryModuleCheckPoint GetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule) + { + lock (_lock) + { + var collection = GetModulesCheckpointCollection(); + return collection.FindOne(x => x.ModuleName == historyModule.Name); + } + } + + public void SetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule, DateTime time, int totalCount) + { + lock (_lock) + { + var collection = GetModulesCheckpointCollection(); + collection.Upsert(new TelemetryHistoryModuleCheckPoint() { ModuleName = historyModule.Name, Time = time, TotalCount = totalCount }); + } + } + + public int GetPendingTelemetriesCount() + { + lock (_lock) + { + var collection = GetPendingTelemetriesCollection(); + return collection.Count(); + } + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs deleted file mode 100644 index 47b8db826..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs +++ /dev/null @@ -1,80 +0,0 @@ -using LiteDB; -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Telemetry -{ - public class TelemetryLiteDbPendingStorageManager : ITelemetryPendingStorageManager - { - private bool _disposed; - private LiteDatabase _database; - - public String DatabasePath { get; private set; } - - public TelemetryLiteDbPendingStorageManager() - { - DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry"); - Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath)); - - _database = new LiteDatabase($"Filename={DatabasePath}"); - _database.Pragma("TIMEOUT", 10); //Read Timeout - _database.Pragma("UTC_DATE", true); //Keep time as UTC when getting data - _database.Commit(); - } - - public virtual void Dispose() - { - if (_database != null) - { - try - { - _disposed = true; - _database.Dispose(); - _database = null; - } - catch { } - } - } - - ~TelemetryLiteDbPendingStorageManager() - { - Dispose(); - } - - private ILiteCollection<T> GetCollection<T>() where T : ITelemetry - { - return _database.GetCollection<T>(); - } - - public void InsertOrUpdateTelemetryObject<T>(T telemetry) where T : ITelemetry - { - var collection = GetCollection<T>(); - collection.Upsert(telemetry); - } - - public void DeleteTelemetryObject<T>(T telemetry) where T : ITelemetry - { - var collection = GetCollection<T>(); - collection.Delete(telemetry.Id); - } - - public List<ITelemetry> GetTelemetryAll() - { - var names = _database.GetCollectionNames(); - - List<ITelemetry> telemetryAll = new List<ITelemetry>(); - - foreach (var name in names) - { - var collection = _database.GetCollection<ITelemetry>(name); - telemetryAll.AddRange(collection.FindAll().ToList()); - } - - return telemetryAll.OrderBy(x => x.Time).ToList(); - } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs new file mode 100644 index 000000000..6aae43f67 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public class TelemetryPendingStorageModule : ITelemetryModule + { + public string Name { get; private set; } = "Pending Storage"; + + public void Dispose() + { + + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs index a3ae6359c..5bd207e9f 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs @@ -10,14 +10,15 @@ namespace Tango.Telemetry { private String _payload; - public ITelemetry TelemetryObject { get; set; } + public ITelemetryModule Module { get; set; } + public PendingTelemetry PendingTelemetry { get; set; } public TelemetrySource Source { get; set; } public String ToPayload() { if (_payload == null) { - _payload = TelemetryObject.ToJson(Newtonsoft.Json.Formatting.None, flatten: true); + _payload = PendingTelemetry.TelemetryObject.ToJson(Newtonsoft.Json.Formatting.None, flatten: true); } return _payload; diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs index 8891a6cc4..1314d3346 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Collections.ObjectModel; using System.Linq; using System.Reflection; using System.Text; @@ -16,64 +17,100 @@ using Tango.Telemetry.TelemetryObjects; namespace Tango.Telemetry { - public class TelemetryPublisher : ExtendedObject, IDisposable + public class TelemetryPublisher : ExtendedObject, ITelemetryPublisher { - public const int MIN_SAMPLING_INTERVAL_SECONDS = 1; - public event EventHandler<TelemetryPackagePublishingEventArgs> PublishingPackage; public event EventHandler<TelemetryPackagePublishedEventArgs> PackagePublished; public event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed; - private System.Timers.Timer _diagnosticsSamplingTimer; private System.Timers.Timer _pendingStorageCheckTimer; + private System.Timers.Timer _historicalDataTimer; + private bool _isDisposed; private Thread _publishThread; - private ProducerConsumerQueue<TelemetryPublishPackage> _publishQueue; - - private List<StartDiagnosticsResponse> _diagnosticsQueue; - private bool _writing; - private bool _emptyWritten; - - private IMachineOperator _machineOperator; + private TelemetryPendingStorageModule _pendingStorageModule; #region Properties - public TelemetryPublisherConfiguration Config { get; private set; } - public ITelemetryPendingStorageManager PendingStorageManager { get; private set; } public bool IsStarted { get; private set; } + public TelemetryPublisherConfiguration Config { get; private set; } + + public ITelemetryStorageManager StorageManager { get; private set; } + + private List<ITelemetryModule> InnerModules { get; } + public ReadOnlyCollection<ITelemetryModule> Modules { get; } + + private List<ITelemetryDestination> InnerDestinations { get; } + public ReadOnlyCollection<ITelemetryDestination> Destinations { get; } + + public ITelemetryQueueManager QueueManager { get; private set; } + #endregion #region Constructor - public TelemetryPublisher(IMachineOperator machineOperator, ITelemetryPendingStorageManager storageManager, TelemetryPublisherConfiguration config) + public TelemetryPublisher(TelemetryPublisherConfiguration config) { - _machineOperator = machineOperator; - PendingStorageManager = storageManager; - Config = config ?? new TelemetryPublisherConfiguration(); - _publishQueue = new ProducerConsumerQueue<TelemetryPublishPackage>(); + _pendingStorageModule = new TelemetryPendingStorageModule(); + + InnerModules = new List<ITelemetryModule>(); + Modules = new ReadOnlyCollection<ITelemetryModule>(InnerModules); + + InnerDestinations = new List<ITelemetryDestination>(); + Destinations = new ReadOnlyCollection<ITelemetryDestination>(InnerDestinations); + _publishThread = new Thread(PublishThreadMethod); _publishThread.IsBackground = true; - _diagnosticsQueue = new List<StartDiagnosticsResponse>(); + StorageManager = new TelemetryLiteDBStorageManager(); + QueueManager = new TelemetryInMemoryQueueManager(); + } - RegisterForEvents(); + public TelemetryPublisher(ITelemetryStorageManager storageManager, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : this(config) + { + StorageManager = storageManager; + QueueManager = queueManager; } #endregion - #region Register / Unregister Events + #region Modules - private void RegisterForEvents() + public void RegisterModule(ITelemetryModule module) { - _machineOperator.DiagnosticsDataAvailable += MachineOperator_DiagnosticsDataAvailable; + if (InnerModules.Exists(x => x.GetType() == module.GetType())) + { + throw new InvalidOperationException($"Module {module.GetType().Name} has already been registered."); + } + + InnerModules.Add(module); + + if (module is ITelemetryStreamingModule streamingModule) + { + streamingModule.TelemetryAvailable += Module_TelemetryAvailable; + + if (IsStarted) + { + streamingModule.Start(); + } + } + } + + private void Module_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e) + { + PushTelemetryPackage(sender as ITelemetryModule, e.TelemetryObject, e.Source); } - private void UnregisterEvents() + #endregion + + #region Destinations + + public void RegisterDestination(ITelemetryDestination destination) { - _machineOperator.DiagnosticsDataAvailable -= MachineOperator_DiagnosticsDataAvailable; + InnerDestinations.Add(destination); } #endregion @@ -88,18 +125,6 @@ namespace Tango.Telemetry IsStarted = true; - if (Config.DiagnosticsSamplingInterval.TotalSeconds < MIN_SAMPLING_INTERVAL_SECONDS) - { - Config.DiagnosticsSamplingInterval = TimeSpan.FromSeconds(MIN_SAMPLING_INTERVAL_SECONDS); - } - - if (_diagnosticsSamplingTimer == null) - { - _diagnosticsSamplingTimer = new System.Timers.Timer(); - _diagnosticsSamplingTimer.Interval = Config.DiagnosticsSamplingInterval.TotalMilliseconds; - _diagnosticsSamplingTimer.Elapsed += DiagnosticsSamplingTimer_Elapsed; - } - if (_pendingStorageCheckTimer == null) { _pendingStorageCheckTimer = new System.Timers.Timer(); @@ -107,13 +132,20 @@ namespace Tango.Telemetry _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed; } - _diagnosticsQueue.Clear(); - - _writing = false; - _diagnosticsSamplingTimer.Start(); _pendingStorageCheckTimer.Start(); + if (_historicalDataTimer == null) + { + _historicalDataTimer = new System.Timers.Timer(); + _historicalDataTimer.Interval = Config.HistoryModulesRequestInterval.TotalMilliseconds; + _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed; + } + + _historicalDataTimer.Start(); + _publishThread.Start(); + + InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Start()); } } @@ -122,103 +154,85 @@ namespace Tango.Telemetry if (IsStarted) { IsStarted = false; - _diagnosticsSamplingTimer.Stop(); + InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Stop()); _pendingStorageCheckTimer.Stop(); - _diagnosticsQueue.Clear(); - _publishQueue.BlockEnqueue(null); + _historicalDataTimer.Stop(); + QueueManager.Enqueue(null); } } #endregion - #region Incoming Data Event Handlers + #region Timers - private void MachineOperator_DiagnosticsDataAvailable(object sender, StartDiagnosticsResponse diagnostics) + private async void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e) { - if (IsStarted && diagnostics.Monitors != null) + _pendingStorageCheckTimer.Stop(); + + var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle); + + foreach (var pendingTelemetry in batch) { - _diagnosticsQueue.Add(diagnostics); - } - } + await PublishTelemetryPackage(new TelemetryPublishPackage() + { + Module = _pendingStorageModule, + PendingTelemetry = pendingTelemetry, + Source = TelemetrySource.PendingStorage + }); - #endregion + if (!IsStarted || _isDisposed) return; + } - #region Timers + _pendingStorageCheckTimer.Start(); + } - private void DiagnosticsSamplingTimer_Elapsed(object sender, ElapsedEventArgs e) + private async void HistoricalDataTimer_Elapsed(object sender, ElapsedEventArgs e) { - if (!IsStarted || _writing) return; + _historicalDataTimer.Stop(); - try + if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries) { - _diagnosticsSamplingTimer.Stop(); - - _writing = true; - - if (_diagnosticsQueue.Count > 0) + foreach (var module in InnerModules.OfType<ITelemetryHistoryModule>().ToList()) { - var queue = _diagnosticsQueue.ToList(); - _diagnosticsQueue.Clear(); - _emptyWritten = false; - - var monitorsAvg = InsightsHelper.AverageMonitors(queue.Select(x => x.Monitors).ToList()); - queue.Clear(); + TelemetryHistoryModuleCheckPoint checkPoint = StorageManager.GetHistoryModuleCheckPoint(module); + if (await module.CanRequestHistory(checkPoint.Time)) + { + var historyTelemetries = (await module.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList(); - TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); - frame.Monitors = monitorsAvg; - frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); + foreach (var telemetry in historyTelemetries) + { + PushTelemetryPackage(module, telemetry, TelemetrySource.ExternalStorage); + checkPoint.Time = telemetry.Time; + checkPoint.TotalCount++; + } - PushTelemetryPackage(frame, TelemetrySource.Streaming); - } - else - { - if (!_emptyWritten) - { - TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); - frame.Monitors = new InsightsMonitors(); - frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); - PushTelemetryPackage(frame, TelemetrySource.Streaming); - _emptyWritten = true; + StorageManager.SetHistoryModuleCheckPoint(module, checkPoint.Time, checkPoint.TotalCount); } } } - catch (Exception ex) - { - LogManager.Log(ex, "Error occurred on insights frame insertion."); - } - finally - { - _writing = false; - _diagnosticsSamplingTimer.Start(); - } - } - - private void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e) - { - _pendingStorageCheckTimer.Stop(); - - var telemetryAll = PendingStorageManager.GetTelemetryAll(); - - foreach (var t in telemetryAll) - { - PushTelemetryPackage(t, TelemetrySource.PendingStorage); - } - _pendingStorageCheckTimer.Start(); + _historicalDataTimer.Start(); } #endregion #region Push - private void PushTelemetryPackage(ITelemetry telemetryObject, TelemetrySource source) + private void PushTelemetryPackage(ITelemetryModule module, ITelemetry telemetry, TelemetrySource source) { - _publishQueue.BlockEnqueue(new TelemetryPublishPackage() { TelemetryObject = telemetryObject, Source = source }); + PendingTelemetry pendingTelemetry = new PendingTelemetry(); + pendingTelemetry.Created = DateTime.UtcNow; + //pendingTelemetry.Expires = module.GetExpiration(); + pendingTelemetry.Module = module.Name; + pendingTelemetry.Source = source; + pendingTelemetry.TelemetryObject = telemetry; + + PushTelemetryPackage(new TelemetryPublishPackage() { Module = module, PendingTelemetry = pendingTelemetry, Source = source }); } private void PushTelemetryPackage(TelemetryPublishPackage package) { - _publishQueue.BlockEnqueue(package); + QueueManager.Enqueue(package); } #endregion @@ -229,10 +243,10 @@ namespace Tango.Telemetry { while (IsStarted) { - TelemetryPublishPackage package = _publishQueue.BlockDequeue(); + TelemetryPublishPackage package = QueueManager.Dequeue(); if (package == null) { - _publishQueue = new ProducerConsumerQueue<TelemetryPublishPackage>(); + QueueManager.Clear(); return; } @@ -249,19 +263,21 @@ namespace Tango.Telemetry protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package) { + if (!IsStarted || _isDisposed) return; + List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>(); properties.Add(new KeyValuePair<string, string>("MachineID", Config.MachineID)); properties.Add(new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName())); properties.Add(new KeyValuePair<string, string>("Environment", Config.Environment)); - List<TelemetryPendingDestination> pendingDestinations = package.TelemetryObject.PendingDestinations.ToList(); + List<TelemetryPendingDestination> pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); //Add all destinations if streaming or external (They will be remove later if successfull) //If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db. if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) { - foreach (var destination in Config.TelemetryDestinations) + foreach (var destination in Destinations) { if (!pendingDestinations.Exists(x => x.Name == destination.Name)) { @@ -270,7 +286,7 @@ namespace Tango.Telemetry } } - foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source))) + foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source))) { if (pendingDestinations.Exists(x => x.Name == destination.Name)) { @@ -300,15 +316,15 @@ namespace Tango.Telemetry } } - package.TelemetryObject.PendingDestinations = pendingDestinations; + package.PendingTelemetry.PendingDestinations = pendingDestinations; - if (package.Source == TelemetrySource.PendingStorage && package.TelemetryObject.PendingDestinations.Count == 0) + if (package.Source == TelemetrySource.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0) { - PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject); + StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } else { - PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject); + StorageManager.UpsertPendingTelemetry(package.PendingTelemetry); } } @@ -354,16 +370,28 @@ namespace Tango.Telemetry public void Dispose() { - UnregisterEvents(); - - if (IsStarted) + if (!_isDisposed) { - Stop(); - } + _isDisposed = true; + foreach (var module in InnerModules) + { + if (module is ITelemetryStreamingModule streamingModule) + { + streamingModule.Stop(); + streamingModule.TelemetryAvailable -= Module_TelemetryAvailable; + } + module.Dispose(); + } - foreach (var destination in Config.TelemetryDestinations) - { - destination.Dispose(); + if (IsStarted) + { + Stop(); + } + + foreach (var destination in Destinations) + { + destination.Dispose(); + } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs index 2d0f48126..0183de6c2 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs @@ -15,7 +15,7 @@ namespace Tango.Telemetry /// </summary> public TimeSpan MaxExponentialBackoff { get; set; } = TimeSpan.FromHours(1); - public TelemetryPublisherAdvanced(IMachineOperator machineOperator, ITelemetryPendingStorageManager storageManager, TelemetryPublisherConfiguration config) : base(machineOperator, storageManager, config) + public TelemetryPublisherAdvanced(ITelemetryStorageManager storageManager, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : base(storageManager, queueManager, config) { } @@ -30,12 +30,12 @@ namespace Tango.Telemetry }; var now = DateTime.UtcNow; - var pendingDestinations = package.TelemetryObject.PendingDestinations.ToList(); + var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); // For Streaming/External: initialize pending destinations list (used if publishing fails) if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) { - foreach (var dest in Config.TelemetryDestinations) + foreach (var dest in Destinations) { if (!pendingDestinations.Any(x => x.Name == dest.Name)) { @@ -50,7 +50,7 @@ namespace Tango.Telemetry } } - foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source))) + foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source))) { var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name); @@ -101,15 +101,15 @@ namespace Tango.Telemetry } } - package.TelemetryObject.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations); + package.PendingTelemetry.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations); if (package.Source == TelemetrySource.PendingStorage && !pendingDestinations.Any()) { - PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject); + StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } else { - PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject); + StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs index da5856848..424249f7d 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs @@ -12,16 +12,17 @@ namespace Tango.Telemetry public String MachineID { get; set; } public MachineTypes MachineType { get; set; } public String Environment { get; set; } - public TimeSpan DiagnosticsSamplingInterval { get; set; } public TimeSpan PendingStorageCheckInterval { get; set; } - public List<ITelemetryDestination> TelemetryDestinations { get; private set; } + public int MaxPendingStorageTelemetriesPerCycle { get; set; } + public TimeSpan HistoryModulesRequestInterval { get; set; } + public int MaxPendingTelemetries { get; set; } public TelemetryPublisherConfiguration() { - TelemetryDestinations = new List<ITelemetryDestination>(); - - DiagnosticsSamplingInterval = TimeSpan.FromSeconds(10); PendingStorageCheckInterval = TimeSpan.FromMinutes(1); + MaxPendingStorageTelemetriesPerCycle = 100; + HistoryModulesRequestInterval = TimeSpan.FromMinutes(1); + MaxPendingTelemetries = 200; } public void Validate() @@ -35,23 +36,8 @@ namespace Tango.Telemetry if (!Enum.IsDefined(typeof(MachineTypes), MachineType)) throw new ArgumentOutOfRangeException(nameof(MachineType), "MachineType is not a valid enum value."); - if (DiagnosticsSamplingInterval.TotalSeconds < 1) - throw new ArgumentOutOfRangeException(nameof(DiagnosticsSamplingInterval), "DiagnosticsSamplingInterval must be at least 1 second."); - if (PendingStorageCheckInterval.TotalSeconds < 5) throw new ArgumentOutOfRangeException(nameof(PendingStorageCheckInterval), "PendingStorageCheckInterval must be at least 5 seconds."); - - if (TelemetryDestinations == null || TelemetryDestinations.Count == 0) - throw new InvalidOperationException("At least one telemetry destination must be provided."); - - foreach (var destination in TelemetryDestinations) - { - if (destination == null) - throw new InvalidOperationException("Telemetry destination list contains a null entry."); - - if (destination.SupportedSources == null || destination.SupportedSources.Count == 0) - throw new InvalidOperationException($"Telemetry destination '{destination.Name}' has no supported sources defined."); - } } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/packages.config b/Software/Visual_Studio/Tango.Telemetry/packages.config index b94abaac9..1a4b2c3c8 100644 --- a/Software/Visual_Studio/Tango.Telemetry/packages.config +++ b/Software/Visual_Studio/Tango.Telemetry/packages.config @@ -6,6 +6,7 @@ <package id="DotNetty.Common" version="0.6.0" targetFramework="net461" /> <package id="DotNetty.Handlers" version="0.6.0" targetFramework="net461" /> <package id="DotNetty.Transport" version="0.6.0" targetFramework="net461" /> + <package id="EntityFramework" version="6.2.0" targetFramework="net461" /> <package id="LiteDB" version="5.0.4" targetFramework="net461" /> <package id="Microsoft.AspNet.WebApi.Client" version="5.2.3" targetFramework="net461" /> <package id="Microsoft.Azure.Amqp" version="2.5.10" targetFramework="net461" /> |
