aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio
diff options
context:
space:
mode:
authorRoy Ben Shabat <Roy.mail.net@gmail.com>2025-07-30 12:36:30 +0300
committerRoy Ben Shabat <Roy.mail.net@gmail.com>2025-07-30 12:36:30 +0300
commit4222eddece906d6f0877022c06b853deb5068472 (patch)
treea29b706b3a5aedb28a42b209d5bb72b0ef94d40e /Software/Visual_Studio
parenta802fe75f9538371004f1833e69a69b798892d0c (diff)
downloadTango-4222eddece906d6f0877022c06b853deb5068472.tar.gz
Tango-4222eddece906d6f0877022c06b853deb5068472.zip
Telemetry source.
Diffstat (limited to 'Software/Visual_Studio')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs26
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs36
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs5
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryHistorySource.cs (renamed from Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs)2
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs (renamed from Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs)2
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingSource.cs (renamed from Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs)2
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs (renamed from Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs)14
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSourceConfig.cs (renamed from Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs)6
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs (renamed from Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs)6
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySourceConfig.cs (renamed from Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs)6
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj27
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryDiagnosticsFrame.cs (renamed from Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryDiagnosticsFrame.cs)4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs (renamed from Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryJobRun.cs)4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs6
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs9
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableSource.cs (renamed from Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs)4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryHistorySourceCheckPoint.cs (renamed from Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs)4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs16
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs (renamed from Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs)2
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs41
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs287
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs81
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs (renamed from Software/Visual_Studio/Tango.Telemetry/TelemetrySource.cs)2
28 files changed, 466 insertions, 146 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs
index 75307f844..7a9cc9f7a 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs
@@ -12,16 +12,30 @@ namespace Tango.Telemetry.Destinations
{
private DeviceClient _hubClient;
- public string Name { get; private set; } = "Azure IoT Hub";
+ public string Name { get; set; } = "Azure IoT Hub";
public bool Enable { get; set; } = true;
public String ConnectionString { get; private set; }
- public IReadOnlyList<TelemetrySource> SupportedSources { get; private set; }
+ public ConnectionStatus HubConnectionStatus { get; private set; }
+ public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; }
public AzureHubTelemetryDestination(String connectionString)
{
+ HubConnectionStatus = ConnectionStatus.Connected;
ConnectionString = connectionString;
- SupportedSources = new List<TelemetrySource>() { TelemetrySource.PendingStorage, TelemetrySource.Streaming, TelemetrySource.ExternalStorage };
+ SupportedSourceTypes = new List<TelemetrySourceTypes>() { TelemetrySourceTypes.PendingStorage, TelemetrySourceTypes.Streaming, TelemetrySourceTypes.ExternalStorage };
+ }
+
+ public Task<bool> IsAvailable()
+ {
+ if (_hubClient == null)
+ {
+ return Task.FromResult(true);
+ }
+ else
+ {
+ return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected);
+ }
}
public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties)
@@ -31,6 +45,7 @@ namespace Tango.Telemetry.Destinations
_hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
_hubClient.SetConnectionStatusChangesHandler((status, reason) =>
{
+ HubConnectionStatus = status;
LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.");
});
}
@@ -53,5 +68,10 @@ namespace Tango.Telemetry.Destinations
{
_hubClient?.Dispose();
}
+
+ public override string ToString()
+ {
+ return Name;
+ }
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
index b22849f3c..b5ff05c29 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
@@ -18,14 +18,15 @@ namespace Tango.Telemetry.Destinations
{
private IMqttClient _mqttClient;
private IMqttClientOptions _mqttOptions;
+ private DateTime _nextRealAvailabilityCheck;
- public string Name { get; private set; } = "MQTT";
+ public string Name { get; set; } = "MQTT";
public bool Enable { get; set; } = true;
public String Address { get; private set; }
public int Port { get; private set; }
public String Topic { get; private set; }
- public IReadOnlyList<TelemetrySource> SupportedSources { get; private set; }
+ public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; }
/// <summary>
///
@@ -35,12 +36,34 @@ namespace Tango.Telemetry.Destinations
/// <param name="port">Default 1883</param>
public MqttTelemetryDestination(String topic, String address = "localhost", int port = 1883)
{
+ _nextRealAvailabilityCheck = DateTime.Now;
Topic = topic;
+ Address = address;
Port = port;
- SupportedSources = new List<TelemetrySource>() { TelemetrySource.Streaming };
+ SupportedSourceTypes = new List<TelemetrySourceTypes>() { TelemetrySourceTypes.Streaming };
}
- public async Task<bool> EnsureConnection()
+ public async Task<bool> IsAvailable()
+ {
+ if (_mqttClient == null)
+ {
+ return await EnsureConnection();
+ }
+ else
+ {
+ if (DateTime.Now > _nextRealAvailabilityCheck)
+ {
+ _nextRealAvailabilityCheck = DateTime.Now.AddMinutes(5);
+ return await EnsureConnection();
+ }
+ else
+ {
+ return _mqttClient.IsConnected;
+ }
+ }
+ }
+
+ private async Task<bool> EnsureConnection()
{
if (_mqttClient == null || !_mqttClient.IsConnected)
{
@@ -98,5 +121,10 @@ namespace Tango.Telemetry.Destinations
{
_mqttClient?.Dispose();
}
+
+ public override string ToString()
+ {
+ return $"{Name} -> {Address}:{Port}";
+ }
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs
index a8cc46fd5..10424531b 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs
@@ -9,8 +9,9 @@ namespace Tango.Telemetry
public interface ITelemetryDestination : IDisposable
{
bool Enable { get; set; }
- String Name { get; }
- IReadOnlyList<TelemetrySource> SupportedSources { get; }
+ String Name { get; set; }
+ Task<bool> IsAvailable();
+ IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; }
Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties);
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistorySource.cs
index 314b99046..0b118d85c 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistoryModule.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryHistorySource.cs
@@ -6,7 +6,7 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
- public interface ITelemetryHistoryModule : ITelemetryModule
+ public interface ITelemetryHistorySource : ITelemetrySource
{
Task<bool> CanRequestHistory(DateTime from);
Task<IEnumerable<ITelemetry>> RequestHistory(DateTime from);
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs
index 0c63dc906..b44f567da 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs
@@ -14,8 +14,8 @@ namespace Tango.Telemetry
event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed;
ITelemetryStorageManager StorageManager { get; }
ITelemetryQueueManager QueueManager { get; }
- ReadOnlyCollection<ITelemetryModule> Modules { get; }
- void RegisterModule(ITelemetryModule module);
+ ReadOnlyCollection<ITelemetrySource> Sources { get; }
+ void RegisterSource(ITelemetrySource source);
ReadOnlyCollection<ITelemetryDestination> Destinations { get; }
void RegisterDestination(ITelemetryDestination destination);
bool IsStarted { get; }
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs
index a27a03fc8..74d58ed4a 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryModule.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs
@@ -6,7 +6,7 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
- public interface ITelemetryModule : IDisposable
+ public interface ITelemetrySource : IDisposable
{
String Name { get; }
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs
index 2c54a2a51..ae63e41cd 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs
@@ -12,7 +12,7 @@ namespace Tango.Telemetry
void DeletePendingTelemetry(PendingTelemetry pendingTelemetry);
List<PendingTelemetry> GetPendingTelemetries(int maxCount);
int GetPendingTelemetriesCount();
- TelemetryHistoryModuleCheckPoint GetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule);
- void SetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule, DateTime time, int totalCount);
+ TelemetryHistorySourceCheckPoint GetHistorySourceCheckPoint(ITelemetryHistorySource source);
+ void SetHistorySourceCheckPoint(ITelemetryHistorySource source, DateTime time, int totalCount);
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingSource.cs
index 12b576b1b..3c60c35ee 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingModule.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStreamingSource.cs
@@ -6,7 +6,7 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
- public interface ITelemetryStreamingModule : ITelemetryModule
+ public interface ITelemetryStreamingSource : ITelemetrySource
{
event EventHandler<TelemetryAvailableEventArgs> TelemetryAvailable;
void Start();
diff --git a/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs
index 2be3b1c45..5e1b5778d 100644
--- a/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/PendingTelemetry.cs
@@ -14,8 +14,8 @@ namespace Tango.Telemetry
public DateTime Created { get; set; }
public DateTime Expires { get; set; }
public ITelemetry TelemetryObject { get; set; }
- public String Module { get; set; }
- public TelemetrySource Source { get; set; }
+ public String Source { get; set; }
+ public TelemetrySourceTypes SourceType { get; set; }
public List<TelemetryPendingDestination> PendingDestinations { get; set; }
public PendingTelemetry()
diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs
index a35e08328..22fac087b 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModule.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs
@@ -10,11 +10,11 @@ using Tango.Integration.Operation;
using Tango.Logging;
using Tango.PMR.Diagnostics;
using Tango.PMR.Insights;
-using Tango.Telemetry.TelemetryObjects;
+using Tango.Telemetry.Telemetries;
-namespace Tango.Telemetry.Modules
+namespace Tango.Telemetry.Sources
{
- public class TelemetryDiagnosticsModule : TelemetryConfigurableModule<TelemetryDiagnosticsModuleConfig>, ITelemetryModule
+ public class TelemetryDiagnosticsSource : TelemetryConfigurableSource<TelemetryDiagnosticsSourceConfig>, ITelemetrySource
{
public const int MIN_SAMPLING_INTERVAL_SECONDS = 1;
@@ -30,12 +30,12 @@ namespace Tango.Telemetry.Modules
public string Name { get; private set; } = "Diagnostics";
- private TelemetryDiagnosticsModule() : base()
+ private TelemetryDiagnosticsSource() : base()
{
_diagnosticsQueue = new List<StartDiagnosticsResponse>();
}
- public TelemetryDiagnosticsModule(IMachineOperator machineOperator) : base()
+ public TelemetryDiagnosticsSource(IMachineOperator machineOperator) : base()
{
_machineOperator = machineOperator;
_machineOperator.DiagnosticsDataAvailable += DiagnosticsDataAvailable;
@@ -101,7 +101,7 @@ namespace Tango.Telemetry.Modules
frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval);
- TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySource.Streaming));
+ TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySourceTypes.Streaming));
}
else
{
@@ -110,7 +110,7 @@ namespace Tango.Telemetry.Modules
TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame();
frame.Monitors = new InsightsMonitors();
frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval);
- TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySource.Streaming));
+ TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs(frame, TelemetrySourceTypes.Streaming));
_emptyWritten = true;
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSourceConfig.cs
index 8d13b3cca..806277d5b 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryDiagnosticsModuleConfig.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSourceConfig.cs
@@ -5,13 +5,13 @@ using System.Text;
using System.Threading.Tasks;
using Tango.Settings;
-namespace Tango.Telemetry.Modules
+namespace Tango.Telemetry.Sources
{
- public class TelemetryDiagnosticsModuleConfig : SettingsBase
+ public class TelemetryDiagnosticsSourceConfig : SettingsBase
{
public TimeSpan DiagnosticsSamplingInterval { get; set; }
- public TelemetryDiagnosticsModuleConfig()
+ public TelemetryDiagnosticsSourceConfig()
{
DiagnosticsSamplingInterval = TimeSpan.FromSeconds(10);
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs
index 0ba2935c1..6cb597e4b 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModule.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs
@@ -5,11 +5,11 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tango.BL;
-using Tango.Telemetry.TelemetryObjects;
+using Tango.Telemetry.Telemetries;
-namespace Tango.Telemetry.Modules
+namespace Tango.Telemetry.Sources
{
- public class TelemetryJobRunsHistoryModule : TelemetryConfigurableModule<TelemetryJobRunsHistoryModuleConfig>, ITelemetryHistoryModule
+ public class TelemetryJobRunsHistorySource : TelemetryConfigurableSource<TelemetryJobRunsHistorySourceConfig>, ITelemetryHistorySource
{
private bool _isBusy;
diff --git a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySourceConfig.cs
index 412dea5f9..da2043941 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Modules/TelemetryJobRunsHistoryModuleConfig.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySourceConfig.cs
@@ -5,13 +5,13 @@ using System.Text;
using System.Threading.Tasks;
using Tango.Settings;
-namespace Tango.Telemetry.Modules
+namespace Tango.Telemetry.Sources
{
- public class TelemetryJobRunsHistoryModuleConfig : SettingsBase
+ public class TelemetryJobRunsHistorySourceConfig : SettingsBase
{
public int MaxJobRunsPerRequest { get; set; }
- public TelemetryJobRunsHistoryModuleConfig()
+ public TelemetryJobRunsHistorySourceConfig()
{
MaxJobRunsPerRequest = 100;
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
index 27ac76783..02f8417f1 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
+++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
@@ -207,25 +207,25 @@
<Compile Include="ExtensionMethods\ITelemetryExtensions.cs" />
<Compile Include="ITelemetryDestination.cs" />
<Compile Include="Destinations\MqttTelemetryDestination.cs" />
- <Compile Include="ITelemetryHistoryModule.cs" />
- <Compile Include="ITelemetryModule.cs" />
+ <Compile Include="ITelemetryHistorySource.cs" />
+ <Compile Include="ITelemetrySource.cs" />
<Compile Include="ITelemetryQueueManager.cs" />
<Compile Include="ITelemetryStorageManager.cs" />
<Compile Include="ITelemetryPublisher.cs" />
- <Compile Include="ITelemetryStreamingModule.cs" />
- <Compile Include="Modules\TelemetryDiagnosticsModule.cs" />
- <Compile Include="Modules\TelemetryDiagnosticsModuleConfig.cs" />
- <Compile Include="Modules\TelemetryJobRunsHistoryModule.cs" />
- <Compile Include="Modules\TelemetryJobRunsHistoryModuleConfig.cs" />
+ <Compile Include="ITelemetryStreamingSource.cs" />
+ <Compile Include="Sources\TelemetryDiagnosticsSource.cs" />
+ <Compile Include="Sources\TelemetryDiagnosticsSourceConfig.cs" />
+ <Compile Include="Sources\TelemetryJobRunsHistoryModule.cs" />
+ <Compile Include="Sources\TelemetryJobRunsHistorySourceConfig.cs" />
<Compile Include="PendingTelemetry.cs" />
- <Compile Include="TelemetryConfigurableModule.cs" />
- <Compile Include="TelemetryHistoryModuleCheckPoint.cs" />
+ <Compile Include="TelemetryConfigurableSource.cs" />
+ <Compile Include="TelemetryHistorySourceCheckPoint.cs" />
<Compile Include="TelemetryNameAttribute.cs" />
<Compile Include="TelemetryAvailableEventArgs.cs" />
<Compile Include="TelemetryPackagePublishedEventArgs.cs" />
<Compile Include="TelemetryPackagePublishFailedEventArgs.cs" />
<Compile Include="TelemetryPendingDestination.cs" />
- <Compile Include="TelemetryPendingStorageModule.cs" />
+ <Compile Include="TelemetryPendingStorageSource.cs" />
<Compile Include="TelemetryInMemoryQueueManager.cs" />
<Compile Include="TelemetryPublisherAdvanced.cs" />
<Compile Include="TelemetryPublisherEventArgs.cs" />
@@ -233,14 +233,15 @@
<Compile Include="TelemetryPublishPackage.cs" />
<Compile Include="TelemetryPublisher.cs" />
<Compile Include="TelemetryPublisherConfiguration.cs" />
- <Compile Include="TelemetrySource.cs" />
+ <Compile Include="TelemetryPublishResult.cs" />
+ <Compile Include="TelemetrySourceTypes.cs" />
<Compile Include="TelemetryLiteDBStorageManager.cs" />
<Compile Include="ITelemetry.cs" />
<Compile Include="JsonFlattener.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TelemetryBase.cs" />
- <Compile Include="TelemetryObjects\TelemetryDiagnosticsFrame.cs" />
- <Compile Include="TelemetryObjects\TelemetryJobRun.cs" />
+ <Compile Include="Telemetries\TelemetryDiagnosticsFrame.cs" />
+ <Compile Include="Telemetries\TelemetryJobRun.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryDiagnosticsFrame.cs b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryDiagnosticsFrame.cs
index 86762596e..da64c50a0 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryDiagnosticsFrame.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryDiagnosticsFrame.cs
@@ -8,9 +8,9 @@ using System.Text;
using System.Threading.Tasks;
using Tango.PMR.Insights;
-namespace Tango.Telemetry.TelemetryObjects
+namespace Tango.Telemetry.Telemetries
{
- [TelemetryName("Diagnostics")]
+ [TelemetryName("DiagnosticsFrame")]
public class TelemetryDiagnosticsFrame : TelemetryBase
{
public InsightsMonitors Monitors { get; set; }
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryJobRun.cs b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs
index 828f61360..7b1ca3a1c 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryObjects/TelemetryJobRun.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs
@@ -4,9 +4,9 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
-namespace Tango.Telemetry.TelemetryObjects
+namespace Tango.Telemetry.Telemetries
{
- [TelemetryName("JobRuns")]
+ [TelemetryName("JobRun")]
public class TelemetryJobRun : TelemetryBase
{
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs
index 163b72f7a..a9ddf890c 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryAvailableEventArgs.cs
@@ -8,13 +8,13 @@ namespace Tango.Telemetry
{
public class TelemetryAvailableEventArgs : EventArgs
{
- public TelemetrySource Source { get; set; }
+ public TelemetrySourceTypes SourceType { get; set; }
public ITelemetry TelemetryObject { get; set; }
- public TelemetryAvailableEventArgs(ITelemetry telemetryObject, TelemetrySource source)
+ public TelemetryAvailableEventArgs(ITelemetry telemetryObject, TelemetrySourceTypes source)
{
TelemetryObject = telemetryObject;
- Source = source;
+ SourceType = source;
}
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
index 133409ff9..20d5211e7 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
@@ -1,4 +1,5 @@
-using Newtonsoft.Json;
+using LiteDB;
+using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -9,6 +10,12 @@ namespace Tango.Telemetry
{
public class TelemetryBase : ITelemetry
{
+ [BsonIgnore] //This will be used for column mapping in ADX
+ public String Type
+ {
+ get { return this.ToTelemetryName(); }
+ }
+
public DateTime Time { get; set; }
public byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true)
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableSource.cs
index 8ffb96af5..c1068582b 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableModule.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryConfigurableSource.cs
@@ -8,11 +8,11 @@ using Tango.Settings;
namespace Tango.Telemetry
{
- public abstract class TelemetryConfigurableModule<T> : ExtendedObject where T : SettingsBase
+ public abstract class TelemetryConfigurableSource<T> : ExtendedObject where T : SettingsBase
{
public T Config { get; set; }
- public TelemetryConfigurableModule()
+ public TelemetryConfigurableSource()
{
Config = SettingsManager.Default.GetOrCreate<T>();
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistorySourceCheckPoint.cs
index 2f57a1926..ee82de05a 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryHistoryModuleCheckPoint.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryHistorySourceCheckPoint.cs
@@ -7,10 +7,10 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
- public class TelemetryHistoryModuleCheckPoint
+ public class TelemetryHistorySourceCheckPoint
{
[BsonId]
- public String ModuleName { get; set; }
+ public String SourceName { get; set; }
public DateTime Time { get; set; }
public int TotalCount { get; set; }
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
index 781b3f3e6..2ceb95298 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
@@ -51,9 +51,9 @@ namespace Tango.Telemetry
return _database.GetCollection<PendingTelemetry>("PendingTelemetries");
}
- private ILiteCollection<TelemetryHistoryModuleCheckPoint> GetModulesCheckpointCollection()
+ private ILiteCollection<TelemetryHistorySourceCheckPoint> GetSourcesCheckpointCollection()
{
- return _database.GetCollection<TelemetryHistoryModuleCheckPoint>("ModuleCheckPoints");
+ return _database.GetCollection<TelemetryHistorySourceCheckPoint>("SourcesCheckPoints");
}
public void UpsertPendingTelemetry(PendingTelemetry pendingTelemetry)
@@ -83,21 +83,21 @@ namespace Tango.Telemetry
}
}
- public TelemetryHistoryModuleCheckPoint GetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule)
+ public TelemetryHistorySourceCheckPoint GetHistorySourceCheckPoint(ITelemetryHistorySource source)
{
lock (_lock)
{
- var collection = GetModulesCheckpointCollection();
- return collection.FindOne(x => x.ModuleName == historyModule.Name);
+ var collection = GetSourcesCheckpointCollection();
+ return collection.FindOne(x => x.SourceName == source.Name);
}
}
- public void SetHistoryModuleCheckPoint(ITelemetryHistoryModule historyModule, DateTime time, int totalCount)
+ public void SetHistorySourceCheckPoint(ITelemetryHistorySource source, DateTime time, int totalCount)
{
lock (_lock)
{
- var collection = GetModulesCheckpointCollection();
- collection.Upsert(new TelemetryHistoryModuleCheckPoint() { ModuleName = historyModule.Name, Time = time, TotalCount = totalCount });
+ var collection = GetSourcesCheckpointCollection();
+ collection.Upsert(new TelemetryHistorySourceCheckPoint() { SourceName = source.Name, Time = time, TotalCount = totalCount });
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs
index 6aae43f67..6aa1f5527 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageModule.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs
@@ -6,7 +6,7 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
- public class TelemetryPendingStorageModule : ITelemetryModule
+ public class TelemetryPendingStorageSource : ITelemetrySource
{
public string Name { get; private set; } = "Pending Storage";
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs
index 5bd207e9f..baafb70a7 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs
@@ -10,9 +10,9 @@ namespace Tango.Telemetry
{
private String _payload;
- public ITelemetryModule Module { get; set; }
+ public ITelemetrySource Source { get; set; }
public PendingTelemetry PendingTelemetry { get; set; }
- public TelemetrySource Source { get; set; }
+ public TelemetrySourceTypes SourceType { get; set; }
public String ToPayload()
{
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs
new file mode 100644
index 000000000..37c6af412
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs
@@ -0,0 +1,41 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public class TelemetryPublishResult
+ {
+ public enum DestinationStatus
+ {
+ None,
+ Passed,
+ Unavailable,
+ Failed,
+ Postponed
+ }
+
+ public class DestinationResult
+ {
+ public ITelemetryDestination Destination { get; set; }
+ public DestinationStatus Status { get; set; }
+ public Exception Error { get; set; }
+ public TimeSpan ElapsedTime { get; set; }
+ }
+
+ public List<DestinationResult> DestinationsResults { get; set; }
+ public TimeSpan TotalElapsedTime { get; set; }
+
+ public TimeSpan OverheadTime
+ {
+ get { return TimeSpan.FromMilliseconds(TotalElapsedTime.TotalMilliseconds - DestinationsResults.Sum(x => x.ElapsedTime.TotalMilliseconds)); }
+ }
+
+ public TelemetryPublishResult()
+ {
+ DestinationsResults = new List<DestinationResult>();
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
index 1314d3346..99d96edff 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
@@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
+using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Text;
@@ -9,11 +10,13 @@ using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Tango.Core;
+using Tango.Core.ExtensionMethods;
using Tango.Insights;
using Tango.Integration.Operation;
+using Tango.Logging;
using Tango.PMR.Diagnostics;
using Tango.PMR.Insights;
-using Tango.Telemetry.TelemetryObjects;
+using Tango.Telemetry.Telemetries;
namespace Tango.Telemetry
{
@@ -26,9 +29,9 @@ namespace Tango.Telemetry
private System.Timers.Timer _pendingStorageCheckTimer;
private System.Timers.Timer _historicalDataTimer;
- private bool _isDisposed;
+ protected bool _isDisposed;
private Thread _publishThread;
- private TelemetryPendingStorageModule _pendingStorageModule;
+ private TelemetryPendingStorageSource _pendingStorageSource;
#region Properties
@@ -38,8 +41,8 @@ namespace Tango.Telemetry
public ITelemetryStorageManager StorageManager { get; private set; }
- private List<ITelemetryModule> InnerModules { get; }
- public ReadOnlyCollection<ITelemetryModule> Modules { get; }
+ private List<ITelemetrySource> InnerSources { get; }
+ public ReadOnlyCollection<ITelemetrySource> Sources { get; }
private List<ITelemetryDestination> InnerDestinations { get; }
public ReadOnlyCollection<ITelemetryDestination> Destinations { get; }
@@ -54,10 +57,10 @@ namespace Tango.Telemetry
{
Config = config ?? new TelemetryPublisherConfiguration();
- _pendingStorageModule = new TelemetryPendingStorageModule();
+ _pendingStorageSource = new TelemetryPendingStorageSource();
- InnerModules = new List<ITelemetryModule>();
- Modules = new ReadOnlyCollection<ITelemetryModule>(InnerModules);
+ InnerSources = new List<ITelemetrySource>();
+ Sources = new ReadOnlyCollection<ITelemetrySource>(InnerSources);
InnerDestinations = new List<ITelemetryDestination>();
Destinations = new ReadOnlyCollection<ITelemetryDestination>(InnerDestinations);
@@ -77,31 +80,48 @@ namespace Tango.Telemetry
#endregion
- #region Modules
+ #region Sources
- public void RegisterModule(ITelemetryModule module)
+ public void RegisterSource(ITelemetrySource source)
{
- if (InnerModules.Exists(x => x.GetType() == module.GetType()))
+ if (source == null) return;
+
+ if (InnerSources.Exists(x => x.GetType() == source.GetType()))
{
- throw new InvalidOperationException($"Module {module.GetType().Name} has already been registered.");
+ LogManager.Log($"Telemetry source {source.Name} has already been registered. Ignoring.", LogCategory.Warning);
+ return;
}
- InnerModules.Add(module);
+ InnerSources.Add(source);
- if (module is ITelemetryStreamingModule streamingModule)
+ if (source is ITelemetryStreamingSource streamingSource)
{
- streamingModule.TelemetryAvailable += Module_TelemetryAvailable;
+ streamingSource.TelemetryAvailable += StreamingSource_TelemetryAvailable;
if (IsStarted)
{
- streamingModule.Start();
+ try
+ {
+ streamingSource.Start();
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, $"Error starting telemetry source {source.Name}.");
+ }
}
}
+
+ LogManager.Log($"Telemetry source {source.Name} registered.");
}
- private void Module_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e)
+ private void StreamingSource_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e)
{
- PushTelemetryPackage(sender as ITelemetryModule, e.TelemetryObject, e.Source);
+ var source = sender as ITelemetrySource;
+ if (source != null)
+ {
+ LogManager.Log($"Telemetry stream received {source.Name} -> {e.SourceType} -> {e.TelemetryObject.ToTelemetryName()}.", LogCategory.Debug);
+ PushTelemetryPackage(source, e.TelemetryObject, e.SourceType);
+ }
}
#endregion
@@ -110,7 +130,17 @@ namespace Tango.Telemetry
public void RegisterDestination(ITelemetryDestination destination)
{
+ if (destination == null) return;
+
+ if (InnerDestinations.Exists(x => x.Name == destination.Name))
+ {
+ LogManager.Log($"Telemetry destination with name {destination.Name} has already been registered. Ignoring.", LogCategory.Warning);
+ return;
+ }
+
InnerDestinations.Add(destination);
+
+ LogManager.Log($"Telemetry destination {destination.Name} registered.");
}
#endregion
@@ -121,31 +151,45 @@ namespace Tango.Telemetry
{
if (!IsStarted)
{
- Config.Validate();
+ try
+ {
+ LogManager.Log($"Starting telemetry publisher...\nConfig:\n{Config.ToJsonString()}\nSources: {String.Join(", ", Sources.Select(x => x.Name))}\nDestinations: {String.Join(", ", Destinations.Select(x => x.Name))}");
- IsStarted = true;
+ Config.Validate();
+ Validate();
- if (_pendingStorageCheckTimer == null)
- {
- _pendingStorageCheckTimer = new System.Timers.Timer();
- _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds;
- _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed;
- }
+ IsStarted = true;
- _pendingStorageCheckTimer.Start();
+ if (_pendingStorageCheckTimer == null)
+ {
+ _pendingStorageCheckTimer = new System.Timers.Timer();
+ _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds;
+ _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed;
+ }
- if (_historicalDataTimer == null)
- {
- _historicalDataTimer = new System.Timers.Timer();
- _historicalDataTimer.Interval = Config.HistoryModulesRequestInterval.TotalMilliseconds;
- _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed;
- }
+ _pendingStorageCheckTimer.Start();
+
+ if (_historicalDataTimer == null)
+ {
+ _historicalDataTimer = new System.Timers.Timer();
+ _historicalDataTimer.Interval = Config.HistorySourcesRequestInterval.TotalMilliseconds;
+ _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed;
+ }
+
+ _historicalDataTimer.Start();
- _historicalDataTimer.Start();
+ _publishThread.Start();
- _publishThread.Start();
+ InnerSources.OfType<ITelemetryStreamingSource>().ToList().ForEach(x => x.Start());
- InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Start());
+ LogManager.Log($"Telemetry publisher started.");
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Error starting telemetry publisher.");
+ Stop();
+ throw ex;
+ }
}
}
@@ -154,10 +198,63 @@ namespace Tango.Telemetry
if (IsStarted)
{
IsStarted = false;
- InnerModules.OfType<ITelemetryStreamingModule>().ToList().ForEach(x => x.Stop());
+
+ LogManager.Log("Stopping telemetry publisher...");
+
+ InnerSources.OfType<ITelemetryStreamingSource>().ToList().ForEach(x =>
+ {
+ try
+ {
+ x.Stop();
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, $"Error while trying to stop telemetry source {x.Name}.");
+ }
+ });
_pendingStorageCheckTimer.Stop();
_historicalDataTimer.Stop();
QueueManager.Enqueue(null);
+
+ LogManager.Log("Telemetry publisher stopped.");
+ }
+ }
+
+ public void Validate()
+ {
+ // Validate all registered sources
+ foreach (var source in InnerSources)
+ {
+ if (string.IsNullOrWhiteSpace(source.Name))
+ {
+ throw new ArgumentException("A registered telemetry source has an invalid or missing Name.");
+ }
+ }
+
+ // Validate all registered destinations
+ foreach (var destination in InnerDestinations)
+ {
+ if (string.IsNullOrWhiteSpace(destination.Name))
+ {
+ throw new ArgumentException("A registered telemetry destination has an invalid or missing Name.");
+ }
+
+ if (destination.SupportedSourceTypes == null || !destination.SupportedSourceTypes.Any())
+ {
+ throw new InvalidOperationException($"Telemetry destination '{destination.Name}' must support at least one telemetry source.");
+ }
+ }
+
+ // Validate StorageManager
+ if (StorageManager == null)
+ {
+ throw new NullReferenceException("StorageManager is not configured.");
+ }
+
+ // Validate QueueManager
+ if (QueueManager == null)
+ {
+ throw new NullReferenceException("QueueManager is not configured.");
}
}
@@ -169,20 +266,39 @@ namespace Tango.Telemetry
{
_pendingStorageCheckTimer.Stop();
+ LogManager.Log($"Fetching pending telemetries from storage (MaxCount: {Config.MaxPendingStorageTelemetriesPerCycle})...");
+
var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle);
+ LogManager.Log($"Pending telemetries count is {batch.Count}. Publishing...");
+
+ Dictionary<String, int> destinationsPasses = new Dictionary<string, int>();
+ List<TelemetryPublishResult> results = new List<TelemetryPublishResult>();
+
foreach (var pendingTelemetry in batch)
{
- await PublishTelemetryPackage(new TelemetryPublishPackage()
+ var result = await PublishTelemetryPackage(new TelemetryPublishPackage()
{
- Module = _pendingStorageModule,
+ Source = _pendingStorageSource,
PendingTelemetry = pendingTelemetry,
- Source = TelemetrySource.PendingStorage
+ SourceType = TelemetrySourceTypes.PendingStorage
});
+ results.Add(result);
+
+ foreach (var d in result.DestinationsResults)
+ {
+ if (d.Status == TelemetryPublishResult.DestinationStatus.Passed)
+ {
+ destinationsPasses[d.Destination.Name] += 1;
+ }
+ }
+
if (!IsStarted || _isDisposed) return;
}
+ LogManager.Log($"Publishing pending telemetries completed after {results.Sum(x => x.TotalElapsedTime.Seconds)} seconds. Destination OK Count: {String.Join(", ", destinationsPasses.Select(x => x.Key + " -> " + x.Value))}");
+
_pendingStorageCheckTimer.Start();
}
@@ -190,23 +306,25 @@ namespace Tango.Telemetry
{
_historicalDataTimer.Stop();
+ LogManager.Log("");
+
if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries)
{
- foreach (var module in InnerModules.OfType<ITelemetryHistoryModule>().ToList())
+ foreach (var source in InnerSources.OfType<ITelemetryHistorySource>().ToList())
{
- TelemetryHistoryModuleCheckPoint checkPoint = StorageManager.GetHistoryModuleCheckPoint(module);
- if (await module.CanRequestHistory(checkPoint.Time))
+ TelemetryHistorySourceCheckPoint checkPoint = StorageManager.GetHistorySourceCheckPoint(source);
+ if (await source.CanRequestHistory(checkPoint.Time))
{
- var historyTelemetries = (await module.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList();
+ var historyTelemetries = (await source.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList();
foreach (var telemetry in historyTelemetries)
{
- PushTelemetryPackage(module, telemetry, TelemetrySource.ExternalStorage);
+ PushTelemetryPackage(source, telemetry, TelemetrySourceTypes.ExternalStorage);
checkPoint.Time = telemetry.Time;
checkPoint.TotalCount++;
}
- StorageManager.SetHistoryModuleCheckPoint(module, checkPoint.Time, checkPoint.TotalCount);
+ StorageManager.SetHistorySourceCheckPoint(source, checkPoint.Time, checkPoint.TotalCount);
}
}
}
@@ -218,16 +336,16 @@ namespace Tango.Telemetry
#region Push
- private void PushTelemetryPackage(ITelemetryModule module, ITelemetry telemetry, TelemetrySource source)
+ private void PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType)
{
PendingTelemetry pendingTelemetry = new PendingTelemetry();
pendingTelemetry.Created = DateTime.UtcNow;
- //pendingTelemetry.Expires = module.GetExpiration();
- pendingTelemetry.Module = module.Name;
- pendingTelemetry.Source = source;
+ //pendingTelemetry.Expires = source.GetExpiration();
+ pendingTelemetry.Source = source.Name;
+ pendingTelemetry.SourceType = sourceType;
pendingTelemetry.TelemetryObject = telemetry;
- PushTelemetryPackage(new TelemetryPublishPackage() { Module = module, PendingTelemetry = pendingTelemetry, Source = source });
+ PushTelemetryPackage(new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType });
}
private void PushTelemetryPackage(TelemetryPublishPackage package)
@@ -261,9 +379,13 @@ namespace Tango.Telemetry
}
}
- protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package)
+ protected virtual async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package)
{
- if (!IsStarted || _isDisposed) return;
+ Stopwatch totalWatch = Stopwatch.StartNew();
+
+ var result = new TelemetryPublishResult();
+
+ if (!IsStarted || _isDisposed) return result;
List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>();
@@ -275,7 +397,7 @@ namespace Tango.Telemetry
//Add all destinations if streaming or external (They will be remove later if successfull)
//If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db.
- if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage)
+ if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage)
{
foreach (var destination in Destinations)
{
@@ -286,26 +408,58 @@ namespace Tango.Telemetry
}
}
- foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source)))
+ foreach (var destination in Destinations.Where(x => x.Enable && x.SupportedSourceTypes.Contains(package.SourceType)))
{
if (pendingDestinations.Exists(x => x.Name == destination.Name))
{
+ Stopwatch destinationWatch = Stopwatch.StartNew();
+
+ var destinationResult = new TelemetryPublishResult.DestinationResult();
+ destinationResult.Destination = destination;
+ result.DestinationsResults.Add(destinationResult);
+
try
{
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
if (OnPublishingPackage(package, destination))
{
- await destination.Publish(package, properties);
- OnPackagePublished(package, destination);
+ if (await destination.IsAvailable())
+ {
+ await destination.Publish(package, properties);
+ OnPackagePublished(package, destination);
+
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+ }
+ else
+ {
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
+ if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
+ {
+ if (!pendingDestinations.Exists(x => x.Name == destination.Name))
+ {
+ pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name });
+ }
+ }
+ }
}
}
catch (Exception ex)
{
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed;
+ destinationResult.Error = ex;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}.");
OnPackagePublishFailed(package, destination, ex);
- if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage))
+ if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
if (!pendingDestinations.Exists(x => x.Name == destination.Name))
{
@@ -318,7 +472,7 @@ namespace Tango.Telemetry
package.PendingTelemetry.PendingDestinations = pendingDestinations;
- if (package.Source == TelemetrySource.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0)
+ if (package.SourceType == TelemetrySourceTypes.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0)
{
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
@@ -326,6 +480,11 @@ namespace Tango.Telemetry
{
StorageManager.UpsertPendingTelemetry(package.PendingTelemetry);
}
+
+ totalWatch.Stop();
+ result.TotalElapsedTime = totalWatch.Elapsed;
+
+ return result;
}
#endregion
@@ -373,14 +532,14 @@ namespace Tango.Telemetry
if (!_isDisposed)
{
_isDisposed = true;
- foreach (var module in InnerModules)
+ foreach (var source in InnerSources)
{
- if (module is ITelemetryStreamingModule streamingModule)
+ if (source is ITelemetryStreamingSource streamingSource)
{
- streamingModule.Stop();
- streamingModule.TelemetryAvailable -= Module_TelemetryAvailable;
+ streamingSource.Stop();
+ streamingSource.TelemetryAvailable -= StreamingSource_TelemetryAvailable;
}
- module.Dispose();
+ source.Dispose();
}
if (IsStarted)
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
index 0183de6c2..c4e7d3d3e 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
@@ -20,8 +21,14 @@ namespace Tango.Telemetry
}
- protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package)
+ protected override async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package)
{
+ Stopwatch totalWatch = Stopwatch.StartNew();
+
+ var result = new TelemetryPublishResult();
+
+ if (!IsStarted || _isDisposed) return result;
+
List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>("MachineID", Config.MachineID),
@@ -33,7 +40,7 @@ namespace Tango.Telemetry
var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList();
// For Streaming/External: initialize pending destinations list (used if publishing fails)
- if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage)
+ if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage)
{
foreach (var dest in Destinations)
{
@@ -50,35 +57,86 @@ namespace Tango.Telemetry
}
}
- foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source)))
+ foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType)))
{
var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name);
if (pendingEntry == null)
continue;
+ Stopwatch destinationWatch = Stopwatch.StartNew();
+
+ var destinationResult = new TelemetryPublishResult.DestinationResult();
+ destinationResult.Destination = destination;
+ result.DestinationsResults.Add(destinationResult);
+
// Respect backoff timing
if (now < pendingEntry.NextEligibleAttempt)
+ {
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
continue;
+ }
try
{
if (OnPublishingPackage(package, destination))
{
- await destination.Publish(package, properties);
- OnPackagePublished(package, destination);
+ if (await destination.IsAvailable())
+ {
+ await destination.Publish(package, properties);
+ OnPackagePublished(package, destination);
- // On success: remove entry from pending list
- pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
+ // On success: remove entry from pending list
+ pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ }
+ else
+ {
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
+ // Only track retry state if retry is supported
+ if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
+ {
+ if (pendingEntry == null)
+ {
+ pendingEntry = new TelemetryPendingDestination { Name = destination.Name };
+ pendingDestinations.Add(pendingEntry);
+ }
+
+ pendingEntry.RetryCount++;
+ pendingEntry.LastAttempt = now;
+
+ // Apply exponential backoff
+ int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
+ pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds);
+ }
+ else
+ {
+ // Remove if not retryable
+ pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ }
+ }
}
}
catch (Exception ex)
{
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed;
+ destinationResult.Error = ex;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}.");
OnPackagePublishFailed(package, destination, ex);
// Only track retry state if retry is supported
- if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage))
+ if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
if (pendingEntry == null)
{
@@ -103,7 +161,7 @@ namespace Tango.Telemetry
package.PendingTelemetry.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations);
- if (package.Source == TelemetrySource.PendingStorage && !pendingDestinations.Any())
+ if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any())
{
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
@@ -111,6 +169,11 @@ namespace Tango.Telemetry
{
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
+
+ totalWatch.Stop();
+ result.TotalElapsedTime = totalWatch.Elapsed;
+
+ return result;
}
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
index 424249f7d..4a3776b87 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
@@ -14,14 +14,14 @@ namespace Tango.Telemetry
public String Environment { get; set; }
public TimeSpan PendingStorageCheckInterval { get; set; }
public int MaxPendingStorageTelemetriesPerCycle { get; set; }
- public TimeSpan HistoryModulesRequestInterval { get; set; }
+ public TimeSpan HistorySourcesRequestInterval { get; set; }
public int MaxPendingTelemetries { get; set; }
public TelemetryPublisherConfiguration()
{
PendingStorageCheckInterval = TimeSpan.FromMinutes(1);
MaxPendingStorageTelemetriesPerCycle = 100;
- HistoryModulesRequestInterval = TimeSpan.FromMinutes(1);
+ HistorySourcesRequestInterval = TimeSpan.FromMinutes(1);
MaxPendingTelemetries = 200;
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetrySource.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs
index 09bbdc539..1ed80d5c9 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetrySource.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs
@@ -6,7 +6,7 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
- public enum TelemetrySource
+ public enum TelemetrySourceTypes
{
Streaming,
ExternalStorage,