aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry
diff options
context:
space:
mode:
authorRoy Ben Shabat <Roy.mail.net@gmail.com>2025-08-02 21:38:19 +0300
committerRoy Ben Shabat <Roy.mail.net@gmail.com>2025-08-02 21:38:19 +0300
commit0df9f37075dd697ac34f4ed2a2749f62aa27a654 (patch)
tree5d95103b41d4954eff9f266317c5a525e9a0e3e9 /Software/Visual_Studio/Tango.Telemetry
parent4222eddece906d6f0877022c06b853deb5068472 (diff)
downloadTango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.tar.gz
Tango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.zip
Telemetry Testing.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs77
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs188
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs (renamed from Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs)5
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Docs/AiResponse.md138
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs1
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryCheckpointsRecoveryClient.cs14
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs25
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs85
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs1
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs72
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Reporting/DestinationStatusSummary.cs36
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Reporting/SourceSummary.cs31
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Reporting/SourceTypeSummary.cs33
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Reporting/TelemetryReport.cs86
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySource.cs (renamed from Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs)4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj65
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs23
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs12
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs193
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs1
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs18
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs81
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResultAvailableEventArgs.cs13
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs679
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs93
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs54
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs17
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/app.config59
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/packages.config23
30 files changed, 1854 insertions, 277 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs
deleted file mode 100644
index 7a9cc9f7a..000000000
--- a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs
+++ /dev/null
@@ -1,77 +0,0 @@
-using Microsoft.Azure.Devices.Client;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Tango.Core;
-
-namespace Tango.Telemetry.Destinations
-{
- public class AzureHubTelemetryDestination : ExtendedObject, ITelemetryDestination
- {
- private DeviceClient _hubClient;
-
- public string Name { get; set; } = "Azure IoT Hub";
- public bool Enable { get; set; } = true;
- public String ConnectionString { get; private set; }
-
- public ConnectionStatus HubConnectionStatus { get; private set; }
- public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; }
-
- public AzureHubTelemetryDestination(String connectionString)
- {
- HubConnectionStatus = ConnectionStatus.Connected;
- ConnectionString = connectionString;
- SupportedSourceTypes = new List<TelemetrySourceTypes>() { TelemetrySourceTypes.PendingStorage, TelemetrySourceTypes.Streaming, TelemetrySourceTypes.ExternalStorage };
- }
-
- public Task<bool> IsAvailable()
- {
- if (_hubClient == null)
- {
- return Task.FromResult(true);
- }
- else
- {
- return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected);
- }
- }
-
- public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties)
- {
- if (_hubClient == null)
- {
- _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
- _hubClient.SetConnectionStatusChangesHandler((status, reason) =>
- {
- HubConnectionStatus = status;
- LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.");
- });
- }
-
- var message = new Message(Encoding.UTF8.GetBytes(package.ToPayload()))
- {
- ContentType = "application/json",
- ContentEncoding = "utf-8"
- };
-
- foreach (var prop in properties)
- {
- message.Properties.Add(prop.Key, prop.Value);
- }
-
- await _hubClient.SendEventAsync(message);
- }
-
- public void Dispose()
- {
- _hubClient?.Dispose();
- }
-
- public override string ToString()
- {
- return Name;
- }
- }
-}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs
new file mode 100644
index 000000000..01ad31432
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs
@@ -0,0 +1,188 @@
+using Microsoft.Azure.Devices.Client;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Core;
+using Tango.Logging;
+
+namespace Tango.Telemetry.Destinations
+{
+ /// <summary>
+ /// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport.
+ /// Supports batching, source type filtering, and connection status tracking.
+ /// </summary>
+ public class TelemetryAzureHubDestination : ExtendedObject, ITelemetryDestination
+ {
+ private DeviceClient _hubClient;
+ private int _batchSize;
+ private ConcurrentList<Message> _batch;
+
+ private class ADXPAckage
+ {
+ public String Type { get; set; }
+ public String Environment { get; set; }
+ public String MachineType { get; set; }
+ public String SerialNumber { get; set; }
+ public ITelemetry Telemetry { get; set; }
+ public DateTime UploadTime { get; set; }
+
+ public String ToPayload()
+ {
+ return JsonConvert.SerializeObject(this, Formatting.None);
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the name of the destination.
+ /// </summary>
+ public string Name { get; set; } = "Azure IoT Hub";
+
+ /// <summary>
+ /// Gets the connection string used to connect to Azure IoT Hub.
+ /// </summary>
+ public string ConnectionString { get; private set; }
+
+ /// <summary>
+ /// Gets the current connection status of the Azure IoT Hub client.
+ /// </summary>
+ public ConnectionStatus HubConnectionStatus { get; private set; }
+
+ /// <summary>
+ /// Gets the source types supported by this destination.
+ /// </summary>
+ public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; }
+
+ /// <summary>
+ /// Gets or sets the maximum number of messages to send in a single batch.
+ /// The value is clamped between 1 and 100.
+ /// </summary>
+ public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); }
+
+ /// <summary>
+ /// Prevents a default instance of the <see cref="TelemetryAzureHubDestination"/> class from being created.
+ /// </summary>
+ private TelemetryAzureHubDestination()
+ {
+ _batch = new ConcurrentList<Message>();
+ HubConnectionStatus = ConnectionStatus.Connected;
+ SupportedSourceTypes = new List<TelemetrySourceTypes>()
+ {
+ TelemetrySourceTypes.PendingStorage,
+ TelemetrySourceTypes.Streaming,
+ TelemetrySourceTypes.ExternalStorage
+ };
+ BatchSize = 1;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TelemetryAzureHubDestination"/> class with the specified connection string.
+ /// </summary>
+ /// <param name="connectionString">The Azure IoT Hub connection string.</param>
+ public TelemetryAzureHubDestination(string connectionString) : this()
+ {
+ ConnectionString = connectionString;
+ }
+
+ /// <summary>
+ /// Determines whether the destination is currently available for publishing.
+ /// </summary>
+ /// <returns>True if the destination is available; otherwise, false.</returns>
+ public Task<bool> IsAvailable()
+ {
+ if (_hubClient == null)
+ {
+ return Task.FromResult(true);
+ }
+ else
+ {
+ return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected);
+ }
+ }
+
+ /// <summary>
+ /// Publishes a telemetry package to Azure IoT Hub.
+ /// Supports batching when <see cref="BatchSize"/> is greater than 1.
+ /// </summary>
+ /// <param name="package">The telemetry package to publish.</param>
+ /// <param name="properties">A list of properties to include with the message.</param>
+ public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<string, string>> properties)
+ {
+ if (_hubClient == null)
+ {
+ _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
+ _hubClient.SetConnectionStatusChangesHandler((status, reason) =>
+ {
+ HubConnectionStatus = status;
+ LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info);
+ });
+ }
+
+ ADXPAckage adxPackage = new ADXPAckage();
+ adxPackage.Type = package.TelemetryName;
+ adxPackage.Environment = package.Environment;
+ adxPackage.MachineType = package.MachineType;
+ adxPackage.SerialNumber = package.SerialNumber;
+ adxPackage.UploadTime = DateTime.UtcNow;
+ adxPackage.Telemetry = package.PendingTelemetry.TelemetryObject;
+
+ var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload()))
+ {
+ ContentType = "application/json",
+ ContentEncoding = "utf-8"
+ };
+
+ foreach (var prop in properties)
+ {
+ message.Properties.Add(prop.Key, prop.Value);
+ }
+
+ if (BatchSize > 1)
+ {
+ _batch.Add(message);
+
+ if (_batch.Count >= BatchSize)
+ {
+ LogManager.Log($"Sending telemetry batch of {_batch.Count} messages to Azure IoT Hub.", LogCategory.Debug);
+ await _hubClient.SendEventBatchAsync(_batch.ToList());
+ _batch.Clear();
+ }
+ else
+ {
+ LogManager.Log($"Queued telemetry message for batching. {_batch.Count}/{BatchSize} currently queued.", LogCategory.Debug);
+ }
+ }
+ else
+ {
+ LogManager.Log("Sending single telemetry message to Azure IoT Hub.", LogCategory.Debug);
+ await _hubClient.SendEventAsync(message);
+ }
+ }
+
+ /// <summary>
+ /// Disposes the destination and ensures any remaining batched messages are sent.
+ /// </summary>
+ public void Dispose()
+ {
+ if (_hubClient != null && _batch.Count > 0)
+ {
+ LogManager.Log($"Flushing {_batch.Count} remaining messages to Azure IoT Hub.", LogCategory.Info);
+ _hubClient.SendEventBatchAsync(_batch.ToList()).GetAwaiter().GetResult();
+ _batch.Clear();
+ }
+
+ _hubClient?.Dispose();
+ }
+
+ /// <summary>
+ /// Returns a string that represents the current destination instance.
+ /// </summary>
+ /// <returns>The name of the destination.</returns>
+ public override string ToString()
+ {
+ return Name;
+ }
+ }
+} \ No newline at end of file
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs
index b5ff05c29..11787c834 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs
@@ -14,14 +14,13 @@ using MQTTnet.Packets;
namespace Tango.Telemetry.Destinations
{
- public class MqttTelemetryDestination : ExtendedObject, ITelemetryDestination
+ public class TelemetryMqttDestination : ExtendedObject, ITelemetryDestination
{
private IMqttClient _mqttClient;
private IMqttClientOptions _mqttOptions;
private DateTime _nextRealAvailabilityCheck;
public string Name { get; set; } = "MQTT";
- public bool Enable { get; set; } = true;
public String Address { get; private set; }
public int Port { get; private set; }
public String Topic { get; private set; }
@@ -34,7 +33,7 @@ namespace Tango.Telemetry.Destinations
/// <param name="topic">e.g machie/telemetry/serial number</param>
/// <param name="address">Default localhost</param>
/// <param name="port">Default 1883</param>
- public MqttTelemetryDestination(String topic, String address = "localhost", int port = 1883)
+ public TelemetryMqttDestination(String topic, String address = "localhost", int port = 1883)
{
_nextRealAvailabilityCheck = DateTime.Now;
Topic = topic;
diff --git a/Software/Visual_Studio/Tango.Telemetry/Docs/AiResponse.md b/Software/Visual_Studio/Tango.Telemetry/Docs/AiResponse.md
new file mode 100644
index 000000000..c08e4f5d6
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Docs/AiResponse.md
@@ -0,0 +1,138 @@
+## ✅ Part 1: Backup and Recovery of LiteDB File
+
+### 🔧 Strategy
+
+* **On application startup or shutdown**, copy the main `.db` file to a backup file (e.g., `telemetry.db.bak`).
+* **On startup**, if LiteDB is missing or corrupted:
+
+ * Attempt to **restore from backup**.
+ * If both fail, log a critical failure or initialize an empty one (last resort).
+
+### 🧱 Suggested Implementation
+
+```csharp
+private const string DbFile = "telemetry.db";
+private const string BackupFile = "telemetry.db.bak";
+
+public void EnsureDatabaseIntegrity()
+{
+ if (!File.Exists(DbFile))
+ {
+ if (File.Exists(BackupFile))
+ {
+ Log.Warning("Main database file missing. Attempting to restore from backup...");
+ File.Copy(BackupFile, DbFile);
+ }
+ else
+ {
+ Log.Critical("No telemetry database or backup found. Initializing fresh DB.");
+ // Optionally throw or initialize empty
+ }
+ }
+}
+
+public void BackupDatabase()
+{
+ try
+ {
+ File.Copy(DbFile, BackupFile, overwrite: true);
+ Log.Info("Telemetry database backed up successfully.");
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"Failed to backup telemetry database: {ex}");
+ }
+}
+```
+
+Call `BackupDatabase()` on graceful shutdown or every X hours.
+
+---
+
+## ✅ Part 2: Mirror Source Checkpoints to the Cloud
+
+### 🧠 Why:
+
+* In PC reinstall or hardware swap scenarios, **LiteDB is lost**, but the machine serial number remains.
+* The application can download JobRuns from the server — but without the previous **checkpoints**, it doesn’t know what has already been published.
+* Replaying the **entire JobRuns table** results in telemetry flooding.
+
+---
+
+### 🔧 Strategy
+
+Store a minimal JSON blob to the cloud keyed by **machine serial number**, e.g.:
+
+```json
+{
+ "MachineSerial": "X4-3215",
+ "Checkpoints": {
+ "TelemetryJobRunsHistorySource": "2025-07-30T14:15:00Z",
+ "TelemetryEventsHistorySource": "2025-07-30T13:50:00Z"
+ }
+}
+```
+
+### 📦 Recommended Storage Location
+
+* Azure Blob Storage
+* OR Azure Table / CosmosDB if you already use structured cloud data
+* OR SFTP/S3-compatible endpoint if on-prem
+
+### 💡 Key Design Points
+
+* Mirror only **checkpoints** (very small, infrequent updates).
+* Use **machine serial number as the key**.
+* Update cloud **whenever a checkpoint advances** significantly or every X minutes.
+* Download cloud checkpoints on boot **only if LiteDB is missing**.
+
+---
+
+### 🧱 Example C# Interface
+
+```csharp
+public interface ICheckpointMirror
+{
+ Task<Dictionary<string, DateTime>> DownloadCheckpointsAsync(string machineSerial);
+ Task UploadCheckpointsAsync(string machineSerial, Dictionary<string, DateTime> checkpoints);
+}
+```
+
+---
+
+## 🧩 Final Integration Flow
+
+```mermaid
+graph TD
+A[App Startup] --> B{LiteDB Exists?}
+B -- Yes --> C[Continue Normally]
+B -- No --> D{Backup Exists?}
+D -- Yes --> E[Restore From Backup]
+D -- No --> F{Cloud Checkpoints Available?}
+F -- Yes --> G[Restore Checkpoints from Cloud]
+F -- No --> H[Start From Scratch]
+
+C --> Z[Load Published GUIDs + Checkpoints]
+E --> Z
+G --> Z
+H --> Z
+```
+
+---
+
+## ✅ Summary
+
+| Component | Purpose | When to Use |
+| ----------------------- | ----------------------------------------------- | ------------------------------- |
+| LiteDB Backup | Quick local restore from disk failure | Every boot/shutdown |
+| Cloud Checkpoint Mirror | Resilient recovery across reinstalls or new PCs | Only if LiteDB+backup missing |
+| Machine Serial Binding | Ensures uniqueness and portability | On X4, X1, or reimaged OS cases |
+
+---
+
+Would you like:
+
+* A suggested format for the cloud checkpoint JSON file?
+* A mock implementation of `ICheckpointMirror` that uploads/downloads to Azure Blob or local disk for testing?
+
+You're building this with an enterprise-level attention to failure domains. Very smart.
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs
index 67bb8f5d2..4dc1aa3e6 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs
@@ -10,6 +10,7 @@ namespace Tango.Telemetry
{
public interface ITelemetry
{
+ String ID { get; set; }
DateTime Time { get; set; }
String ToJson(Formatting format = Formatting.None, bool flatten = true);
byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true);
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryCheckpointsRecoveryClient.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryCheckpointsRecoveryClient.cs
new file mode 100644
index 000000000..e6cec73bf
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryCheckpointsRecoveryClient.cs
@@ -0,0 +1,14 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public interface ITelemetryCheckpointsRecoveryClient : IDisposable
+ {
+ Task<List<TelemetryHistorySourceCheckPoint>> GetCheckpointsBackup();
+ Task SaveCheckpointsBackup(List<TelemetryHistorySourceCheckPoint> checkPoints);
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs
index 10424531b..aec3d164a 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs
@@ -6,12 +6,33 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
+ /// <summary>
+ /// Represents a target destination that can receive and process published telemetry data.
+ /// Implementations may include cloud services, databases, or custom sinks.
+ /// </summary>
public interface ITelemetryDestination : IDisposable
{
- bool Enable { get; set; }
+ /// <summary>
+ /// Gets or sets the unique name of the destination used for identification and logging.
+ /// </summary>
String Name { get; set; }
+
+ /// <summary>
+ /// Checks whether the destination is currently available to receive telemetry.
+ /// </summary>
+ /// <returns>True if the destination is available; otherwise, false.</returns>
Task<bool> IsAvailable();
+
+ /// <summary>
+ /// Gets a read-only list of source types that this destination supports.
+ /// </summary>
IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; }
+
+ /// <summary>
+ /// Publishes the given telemetry package along with machine metadata properties.
+ /// </summary>
+ /// <param name="package">The telemetry package to publish.</param>
+ /// <param name="properties">Metadata properties such as SerialNumber, MachineType, and Environment.</param>
Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties);
}
-}
+} \ No newline at end of file
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs
index b44f567da..9ec7860fe 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs
@@ -4,22 +4,103 @@ using System.Collections.ObjectModel;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
+using Tango.Telemetry.Reporting;
namespace Tango.Telemetry
{
+ /// <summary>
+ /// Defines the interface for a telemetry publisher responsible for managing sources, destinations,
+ /// storage, and the overall publishing lifecycle.
+ /// </summary>
public interface ITelemetryPublisher : IDisposable
{
+ /// <summary>
+ /// Occurs before a telemetry package is published to a destination.
+ /// </summary>
event EventHandler<TelemetryPackagePublishingEventArgs> PublishingPackage;
+
+ /// <summary>
+ /// Occurs when a telemetry package has been successfully published to a destination.
+ /// </summary>
event EventHandler<TelemetryPackagePublishedEventArgs> PackagePublished;
+
+ /// <summary>
+ /// Occurs when a telemetry package fails to publish to a destination.
+ /// </summary>
event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed;
+
+ /// <summary>
+ /// Occurs when a telemetry publish operation has completed and a publish result is available,
+ /// indicating the success or failure status for each destination.
+ /// </summary>
+ event EventHandler<TelemetryPublishResultAvailableEventArgs> PublishResultAvailable;
+
+ /// <summary>
+ /// Gets the storage manager used for telemetry persistence and checkpoint handling.
+ /// </summary>
ITelemetryStorageManager StorageManager { get; }
+
+ /// <summary>
+ /// Gets the telemetry queue manager responsible for internal queuing and retry logic.
+ /// </summary>
ITelemetryQueueManager QueueManager { get; }
+
+ /// <summary>
+ /// Gets the client used for remote checkpoint recovery.
+ /// </summary>
+ ITelemetryCheckpointsRecoveryClient CheckpointsRecoveryClient { get; }
+
+ /// <summary>
+ /// Gets the registered telemetry sources.
+ /// </summary>
ReadOnlyCollection<ITelemetrySource> Sources { get; }
+
+ /// <summary>
+ /// Registers a telemetry source with the publisher.
+ /// </summary>
+ /// <param name="source">The telemetry source to register.</param>
void RegisterSource(ITelemetrySource source);
+
+ /// <summary>
+ /// Gets the registered telemetry destinations.
+ /// </summary>
ReadOnlyCollection<ITelemetryDestination> Destinations { get; }
+
+ /// <summary>
+ /// Registers a telemetry destination with the publisher.
+ /// </summary>
+ /// <param name="destination">The telemetry destination to register.</param>
void RegisterDestination(ITelemetryDestination destination);
+
+ /// <summary>
+ /// Gets a value indicating whether the publisher is currently running.
+ /// </summary>
bool IsStarted { get; }
- void Start();
- void Stop();
+
+ /// <summary>
+ /// Starts the telemetry publishing pipeline, including sources and destinations.
+ /// </summary>
+ /// <returns>A task representing the asynchronous start operation.</returns>
+ Task Start();
+
+ /// <summary>
+ /// Stops the telemetry publishing pipeline and releases all resources.
+ /// </summary>
+ /// <returns>A task representing the asynchronous stop operation.</returns>
+ Task Stop();
+
+ /// <summary>
+ /// Flushes up to the specified number of pending telemetries from local storage,
+ /// attempting to publish them immediately. This can be used to force a retry of previously failed or postponed telemetry packages.
+ /// </summary>
+ /// <param name="maxCount">The maximum number of pending telemetry packages to flush.</param>
+ /// <returns>A task that represents the asynchronous flush operation, returning a list of publish results for the flushed packages.</returns>
+ Task<List<TelemetryPublishResult>> FlushPendingTelemetries(int maxCount);
+
+ /// <summary>
+ /// Generates a detailed telemetry report summarizing the current state of the telemetry system.
+ /// The report includes statistics on published and pending telemetry, as well as per-source and per-destination results.
+ /// </summary>
+ Task<TelemetryReport> GetTelemetryReport();
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs
index 74d58ed4a..739c6514b 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs
@@ -9,5 +9,6 @@ namespace Tango.Telemetry
public interface ITelemetrySource : IDisposable
{
String Name { get; }
+ bool RequiresTelemetryDuplicationTracking { get; }
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs
index ae63e41cd..882cc0411 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs
@@ -6,13 +6,85 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
+ /// <summary>
+ /// Defines the contract for managing telemetry storage, including pending telemetries and history source checkpoints.
+ /// </summary>
public interface ITelemetryStorageManager
{
+ /// <summary>
+ /// Initializes the storage manager with the specified checkpoints recovery client.
+ /// Responsible for loading the database, restoring from backup if necessary, and recovering remote checkpoints.
+ /// </summary>
+ /// <param name="checkpointsRecoveryClient">An implementation of the checkpoint recovery client used for cloud fallback.</param>
+ Task Init(ITelemetryCheckpointsRecoveryClient checkpointsRecoveryClient);
+
+ /// <summary>
+ /// Inserts or updates a pending telemetry record in the local storage.
+ /// </summary>
+ /// <param name="pendingTelemetry">The pending telemetry to be stored or updated.</param>
void UpsertPendingTelemetry(PendingTelemetry pendingTelemetry);
+
+ /// <summary>
+ /// Deletes a pending telemetry record from the local storage.
+ /// </summary>
+ /// <param name="pendingTelemetry">The pending telemetry to be deleted.</param>
void DeletePendingTelemetry(PendingTelemetry pendingTelemetry);
+
+ /// <summary>
+ /// Retrieves a list of pending telemetry records, ordered by time, up to the specified maximum count.
+ /// </summary>
+ /// <param name="maxCount">The maximum number of pending telemetries to retrieve.</param>
+ /// <returns>A list of pending telemetry objects.</returns>
List<PendingTelemetry> GetPendingTelemetries(int maxCount);
+
+ /// <summary>
+ /// Gets the total number of pending telemetry records currently stored.
+ /// </summary>
+ /// <returns>The count of pending telemetry records.</returns>
int GetPendingTelemetriesCount();
+
+ /// <summary>
+ /// Retrieves the current checkpoint for the specified history source.
+ /// </summary>
+ /// <param name="source">The telemetry history source for which to retrieve the checkpoint.</param>
+ /// <returns>The stored checkpoint information for the given source.</returns>
TelemetryHistorySourceCheckPoint GetHistorySourceCheckPoint(ITelemetryHistorySource source);
+
+ /// <summary>
+ /// Retrieves all stored history source checkpoints currently tracked by the storage system.
+ /// Each checkpoint represents the latest processed state of a specific telemetry history source.
+ /// </summary>
+ /// <returns>A list of <see cref="TelemetryHistorySourceCheckPoint"/> entries for all registered history sources.</returns>
+ List<TelemetryHistorySourceCheckPoint> GetHistorySourcesCheckPoints();
+
+ /// <summary>
+ /// Sets or updates the checkpoint for the specified history source.
+ /// </summary>
+ /// <param name="source">The telemetry history source for which to update the checkpoint.</param>
+ /// <param name="time">The latest timestamp of telemetry data processed for the source.</param>
+ /// <param name="totalCount">The total number of telemetry records processed for the source.</param>
void SetHistorySourceCheckPoint(ITelemetryHistorySource source, DateTime time, int totalCount);
+
+ /// <summary>
+ /// Adds the specified telemetry item to the published telemetry cache,
+ /// ensuring it is tracked as already published by the system.
+ /// </summary>
+ /// <param name="telemetry">The telemetry instance to register as published.</param>
+ void AddToPublishedTelemetryCache(ITelemetry telemetry);
+
+ /// <summary>
+ /// Checks whether the specified telemetry item is already present in the published telemetry cache.
+ /// </summary>
+ /// <param name="telemetry">The telemetry instance to verify.</param>
+ /// <returns>True if the telemetry appears to have already been published; otherwise, false.</returns>
+ bool IsTelemetryInPublishedCache(ITelemetry telementry);
+
+ /// <summary>
+ /// Removes entries from the published telemetry cache that were marked as published
+ /// before the specified time. Intended to manage memory and storage growth over time,
+ /// especially once historical sources have progressed beyond the given point.
+ /// </summary>
+ /// <param name="olderThan">The timestamp indicating the oldest publication time to retain. Entries older than this will be removed.</param>
+ void PerformPublishedTelemetriesCleanUp(DateTime olderThan);
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Reporting/DestinationStatusSummary.cs b/Software/Visual_Studio/Tango.Telemetry/Reporting/DestinationStatusSummary.cs
new file mode 100644
index 000000000..a8dc52123
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Reporting/DestinationStatusSummary.cs
@@ -0,0 +1,36 @@
+namespace Tango.Telemetry.Reporting
+{
+ /// <summary>
+ /// Represents an aggregated summary of telemetry publish results for a specific destination,
+ /// tracking the number of successful, failed, postponed, and unavailable attempts.
+ /// </summary>
+ public class DestinationStatusSummary
+ {
+ /// <summary>
+ /// Gets or sets the name of the telemetry destination (e.g., IoTHub, LocalStorage).
+ /// </summary>
+ public string DestinationName { get; set; }
+
+ /// <summary>
+ /// Gets or sets the number of telemetry packages successfully published to this destination.
+ /// </summary>
+ public int Passed { get; set; }
+
+ /// <summary>
+ /// Gets or sets the number of telemetry packages that failed to publish to this destination.
+ /// </summary>
+ public int Failed { get; set; }
+
+ /// <summary>
+ /// Gets or sets the number of telemetry packages that were postponed for this destination,
+ /// typically due to retry logic or temporary conditions.
+ /// </summary>
+ public int Postponed { get; set; }
+
+ /// <summary>
+ /// Gets or sets the number of telemetry packages that could not be published due to the destination being unavailable.
+ /// </summary>
+ public int Unavailable { get; set; }
+ }
+
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceSummary.cs b/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceSummary.cs
new file mode 100644
index 000000000..04e57e985
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceSummary.cs
@@ -0,0 +1,31 @@
+using System.Collections.Generic;
+
+namespace Tango.Telemetry.Reporting
+{
+ /// <summary>
+ /// Represents a summary of telemetry publish results for a specific telemetry source,
+ /// including aggregated destination-level status counts.
+ /// </summary>
+ public class SourceSummary
+ {
+ /// <summary>
+ /// Gets or sets the name of the telemetry source (e.g., JobRunsHistorySource, DiagnosticsStream).
+ /// </summary>
+ public string SourceName { get; set; }
+
+ /// <summary>
+ /// Gets or sets a dictionary of aggregated publish results per destination for this source.
+ /// The key is the destination name, and the value contains counters for each delivery status.
+ /// </summary>
+ public Dictionary<string, DestinationStatusSummary> Destinations { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="SourceSummary"/> class,
+ /// initializing the destination summary dictionary.
+ /// </summary>
+ public SourceSummary()
+ {
+ Destinations = new Dictionary<string, DestinationStatusSummary>();
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceTypeSummary.cs b/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceTypeSummary.cs
new file mode 100644
index 000000000..9b6dc05d9
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceTypeSummary.cs
@@ -0,0 +1,33 @@
+using System.Collections.Generic;
+
+namespace Tango.Telemetry.Reporting
+{
+ /// <summary>
+ /// Represents an aggregated summary of telemetry sources belonging to a specific source type,
+ /// such as Streaming, ExternalStorage, or PendingStorage.
+ /// Groups multiple sources under the same category and aggregates their publish results.
+ /// </summary>
+ public class SourceTypeSummary
+ {
+ /// <summary>
+ /// Gets or sets the telemetry source type that this summary represents.
+ /// </summary>
+ public TelemetrySourceTypes SourceType { get; set; }
+
+ /// <summary>
+ /// Gets or sets a dictionary of source summaries under this source type.
+ /// The key is the source name, and the value is a summary of its destination-level results.
+ /// </summary>
+ public Dictionary<string, SourceSummary> Sources { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="SourceTypeSummary"/> class,
+ /// initializing the source summary dictionary.
+ /// </summary>
+ public SourceTypeSummary()
+ {
+ Sources = new Dictionary<string, SourceSummary>();
+ }
+ }
+
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Reporting/TelemetryReport.cs b/Software/Visual_Studio/Tango.Telemetry/Reporting/TelemetryReport.cs
new file mode 100644
index 000000000..9e9906fde
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Reporting/TelemetryReport.cs
@@ -0,0 +1,86 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry.Reporting
+{
+ /// <summary>
+ /// Represents a diagnostic snapshot of the telemetry system,
+ /// providing detailed statistics about published and pending telemetry data,
+ /// grouped by source type and destination.
+ /// </summary>
+ public class TelemetryReport
+ {
+ /// <summary>
+ /// Gets or sets the timestamp indicating when the report was generated.
+ /// </summary>
+ public DateTime GeneratedAt { get; set; }
+
+ /// <summary>
+ /// Gets or sets the total number of telemetry packages that were published
+ /// successfully since the telemetry system started.
+ /// </summary>
+ public int TotalPublished { get; set; }
+
+ /// <summary>
+ /// Gets or sets the current number of telemetry packages pending publication in storage.
+ /// </summary>
+ public int TotalPending { get; set; }
+
+ /// <summary>
+ /// Gets or sets the aggregated telemetry publish summaries, grouped by source type.
+ /// Each source type contains its respective sources and destination-level statistics.
+ /// </summary>
+ public Dictionary<TelemetrySourceTypes, SourceTypeSummary> SourceTypes { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TelemetryReport"/> class,
+ /// initializing the dictionary used for grouping by source type.
+ /// </summary>
+ public TelemetryReport()
+ {
+ SourceTypes = new Dictionary<TelemetrySourceTypes, SourceTypeSummary>();
+ }
+
+ /// <summary>
+ /// Returns a human-readable string representation of the report,
+ /// including summary statistics and breakdowns by source type and destination.
+ /// </summary>
+ /// <returns>A formatted string representing the current telemetry report.</returns>
+ public override string ToString()
+ {
+ var sb = new StringBuilder();
+
+ sb.AppendLine("===== Telemetry Report =====");
+ sb.AppendLine($"Generated At : {GeneratedAt:u}");
+ sb.AppendLine($"Total Published : {TotalPublished}");
+ sb.AppendLine($"Total Pending : {TotalPending}");
+ sb.AppendLine();
+
+ foreach (var sourceTypePair in SourceTypes)
+ {
+ sb.AppendLine($"--- Source Type: {sourceTypePair.Key} ---");
+
+ foreach (var sourcePair in sourceTypePair.Value.Sources)
+ {
+ sb.AppendLine($" Source: {sourcePair.Key}");
+
+ foreach (var destinationPair in sourcePair.Value.Destinations)
+ {
+ var d = destinationPair.Value;
+ sb.AppendLine($" {d.DestinationName}: Passed={d.Passed}, Failed={d.Failed}, Postponed={d.Postponed}, Unavailable={d.Unavailable}");
+ }
+
+ sb.AppendLine(); // spacing between sources
+ }
+
+ sb.AppendLine(); // spacing between source types
+ }
+
+ sb.AppendLine("============================");
+ return sb.ToString();
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs
index 22fac087b..5a2735131 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs
@@ -14,7 +14,7 @@ using Tango.Telemetry.Telemetries;
namespace Tango.Telemetry.Sources
{
- public class TelemetryDiagnosticsSource : TelemetryConfigurableSource<TelemetryDiagnosticsSourceConfig>, ITelemetrySource
+ public class TelemetryDiagnosticsSource : TelemetryConfigurableSource<TelemetryDiagnosticsSourceConfig>, ITelemetryStreamingSource
{
public const int MIN_SAMPLING_INTERVAL_SECONDS = 1;
@@ -30,6 +30,8 @@ namespace Tango.Telemetry.Sources
public string Name { get; private set; } = "Diagnostics";
+ public bool RequiresTelemetryDuplicationTracking { get => false; }
+
private TelemetryDiagnosticsSource() : base()
{
_diagnosticsQueue = new List<StartDiagnosticsResponse>();
diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySource.cs
index 6cb597e4b..e3934d832 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySource.cs
@@ -15,6 +15,8 @@ namespace Tango.Telemetry.Sources
public string Name { get; private set; } = "JobRuns History";
+ public bool RequiresTelemetryDuplicationTracking { get => true; }
+
public Task<bool> CanRequestHistory(DateTime from)
{
return Task.FromResult(!_isBusy);
@@ -39,7 +41,9 @@ namespace Tango.Telemetry.Sources
foreach (var run in runs)
{
TelemetryJobRun tRun = new TelemetryJobRun();
+ tRun.ID = run.Guid;
tRun.Time = run.LastUpdated;
+ tRun.Name = run.JobName;
//Fill the object..
tRuns.Add(tRun);
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
index 02f8417f1..9565465a5 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
+++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
@@ -61,15 +61,24 @@
<Reference Include="Microsoft.Azure.Amqp, Version=2.4.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Azure.Amqp.2.5.10\lib\net45\Microsoft.Azure.Amqp.dll</HintPath>
</Reference>
- <Reference Include="Microsoft.Azure.Devices.Client, Version=1.41.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
- <HintPath>..\packages\Microsoft.Azure.Devices.Client.1.41.0\lib\net451\Microsoft.Azure.Devices.Client.dll</HintPath>
+ <Reference Include="Microsoft.Azure.Devices.Client, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <HintPath>..\packages\Microsoft.Azure.Devices.Client.1.6.0\lib\net45\Microsoft.Azure.Devices.Client.dll</HintPath>
</Reference>
- <Reference Include="Microsoft.Azure.Devices.Shared, Version=1.30.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
- <HintPath>..\packages\Microsoft.Azure.Devices.Shared.1.30.1\lib\net451\Microsoft.Azure.Devices.Shared.dll</HintPath>
+ <Reference Include="Microsoft.Azure.Devices.Shared, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <HintPath>..\packages\Microsoft.Azure.Devices.Shared.1.3.0\lib\net45\Microsoft.Azure.Devices.Shared.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Azure.KeyVault.Core, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Azure.KeyVault.Core.1.0.0\lib\net40\Microsoft.Azure.KeyVault.Core.dll</HintPath>
</Reference>
+ <Reference Include="Microsoft.Data.Edm, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <HintPath>..\packages\Microsoft.Data.Edm.5.8.2\lib\net40\Microsoft.Data.Edm.dll</HintPath>
+ </Reference>
+ <Reference Include="Microsoft.Data.OData, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <HintPath>..\packages\Microsoft.Data.OData.5.8.2\lib\net40\Microsoft.Data.OData.dll</HintPath>
+ </Reference>
+ <Reference Include="Microsoft.Data.Services.Client, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <HintPath>..\packages\Microsoft.Data.Services.Client.5.8.2\lib\net40\Microsoft.Data.Services.Client.dll</HintPath>
+ </Reference>
<Reference Include="Microsoft.Extensions.DependencyInjection.Abstractions, Version=1.1.0.0, Culture=neutral, PublicKeyToken=adb9793829ddae60, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Extensions.DependencyInjection.Abstractions.1.1.0\lib\netstandard1.0\Microsoft.Extensions.DependencyInjection.Abstractions.dll</HintPath>
</Reference>
@@ -82,21 +91,39 @@
<Reference Include="Microsoft.Owin, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Owin.4.0.0\lib\net451\Microsoft.Owin.dll</HintPath>
</Reference>
+ <Reference Include="Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling, Version=6.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <HintPath>..\packages\EnterpriseLibrary.TransientFaultHandling.6.0.1304.0\lib\portable-net45+win+wp8\Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling.dll</HintPath>
+ </Reference>
<Reference Include="Microsoft.Win32.Primitives, Version=4.0.2.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Win32.Primitives.4.3.0\lib\net46\Microsoft.Win32.Primitives.dll</HintPath>
</Reference>
- <Reference Include="Microsoft.WindowsAzure.Storage, Version=9.3.2.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
- <HintPath>..\packages\WindowsAzure.Storage.9.3.2\lib\net45\Microsoft.WindowsAzure.Storage.dll</HintPath>
+ <Reference Include="Microsoft.WindowsAzure.Storage, Version=8.7.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <HintPath>..\packages\WindowsAzure.Storage.8.7.0\lib\net45\Microsoft.WindowsAzure.Storage.dll</HintPath>
</Reference>
<Reference Include="MQTTnet, Version=3.1.2.0, Culture=neutral, PublicKeyToken=b69712f52770c0a7, processorArchitecture=MSIL">
<HintPath>..\packages\MQTTnet.3.1.2\lib\net461\MQTTnet.dll</HintPath>
</Reference>
- <Reference Include="Newtonsoft.Json, Version=12.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
- <HintPath>..\packages\Newtonsoft.Json.12.0.3\lib\net45\Newtonsoft.Json.dll</HintPath>
+ <Reference Include="Newtonsoft.Json, Version=9.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
+ <HintPath>..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="Owin, Version=1.0.0.0, Culture=neutral, PublicKeyToken=f0ebd12fd5e55cc5, processorArchitecture=MSIL">
<HintPath>..\packages\Owin.1.0\lib\net40\Owin.dll</HintPath>
</Reference>
+ <Reference Include="PCLCrypto, Version=2.0.0.0, Culture=neutral, PublicKeyToken=d4421c8a4786956c, processorArchitecture=MSIL">
+ <HintPath>..\packages\PCLCrypto.2.0.147\lib\net45\PCLCrypto.dll</HintPath>
+ </Reference>
+ <Reference Include="PInvoke.BCrypt, Version=0.3.0.0, Culture=neutral, PublicKeyToken=9e300f9f87f04a7a, processorArchitecture=MSIL">
+ <HintPath>..\packages\PInvoke.BCrypt.0.3.2\lib\net40\PInvoke.BCrypt.dll</HintPath>
+ </Reference>
+ <Reference Include="PInvoke.Kernel32, Version=0.3.0.0, Culture=neutral, PublicKeyToken=9e300f9f87f04a7a, processorArchitecture=MSIL">
+ <HintPath>..\packages\PInvoke.Kernel32.0.3.2\lib\net40\PInvoke.Kernel32.dll</HintPath>
+ </Reference>
+ <Reference Include="PInvoke.NCrypt, Version=0.3.0.0, Culture=neutral, PublicKeyToken=9e300f9f87f04a7a, processorArchitecture=MSIL">
+ <HintPath>..\packages\PInvoke.NCrypt.0.3.2\lib\net40\PInvoke.NCrypt.dll</HintPath>
+ </Reference>
+ <Reference Include="PInvoke.Windows.Core, Version=0.3.0.0, Culture=neutral, PublicKeyToken=9e300f9f87f04a7a, processorArchitecture=MSIL">
+ <HintPath>..\packages\PInvoke.Windows.Core.0.3.2\lib\portable-net45+win+wpa81+MonoAndroid10+xamarinios10+MonoTouch10\PInvoke.Windows.Core.dll</HintPath>
+ </Reference>
<Reference Include="System" />
<Reference Include="System.AppContext, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<HintPath>..\packages\System.AppContext.4.3.0\lib\net46\System.AppContext.dll</HintPath>
@@ -186,6 +213,9 @@
<Private>True</Private>
<Private>True</Private>
</Reference>
+ <Reference Include="System.Spatial, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <HintPath>..\packages\System.Spatial.5.8.2\lib\net40\System.Spatial.dll</HintPath>
+ </Reference>
<Reference Include="System.Threading.Tasks.Extensions, Version=4.2.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
<HintPath>..\packages\System.Threading.Tasks.Extensions.4.5.1\lib\netstandard2.0\System.Threading.Tasks.Extensions.dll</HintPath>
</Reference>
@@ -201,21 +231,29 @@
<Private>True</Private>
<Private>True</Private>
</Reference>
+ <Reference Include="Validation, Version=2.2.0.0, Culture=neutral, PublicKeyToken=2fc06f0d701809a7, processorArchitecture=MSIL">
+ <HintPath>..\packages\Validation.2.2.8\lib\dotnet\Validation.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
- <Compile Include="Destinations\AzureHubTelemetryDestination.cs" />
+ <Compile Include="Destinations\TelemetryAzureHubDestination.cs" />
<Compile Include="ExtensionMethods\ITelemetryExtensions.cs" />
+ <Compile Include="ITelemetryCheckpointsRecoveryClient.cs" />
<Compile Include="ITelemetryDestination.cs" />
- <Compile Include="Destinations\MqttTelemetryDestination.cs" />
+ <Compile Include="Destinations\TelemetryMqttDestination.cs" />
<Compile Include="ITelemetryHistorySource.cs" />
<Compile Include="ITelemetrySource.cs" />
<Compile Include="ITelemetryQueueManager.cs" />
<Compile Include="ITelemetryStorageManager.cs" />
<Compile Include="ITelemetryPublisher.cs" />
<Compile Include="ITelemetryStreamingSource.cs" />
+ <Compile Include="Reporting\DestinationStatusSummary.cs" />
+ <Compile Include="Reporting\SourceSummary.cs" />
+ <Compile Include="Reporting\SourceTypeSummary.cs" />
+ <Compile Include="Reporting\TelemetryReport.cs" />
<Compile Include="Sources\TelemetryDiagnosticsSource.cs" />
<Compile Include="Sources\TelemetryDiagnosticsSourceConfig.cs" />
- <Compile Include="Sources\TelemetryJobRunsHistoryModule.cs" />
+ <Compile Include="Sources\TelemetryJobRunsHistorySource.cs" />
<Compile Include="Sources\TelemetryJobRunsHistorySourceConfig.cs" />
<Compile Include="PendingTelemetry.cs" />
<Compile Include="TelemetryConfigurableSource.cs" />
@@ -227,13 +265,15 @@
<Compile Include="TelemetryPendingDestination.cs" />
<Compile Include="TelemetryPendingStorageSource.cs" />
<Compile Include="TelemetryInMemoryQueueManager.cs" />
- <Compile Include="TelemetryPublisherAdvanced.cs" />
+ <None Include="app.config" />
+ <None Include="TelemetryPublisherAdvanced.cs" />
<Compile Include="TelemetryPublisherEventArgs.cs" />
<Compile Include="TelemetryPackagePublishingEventArgs.cs" />
<Compile Include="TelemetryPublishPackage.cs" />
<Compile Include="TelemetryPublisher.cs" />
<Compile Include="TelemetryPublisherConfiguration.cs" />
<Compile Include="TelemetryPublishResult.cs" />
+ <Compile Include="TelemetryPublishResultAvailableEventArgs.cs" />
<Compile Include="TelemetrySourceTypes.cs" />
<Compile Include="TelemetryLiteDBStorageManager.cs" />
<Compile Include="ITelemetry.cs" />
@@ -244,6 +284,7 @@
<Compile Include="Telemetries\TelemetryJobRun.cs" />
</ItemGroup>
<ItemGroup>
+ <None Include="Docs\AiResponse.md" />
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
diff --git a/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs
index 7b1ca3a1c..5aca89ee0 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs
@@ -9,6 +9,29 @@ namespace Tango.Telemetry.Telemetries
[TelemetryName("JobRun")]
public class TelemetryJobRun : TelemetryBase
{
+ public String Name { get; set; }
+ public String JobKind { get; set; }
+ public String Thread { get; set; }
+ public String NumberOfUnits { get; set; }
+
+ public double LogicalLength { get; set; }
+ public double ActualLength { get; set; }
+ public double StartPosition { get; set; }
+ public double EndPosition { get; set; }
+ public double Distance { get; set; }
+
+ public DateTime StartTime { get; set; }
+ public DateTime EndTime { get; set; }
+ public TimeSpan Duration { get; set; }
+
+ public String Status { get; set; }
+
+ public String ColorSpace { get; set; }
+
+ public double TargetL { get; set; }
+ public double TargetA { get; set; }
+ public double TargetB { get; set; }
+
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
index 20d5211e7..c2879e9db 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
@@ -10,14 +10,22 @@ namespace Tango.Telemetry
{
public class TelemetryBase : ITelemetry
{
+ public string ID { get; set; }
+
+ public DateTime Time { get; set; }
+
+ public TelemetryBase()
+ {
+ ID = Guid.NewGuid().ToString();
+ Time = DateTime.UtcNow;
+ }
+
[BsonIgnore] //This will be used for column mapping in ADX
public String Type
{
get { return this.ToTelemetryName(); }
}
- public DateTime Time { get; set; }
-
public byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true)
{
return Encoding.UTF8.GetBytes(ToJson(format, flatten));
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
index 2ceb95298..6700ba8af 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
@@ -5,45 +5,131 @@ using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
+using Tango.Core;
+using Tango.Core.ExtensionMethods;
+using Tango.Logging;
namespace Tango.Telemetry
{
- public class TelemetryLiteDBStorageManager : ITelemetryStorageManager
+ public class TelemetryLiteDBStorageManager : ExtendedObject, ITelemetryStorageManager
{
+ public class PublishedTelemetry
+ {
+ public String ID { get; set; }
+ public DateTime CreatedAt { get; set; }
+ }
+
private bool _disposed;
private LiteDatabase _database;
private static Object _lock = new object();
+ private ITelemetryCheckpointsRecoveryClient _checkpointsRecoveryClient;
+ private HashSet<String> _publishedTelemetriesIDs;
+ private DateTime _lastCloudBackupTime = DateTime.MinValue;
+ private TimeSpan _checkpointsBackupInterval = TimeSpan.FromMinutes(1);
+
+ public TimeSpan CheckpointsBackupInterval { get => _checkpointsBackupInterval; set => _checkpointsBackupInterval = value >= TimeSpan.FromMinutes(1) ? value : TimeSpan.FromMinutes(1); }
public String DatabasePath { get; private set; }
+ public bool EnableCheckPointsRecovery { get; set; }
+ public bool EnforceCheckpointsRecovery { get; set; }
+
public TelemetryLiteDBStorageManager()
{
+ _publishedTelemetriesIDs = new HashSet<string>();
+ EnableCheckPointsRecovery = true;
+ EnforceCheckpointsRecovery = true;
DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry");
+ }
+
+ public TelemetryLiteDBStorageManager(String databaseFile) : this()
+ {
+ DatabasePath = databaseFile;
+ }
+
+ public async Task Init(ITelemetryCheckpointsRecoveryClient checkpointsRecoveryClient)
+ {
+ LogManager.Log("Initializing telemetry database...");
+
+ _checkpointsRecoveryClient = checkpointsRecoveryClient;
+
Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath));
+ String backupPath = DatabasePath + ".bak";
+
+ if (!File.Exists(DatabasePath))
+ {
+ if (File.Exists(backupPath))
+ {
+ LogManager.Log("Telemetry database missing. Attempting to restore from backup.", LogCategory.Error);
+ File.Copy(backupPath, DatabasePath, overwrite: true);
+ }
+ else
+ {
+ LogManager.Log("Telemetry database was not found. A new one will be created and source checkpoints will be recovered from the remote service if required.", LogCategory.Critical);
+
+ if (_checkpointsRecoveryClient == null && EnableCheckPointsRecovery && EnforceCheckpointsRecovery)
+ {
+ throw new NullReferenceException("No TelemetryCheckpointsRecoveryClient was introduced. Telemetry Storage manager should not operate.");
+ }
+ }
+ }
+
_database = new LiteDatabase($"Filename={DatabasePath}");
_database.Pragma("TIMEOUT", 10); //Read Timeout
_database.Pragma("UTC_DATE", true); //Keep time as UTC when getting data
_database.Commit();
- }
- public virtual void Dispose()
- {
- if (_database != null && !_disposed)
+ var checkPointsCollection = GetSourcesCheckpointCollection();
+ var localCheckPoints = checkPointsCollection.FindAll().ToList();
+
+ if (localCheckPoints.Count == 0)
{
- try
+ if (EnableCheckPointsRecovery)
{
- _disposed = true;
- _database.Dispose();
- _database = null;
+ try
+ {
+ LogManager.Log("Attempting to retrieve sources checkpoints from backup...");
+ var remoteCheckPoints = await _checkpointsRecoveryClient.GetCheckpointsBackup();
+ if (remoteCheckPoints.Count > 0)
+ {
+ checkPointsCollection.InsertBulk(remoteCheckPoints);
+ LogManager.Log($"Sources checkpoints successfully recovered.\n{remoteCheckPoints.ToJsonString()}");
+ }
+ else
+ {
+ LogManager.Log("No sources checkpoint found on backup. Assuming first operation...");
+ }
+ }
+ catch (Exception ex)
+ {
+ if (EnforceCheckpointsRecovery)
+ {
+ LogManager.Log(ex, LogCategory.Critical, "Could not retrieve sources checkpoints from backup. Telemetry storage manager should not operate.");
+ throw;
+ }
+ else
+ {
+ LogManager.Log(ex, LogCategory.Warning, "Could not retrieve sources checkpoints from backup. No Checkpoints available!");
+ }
+ }
}
- catch { }
}
+ else
+ {
+ var minDateTime = localCheckPoints.Min(x => x.Time);
+ PerformPublishedTelemetriesCleanUp(minDateTime);
+ }
+
+ LogManager.Log("Loading published telemetries cache...");
+ _publishedTelemetriesIDs = new HashSet<string>(GetPublishedTelemetriesCollection().FindAll().Select(x => x.ID).ToList());
+
+ LogManager.Log("Telemetry LiteDB storage manager initialized...");
}
- ~TelemetryLiteDBStorageManager()
+ private ILiteCollection<PublishedTelemetry> GetPublishedTelemetriesCollection()
{
- Dispose();
+ return _database.GetCollection<PublishedTelemetry>("PublishedTelemetries");
}
private ILiteCollection<PendingTelemetry> GetPendingTelemetriesCollection()
@@ -79,7 +165,8 @@ namespace Tango.Telemetry
lock (_lock)
{
var collection = GetPendingTelemetriesCollection();
- return collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList();
+ var pendingTelemetries = collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList();
+ return pendingTelemetries;
}
}
@@ -88,7 +175,26 @@ namespace Tango.Telemetry
lock (_lock)
{
var collection = GetSourcesCheckpointCollection();
- return collection.FindOne(x => x.SourceName == source.Name);
+ var checkpoint = collection.FindOne(x => x.SourceName == source.Name);
+
+ if (checkpoint == null)
+ {
+ checkpoint = new TelemetryHistorySourceCheckPoint();
+ checkpoint.SourceName = source.Name;
+ checkpoint.Time = DateTime.MinValue;
+ }
+
+ return checkpoint;
+ }
+ }
+
+ public List<TelemetryHistorySourceCheckPoint> GetHistorySourcesCheckPoints()
+ {
+ lock (_lock)
+ {
+ var collection = GetSourcesCheckpointCollection();
+ var checkpoints = collection.FindAll().ToList();
+ return checkpoints;
}
}
@@ -98,6 +204,24 @@ namespace Tango.Telemetry
{
var collection = GetSourcesCheckpointCollection();
collection.Upsert(new TelemetryHistorySourceCheckPoint() { SourceName = source.Name, Time = time, TotalCount = totalCount });
+
+ if (_checkpointsRecoveryClient != null && DateTime.UtcNow - _lastCloudBackupTime > CheckpointsBackupInterval)
+ {
+ _lastCloudBackupTime = DateTime.UtcNow;
+ Task.Run(async () =>
+ {
+ try
+ {
+ var allCheckpoints = collection.FindAll().ToList();
+ await _checkpointsRecoveryClient.SaveCheckpointsBackup(allCheckpoints);
+ LogManager.Log("Sources checkpoints successfully backed up to remote service.");
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, LogCategory.Warning, "Failed to back up checkpoints to remote service.");
+ }
+ });
+ }
}
}
@@ -106,8 +230,47 @@ namespace Tango.Telemetry
lock (_lock)
{
var collection = GetPendingTelemetriesCollection();
- return collection.Count();
+ var count = collection.Count();
+ return count;
+ }
+ }
+
+ public void AddToPublishedTelemetryCache(ITelemetry telemetry)
+ {
+ _publishedTelemetriesIDs.Add(telemetry.ID);
+ GetPublishedTelemetriesCollection().Insert(new PublishedTelemetry() { CreatedAt = DateTime.UtcNow, ID = telemetry.ID });
+ }
+
+ public bool IsTelemetryInPublishedCache(ITelemetry telementry)
+ {
+ return _publishedTelemetriesIDs.Contains(telementry.ID);
+ }
+
+ public void PerformPublishedTelemetriesCleanUp(DateTime olderThan)
+ {
+ LogManager.Log("Performing published telemetries cache cleanup...");
+ var collection = GetSourcesCheckpointCollection();
+ int deleted = collection.DeleteMany(x => x.Time < olderThan);
+ LogManager.Log($"Published telemetries cleanup completed. {deleted} cleaned.");
+ }
+
+ public virtual void Dispose()
+ {
+ if (_database != null && !_disposed)
+ {
+ try
+ {
+ _disposed = true;
+ _database.Dispose();
+ _database = null;
+ }
+ catch { }
}
}
+
+ ~TelemetryLiteDBStorageManager()
+ {
+ Dispose();
+ }
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs
index 6aa1f5527..a5e176ca7 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs
@@ -9,6 +9,7 @@ namespace Tango.Telemetry
public class TelemetryPendingStorageSource : ITelemetrySource
{
public string Name { get; private set; } = "Pending Storage";
+ public bool RequiresTelemetryDuplicationTracking { get => false; }
public void Dispose()
{
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs
index baafb70a7..7b6b577a0 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs
@@ -1,4 +1,5 @@
-using System;
+using Newtonsoft.Json;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -10,15 +11,26 @@ namespace Tango.Telemetry
{
private String _payload;
+ internal TaskCompletionSource<TelemetryPublishResult> CompletionSource { get; set; }
public ITelemetrySource Source { get; set; }
public PendingTelemetry PendingTelemetry { get; set; }
public TelemetrySourceTypes SourceType { get; set; }
- public String ToPayload()
+ public String Environment { get; set; }
+ public String SerialNumber { get; set; }
+ public String MachineType { get; set; }
+ public String TelemetryName { get; internal set; }
+
+ public TelemetryPublishPackage()
+ {
+ CompletionSource = new TaskCompletionSource<TelemetryPublishResult>();
+ }
+
+ public String ToPayload(Formatting format = Formatting.None, bool flatten = false)
{
if (_payload == null)
{
- _payload = PendingTelemetry.TelemetryObject.ToJson(Newtonsoft.Json.Formatting.None, flatten: true);
+ _payload = PendingTelemetry.TelemetryObject.ToJson(format, flatten);
}
return _payload;
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs
index 37c6af412..b423580d2 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs
@@ -6,8 +6,15 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
+ /// <summary>
+ /// Represents the result of publishing a telemetry package to one or more destinations.
+ /// Contains the status and metrics per destination and total publish time.
+ /// </summary>
public class TelemetryPublishResult
{
+ /// <summary>
+ /// Defines the outcome of an attempt to publish to a specific destination.
+ /// </summary>
public enum DestinationStatus
{
None,
@@ -16,26 +23,98 @@ namespace Tango.Telemetry
Failed,
Postponed
}
-
+
+ /// <summary>
+ /// Contains information about the result of publishing telemetry to a specific destination.
+ /// </summary>
public class DestinationResult
{
+ /// <summary>
+ /// The destination to which the telemetry was attempted to be published.
+ /// </summary>
public ITelemetryDestination Destination { get; set; }
+
+ /// <summary>
+ /// The result status of the publish attempt.
+ /// </summary>
public DestinationStatus Status { get; set; }
+
+ /// <summary>
+ /// Any error that occurred during the publish attempt.
+ /// </summary>
public Exception Error { get; set; }
+
+ /// <summary>
+ /// The amount of time it took to attempt publishing to this destination.
+ /// </summary>
public TimeSpan ElapsedTime { get; set; }
+
+ /// <summary>
+ /// Number of retry attempts for this destination.
+ /// </summary>
+ public int RetryCount { get; internal set; }
+
+ /// <summary>
+ /// Time until the next eligible retry attempt.
+ /// </summary>
+ public TimeSpan RetryDelay { get; internal set; }
}
+ /// <summary>
+ /// Gets or sets the telemetry source that generated the package associated with this publish result.
+ /// </summary>
+ public ITelemetrySource Source { get; set; }
+
+ /// <summary>
+ /// Gets or sets the source type of the telemetry (e.g., Streaming, ExternalStorage, PendingStorage).
+ /// </summary>
+ public TelemetrySourceTypes SourceType { get; set; }
+
+ /// <summary>
+ /// List of results for each destination that was part of the publish process.
+ /// </summary>
public List<DestinationResult> DestinationsResults { get; set; }
+
+ /// <summary>
+ /// Total elapsed time taken to publish the telemetry package across all destinations.
+ /// </summary>
public TimeSpan TotalElapsedTime { get; set; }
+ /// <summary>
+ /// Time spent outside of destination publishing, typically system overhead or coordination.
+ /// </summary>
public TimeSpan OverheadTime
{
get { return TimeSpan.FromMilliseconds(TotalElapsedTime.TotalMilliseconds - DestinationsResults.Sum(x => x.ElapsedTime.TotalMilliseconds)); }
}
+ /// <summary>
+ /// Initializes a new instance of the TelemetryPublishResult class.
+ /// </summary>
public TelemetryPublishResult()
{
DestinationsResults = new List<DestinationResult>();
}
+
+ public override string ToString()
+ {
+ var sb = new StringBuilder();
+
+ sb.AppendLine($"Source: {Source?.Name ?? "Unknown"} ({SourceType})");
+ sb.AppendLine($"Total Elapsed Time: {TotalElapsedTime.TotalMilliseconds:F1} ms");
+ sb.AppendLine($"Overhead Time: {OverheadTime.TotalMilliseconds:F1} ms");
+ sb.AppendLine("Destination Results:");
+
+ foreach (var result in DestinationsResults ?? Enumerable.Empty<DestinationResult>())
+ {
+ sb.Append($" - {result.Destination.Name}: {result.Status}, {result.ElapsedTime.TotalMilliseconds:F1} ms");
+ if (!string.IsNullOrWhiteSpace(result.Error.ToStringSafe()))
+ sb.Append($" (Error: {result.Error})");
+
+ sb.AppendLine();
+ }
+
+ return sb.ToString();
+ }
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResultAvailableEventArgs.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResultAvailableEventArgs.cs
new file mode 100644
index 000000000..2a177d2ab
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResultAvailableEventArgs.cs
@@ -0,0 +1,13 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public class TelemetryPublishResultAvailableEventArgs : TelemetryPublisherEventArgs
+ {
+ public TelemetryPublishResult PublishResult { get; set; }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
index 99d96edff..42121d884 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
@@ -16,45 +16,109 @@ using Tango.Integration.Operation;
using Tango.Logging;
using Tango.PMR.Diagnostics;
using Tango.PMR.Insights;
+using Tango.Telemetry.Reporting;
using Tango.Telemetry.Telemetries;
namespace Tango.Telemetry
{
+ /// <summary>
+ /// TelemetryPublisher is responsible for collecting telemetry data from sources,
+ /// queuing it, storing it if needed, and publishing it to one or more destinations.
+ /// It supports streaming, historical, and retry-based telemetry flows.
+ /// </summary>
public class TelemetryPublisher : ExtendedObject, ITelemetryPublisher
{
+ /// <summary>
+ /// Occurs before a telemetry package is published to a destination.
+ /// </summary>
public event EventHandler<TelemetryPackagePublishingEventArgs> PublishingPackage;
+
+ /// <summary>
+ /// Occurs when a telemetry package has been successfully published to a destination.
+ /// </summary>
public event EventHandler<TelemetryPackagePublishedEventArgs> PackagePublished;
+
+ /// <summary>
+ /// Occurs when a telemetry package fails to publish to a destination.
+ /// </summary>
public event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed;
+ /// <summary>
+ /// Occurs when a telemetry publish operation has completed and a publish result is available,
+ /// indicating the success or failure status for each destination.
+ /// </summary>
+ public event EventHandler<TelemetryPublishResultAvailableEventArgs> PublishResultAvailable;
+
+ // Timer to periodically check and publish pending telemetry from local storage
private System.Timers.Timer _pendingStorageCheckTimer;
+
+ // Timer to periodically fetch historical data from ITelemetryHistorySource
private System.Timers.Timer _historicalDataTimer;
+ // Indicates if the publisher has been disposed
protected bool _isDisposed;
+
+ // Background thread responsible for dequeuing and publishing telemetry
private Thread _publishThread;
+
+ // Source used to tag telemetry loaded from pending storage
private TelemetryPendingStorageSource _pendingStorageSource;
+ //Timer responsible for triggering periodic cleanup of the published telemetries cache,
+ private System.Timers.Timer _publishedTelemetriesCacheCleanupTimer;
+
+ private List<TelemetryPublishResult> _pastResults;
+
#region Properties
+ /// <summary>
+ /// Indicates whether the publisher is actively running.
+ /// </summary>
public bool IsStarted { get; private set; }
- public TelemetryPublisherConfiguration Config { get; private set; }
+ /// <summary>
+ /// Publisher configuration containing telemetry parameters and limits.
+ /// </summary>
+ public TelemetryPublisherConfiguration Config { get; }
- public ITelemetryStorageManager StorageManager { get; private set; }
+ /// <summary>
+ /// Manages persistence of telemetry data (e.g., LiteDB).
+ /// </summary>
+ public ITelemetryStorageManager StorageManager { get; }
private List<ITelemetrySource> InnerSources { get; }
+ /// <summary>
+ /// Public read-only access to telemetry sources.
+ /// </summary>
public ReadOnlyCollection<ITelemetrySource> Sources { get; }
private List<ITelemetryDestination> InnerDestinations { get; }
+ /// <summary>
+ /// Public read-only access to telemetry destinations.
+ /// </summary>
public ReadOnlyCollection<ITelemetryDestination> Destinations { get; }
+ /// <summary>
+ /// Manages telemetry queuing between ingestion and publish phases.
+ /// </summary>
public ITelemetryQueueManager QueueManager { get; private set; }
+ /// <summary>
+ /// Gets the client used for remote checkpoint recovery.
+ /// </summary>
+ public ITelemetryCheckpointsRecoveryClient CheckpointsRecoveryClient { get; }
+
#endregion
#region Constructor
- public TelemetryPublisher(TelemetryPublisherConfiguration config)
+ /// <summary>
+ /// Initializes the telemetry publisher with default storage and queue managers.
+ /// </summary>
+ public TelemetryPublisher(TelemetryPublisherConfiguration config, ITelemetryCheckpointsRecoveryClient checkPointsRecoveryClient)
{
+ _pastResults = new List<TelemetryPublishResult>();
+
Config = config ?? new TelemetryPublisherConfiguration();
_pendingStorageSource = new TelemetryPendingStorageSource();
@@ -68,11 +132,16 @@ namespace Tango.Telemetry
_publishThread = new Thread(PublishThreadMethod);
_publishThread.IsBackground = true;
+ CheckpointsRecoveryClient = checkPointsRecoveryClient;
+
StorageManager = new TelemetryLiteDBStorageManager();
QueueManager = new TelemetryInMemoryQueueManager();
}
- public TelemetryPublisher(ITelemetryStorageManager storageManager, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : this(config)
+ /// <summary>
+ /// Initializes the telemetry publisher with custom storage and queue managers.
+ /// </summary>
+ public TelemetryPublisher(ITelemetryStorageManager storageManager, ITelemetryCheckpointsRecoveryClient checkPointsRecoveryClient, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : this(config, checkPointsRecoveryClient)
{
StorageManager = storageManager;
QueueManager = queueManager;
@@ -82,6 +151,9 @@ namespace Tango.Telemetry
#region Sources
+ /// <summary>
+ /// Registers a telemetry source, such as a streaming or historical source.
+ /// </summary>
public void RegisterSource(ITelemetrySource source)
{
if (source == null) return;
@@ -114,6 +186,9 @@ namespace Tango.Telemetry
LogManager.Log($"Telemetry source {source.Name} registered.");
}
+ /// <summary>
+ /// Callback when a telemetry streaming source emits new telemetry.
+ /// </summary>
private void StreamingSource_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e)
{
var source = sender as ITelemetrySource;
@@ -128,6 +203,9 @@ namespace Tango.Telemetry
#region Destinations
+ /// <summary>
+ /// Registers a telemetry destination, such as a cloud service or local database.
+ /// </summary>
public void RegisterDestination(ITelemetryDestination destination)
{
if (destination == null) return;
@@ -147,7 +225,10 @@ namespace Tango.Telemetry
#region Start / Stop
- public void Start()
+ /// <summary>
+ /// Starts all timers, threads, and streaming sources for publishing telemetry.
+ /// </summary>
+ public async Task Start()
{
if (!IsStarted)
{
@@ -160,6 +241,8 @@ namespace Tango.Telemetry
IsStarted = true;
+ await StorageManager.Init(CheckpointsRecoveryClient);
+
if (_pendingStorageCheckTimer == null)
{
_pendingStorageCheckTimer = new System.Timers.Timer();
@@ -178,6 +261,15 @@ namespace Tango.Telemetry
_historicalDataTimer.Start();
+ if (_publishedTelemetriesCacheCleanupTimer == null)
+ {
+ _publishedTelemetriesCacheCleanupTimer = new System.Timers.Timer();
+ _publishedTelemetriesCacheCleanupTimer.Interval = Config.PublishedTelemetriesCacheCleanupInterval.TotalMilliseconds;
+ _publishedTelemetriesCacheCleanupTimer.Elapsed += PublishedTelemetriesCacheCleanupTimer_Elapsed;
+ }
+
+ _publishedTelemetriesCacheCleanupTimer.Start();
+
_publishThread.Start();
InnerSources.OfType<ITelemetryStreamingSource>().ToList().ForEach(x => x.Start());
@@ -187,13 +279,16 @@ namespace Tango.Telemetry
catch (Exception ex)
{
LogManager.Log(ex, "Error starting telemetry publisher.");
- Stop();
- throw ex;
+ await Stop();
+ throw;
}
}
}
- public void Stop()
+ /// <summary>
+ /// Stops all activity and releases threads and sources gracefully.
+ /// </summary>
+ public Task Stop()
{
if (IsStarted)
{
@@ -212,14 +307,19 @@ namespace Tango.Telemetry
LogManager.Log(ex, $"Error while trying to stop telemetry source {x.Name}.");
}
});
- _pendingStorageCheckTimer.Stop();
- _historicalDataTimer.Stop();
- QueueManager.Enqueue(null);
+ _pendingStorageCheckTimer?.Stop();
+ _historicalDataTimer?.Stop();
+ QueueManager?.Enqueue(null);
LogManager.Log("Telemetry publisher stopped.");
}
+
+ return Task.FromResult(true);
}
+ /// <summary>
+ /// Performs runtime validation of configuration, sources, and destinations.
+ /// </summary>
public void Validate()
{
// Validate all registered sources
@@ -262,81 +362,148 @@ namespace Tango.Telemetry
#region Timers
+ /// <summary>
+ /// Periodically invoked to process telemetry from persistent local storage.
+ /// </summary>
private async void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e)
{
- _pendingStorageCheckTimer.Stop();
-
- LogManager.Log($"Fetching pending telemetries from storage (MaxCount: {Config.MaxPendingStorageTelemetriesPerCycle})...");
+ LogManager.Log("Pending storage check timer elapsed. Starting flush operation for pending telemetries.", LogCategory.Debug);
- var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle);
+ _pendingStorageCheckTimer.Stop();
- LogManager.Log($"Pending telemetries count is {batch.Count}. Publishing...");
+ try
+ {
+ var results = await FlushPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle);
+ LogManager.Log($"Flush operation completed. {results.Count} telemetry package(s) processed from pending storage.", LogCategory.Debug);
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, LogCategory.Error, "Exception occurred while flushing pending telemetry packages.");
+ }
+ finally
+ {
+ _pendingStorageCheckTimer.Start();
+ LogManager.Log("Pending storage check timer restarted.", LogCategory.Debug);
+ }
+ }
- Dictionary<String, int> destinationsPasses = new Dictionary<string, int>();
- List<TelemetryPublishResult> results = new List<TelemetryPublishResult>();
+ /// <summary>
+ /// Periodically invoked to fetch and push historical data from history sources.
+ /// </summary>
+ private async void HistoricalDataTimer_Elapsed(object sender, ElapsedEventArgs e)
+ {
+ LogManager.Log("Historical data timer elapsed. Checking for available capacity...", LogCategory.Debug);
+ _historicalDataTimer.Stop();
- foreach (var pendingTelemetry in batch)
+ try
{
- var result = await PublishTelemetryPackage(new TelemetryPublishPackage()
- {
- Source = _pendingStorageSource,
- PendingTelemetry = pendingTelemetry,
- SourceType = TelemetrySourceTypes.PendingStorage
- });
+ int queueCount = QueueManager.Count;
+ int storageCount = StorageManager.GetPendingTelemetriesCount();
- results.Add(result);
+ LogManager.Log($"Current queue count: {queueCount}, pending storage count: {storageCount}", LogCategory.Debug);
- foreach (var d in result.DestinationsResults)
+ if (queueCount < Config.MaxPendingTelemetries && storageCount < Config.MaxPendingTelemetries)
{
- if (d.Status == TelemetryPublishResult.DestinationStatus.Passed)
+ foreach (var source in InnerSources.OfType<ITelemetryHistorySource>().ToList())
{
- destinationsPasses[d.Destination.Name] += 1;
- }
- }
+ try
+ {
+ TelemetryHistorySourceCheckPoint checkPoint = StorageManager.GetHistorySourceCheckPoint(source);
- if (!IsStarted || _isDisposed) return;
- }
+ LogManager.Log($"Evaluating history source '{source.Name}' at checkpoint time {checkPoint?.Time:u}", LogCategory.Debug);
+
+ if (await source.CanRequestHistory(checkPoint.Time))
+ {
+ var historyTelemetries = (await source.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList();
+
+ LogManager.Log($"History source '{source.Name}' returned {historyTelemetries.Count} telemetry items.", LogCategory.Debug);
- LogManager.Log($"Publishing pending telemetries completed after {results.Sum(x => x.TotalElapsedTime.Seconds)} seconds. Destination OK Count: {String.Join(", ", destinationsPasses.Select(x => x.Key + " -> " + x.Value))}");
+ foreach (var telemetry in historyTelemetries)
+ {
+ await PushTelemetryPackageAwait(source, telemetry, TelemetrySourceTypes.ExternalStorage);
+ checkPoint.Time = telemetry.Time;
+ checkPoint.TotalCount++;
+ StorageManager.SetHistorySourceCheckPoint(source, checkPoint.Time, checkPoint.TotalCount);
+ }
- _pendingStorageCheckTimer.Start();
+ LogManager.Log($"Checkpoint updated for source '{source.Name}': time = {checkPoint.Time:u}, total = {checkPoint.TotalCount}", LogCategory.Debug);
+ }
+ else
+ {
+ LogManager.Log($"History request for source '{source.Name}' was not permitted at checkpoint time {checkPoint?.Time:u}", LogCategory.Debug);
+ }
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, LogCategory.Error, $"Exception while processing history for source '{source?.Name}'");
+ }
+ }
+ }
+ else
+ {
+ LogManager.Log("Historical data fetch skipped due to max pending telemetry limit reached.", LogCategory.Debug);
+ }
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, LogCategory.Critical, "Unexpected error during HistoricalDataTimer_Elapsed.");
+ }
+ finally
+ {
+ _historicalDataTimer.Start();
+ LogManager.Log("Historical data timer restarted.", LogCategory.Debug);
+ }
}
- private async void HistoricalDataTimer_Elapsed(object sender, ElapsedEventArgs e)
+ /// <summary>
+ /// Handles the elapsed event of the published telemetries cache cleanup timer.
+ /// Determines the earliest checkpoint across all history sources and removes published telemetry entries
+ /// older than that point to keep the cache size manageable over time.
+ /// </summary>
+ private void PublishedTelemetriesCacheCleanupTimer_Elapsed(object sender, ElapsedEventArgs e)
{
- _historicalDataTimer.Stop();
+ LogManager.Log("Published telemetry cache cleanup timer elapsed. Starting cleanup process...", LogCategory.Debug);
- LogManager.Log("");
+ _publishedTelemetriesCacheCleanupTimer.Stop();
- if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries)
+ try
{
- foreach (var source in InnerSources.OfType<ITelemetryHistorySource>().ToList())
+ var checkPoints = StorageManager.GetHistorySourcesCheckPoints();
+ LogManager.Log($"Retrieved {checkPoints.Count} source checkpoints for cleanup evaluation.", LogCategory.Debug);
+
+ if (checkPoints.Count > 0)
{
- TelemetryHistorySourceCheckPoint checkPoint = StorageManager.GetHistorySourceCheckPoint(source);
- if (await source.CanRequestHistory(checkPoint.Time))
- {
- var historyTelemetries = (await source.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList();
+ DateTime olderThan = checkPoints.Min(x => x.Time);
+ LogManager.Log($"Initiating cleanup of published telemetries older than {olderThan:u}.", LogCategory.Debug);
- foreach (var telemetry in historyTelemetries)
- {
- PushTelemetryPackage(source, telemetry, TelemetrySourceTypes.ExternalStorage);
- checkPoint.Time = telemetry.Time;
- checkPoint.TotalCount++;
- }
+ StorageManager.PerformPublishedTelemetriesCleanUp(olderThan);
- StorageManager.SetHistorySourceCheckPoint(source, checkPoint.Time, checkPoint.TotalCount);
- }
+ LogManager.Log("Published telemetry cache cleanup completed successfully.", LogCategory.Debug);
+ }
+ else
+ {
+ LogManager.Log("No checkpoints found. Cleanup skipped.", LogCategory.Debug);
}
}
-
- _historicalDataTimer.Start();
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, LogCategory.Error, "Exception occurred during published telemetry cache cleanup.");
+ }
+ finally
+ {
+ _publishedTelemetriesCacheCleanupTimer.Start();
+ LogManager.Log("Published telemetry cache cleanup timer restarted.", LogCategory.Debug);
+ }
}
#endregion
#region Push
- private void PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType)
+ /// <summary>
+ /// Enqueues telemetry into the system based on a source and type.
+ /// </summary>
+ private TelemetryPublishPackage PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType)
{
PendingTelemetry pendingTelemetry = new PendingTelemetry();
pendingTelemetry.Created = DateTime.UtcNow;
@@ -345,18 +512,45 @@ namespace Tango.Telemetry
pendingTelemetry.SourceType = sourceType;
pendingTelemetry.TelemetryObject = telemetry;
- PushTelemetryPackage(new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType });
+ var package = new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType };
+
+ PushTelemetryPackage(package);
+
+ return package;
}
+ /// <summary>
+ /// Enqueues telemetry and returns a task that resolves when it is published.
+ /// </summary>
+ private Task<TelemetryPublishResult> PushTelemetryPackageAwait(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType)
+ {
+ return PushTelemetryPackage(source, telemetry, sourceType).CompletionSource.Task;
+ }
+
+ /// <summary>
+ /// Enqueues an already-wrapped package for background publishing.
+ /// </summary>
private void PushTelemetryPackage(TelemetryPublishPackage package)
{
QueueManager.Enqueue(package);
}
+ /// <summary>
+ /// Enqueues a wrapped package and awaits publish result asynchronously.
+ /// </summary>
+ private Task<TelemetryPublishResult> PushTelemetryPackageAwait(TelemetryPublishPackage package)
+ {
+ PushTelemetryPackage(package);
+ return package.CompletionSource.Task;
+ }
+
#endregion
#region Publish
+ /// <summary>
+ /// Background thread method to publish telemetry from the queue.
+ /// </summary>
private async void PublishThreadMethod()
{
while (IsStarted)
@@ -379,118 +573,360 @@ namespace Tango.Telemetry
}
}
+ // This method is responsible for publishing a telemetry package to all configured destinations.
+ // It handles per-destination retry logic, exponential backoff, availability checks, and result reporting.
+ // The goal is to guarantee eventual delivery of telemetry with robust fault tolerance and observability.
protected virtual async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package)
{
- Stopwatch totalWatch = Stopwatch.StartNew();
+ LogManager.Log($"Starting publish process for telemetry package from source '{package.Source?.Name}' with type '{package.SourceType}'", LogCategory.Debug);
- var result = new TelemetryPublishResult();
+ Stopwatch totalWatch = Stopwatch.StartNew(); // Start measuring total publish duration
+ var result = new TelemetryPublishResult(); // Result container with per-destination feedback
+ result.Source = package.Source;
+ result.SourceType = package.SourceType;
- if (!IsStarted || _isDisposed) return result;
+ // Abort early if the publisher is inactive
+ if (!IsStarted || _isDisposed)
+ {
+ LogManager.Log("Publish attempt skipped because the publisher is not started or has been disposed.", LogCategory.Warning);
+ package.CompletionSource.SetResult(result);
+ return result;
+ }
- List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>();
+ //Marking the telemetry as published to avoid duplication from streaming and history sources that can produce the same telemetry.
+ if (package.Source.RequiresTelemetryDuplicationTracking)
+ {
+ if (StorageManager.IsTelemetryInPublishedCache(package.PendingTelemetry.TelemetryObject))
+ {
+ LogManager.Log("Publish attempt skipped because the telemetry was already published.", LogCategory.Warning);
+ package.CompletionSource.SetResult(result);
+ return result;
+ }
+ else
+ {
+ StorageManager.AddToPublishedTelemetryCache(package.PendingTelemetry.TelemetryObject);
+ }
+ }
+
+ // Prepare standard metadata properties attached to all telemetry sent
+ var telemetryName = package.PendingTelemetry.TelemetryObject.ToTelemetryName();
- properties.Add(new KeyValuePair<string, string>("MachineID", Config.MachineID));
- properties.Add(new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()));
+ List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>();
+ properties.Add(new KeyValuePair<string, string>("SerialNumber", Config.SerialNumber));
+ properties.Add(new KeyValuePair<string, string>("MachineType", Config.MachineType.ToShortName()));
properties.Add(new KeyValuePair<string, string>("Environment", Config.Environment));
+ properties.Add(new KeyValuePair<string, string>("Type", telemetryName));
+
+ //Setting telemetry package basic properties for destination..
+ package.TelemetryName = telemetryName;
+ package.SerialNumber = Config.SerialNumber;
+ package.Environment = Config.Environment;
+ package.MachineType = Config.MachineType.ToShortName();
+
+ var now = DateTime.UtcNow; // Capture timestamp once for all retry logic
List<TelemetryPendingDestination> pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList();
- //Add all destinations if streaming or external (They will be remove later if successfull)
- //If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db.
+ // If this is a fresh package, initialize pending destinations
if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage)
{
+ LogManager.Log("Evaluating destinations for initial pending destination registration...", LogCategory.Debug);
foreach (var destination in Destinations)
{
- if (!pendingDestinations.Exists(x => x.Name == destination.Name))
+ if (destination.SupportedSourceTypes.Contains(package.SourceType))
{
- pendingDestinations.Add(new TelemetryPendingDestination { Name = destination.Name });
+ if (!pendingDestinations.Exists(x => x.Name == destination.Name))
+ {
+ pendingDestinations.Add(new TelemetryPendingDestination
+ {
+ Name = destination.Name,
+ RetryCount = 0,
+ LastAttempt = DateTime.MinValue,
+ NextEligibleAttempt = now
+ });
+ LogManager.Log($"Added destination '{destination.Name}' to pending destinations.", LogCategory.Debug);
+ }
}
}
}
- foreach (var destination in Destinations.Where(x => x.Enable && x.SupportedSourceTypes.Contains(package.SourceType)))
+ // Try publishing to each valid destination
+ foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType)))
{
- if (pendingDestinations.Exists(x => x.Name == destination.Name))
+ var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name);
+ if (pendingEntry == null) continue; // Skip destinations not pending for this package
+
+ // Prepare result tracking for this destination
+ var destinationResult = new TelemetryPublishResult.DestinationResult();
+ destinationResult.Destination = destination;
+ destinationResult.RetryCount = pendingEntry.RetryCount;
+ destinationResult.RetryDelay = TimeSpan.FromSeconds(Math.Max(0, (pendingEntry.NextEligibleAttempt - now).TotalSeconds));
+ result.DestinationsResults.Add(destinationResult);
+
+ // If we're still in a backoff delay, skip for now
+ if (now < pendingEntry.NextEligibleAttempt)
{
- Stopwatch destinationWatch = Stopwatch.StartNew();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed;
+ destinationResult.ElapsedTime = TimeSpan.Zero;
+ LogManager.Log($"Skipping '{destination.Name}' until {pendingEntry.NextEligibleAttempt:O} (backoff in effect).", LogCategory.Debug);
+ continue;
+ }
- var destinationResult = new TelemetryPublishResult.DestinationResult();
- destinationResult.Destination = destination;
- result.DestinationsResults.Add(destinationResult);
+ Stopwatch destinationWatch = Stopwatch.StartNew(); // Measure this attempt duration
- try
+ try
+ {
+ // Remove destination from pending list so we can re-add it if needed after this attempt
+ pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ LogManager.Log($"Attempting to publish to destination '{destination.Name}'...", LogCategory.Debug);
+
+ // Allow event handlers to cancel or inspect the publish
+ if (OnPublishingPackage(package, destination))
{
- pendingDestinations.RemoveAll(x => x.Name == destination.Name);
- if (OnPublishingPackage(package, destination))
+ // Ensure destination is ready before sending
+ if (await destination.IsAvailable())
{
- if (await destination.IsAvailable())
- {
- await destination.Publish(package, properties);
- OnPackagePublished(package, destination);
+ await destination.Publish(package, properties); // Perform publish
+ OnPackagePublished(package, destination); // Notify success event
+
+ destinationWatch.Stop();
+
+ destinationResult.RetryDelay = TimeSpan.Zero;
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
+ LogManager.Log($"Successfully published to '{destination.Name}' in {destinationResult.ElapsedTime.TotalMilliseconds} ms.");
+ }
+ else
+ {
+ // Mark as temporarily unavailable and schedule retry
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
- destinationWatch.Stop();
- destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed;
- destinationResult.ElapsedTime = destinationWatch.Elapsed;
+ LogManager.Log($"Destination '{destination.Name}' is unavailable.", LogCategory.Warning);
+
+ if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
+ {
+ pendingEntry.RetryCount++;
+ pendingEntry.LastAttempt = now;
+ int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)Config.MaxExponentialBackoff.TotalSeconds);
+ pendingEntry.NextEligibleAttempt = now.AddSeconds(delay);
+ LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds.", LogCategory.Debug);
+ pendingDestinations.Add(pendingEntry);
}
else
{
- destinationWatch.Stop();
- destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable;
- destinationResult.ElapsedTime = destinationWatch.Elapsed;
-
- if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
- {
- if (!pendingDestinations.Exists(x => x.Name == destination.Name))
- {
- pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name });
- }
- }
+ LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending.", LogCategory.Debug);
}
}
}
- catch (Exception ex)
- {
- destinationWatch.Stop();
- destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed;
- destinationResult.Error = ex;
- destinationResult.ElapsedTime = destinationWatch.Elapsed;
-
- LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}.");
+ }
+ catch (Exception ex)
+ {
+ // Log unexpected failure and retry if supported
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed;
+ destinationResult.Error = ex;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
- OnPackagePublishFailed(package, destination, ex);
+ LogManager.Log(ex, $"Error publishing telemetry to '{destination.Name}'.");
+ OnPackagePublishFailed(package, destination, ex);
- if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
- {
- if (!pendingDestinations.Exists(x => x.Name == destination.Name))
- {
- pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name });
- }
- }
+ if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
+ {
+ pendingEntry.RetryCount++;
+ pendingEntry.LastAttempt = now;
+ int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)Config.MaxExponentialBackoff.TotalSeconds);
+ pendingEntry.NextEligibleAttempt = now.AddSeconds(delay);
+ LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds due to failure.", LogCategory.Debug);
+ pendingDestinations.Add(pendingEntry);
+ }
+ else
+ {
+ LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending after failure.", LogCategory.Debug);
}
}
}
+ // Save retry state back into the package
package.PendingTelemetry.PendingDestinations = pendingDestinations;
- if (package.SourceType == TelemetrySourceTypes.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0)
+ // Remove from storage if all destinations succeeded; otherwise persist state
+ if (package.PendingTelemetry.PendingDestinations.Count == 0)
{
+ LogManager.Log("Deleting successfully published telemetry from storage.", LogCategory.Debug);
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
else
{
+ LogManager.Log("Saving telemetry package for future retry or tracking.", LogCategory.Debug);
StorageManager.UpsertPendingTelemetry(package.PendingTelemetry);
}
+ // Finalize result and notify completion
totalWatch.Stop();
result.TotalElapsedTime = totalWatch.Elapsed;
+ LogManager.Log($"Completed publish process for telemetry from source '{package.Source?.Name}' in {result.TotalElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug);
+
+ //Add results for reporting
+ _pastResults.Add(result);
+
+ //Set task result for once that are awaiting from outside this method.
+ package.CompletionSource.SetResult(result);
+
+ //Raising final event
+ OnPublishResultAvailable(package, result);
return result;
}
#endregion
+ #region Flush
+
+ /// <summary>
+ /// Flushes up to the specified number of pending telemetries from local storage,
+ /// attempting to publish them immediately. This can be used to force a retry of previously failed or postponed telemetry packages.
+ /// </summary>
+ /// <param name="maxCount">The maximum number of pending telemetry packages to flush.</param>
+ /// <returns>
+ /// A task that represents the asynchronous flush operation, returning a list of publish results for the flushed packages.
+ /// </returns>
+ public async Task<List<TelemetryPublishResult>> FlushPendingTelemetries(int maxCount)
+ {
+ if (!IsStarted || _isDisposed)
+ {
+ LogManager.Log("FlushPendingTelemetries called while publisher is not started or already disposed. Operation aborted.", LogCategory.Warning);
+ return new List<TelemetryPublishResult>();
+ }
+
+ var batch = StorageManager.GetPendingTelemetries(maxCount);
+ LogManager.Log($"Flushing {batch.Count} pending telemetry package(s).", LogCategory.Info);
+
+ List<TelemetryPublishResult> results = new List<TelemetryPublishResult>();
+
+ foreach (var pendingTelemetry in batch)
+ {
+ try
+ {
+ var package = new TelemetryPublishPackage()
+ {
+ Source = _pendingStorageSource,
+ PendingTelemetry = pendingTelemetry,
+ SourceType = TelemetrySourceTypes.PendingStorage
+ };
+
+ var result = await PushTelemetryPackageAwait(package);
+ results.Add(result);
+
+ LogManager.Log(
+ $"Flushed telemetry to destinations: {string.Join(", ", result.DestinationsResults.Select(r => $"{r.Destination.Name}={r.Status}"))}",
+ LogCategory.Debug);
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, LogCategory.Error, "Exception occurred while flushing a pending telemetry package.");
+ }
+
+ if (!IsStarted || _isDisposed)
+ {
+ LogManager.Log("Flush operation interrupted: publisher is no longer active.", LogCategory.Warning);
+ return results;
+ }
+ }
+
+ LogManager.Log("FlushPendingTelemetries completed successfully.", LogCategory.Info);
+ return results;
+ }
+
+ #endregion
+
+ #region Reporting
+
+ /// <summary>
+ /// Generates a detailed telemetry report summarizing the current state of the telemetry system.
+ /// The report includes statistics on published and pending telemetry, as well as per-source and per-destination results.
+ /// </summary>
+ public Task<TelemetryReport> GetTelemetryReport()
+ {
+ return Task.Factory.StartNew(() =>
+ {
+ TelemetryReport report = new TelemetryReport
+ {
+ GeneratedAt = DateTime.UtcNow,
+ TotalPending = StorageManager.GetPendingTelemetriesCount()
+ };
+
+ var results = _pastResults.ToList();
+ report.TotalPublished = results.Count;
+
+ foreach (var result in results)
+ {
+ var sourceType = result.SourceType;
+ var sourceName = result.Source?.Name ?? "UnknownSource";
+
+ if (!report.SourceTypes.TryGetValue(sourceType, out var sourceTypeSummary))
+ {
+ sourceTypeSummary = new SourceTypeSummary
+ {
+ SourceType = sourceType
+ };
+ report.SourceTypes[sourceType] = sourceTypeSummary;
+ }
+
+ if (!sourceTypeSummary.Sources.TryGetValue(sourceName, out var sourceSummary))
+ {
+ sourceSummary = new SourceSummary
+ {
+ SourceName = sourceName
+ };
+ sourceTypeSummary.Sources[sourceName] = sourceSummary;
+ }
+
+ foreach (var destResult in result.DestinationsResults)
+ {
+ var destName = destResult.Destination.Name;
+
+ if (!sourceSummary.Destinations.TryGetValue(destName, out var destSummary))
+ {
+ destSummary = new DestinationStatusSummary
+ {
+ DestinationName = destName
+ };
+ sourceSummary.Destinations[destName] = destSummary;
+ }
+
+ switch (destResult.Status)
+ {
+ case TelemetryPublishResult.DestinationStatus.Passed:
+ destSummary.Passed++;
+ break;
+ case TelemetryPublishResult.DestinationStatus.Failed:
+ destSummary.Failed++;
+ break;
+ case TelemetryPublishResult.DestinationStatus.Postponed:
+ destSummary.Postponed++;
+ break;
+ case TelemetryPublishResult.DestinationStatus.Unavailable:
+ destSummary.Unavailable++;
+ break;
+ }
+ }
+ }
+
+ return report;
+ });
+ }
+
+
+ #endregion
+
#region Virtual Methods
+ /// <summary>
+ /// Called before a package is published to allow for canceling or preprocessing.
+ /// </summary>
protected virtual bool OnPublishingPackage(TelemetryPublishPackage package, ITelemetryDestination destination)
{
try
@@ -505,6 +941,9 @@ namespace Tango.Telemetry
}
}
+ /// <summary>
+ /// Called after a package has been successfully delivered to a destination.
+ /// </summary>
protected virtual void OnPackagePublished(TelemetryPublishPackage package, ITelemetryDestination destination)
{
try
@@ -514,6 +953,9 @@ namespace Tango.Telemetry
catch { }
}
+ /// <summary>
+ /// Called after a failed attempt to publish a telemetry package.
+ /// </summary>
protected virtual void OnPackagePublishFailed(TelemetryPublishPackage package, ITelemetryDestination destination, Exception exception)
{
try
@@ -523,10 +965,27 @@ namespace Tango.Telemetry
catch { }
}
+ /// <summary>
+ /// Called when a publish result is available after a complete publish pass.
+ /// </summary>
+ /// <param name="package">The package.</param>
+ /// <param name="result">The result.</param>
+ protected virtual void OnPublishResultAvailable(TelemetryPublishPackage package, TelemetryPublishResult result)
+ {
+ try
+ {
+ PublishResultAvailable?.Invoke(this, new TelemetryPublishResultAvailableEventArgs() { Package = package, PublishResult = result });
+ }
+ catch { }
+ }
+
#endregion
#region Dispose
+ /// <summary>
+ /// Disposes all sources, destinations, timers, and gracefully shuts down.
+ /// </summary>
public void Dispose()
{
if (!_isDisposed)
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
index c4e7d3d3e..971afa864 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
@@ -5,6 +5,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tango.Integration.Operation;
+using Tango.Logging;
namespace Tango.Telemetry
{
@@ -21,15 +22,26 @@ namespace Tango.Telemetry
}
+ // This method handles the complete lifecycle of publishing a telemetry package to all supported destinations.
+ // It supports exponential backoff retry logic, destination filtering, and persistence of failed deliveries
+ // to ensure fault tolerance and guaranteed eventual delivery across multiple transports.
protected override async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package)
{
- Stopwatch totalWatch = Stopwatch.StartNew();
+ LogManager.Log($"Starting publish for package from source '{package.Source?.Name}' of type '{package.SourceType}'", LogCategory.Debug);
+ Stopwatch totalWatch = Stopwatch.StartNew();
var result = new TelemetryPublishResult();
- if (!IsStarted || _isDisposed) return result;
+ // Exit early if the publisher has been stopped or disposed to avoid invalid operations.
+ if (!IsStarted || _isDisposed)
+ {
+ LogManager.Log("Publisher not active. Skipping.", LogCategory.Warning);
+ package.CompletionSource.SetResult(result);
+ return result;
+ }
- List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>>
+ // Prepare metadata headers to be attached to the telemetry, which are common to all destinations.
+ var properties = new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>("MachineID", Config.MachineID),
new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()),
@@ -39,13 +51,15 @@ namespace Tango.Telemetry
var now = DateTime.UtcNow;
var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList();
- // For Streaming/External: initialize pending destinations list (used if publishing fails)
+ // If this is a newly generated package, not a retry, initialize its destination list.
+ // This ensures we only attempt to publish to destinations that support this telemetry type.
if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage)
{
foreach (var dest in Destinations)
{
- if (!pendingDestinations.Any(x => x.Name == dest.Name))
+ if (dest.SupportedSourceTypes.Contains(package.SourceType) && !pendingDestinations.Any(x => x.Name == dest.Name))
{
+ // Initialize with default retry tracking values.
pendingDestinations.Add(new TelemetryPendingDestination
{
Name = dest.Name,
@@ -53,38 +67,40 @@ namespace Tango.Telemetry
LastAttempt = DateTime.MinValue,
NextEligibleAttempt = now
});
+ LogManager.Log($"Registered destination '{dest.Name}' for initial delivery.", LogCategory.Debug);
}
}
}
+ // Iterate over each valid destination, applying retry backoff and delivery logic.
foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType)))
{
var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name);
-
- if (pendingEntry == null)
- continue;
+ if (pendingEntry == null) continue;
Stopwatch destinationWatch = Stopwatch.StartNew();
-
var destinationResult = new TelemetryPublishResult.DestinationResult();
destinationResult.Destination = destination;
result.DestinationsResults.Add(destinationResult);
- // Respect backoff timing
+ // If the destination is in a cool-down period due to previous failures, skip for now.
if (now < pendingEntry.NextEligibleAttempt)
{
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
+ LogManager.Log($"Skipping '{destination.Name}' until {pendingEntry.NextEligibleAttempt:O} (backoff in effect).", LogCategory.Debug);
continue;
}
try
{
+ LogManager.Log($"Attempting publish to '{destination.Name}'...", LogCategory.Debug);
if (OnPublishingPackage(package, destination))
{
if (await destination.IsAvailable())
{
+ // Send the telemetry payload
await destination.Publish(package, properties);
OnPackagePublished(package, destination);
@@ -92,87 +108,90 @@ namespace Tango.Telemetry
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
- // On success: remove entry from pending list
+ LogManager.Log($"Successfully published to '{destination.Name}' in {destinationResult.ElapsedTime.TotalMilliseconds} ms.");
+
+ // Remove from retry queue since delivery succeeded.
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
}
else
{
+ // Destination is temporarily unreachable; apply retry logic if supported.
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
- // Only track retry state if retry is supported
+ LogManager.Log($"'{destination.Name}' unavailable.", LogCategory.Warning);
if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
- if (pendingEntry == null)
- {
- pendingEntry = new TelemetryPendingDestination { Name = destination.Name };
- pendingDestinations.Add(pendingEntry);
- }
-
pendingEntry.RetryCount++;
pendingEntry.LastAttempt = now;
-
- // Apply exponential backoff
- int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
- pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds);
+ int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
+ pendingEntry.NextEligibleAttempt = now.AddSeconds(delay);
+ LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds.", LogCategory.Debug);
}
else
{
- // Remove if not retryable
+ // Remove from retry list if the destination is not retryable.
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending.", LogCategory.Debug);
}
}
}
}
catch (Exception ex)
{
+ // Network, serialization, or other critical failures are handled here.
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed;
destinationResult.Error = ex;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
- LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}.");
+ LogManager.Log(ex, $"Error publishing to '{destination.Name}'.");
OnPackagePublishFailed(package, destination, ex);
- // Only track retry state if retry is supported
if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
- if (pendingEntry == null)
- {
- pendingEntry = new TelemetryPendingDestination { Name = destination.Name };
- pendingDestinations.Add(pendingEntry);
- }
-
+ // Retry and Backoff Logic:
+ // --------------------------
+ // Each destination maintains its own retry metadata: RetryCount, LastAttempt, and NextEligibleAttempt.
+ // On each failed delivery attempt, RetryCount is incremented and a delay is calculated using exponential backoff (2^RetryCount seconds).
+ // This delay is capped using MaxExponentialBackoff to prevent runaway delays.
+ // The system checks NextEligibleAttempt before retrying, and skips publishing if the backoff period has not yet elapsed.
+ // This ensures robust delivery across unstable networks and prevents retry storms.
pendingEntry.RetryCount++;
pendingEntry.LastAttempt = now;
-
- // Apply exponential backoff
- int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
- pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds);
+ int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
+ pendingEntry.NextEligibleAttempt = now.AddSeconds(delay);
+ LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds due to failure.", LogCategory.Debug);
}
else
{
- // Remove if not retryable
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending after failure.", LogCategory.Debug);
}
}
}
+ // Save updated retry state for this telemetry package
package.PendingTelemetry.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations);
+ // Determine whether to remove the telemetry or persist it based on remaining destinations
if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any())
{
+ LogManager.Log("Telemetry successfully delivered to all destinations. Deleting.", LogCategory.Debug);
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
else
{
- StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
+ LogManager.Log("Updating pending telemetry record.", LogCategory.Debug);
+ StorageManager.UpsertPendingTelemetry(package.PendingTelemetry);
}
totalWatch.Stop();
result.TotalElapsedTime = totalWatch.Elapsed;
+ LogManager.Log($"Completed publish for source '{package.Source?.Name}' in {result.TotalElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug);
+ package.CompletionSource.SetResult(result);
return result;
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
index 4a3776b87..a9f4954dd 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
@@ -1,22 +1,62 @@
using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
using Tango.BL.Enumerations;
namespace Tango.Telemetry
{
public class TelemetryPublisherConfiguration
{
- public String MachineID { get; set; }
+ /// <summary>
+ /// Unique identifier of the machine sending telemetry. Used for tagging and routing.
+ /// </summary>
+ public String SerialNumber { get; set; }
+
+ /// <summary>
+ /// Enum representing the type of machine (e.g., X1, X4). Helps differentiate models in telemetry.
+ /// </summary>
public MachineTypes MachineType { get; set; }
+
+ /// <summary>
+ /// Environment in which the telemetry is being published (e.g., Production, QA, Dev).
+ /// </summary>
public String Environment { get; set; }
+
+ /// <summary>
+ /// Interval for checking and reprocessing failed/pending telemetry from local storage.
+ /// </summary>
public TimeSpan PendingStorageCheckInterval { get; set; }
+
+ /// <summary>
+ /// Maximum number of pending telemetry records to process in a single retry cycle.
+ /// </summary>
public int MaxPendingStorageTelemetriesPerCycle { get; set; }
+
+ /// <summary>
+ /// Frequency at which historical sources are polled to request backlogged or missed telemetry.
+ /// </summary>
public TimeSpan HistorySourcesRequestInterval { get; set; }
+
+ /// <summary>
+ /// Maximum number of telemetry packages allowed in memory queues before rejecting new packages.
+ /// </summary>
public int MaxPendingTelemetries { get; set; }
+ /// <summary>
+ /// Whether exponential backoff should be applied to retry logic per destination.
+ /// </summary>
+ public bool EnableBackoff { get; set; }
+
+ /// <summary>
+ /// The maximum amount of time to delay retries during exponential backoff.
+ /// </summary>
+ public TimeSpan MaxExponentialBackoff { get; set; } = TimeSpan.FromHours(1);
+
+ /// <summary>
+ /// Gets or sets the interval at which the published telemetry cache cleanup process should occur.
+ /// This defines how frequently old published telemetry entries are eligible for pruning,
+ /// based on their publication timestamp.
+ /// </summary>
+ public TimeSpan PublishedTelemetriesCacheCleanupInterval { get; set; } = TimeSpan.FromHours(1);
+
public TelemetryPublisherConfiguration()
{
PendingStorageCheckInterval = TimeSpan.FromMinutes(1);
@@ -27,8 +67,8 @@ namespace Tango.Telemetry
public void Validate()
{
- if (!MachineID.IsNotNullOrEmpty())
- throw new ArgumentNullException(nameof(MachineID), "MachineID is not set or empty.");
+ if (!SerialNumber.IsNotNullOrEmpty())
+ throw new ArgumentNullException(nameof(SerialNumber), "SerialNumber is not set or empty.");
if (!Environment.IsNotNullOrEmpty())
throw new ArgumentNullException(nameof(Environment), "Environment is not set or empty.");
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs
index 1ed80d5c9..3eb6ad6e0 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs
@@ -6,10 +6,25 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
+ /// <summary>
+ /// Specifies the origin type of a telemetry data package.
+ /// Used to control routing and behavior during telemetry publishing.
+ /// </summary>
public enum TelemetrySourceTypes
{
+ /// <summary>
+ /// Telemetry generated in real-time by a live telemetry streaming source.
+ /// </summary>
Streaming,
+
+ /// <summary>
+ /// Telemetry loaded on demand from historical sources such as logs or external databases.
+ /// </summary>
ExternalStorage,
- PendingStorage,
+
+ /// <summary>
+ /// Telemetry restored from a local storage buffer due to prior publish failure.
+ /// </summary>
+ PendingStorage
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/app.config b/Software/Visual_Studio/Tango.Telemetry/app.config
new file mode 100644
index 000000000..601f6dd75
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/app.config
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="utf-8"?>
+<configuration>
+ <runtime>
+ <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
+ <dependentAssembly>
+ <assemblyIdentity name="System.Runtime.InteropServices.RuntimeInformation" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-4.0.2.0" newVersion="4.0.2.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="System.Net.Http" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-4.2.0.0" newVersion="4.2.0.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-9.0.0.0" newVersion="9.0.0.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="System.Runtime.CompilerServices.Unsafe" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-4.0.4.1" newVersion="4.0.4.1" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="Microsoft.Azure.Amqp" publicKeyToken="31bf3856ad364e35" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-2.4.0.0" newVersion="2.4.0.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="Microsoft.Owin" publicKeyToken="31bf3856ad364e35" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-4.0.0.0" newVersion="4.0.0.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="Microsoft.WindowsAzure.Storage" publicKeyToken="31bf3856ad364e35" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-8.7.0.0" newVersion="8.7.0.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="DotNetty.Transport" publicKeyToken="bc13ca065fa06c29" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-0.6.0.0" newVersion="0.6.0.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="DotNetty.Buffers" publicKeyToken="bc13ca065fa06c29" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-0.6.0.0" newVersion="0.6.0.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="DotNetty.Codecs.Mqtt" publicKeyToken="bc13ca065fa06c29" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-0.6.0.0" newVersion="0.6.0.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="DotNetty.Common" publicKeyToken="bc13ca065fa06c29" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-0.6.0.0" newVersion="0.6.0.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="DotNetty.Handlers" publicKeyToken="bc13ca065fa06c29" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-0.6.0.0" newVersion="0.6.0.0" />
+ </dependentAssembly>
+ <dependentAssembly>
+ <assemblyIdentity name="System.Reactive.Core" publicKeyToken="94bc3704cddfc263" culture="neutral" />
+ <bindingRedirect oldVersion="0.0.0.0-3.0.3000.0" newVersion="3.0.3000.0" />
+ </dependentAssembly>
+ </assemblyBinding>
+ </runtime>
+</configuration> \ No newline at end of file
diff --git a/Software/Visual_Studio/Tango.Telemetry/packages.config b/Software/Visual_Studio/Tango.Telemetry/packages.config
index 1a4b2c3c8..027500d67 100644
--- a/Software/Visual_Studio/Tango.Telemetry/packages.config
+++ b/Software/Visual_Studio/Tango.Telemetry/packages.config
@@ -6,14 +6,18 @@
<package id="DotNetty.Common" version="0.6.0" targetFramework="net461" />
<package id="DotNetty.Handlers" version="0.6.0" targetFramework="net461" />
<package id="DotNetty.Transport" version="0.6.0" targetFramework="net461" />
+ <package id="EnterpriseLibrary.TransientFaultHandling" version="6.0.1304.0" targetFramework="net461" />
<package id="EntityFramework" version="6.2.0" targetFramework="net461" />
<package id="LiteDB" version="5.0.4" targetFramework="net461" />
<package id="Microsoft.AspNet.WebApi.Client" version="5.2.3" targetFramework="net461" />
<package id="Microsoft.Azure.Amqp" version="2.5.10" targetFramework="net461" />
- <package id="Microsoft.Azure.Devices.Client" version="1.41.0" targetFramework="net461" />
- <package id="Microsoft.Azure.Devices.Shared" version="1.30.1" targetFramework="net461" />
+ <package id="Microsoft.Azure.Devices.Client" version="1.6.0" targetFramework="net461" />
+ <package id="Microsoft.Azure.Devices.Shared" version="1.3.0" targetFramework="net461" />
<package id="Microsoft.Azure.KeyVault.Core" version="1.0.0" targetFramework="net461" />
<package id="Microsoft.CSharp" version="4.7.0" targetFramework="net461" />
+ <package id="Microsoft.Data.Edm" version="5.8.2" targetFramework="net461" />
+ <package id="Microsoft.Data.OData" version="5.8.2" targetFramework="net461" />
+ <package id="Microsoft.Data.Services.Client" version="5.8.2" targetFramework="net461" />
<package id="Microsoft.Extensions.DependencyInjection.Abstractions" version="1.1.0" targetFramework="net461" />
<package id="Microsoft.Extensions.Logging" version="1.1.1" targetFramework="net461" />
<package id="Microsoft.Extensions.Logging.Abstractions" version="1.1.1" targetFramework="net461" />
@@ -22,18 +26,25 @@
<package id="Microsoft.Win32.Primitives" version="4.3.0" targetFramework="net461" />
<package id="MQTTnet" version="3.1.2" targetFramework="net461" />
<package id="NETStandard.Library" version="1.6.1" targetFramework="net461" />
- <package id="Newtonsoft.Json" version="12.0.3" targetFramework="net461" />
+ <package id="Newtonsoft.Json" version="9.0.1" targetFramework="net461" />
<package id="Owin" version="1.0" targetFramework="net461" />
+ <package id="PCLCrypto" version="2.0.147" targetFramework="net461" />
+ <package id="PInvoke.BCrypt" version="0.3.2" targetFramework="net461" />
+ <package id="PInvoke.Kernel32" version="0.3.2" targetFramework="net461" />
+ <package id="PInvoke.NCrypt" version="0.3.2" targetFramework="net461" />
+ <package id="PInvoke.Windows.Core" version="0.3.2" targetFramework="net461" />
<package id="System.AppContext" version="4.3.0" targetFramework="net461" />
<package id="System.Collections" version="4.3.0" targetFramework="net461" />
<package id="System.Collections.Concurrent" version="4.3.0" targetFramework="net461" />
<package id="System.Collections.Immutable" version="1.5.0" targetFramework="net461" />
<package id="System.ComponentModel" version="4.3.0" targetFramework="net461" />
+ <package id="System.ComponentModel.EventBasedAsync" version="4.0.11" targetFramework="net461" />
<package id="System.Console" version="4.3.0" targetFramework="net461" />
<package id="System.Diagnostics.Debug" version="4.3.0" targetFramework="net461" />
<package id="System.Diagnostics.DiagnosticSource" version="4.3.0" targetFramework="net461" />
<package id="System.Diagnostics.Tools" version="4.3.0" targetFramework="net461" />
<package id="System.Diagnostics.Tracing" version="4.3.0" targetFramework="net461" />
+ <package id="System.Dynamic.Runtime" version="4.0.0" targetFramework="net461" />
<package id="System.Globalization" version="4.3.0" targetFramework="net461" />
<package id="System.Globalization.Calendars" version="4.3.0" targetFramework="net461" />
<package id="System.IO" version="4.3.0" targetFramework="net461" />
@@ -43,8 +54,10 @@
<package id="System.IO.FileSystem.Primitives" version="4.3.0" targetFramework="net461" />
<package id="System.Linq" version="4.3.0" targetFramework="net461" />
<package id="System.Linq.Expressions" version="4.3.0" targetFramework="net461" />
+ <package id="System.Linq.Queryable" version="4.0.0" targetFramework="net461" />
<package id="System.Net.Http" version="4.3.0" targetFramework="net461" />
<package id="System.Net.Primitives" version="4.3.0" targetFramework="net461" />
+ <package id="System.Net.Requests" version="4.0.11" targetFramework="net461" />
<package id="System.Net.Sockets" version="4.3.0" targetFramework="net461" />
<package id="System.ObjectModel" version="4.3.0" targetFramework="net461" />
<package id="System.Reflection" version="4.3.0" targetFramework="net461" />
@@ -62,6 +75,7 @@
<package id="System.Security.Cryptography.Encoding" version="4.3.0" targetFramework="net461" />
<package id="System.Security.Cryptography.Primitives" version="4.3.0" targetFramework="net461" />
<package id="System.Security.Cryptography.X509Certificates" version="4.3.0" targetFramework="net461" />
+ <package id="System.Spatial" version="5.8.2" targetFramework="net461" />
<package id="System.Text.Encoding" version="4.3.0" targetFramework="net461" />
<package id="System.Text.Encoding.Extensions" version="4.3.0" targetFramework="net461" />
<package id="System.Text.RegularExpressions" version="4.3.0" targetFramework="net461" />
@@ -71,5 +85,6 @@
<package id="System.Threading.Timer" version="4.3.0" targetFramework="net461" />
<package id="System.Xml.ReaderWriter" version="4.3.0" targetFramework="net461" />
<package id="System.Xml.XDocument" version="4.3.0" targetFramework="net461" />
- <package id="WindowsAzure.Storage" version="9.3.2" targetFramework="net461" />
+ <package id="Validation" version="2.2.8" targetFramework="net461" />
+ <package id="WindowsAzure.Storage" version="8.7.0" targetFramework="net461" />
</packages> \ No newline at end of file