diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-08-02 21:38:19 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-08-02 21:38:19 +0300 |
| commit | 0df9f37075dd697ac34f4ed2a2749f62aa27a654 (patch) | |
| tree | 5d95103b41d4954eff9f266317c5a525e9a0e3e9 /Software/Visual_Studio/Tango.Telemetry | |
| parent | 4222eddece906d6f0877022c06b853deb5068472 (diff) | |
| download | Tango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.tar.gz Tango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.zip | |
Telemetry Testing.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry')
30 files changed, 1854 insertions, 277 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs deleted file mode 100644 index 7a9cc9f7a..000000000 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs +++ /dev/null @@ -1,77 +0,0 @@ -using Microsoft.Azure.Devices.Client; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Tango.Core; - -namespace Tango.Telemetry.Destinations -{ - public class AzureHubTelemetryDestination : ExtendedObject, ITelemetryDestination - { - private DeviceClient _hubClient; - - public string Name { get; set; } = "Azure IoT Hub"; - public bool Enable { get; set; } = true; - public String ConnectionString { get; private set; } - - public ConnectionStatus HubConnectionStatus { get; private set; } - public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; } - - public AzureHubTelemetryDestination(String connectionString) - { - HubConnectionStatus = ConnectionStatus.Connected; - ConnectionString = connectionString; - SupportedSourceTypes = new List<TelemetrySourceTypes>() { TelemetrySourceTypes.PendingStorage, TelemetrySourceTypes.Streaming, TelemetrySourceTypes.ExternalStorage }; - } - - public Task<bool> IsAvailable() - { - if (_hubClient == null) - { - return Task.FromResult(true); - } - else - { - return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); - } - } - - public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties) - { - if (_hubClient == null) - { - _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); - _hubClient.SetConnectionStatusChangesHandler((status, reason) => - { - HubConnectionStatus = status; - LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}."); - }); - } - - var message = new Message(Encoding.UTF8.GetBytes(package.ToPayload())) - { - ContentType = "application/json", - ContentEncoding = "utf-8" - }; - - foreach (var prop in properties) - { - message.Properties.Add(prop.Key, prop.Value); - } - - await _hubClient.SendEventAsync(message); - } - - public void Dispose() - { - _hubClient?.Dispose(); - } - - public override string ToString() - { - return Name; - } - } -} diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs new file mode 100644 index 000000000..01ad31432 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs @@ -0,0 +1,188 @@ +using Microsoft.Azure.Devices.Client; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Core; +using Tango.Logging; + +namespace Tango.Telemetry.Destinations +{ + /// <summary> + /// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport. + /// Supports batching, source type filtering, and connection status tracking. + /// </summary> + public class TelemetryAzureHubDestination : ExtendedObject, ITelemetryDestination + { + private DeviceClient _hubClient; + private int _batchSize; + private ConcurrentList<Message> _batch; + + private class ADXPAckage + { + public String Type { get; set; } + public String Environment { get; set; } + public String MachineType { get; set; } + public String SerialNumber { get; set; } + public ITelemetry Telemetry { get; set; } + public DateTime UploadTime { get; set; } + + public String ToPayload() + { + return JsonConvert.SerializeObject(this, Formatting.None); + } + } + + /// <summary> + /// Gets or sets the name of the destination. + /// </summary> + public string Name { get; set; } = "Azure IoT Hub"; + + /// <summary> + /// Gets the connection string used to connect to Azure IoT Hub. + /// </summary> + public string ConnectionString { get; private set; } + + /// <summary> + /// Gets the current connection status of the Azure IoT Hub client. + /// </summary> + public ConnectionStatus HubConnectionStatus { get; private set; } + + /// <summary> + /// Gets the source types supported by this destination. + /// </summary> + public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; } + + /// <summary> + /// Gets or sets the maximum number of messages to send in a single batch. + /// The value is clamped between 1 and 100. + /// </summary> + public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); } + + /// <summary> + /// Prevents a default instance of the <see cref="TelemetryAzureHubDestination"/> class from being created. + /// </summary> + private TelemetryAzureHubDestination() + { + _batch = new ConcurrentList<Message>(); + HubConnectionStatus = ConnectionStatus.Connected; + SupportedSourceTypes = new List<TelemetrySourceTypes>() + { + TelemetrySourceTypes.PendingStorage, + TelemetrySourceTypes.Streaming, + TelemetrySourceTypes.ExternalStorage + }; + BatchSize = 1; + } + + /// <summary> + /// Initializes a new instance of the <see cref="TelemetryAzureHubDestination"/> class with the specified connection string. + /// </summary> + /// <param name="connectionString">The Azure IoT Hub connection string.</param> + public TelemetryAzureHubDestination(string connectionString) : this() + { + ConnectionString = connectionString; + } + + /// <summary> + /// Determines whether the destination is currently available for publishing. + /// </summary> + /// <returns>True if the destination is available; otherwise, false.</returns> + public Task<bool> IsAvailable() + { + if (_hubClient == null) + { + return Task.FromResult(true); + } + else + { + return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); + } + } + + /// <summary> + /// Publishes a telemetry package to Azure IoT Hub. + /// Supports batching when <see cref="BatchSize"/> is greater than 1. + /// </summary> + /// <param name="package">The telemetry package to publish.</param> + /// <param name="properties">A list of properties to include with the message.</param> + public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<string, string>> properties) + { + if (_hubClient == null) + { + _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); + _hubClient.SetConnectionStatusChangesHandler((status, reason) => + { + HubConnectionStatus = status; + LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info); + }); + } + + ADXPAckage adxPackage = new ADXPAckage(); + adxPackage.Type = package.TelemetryName; + adxPackage.Environment = package.Environment; + adxPackage.MachineType = package.MachineType; + adxPackage.SerialNumber = package.SerialNumber; + adxPackage.UploadTime = DateTime.UtcNow; + adxPackage.Telemetry = package.PendingTelemetry.TelemetryObject; + + var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload())) + { + ContentType = "application/json", + ContentEncoding = "utf-8" + }; + + foreach (var prop in properties) + { + message.Properties.Add(prop.Key, prop.Value); + } + + if (BatchSize > 1) + { + _batch.Add(message); + + if (_batch.Count >= BatchSize) + { + LogManager.Log($"Sending telemetry batch of {_batch.Count} messages to Azure IoT Hub.", LogCategory.Debug); + await _hubClient.SendEventBatchAsync(_batch.ToList()); + _batch.Clear(); + } + else + { + LogManager.Log($"Queued telemetry message for batching. {_batch.Count}/{BatchSize} currently queued.", LogCategory.Debug); + } + } + else + { + LogManager.Log("Sending single telemetry message to Azure IoT Hub.", LogCategory.Debug); + await _hubClient.SendEventAsync(message); + } + } + + /// <summary> + /// Disposes the destination and ensures any remaining batched messages are sent. + /// </summary> + public void Dispose() + { + if (_hubClient != null && _batch.Count > 0) + { + LogManager.Log($"Flushing {_batch.Count} remaining messages to Azure IoT Hub.", LogCategory.Info); + _hubClient.SendEventBatchAsync(_batch.ToList()).GetAwaiter().GetResult(); + _batch.Clear(); + } + + _hubClient?.Dispose(); + } + + /// <summary> + /// Returns a string that represents the current destination instance. + /// </summary> + /// <returns>The name of the destination.</returns> + public override string ToString() + { + return Name; + } + } +}
\ No newline at end of file diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs index b5ff05c29..11787c834 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs @@ -14,14 +14,13 @@ using MQTTnet.Packets; namespace Tango.Telemetry.Destinations { - public class MqttTelemetryDestination : ExtendedObject, ITelemetryDestination + public class TelemetryMqttDestination : ExtendedObject, ITelemetryDestination { private IMqttClient _mqttClient; private IMqttClientOptions _mqttOptions; private DateTime _nextRealAvailabilityCheck; public string Name { get; set; } = "MQTT"; - public bool Enable { get; set; } = true; public String Address { get; private set; } public int Port { get; private set; } public String Topic { get; private set; } @@ -34,7 +33,7 @@ namespace Tango.Telemetry.Destinations /// <param name="topic">e.g machie/telemetry/serial number</param> /// <param name="address">Default localhost</param> /// <param name="port">Default 1883</param> - public MqttTelemetryDestination(String topic, String address = "localhost", int port = 1883) + public TelemetryMqttDestination(String topic, String address = "localhost", int port = 1883) { _nextRealAvailabilityCheck = DateTime.Now; Topic = topic; diff --git a/Software/Visual_Studio/Tango.Telemetry/Docs/AiResponse.md b/Software/Visual_Studio/Tango.Telemetry/Docs/AiResponse.md new file mode 100644 index 000000000..c08e4f5d6 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Docs/AiResponse.md @@ -0,0 +1,138 @@ +## ✅ Part 1: Backup and Recovery of LiteDB File + +### 🔧 Strategy + +* **On application startup or shutdown**, copy the main `.db` file to a backup file (e.g., `telemetry.db.bak`). +* **On startup**, if LiteDB is missing or corrupted: + + * Attempt to **restore from backup**. + * If both fail, log a critical failure or initialize an empty one (last resort). + +### 🧱 Suggested Implementation + +```csharp +private const string DbFile = "telemetry.db"; +private const string BackupFile = "telemetry.db.bak"; + +public void EnsureDatabaseIntegrity() +{ + if (!File.Exists(DbFile)) + { + if (File.Exists(BackupFile)) + { + Log.Warning("Main database file missing. Attempting to restore from backup..."); + File.Copy(BackupFile, DbFile); + } + else + { + Log.Critical("No telemetry database or backup found. Initializing fresh DB."); + // Optionally throw or initialize empty + } + } +} + +public void BackupDatabase() +{ + try + { + File.Copy(DbFile, BackupFile, overwrite: true); + Log.Info("Telemetry database backed up successfully."); + } + catch (Exception ex) + { + Log.Error($"Failed to backup telemetry database: {ex}"); + } +} +``` + +Call `BackupDatabase()` on graceful shutdown or every X hours. + +--- + +## ✅ Part 2: Mirror Source Checkpoints to the Cloud + +### 🧠 Why: + +* In PC reinstall or hardware swap scenarios, **LiteDB is lost**, but the machine serial number remains. +* The application can download JobRuns from the server — but without the previous **checkpoints**, it doesn’t know what has already been published. +* Replaying the **entire JobRuns table** results in telemetry flooding. + +--- + +### 🔧 Strategy + +Store a minimal JSON blob to the cloud keyed by **machine serial number**, e.g.: + +```json +{ + "MachineSerial": "X4-3215", + "Checkpoints": { + "TelemetryJobRunsHistorySource": "2025-07-30T14:15:00Z", + "TelemetryEventsHistorySource": "2025-07-30T13:50:00Z" + } +} +``` + +### 📦 Recommended Storage Location + +* Azure Blob Storage +* OR Azure Table / CosmosDB if you already use structured cloud data +* OR SFTP/S3-compatible endpoint if on-prem + +### 💡 Key Design Points + +* Mirror only **checkpoints** (very small, infrequent updates). +* Use **machine serial number as the key**. +* Update cloud **whenever a checkpoint advances** significantly or every X minutes. +* Download cloud checkpoints on boot **only if LiteDB is missing**. + +--- + +### 🧱 Example C# Interface + +```csharp +public interface ICheckpointMirror +{ + Task<Dictionary<string, DateTime>> DownloadCheckpointsAsync(string machineSerial); + Task UploadCheckpointsAsync(string machineSerial, Dictionary<string, DateTime> checkpoints); +} +``` + +--- + +## 🧩 Final Integration Flow + +```mermaid +graph TD +A[App Startup] --> B{LiteDB Exists?} +B -- Yes --> C[Continue Normally] +B -- No --> D{Backup Exists?} +D -- Yes --> E[Restore From Backup] +D -- No --> F{Cloud Checkpoints Available?} +F -- Yes --> G[Restore Checkpoints from Cloud] +F -- No --> H[Start From Scratch] + +C --> Z[Load Published GUIDs + Checkpoints] +E --> Z +G --> Z +H --> Z +``` + +--- + +## ✅ Summary + +| Component | Purpose | When to Use | +| ----------------------- | ----------------------------------------------- | ------------------------------- | +| LiteDB Backup | Quick local restore from disk failure | Every boot/shutdown | +| Cloud Checkpoint Mirror | Resilient recovery across reinstalls or new PCs | Only if LiteDB+backup missing | +| Machine Serial Binding | Ensures uniqueness and portability | On X4, X1, or reimaged OS cases | + +--- + +Would you like: + +* A suggested format for the cloud checkpoint JSON file? +* A mock implementation of `ICheckpointMirror` that uploads/downloads to Azure Blob or local disk for testing? + +You're building this with an enterprise-level attention to failure domains. Very smart. diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs index 67bb8f5d2..4dc1aa3e6 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs @@ -10,6 +10,7 @@ namespace Tango.Telemetry { public interface ITelemetry { + String ID { get; set; } DateTime Time { get; set; } String ToJson(Formatting format = Formatting.None, bool flatten = true); byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true); diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryCheckpointsRecoveryClient.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryCheckpointsRecoveryClient.cs new file mode 100644 index 000000000..e6cec73bf --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryCheckpointsRecoveryClient.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetryCheckpointsRecoveryClient : IDisposable + { + Task<List<TelemetryHistorySourceCheckPoint>> GetCheckpointsBackup(); + Task SaveCheckpointsBackup(List<TelemetryHistorySourceCheckPoint> checkPoints); + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs index 10424531b..aec3d164a 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs @@ -6,12 +6,33 @@ using System.Threading.Tasks; namespace Tango.Telemetry { + /// <summary> + /// Represents a target destination that can receive and process published telemetry data. + /// Implementations may include cloud services, databases, or custom sinks. + /// </summary> public interface ITelemetryDestination : IDisposable { - bool Enable { get; set; } + /// <summary> + /// Gets or sets the unique name of the destination used for identification and logging. + /// </summary> String Name { get; set; } + + /// <summary> + /// Checks whether the destination is currently available to receive telemetry. + /// </summary> + /// <returns>True if the destination is available; otherwise, false.</returns> Task<bool> IsAvailable(); + + /// <summary> + /// Gets a read-only list of source types that this destination supports. + /// </summary> IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; } + + /// <summary> + /// Publishes the given telemetry package along with machine metadata properties. + /// </summary> + /// <param name="package">The telemetry package to publish.</param> + /// <param name="properties">Metadata properties such as SerialNumber, MachineType, and Environment.</param> Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties); } -} +}
\ No newline at end of file diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs index b44f567da..9ec7860fe 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPublisher.cs @@ -4,22 +4,103 @@ using System.Collections.ObjectModel; using System.Linq; using System.Text; using System.Threading.Tasks; +using Tango.Telemetry.Reporting; namespace Tango.Telemetry { + /// <summary> + /// Defines the interface for a telemetry publisher responsible for managing sources, destinations, + /// storage, and the overall publishing lifecycle. + /// </summary> public interface ITelemetryPublisher : IDisposable { + /// <summary> + /// Occurs before a telemetry package is published to a destination. + /// </summary> event EventHandler<TelemetryPackagePublishingEventArgs> PublishingPackage; + + /// <summary> + /// Occurs when a telemetry package has been successfully published to a destination. + /// </summary> event EventHandler<TelemetryPackagePublishedEventArgs> PackagePublished; + + /// <summary> + /// Occurs when a telemetry package fails to publish to a destination. + /// </summary> event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed; + + /// <summary> + /// Occurs when a telemetry publish operation has completed and a publish result is available, + /// indicating the success or failure status for each destination. + /// </summary> + event EventHandler<TelemetryPublishResultAvailableEventArgs> PublishResultAvailable; + + /// <summary> + /// Gets the storage manager used for telemetry persistence and checkpoint handling. + /// </summary> ITelemetryStorageManager StorageManager { get; } + + /// <summary> + /// Gets the telemetry queue manager responsible for internal queuing and retry logic. + /// </summary> ITelemetryQueueManager QueueManager { get; } + + /// <summary> + /// Gets the client used for remote checkpoint recovery. + /// </summary> + ITelemetryCheckpointsRecoveryClient CheckpointsRecoveryClient { get; } + + /// <summary> + /// Gets the registered telemetry sources. + /// </summary> ReadOnlyCollection<ITelemetrySource> Sources { get; } + + /// <summary> + /// Registers a telemetry source with the publisher. + /// </summary> + /// <param name="source">The telemetry source to register.</param> void RegisterSource(ITelemetrySource source); + + /// <summary> + /// Gets the registered telemetry destinations. + /// </summary> ReadOnlyCollection<ITelemetryDestination> Destinations { get; } + + /// <summary> + /// Registers a telemetry destination with the publisher. + /// </summary> + /// <param name="destination">The telemetry destination to register.</param> void RegisterDestination(ITelemetryDestination destination); + + /// <summary> + /// Gets a value indicating whether the publisher is currently running. + /// </summary> bool IsStarted { get; } - void Start(); - void Stop(); + + /// <summary> + /// Starts the telemetry publishing pipeline, including sources and destinations. + /// </summary> + /// <returns>A task representing the asynchronous start operation.</returns> + Task Start(); + + /// <summary> + /// Stops the telemetry publishing pipeline and releases all resources. + /// </summary> + /// <returns>A task representing the asynchronous stop operation.</returns> + Task Stop(); + + /// <summary> + /// Flushes up to the specified number of pending telemetries from local storage, + /// attempting to publish them immediately. This can be used to force a retry of previously failed or postponed telemetry packages. + /// </summary> + /// <param name="maxCount">The maximum number of pending telemetry packages to flush.</param> + /// <returns>A task that represents the asynchronous flush operation, returning a list of publish results for the flushed packages.</returns> + Task<List<TelemetryPublishResult>> FlushPendingTelemetries(int maxCount); + + /// <summary> + /// Generates a detailed telemetry report summarizing the current state of the telemetry system. + /// The report includes statistics on published and pending telemetry, as well as per-source and per-destination results. + /// </summary> + Task<TelemetryReport> GetTelemetryReport(); } } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs index 74d58ed4a..739c6514b 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetrySource.cs @@ -9,5 +9,6 @@ namespace Tango.Telemetry public interface ITelemetrySource : IDisposable { String Name { get; } + bool RequiresTelemetryDuplicationTracking { get; } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs index ae63e41cd..882cc0411 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryStorageManager.cs @@ -6,13 +6,85 @@ using System.Threading.Tasks; namespace Tango.Telemetry { + /// <summary> + /// Defines the contract for managing telemetry storage, including pending telemetries and history source checkpoints. + /// </summary> public interface ITelemetryStorageManager { + /// <summary> + /// Initializes the storage manager with the specified checkpoints recovery client. + /// Responsible for loading the database, restoring from backup if necessary, and recovering remote checkpoints. + /// </summary> + /// <param name="checkpointsRecoveryClient">An implementation of the checkpoint recovery client used for cloud fallback.</param> + Task Init(ITelemetryCheckpointsRecoveryClient checkpointsRecoveryClient); + + /// <summary> + /// Inserts or updates a pending telemetry record in the local storage. + /// </summary> + /// <param name="pendingTelemetry">The pending telemetry to be stored or updated.</param> void UpsertPendingTelemetry(PendingTelemetry pendingTelemetry); + + /// <summary> + /// Deletes a pending telemetry record from the local storage. + /// </summary> + /// <param name="pendingTelemetry">The pending telemetry to be deleted.</param> void DeletePendingTelemetry(PendingTelemetry pendingTelemetry); + + /// <summary> + /// Retrieves a list of pending telemetry records, ordered by time, up to the specified maximum count. + /// </summary> + /// <param name="maxCount">The maximum number of pending telemetries to retrieve.</param> + /// <returns>A list of pending telemetry objects.</returns> List<PendingTelemetry> GetPendingTelemetries(int maxCount); + + /// <summary> + /// Gets the total number of pending telemetry records currently stored. + /// </summary> + /// <returns>The count of pending telemetry records.</returns> int GetPendingTelemetriesCount(); + + /// <summary> + /// Retrieves the current checkpoint for the specified history source. + /// </summary> + /// <param name="source">The telemetry history source for which to retrieve the checkpoint.</param> + /// <returns>The stored checkpoint information for the given source.</returns> TelemetryHistorySourceCheckPoint GetHistorySourceCheckPoint(ITelemetryHistorySource source); + + /// <summary> + /// Retrieves all stored history source checkpoints currently tracked by the storage system. + /// Each checkpoint represents the latest processed state of a specific telemetry history source. + /// </summary> + /// <returns>A list of <see cref="TelemetryHistorySourceCheckPoint"/> entries for all registered history sources.</returns> + List<TelemetryHistorySourceCheckPoint> GetHistorySourcesCheckPoints(); + + /// <summary> + /// Sets or updates the checkpoint for the specified history source. + /// </summary> + /// <param name="source">The telemetry history source for which to update the checkpoint.</param> + /// <param name="time">The latest timestamp of telemetry data processed for the source.</param> + /// <param name="totalCount">The total number of telemetry records processed for the source.</param> void SetHistorySourceCheckPoint(ITelemetryHistorySource source, DateTime time, int totalCount); + + /// <summary> + /// Adds the specified telemetry item to the published telemetry cache, + /// ensuring it is tracked as already published by the system. + /// </summary> + /// <param name="telemetry">The telemetry instance to register as published.</param> + void AddToPublishedTelemetryCache(ITelemetry telemetry); + + /// <summary> + /// Checks whether the specified telemetry item is already present in the published telemetry cache. + /// </summary> + /// <param name="telemetry">The telemetry instance to verify.</param> + /// <returns>True if the telemetry appears to have already been published; otherwise, false.</returns> + bool IsTelemetryInPublishedCache(ITelemetry telementry); + + /// <summary> + /// Removes entries from the published telemetry cache that were marked as published + /// before the specified time. Intended to manage memory and storage growth over time, + /// especially once historical sources have progressed beyond the given point. + /// </summary> + /// <param name="olderThan">The timestamp indicating the oldest publication time to retain. Entries older than this will be removed.</param> + void PerformPublishedTelemetriesCleanUp(DateTime olderThan); } } diff --git a/Software/Visual_Studio/Tango.Telemetry/Reporting/DestinationStatusSummary.cs b/Software/Visual_Studio/Tango.Telemetry/Reporting/DestinationStatusSummary.cs new file mode 100644 index 000000000..a8dc52123 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Reporting/DestinationStatusSummary.cs @@ -0,0 +1,36 @@ +namespace Tango.Telemetry.Reporting +{ + /// <summary> + /// Represents an aggregated summary of telemetry publish results for a specific destination, + /// tracking the number of successful, failed, postponed, and unavailable attempts. + /// </summary> + public class DestinationStatusSummary + { + /// <summary> + /// Gets or sets the name of the telemetry destination (e.g., IoTHub, LocalStorage). + /// </summary> + public string DestinationName { get; set; } + + /// <summary> + /// Gets or sets the number of telemetry packages successfully published to this destination. + /// </summary> + public int Passed { get; set; } + + /// <summary> + /// Gets or sets the number of telemetry packages that failed to publish to this destination. + /// </summary> + public int Failed { get; set; } + + /// <summary> + /// Gets or sets the number of telemetry packages that were postponed for this destination, + /// typically due to retry logic or temporary conditions. + /// </summary> + public int Postponed { get; set; } + + /// <summary> + /// Gets or sets the number of telemetry packages that could not be published due to the destination being unavailable. + /// </summary> + public int Unavailable { get; set; } + } + +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceSummary.cs b/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceSummary.cs new file mode 100644 index 000000000..04e57e985 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceSummary.cs @@ -0,0 +1,31 @@ +using System.Collections.Generic; + +namespace Tango.Telemetry.Reporting +{ + /// <summary> + /// Represents a summary of telemetry publish results for a specific telemetry source, + /// including aggregated destination-level status counts. + /// </summary> + public class SourceSummary + { + /// <summary> + /// Gets or sets the name of the telemetry source (e.g., JobRunsHistorySource, DiagnosticsStream). + /// </summary> + public string SourceName { get; set; } + + /// <summary> + /// Gets or sets a dictionary of aggregated publish results per destination for this source. + /// The key is the destination name, and the value contains counters for each delivery status. + /// </summary> + public Dictionary<string, DestinationStatusSummary> Destinations { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="SourceSummary"/> class, + /// initializing the destination summary dictionary. + /// </summary> + public SourceSummary() + { + Destinations = new Dictionary<string, DestinationStatusSummary>(); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceTypeSummary.cs b/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceTypeSummary.cs new file mode 100644 index 000000000..9b6dc05d9 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Reporting/SourceTypeSummary.cs @@ -0,0 +1,33 @@ +using System.Collections.Generic; + +namespace Tango.Telemetry.Reporting +{ + /// <summary> + /// Represents an aggregated summary of telemetry sources belonging to a specific source type, + /// such as Streaming, ExternalStorage, or PendingStorage. + /// Groups multiple sources under the same category and aggregates their publish results. + /// </summary> + public class SourceTypeSummary + { + /// <summary> + /// Gets or sets the telemetry source type that this summary represents. + /// </summary> + public TelemetrySourceTypes SourceType { get; set; } + + /// <summary> + /// Gets or sets a dictionary of source summaries under this source type. + /// The key is the source name, and the value is a summary of its destination-level results. + /// </summary> + public Dictionary<string, SourceSummary> Sources { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="SourceTypeSummary"/> class, + /// initializing the source summary dictionary. + /// </summary> + public SourceTypeSummary() + { + Sources = new Dictionary<string, SourceSummary>(); + } + } + +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Reporting/TelemetryReport.cs b/Software/Visual_Studio/Tango.Telemetry/Reporting/TelemetryReport.cs new file mode 100644 index 000000000..9e9906fde --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Reporting/TelemetryReport.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry.Reporting +{ + /// <summary> + /// Represents a diagnostic snapshot of the telemetry system, + /// providing detailed statistics about published and pending telemetry data, + /// grouped by source type and destination. + /// </summary> + public class TelemetryReport + { + /// <summary> + /// Gets or sets the timestamp indicating when the report was generated. + /// </summary> + public DateTime GeneratedAt { get; set; } + + /// <summary> + /// Gets or sets the total number of telemetry packages that were published + /// successfully since the telemetry system started. + /// </summary> + public int TotalPublished { get; set; } + + /// <summary> + /// Gets or sets the current number of telemetry packages pending publication in storage. + /// </summary> + public int TotalPending { get; set; } + + /// <summary> + /// Gets or sets the aggregated telemetry publish summaries, grouped by source type. + /// Each source type contains its respective sources and destination-level statistics. + /// </summary> + public Dictionary<TelemetrySourceTypes, SourceTypeSummary> SourceTypes { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="TelemetryReport"/> class, + /// initializing the dictionary used for grouping by source type. + /// </summary> + public TelemetryReport() + { + SourceTypes = new Dictionary<TelemetrySourceTypes, SourceTypeSummary>(); + } + + /// <summary> + /// Returns a human-readable string representation of the report, + /// including summary statistics and breakdowns by source type and destination. + /// </summary> + /// <returns>A formatted string representing the current telemetry report.</returns> + public override string ToString() + { + var sb = new StringBuilder(); + + sb.AppendLine("===== Telemetry Report ====="); + sb.AppendLine($"Generated At : {GeneratedAt:u}"); + sb.AppendLine($"Total Published : {TotalPublished}"); + sb.AppendLine($"Total Pending : {TotalPending}"); + sb.AppendLine(); + + foreach (var sourceTypePair in SourceTypes) + { + sb.AppendLine($"--- Source Type: {sourceTypePair.Key} ---"); + + foreach (var sourcePair in sourceTypePair.Value.Sources) + { + sb.AppendLine($" Source: {sourcePair.Key}"); + + foreach (var destinationPair in sourcePair.Value.Destinations) + { + var d = destinationPair.Value; + sb.AppendLine($" {d.DestinationName}: Passed={d.Passed}, Failed={d.Failed}, Postponed={d.Postponed}, Unavailable={d.Unavailable}"); + } + + sb.AppendLine(); // spacing between sources + } + + sb.AppendLine(); // spacing between source types + } + + sb.AppendLine("============================"); + return sb.ToString(); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs index 22fac087b..5a2735131 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsSource.cs @@ -14,7 +14,7 @@ using Tango.Telemetry.Telemetries; namespace Tango.Telemetry.Sources { - public class TelemetryDiagnosticsSource : TelemetryConfigurableSource<TelemetryDiagnosticsSourceConfig>, ITelemetrySource + public class TelemetryDiagnosticsSource : TelemetryConfigurableSource<TelemetryDiagnosticsSourceConfig>, ITelemetryStreamingSource { public const int MIN_SAMPLING_INTERVAL_SECONDS = 1; @@ -30,6 +30,8 @@ namespace Tango.Telemetry.Sources public string Name { get; private set; } = "Diagnostics"; + public bool RequiresTelemetryDuplicationTracking { get => false; } + private TelemetryDiagnosticsSource() : base() { _diagnosticsQueue = new List<StartDiagnosticsResponse>(); diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySource.cs index 6cb597e4b..e3934d832 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistoryModule.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobRunsHistorySource.cs @@ -15,6 +15,8 @@ namespace Tango.Telemetry.Sources public string Name { get; private set; } = "JobRuns History"; + public bool RequiresTelemetryDuplicationTracking { get => true; } + public Task<bool> CanRequestHistory(DateTime from) { return Task.FromResult(!_isBusy); @@ -39,7 +41,9 @@ namespace Tango.Telemetry.Sources foreach (var run in runs) { TelemetryJobRun tRun = new TelemetryJobRun(); + tRun.ID = run.Guid; tRun.Time = run.LastUpdated; + tRun.Name = run.JobName; //Fill the object.. tRuns.Add(tRun); } diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj index 02f8417f1..9565465a5 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj +++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj @@ -61,15 +61,24 @@ <Reference Include="Microsoft.Azure.Amqp, Version=2.4.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> <HintPath>..\packages\Microsoft.Azure.Amqp.2.5.10\lib\net45\Microsoft.Azure.Amqp.dll</HintPath> </Reference> - <Reference Include="Microsoft.Azure.Devices.Client, Version=1.41.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> - <HintPath>..\packages\Microsoft.Azure.Devices.Client.1.41.0\lib\net451\Microsoft.Azure.Devices.Client.dll</HintPath> + <Reference Include="Microsoft.Azure.Devices.Client, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <HintPath>..\packages\Microsoft.Azure.Devices.Client.1.6.0\lib\net45\Microsoft.Azure.Devices.Client.dll</HintPath> </Reference> - <Reference Include="Microsoft.Azure.Devices.Shared, Version=1.30.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> - <HintPath>..\packages\Microsoft.Azure.Devices.Shared.1.30.1\lib\net451\Microsoft.Azure.Devices.Shared.dll</HintPath> + <Reference Include="Microsoft.Azure.Devices.Shared, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <HintPath>..\packages\Microsoft.Azure.Devices.Shared.1.3.0\lib\net45\Microsoft.Azure.Devices.Shared.dll</HintPath> </Reference> <Reference Include="Microsoft.Azure.KeyVault.Core, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> <HintPath>..\packages\Microsoft.Azure.KeyVault.Core.1.0.0\lib\net40\Microsoft.Azure.KeyVault.Core.dll</HintPath> </Reference> + <Reference Include="Microsoft.Data.Edm, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <HintPath>..\packages\Microsoft.Data.Edm.5.8.2\lib\net40\Microsoft.Data.Edm.dll</HintPath> + </Reference> + <Reference Include="Microsoft.Data.OData, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <HintPath>..\packages\Microsoft.Data.OData.5.8.2\lib\net40\Microsoft.Data.OData.dll</HintPath> + </Reference> + <Reference Include="Microsoft.Data.Services.Client, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <HintPath>..\packages\Microsoft.Data.Services.Client.5.8.2\lib\net40\Microsoft.Data.Services.Client.dll</HintPath> + </Reference> <Reference Include="Microsoft.Extensions.DependencyInjection.Abstractions, Version=1.1.0.0, Culture=neutral, PublicKeyToken=adb9793829ddae60, processorArchitecture=MSIL"> <HintPath>..\packages\Microsoft.Extensions.DependencyInjection.Abstractions.1.1.0\lib\netstandard1.0\Microsoft.Extensions.DependencyInjection.Abstractions.dll</HintPath> </Reference> @@ -82,21 +91,39 @@ <Reference Include="Microsoft.Owin, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> <HintPath>..\packages\Microsoft.Owin.4.0.0\lib\net451\Microsoft.Owin.dll</HintPath> </Reference> + <Reference Include="Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling, Version=6.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <HintPath>..\packages\EnterpriseLibrary.TransientFaultHandling.6.0.1304.0\lib\portable-net45+win+wp8\Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling.dll</HintPath> + </Reference> <Reference Include="Microsoft.Win32.Primitives, Version=4.0.2.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL"> <HintPath>..\packages\Microsoft.Win32.Primitives.4.3.0\lib\net46\Microsoft.Win32.Primitives.dll</HintPath> </Reference> - <Reference Include="Microsoft.WindowsAzure.Storage, Version=9.3.2.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> - <HintPath>..\packages\WindowsAzure.Storage.9.3.2\lib\net45\Microsoft.WindowsAzure.Storage.dll</HintPath> + <Reference Include="Microsoft.WindowsAzure.Storage, Version=8.7.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <HintPath>..\packages\WindowsAzure.Storage.8.7.0\lib\net45\Microsoft.WindowsAzure.Storage.dll</HintPath> </Reference> <Reference Include="MQTTnet, Version=3.1.2.0, Culture=neutral, PublicKeyToken=b69712f52770c0a7, processorArchitecture=MSIL"> <HintPath>..\packages\MQTTnet.3.1.2\lib\net461\MQTTnet.dll</HintPath> </Reference> - <Reference Include="Newtonsoft.Json, Version=12.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL"> - <HintPath>..\packages\Newtonsoft.Json.12.0.3\lib\net45\Newtonsoft.Json.dll</HintPath> + <Reference Include="Newtonsoft.Json, Version=9.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL"> + <HintPath>..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath> </Reference> <Reference Include="Owin, Version=1.0.0.0, Culture=neutral, PublicKeyToken=f0ebd12fd5e55cc5, processorArchitecture=MSIL"> <HintPath>..\packages\Owin.1.0\lib\net40\Owin.dll</HintPath> </Reference> + <Reference Include="PCLCrypto, Version=2.0.0.0, Culture=neutral, PublicKeyToken=d4421c8a4786956c, processorArchitecture=MSIL"> + <HintPath>..\packages\PCLCrypto.2.0.147\lib\net45\PCLCrypto.dll</HintPath> + </Reference> + <Reference Include="PInvoke.BCrypt, Version=0.3.0.0, Culture=neutral, PublicKeyToken=9e300f9f87f04a7a, processorArchitecture=MSIL"> + <HintPath>..\packages\PInvoke.BCrypt.0.3.2\lib\net40\PInvoke.BCrypt.dll</HintPath> + </Reference> + <Reference Include="PInvoke.Kernel32, Version=0.3.0.0, Culture=neutral, PublicKeyToken=9e300f9f87f04a7a, processorArchitecture=MSIL"> + <HintPath>..\packages\PInvoke.Kernel32.0.3.2\lib\net40\PInvoke.Kernel32.dll</HintPath> + </Reference> + <Reference Include="PInvoke.NCrypt, Version=0.3.0.0, Culture=neutral, PublicKeyToken=9e300f9f87f04a7a, processorArchitecture=MSIL"> + <HintPath>..\packages\PInvoke.NCrypt.0.3.2\lib\net40\PInvoke.NCrypt.dll</HintPath> + </Reference> + <Reference Include="PInvoke.Windows.Core, Version=0.3.0.0, Culture=neutral, PublicKeyToken=9e300f9f87f04a7a, processorArchitecture=MSIL"> + <HintPath>..\packages\PInvoke.Windows.Core.0.3.2\lib\portable-net45+win+wpa81+MonoAndroid10+xamarinios10+MonoTouch10\PInvoke.Windows.Core.dll</HintPath> + </Reference> <Reference Include="System" /> <Reference Include="System.AppContext, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL"> <HintPath>..\packages\System.AppContext.4.3.0\lib\net46\System.AppContext.dll</HintPath> @@ -186,6 +213,9 @@ <Private>True</Private> <Private>True</Private> </Reference> + <Reference Include="System.Spatial, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <HintPath>..\packages\System.Spatial.5.8.2\lib\net40\System.Spatial.dll</HintPath> + </Reference> <Reference Include="System.Threading.Tasks.Extensions, Version=4.2.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL"> <HintPath>..\packages\System.Threading.Tasks.Extensions.4.5.1\lib\netstandard2.0\System.Threading.Tasks.Extensions.dll</HintPath> </Reference> @@ -201,21 +231,29 @@ <Private>True</Private> <Private>True</Private> </Reference> + <Reference Include="Validation, Version=2.2.0.0, Culture=neutral, PublicKeyToken=2fc06f0d701809a7, processorArchitecture=MSIL"> + <HintPath>..\packages\Validation.2.2.8\lib\dotnet\Validation.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> - <Compile Include="Destinations\AzureHubTelemetryDestination.cs" /> + <Compile Include="Destinations\TelemetryAzureHubDestination.cs" /> <Compile Include="ExtensionMethods\ITelemetryExtensions.cs" /> + <Compile Include="ITelemetryCheckpointsRecoveryClient.cs" /> <Compile Include="ITelemetryDestination.cs" /> - <Compile Include="Destinations\MqttTelemetryDestination.cs" /> + <Compile Include="Destinations\TelemetryMqttDestination.cs" /> <Compile Include="ITelemetryHistorySource.cs" /> <Compile Include="ITelemetrySource.cs" /> <Compile Include="ITelemetryQueueManager.cs" /> <Compile Include="ITelemetryStorageManager.cs" /> <Compile Include="ITelemetryPublisher.cs" /> <Compile Include="ITelemetryStreamingSource.cs" /> + <Compile Include="Reporting\DestinationStatusSummary.cs" /> + <Compile Include="Reporting\SourceSummary.cs" /> + <Compile Include="Reporting\SourceTypeSummary.cs" /> + <Compile Include="Reporting\TelemetryReport.cs" /> <Compile Include="Sources\TelemetryDiagnosticsSource.cs" /> <Compile Include="Sources\TelemetryDiagnosticsSourceConfig.cs" /> - <Compile Include="Sources\TelemetryJobRunsHistoryModule.cs" /> + <Compile Include="Sources\TelemetryJobRunsHistorySource.cs" /> <Compile Include="Sources\TelemetryJobRunsHistorySourceConfig.cs" /> <Compile Include="PendingTelemetry.cs" /> <Compile Include="TelemetryConfigurableSource.cs" /> @@ -227,13 +265,15 @@ <Compile Include="TelemetryPendingDestination.cs" /> <Compile Include="TelemetryPendingStorageSource.cs" /> <Compile Include="TelemetryInMemoryQueueManager.cs" /> - <Compile Include="TelemetryPublisherAdvanced.cs" /> + <None Include="app.config" /> + <None Include="TelemetryPublisherAdvanced.cs" /> <Compile Include="TelemetryPublisherEventArgs.cs" /> <Compile Include="TelemetryPackagePublishingEventArgs.cs" /> <Compile Include="TelemetryPublishPackage.cs" /> <Compile Include="TelemetryPublisher.cs" /> <Compile Include="TelemetryPublisherConfiguration.cs" /> <Compile Include="TelemetryPublishResult.cs" /> + <Compile Include="TelemetryPublishResultAvailableEventArgs.cs" /> <Compile Include="TelemetrySourceTypes.cs" /> <Compile Include="TelemetryLiteDBStorageManager.cs" /> <Compile Include="ITelemetry.cs" /> @@ -244,6 +284,7 @@ <Compile Include="Telemetries\TelemetryJobRun.cs" /> </ItemGroup> <ItemGroup> + <None Include="Docs\AiResponse.md" /> <None Include="packages.config" /> </ItemGroup> <ItemGroup> diff --git a/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs index 7b1ca3a1c..5aca89ee0 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Telemetries/TelemetryJobRun.cs @@ -9,6 +9,29 @@ namespace Tango.Telemetry.Telemetries [TelemetryName("JobRun")] public class TelemetryJobRun : TelemetryBase { + public String Name { get; set; } + public String JobKind { get; set; } + public String Thread { get; set; } + public String NumberOfUnits { get; set; } + + public double LogicalLength { get; set; } + public double ActualLength { get; set; } + public double StartPosition { get; set; } + public double EndPosition { get; set; } + public double Distance { get; set; } + + public DateTime StartTime { get; set; } + public DateTime EndTime { get; set; } + public TimeSpan Duration { get; set; } + + public String Status { get; set; } + + public String ColorSpace { get; set; } + + public double TargetL { get; set; } + public double TargetA { get; set; } + public double TargetB { get; set; } + } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs index 20d5211e7..c2879e9db 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs @@ -10,14 +10,22 @@ namespace Tango.Telemetry { public class TelemetryBase : ITelemetry { + public string ID { get; set; } + + public DateTime Time { get; set; } + + public TelemetryBase() + { + ID = Guid.NewGuid().ToString(); + Time = DateTime.UtcNow; + } + [BsonIgnore] //This will be used for column mapping in ADX public String Type { get { return this.ToTelemetryName(); } } - public DateTime Time { get; set; } - public byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true) { return Encoding.UTF8.GetBytes(ToJson(format, flatten)); diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs index 2ceb95298..6700ba8af 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs @@ -5,45 +5,131 @@ using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; +using Tango.Core; +using Tango.Core.ExtensionMethods; +using Tango.Logging; namespace Tango.Telemetry { - public class TelemetryLiteDBStorageManager : ITelemetryStorageManager + public class TelemetryLiteDBStorageManager : ExtendedObject, ITelemetryStorageManager { + public class PublishedTelemetry + { + public String ID { get; set; } + public DateTime CreatedAt { get; set; } + } + private bool _disposed; private LiteDatabase _database; private static Object _lock = new object(); + private ITelemetryCheckpointsRecoveryClient _checkpointsRecoveryClient; + private HashSet<String> _publishedTelemetriesIDs; + private DateTime _lastCloudBackupTime = DateTime.MinValue; + private TimeSpan _checkpointsBackupInterval = TimeSpan.FromMinutes(1); + + public TimeSpan CheckpointsBackupInterval { get => _checkpointsBackupInterval; set => _checkpointsBackupInterval = value >= TimeSpan.FromMinutes(1) ? value : TimeSpan.FromMinutes(1); } public String DatabasePath { get; private set; } + public bool EnableCheckPointsRecovery { get; set; } + public bool EnforceCheckpointsRecovery { get; set; } + public TelemetryLiteDBStorageManager() { + _publishedTelemetriesIDs = new HashSet<string>(); + EnableCheckPointsRecovery = true; + EnforceCheckpointsRecovery = true; DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry"); + } + + public TelemetryLiteDBStorageManager(String databaseFile) : this() + { + DatabasePath = databaseFile; + } + + public async Task Init(ITelemetryCheckpointsRecoveryClient checkpointsRecoveryClient) + { + LogManager.Log("Initializing telemetry database..."); + + _checkpointsRecoveryClient = checkpointsRecoveryClient; + Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath)); + String backupPath = DatabasePath + ".bak"; + + if (!File.Exists(DatabasePath)) + { + if (File.Exists(backupPath)) + { + LogManager.Log("Telemetry database missing. Attempting to restore from backup.", LogCategory.Error); + File.Copy(backupPath, DatabasePath, overwrite: true); + } + else + { + LogManager.Log("Telemetry database was not found. A new one will be created and source checkpoints will be recovered from the remote service if required.", LogCategory.Critical); + + if (_checkpointsRecoveryClient == null && EnableCheckPointsRecovery && EnforceCheckpointsRecovery) + { + throw new NullReferenceException("No TelemetryCheckpointsRecoveryClient was introduced. Telemetry Storage manager should not operate."); + } + } + } + _database = new LiteDatabase($"Filename={DatabasePath}"); _database.Pragma("TIMEOUT", 10); //Read Timeout _database.Pragma("UTC_DATE", true); //Keep time as UTC when getting data _database.Commit(); - } - public virtual void Dispose() - { - if (_database != null && !_disposed) + var checkPointsCollection = GetSourcesCheckpointCollection(); + var localCheckPoints = checkPointsCollection.FindAll().ToList(); + + if (localCheckPoints.Count == 0) { - try + if (EnableCheckPointsRecovery) { - _disposed = true; - _database.Dispose(); - _database = null; + try + { + LogManager.Log("Attempting to retrieve sources checkpoints from backup..."); + var remoteCheckPoints = await _checkpointsRecoveryClient.GetCheckpointsBackup(); + if (remoteCheckPoints.Count > 0) + { + checkPointsCollection.InsertBulk(remoteCheckPoints); + LogManager.Log($"Sources checkpoints successfully recovered.\n{remoteCheckPoints.ToJsonString()}"); + } + else + { + LogManager.Log("No sources checkpoint found on backup. Assuming first operation..."); + } + } + catch (Exception ex) + { + if (EnforceCheckpointsRecovery) + { + LogManager.Log(ex, LogCategory.Critical, "Could not retrieve sources checkpoints from backup. Telemetry storage manager should not operate."); + throw; + } + else + { + LogManager.Log(ex, LogCategory.Warning, "Could not retrieve sources checkpoints from backup. No Checkpoints available!"); + } + } } - catch { } } + else + { + var minDateTime = localCheckPoints.Min(x => x.Time); + PerformPublishedTelemetriesCleanUp(minDateTime); + } + + LogManager.Log("Loading published telemetries cache..."); + _publishedTelemetriesIDs = new HashSet<string>(GetPublishedTelemetriesCollection().FindAll().Select(x => x.ID).ToList()); + + LogManager.Log("Telemetry LiteDB storage manager initialized..."); } - ~TelemetryLiteDBStorageManager() + private ILiteCollection<PublishedTelemetry> GetPublishedTelemetriesCollection() { - Dispose(); + return _database.GetCollection<PublishedTelemetry>("PublishedTelemetries"); } private ILiteCollection<PendingTelemetry> GetPendingTelemetriesCollection() @@ -79,7 +165,8 @@ namespace Tango.Telemetry lock (_lock) { var collection = GetPendingTelemetriesCollection(); - return collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList(); + var pendingTelemetries = collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList(); + return pendingTelemetries; } } @@ -88,7 +175,26 @@ namespace Tango.Telemetry lock (_lock) { var collection = GetSourcesCheckpointCollection(); - return collection.FindOne(x => x.SourceName == source.Name); + var checkpoint = collection.FindOne(x => x.SourceName == source.Name); + + if (checkpoint == null) + { + checkpoint = new TelemetryHistorySourceCheckPoint(); + checkpoint.SourceName = source.Name; + checkpoint.Time = DateTime.MinValue; + } + + return checkpoint; + } + } + + public List<TelemetryHistorySourceCheckPoint> GetHistorySourcesCheckPoints() + { + lock (_lock) + { + var collection = GetSourcesCheckpointCollection(); + var checkpoints = collection.FindAll().ToList(); + return checkpoints; } } @@ -98,6 +204,24 @@ namespace Tango.Telemetry { var collection = GetSourcesCheckpointCollection(); collection.Upsert(new TelemetryHistorySourceCheckPoint() { SourceName = source.Name, Time = time, TotalCount = totalCount }); + + if (_checkpointsRecoveryClient != null && DateTime.UtcNow - _lastCloudBackupTime > CheckpointsBackupInterval) + { + _lastCloudBackupTime = DateTime.UtcNow; + Task.Run(async () => + { + try + { + var allCheckpoints = collection.FindAll().ToList(); + await _checkpointsRecoveryClient.SaveCheckpointsBackup(allCheckpoints); + LogManager.Log("Sources checkpoints successfully backed up to remote service."); + } + catch (Exception ex) + { + LogManager.Log(ex, LogCategory.Warning, "Failed to back up checkpoints to remote service."); + } + }); + } } } @@ -106,8 +230,47 @@ namespace Tango.Telemetry lock (_lock) { var collection = GetPendingTelemetriesCollection(); - return collection.Count(); + var count = collection.Count(); + return count; + } + } + + public void AddToPublishedTelemetryCache(ITelemetry telemetry) + { + _publishedTelemetriesIDs.Add(telemetry.ID); + GetPublishedTelemetriesCollection().Insert(new PublishedTelemetry() { CreatedAt = DateTime.UtcNow, ID = telemetry.ID }); + } + + public bool IsTelemetryInPublishedCache(ITelemetry telementry) + { + return _publishedTelemetriesIDs.Contains(telementry.ID); + } + + public void PerformPublishedTelemetriesCleanUp(DateTime olderThan) + { + LogManager.Log("Performing published telemetries cache cleanup..."); + var collection = GetSourcesCheckpointCollection(); + int deleted = collection.DeleteMany(x => x.Time < olderThan); + LogManager.Log($"Published telemetries cleanup completed. {deleted} cleaned."); + } + + public virtual void Dispose() + { + if (_database != null && !_disposed) + { + try + { + _disposed = true; + _database.Dispose(); + _database = null; + } + catch { } } } + + ~TelemetryLiteDBStorageManager() + { + Dispose(); + } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs index 6aa1f5527..a5e176ca7 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs @@ -9,6 +9,7 @@ namespace Tango.Telemetry public class TelemetryPendingStorageSource : ITelemetrySource { public string Name { get; private set; } = "Pending Storage"; + public bool RequiresTelemetryDuplicationTracking { get => false; } public void Dispose() { diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs index baafb70a7..7b6b577a0 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishPackage.cs @@ -1,4 +1,5 @@ -using System; +using Newtonsoft.Json; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -10,15 +11,26 @@ namespace Tango.Telemetry { private String _payload; + internal TaskCompletionSource<TelemetryPublishResult> CompletionSource { get; set; } public ITelemetrySource Source { get; set; } public PendingTelemetry PendingTelemetry { get; set; } public TelemetrySourceTypes SourceType { get; set; } - public String ToPayload() + public String Environment { get; set; } + public String SerialNumber { get; set; } + public String MachineType { get; set; } + public String TelemetryName { get; internal set; } + + public TelemetryPublishPackage() + { + CompletionSource = new TaskCompletionSource<TelemetryPublishResult>(); + } + + public String ToPayload(Formatting format = Formatting.None, bool flatten = false) { if (_payload == null) { - _payload = PendingTelemetry.TelemetryObject.ToJson(Newtonsoft.Json.Formatting.None, flatten: true); + _payload = PendingTelemetry.TelemetryObject.ToJson(format, flatten); } return _payload; diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs index 37c6af412..b423580d2 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResult.cs @@ -6,8 +6,15 @@ using System.Threading.Tasks; namespace Tango.Telemetry { + /// <summary> + /// Represents the result of publishing a telemetry package to one or more destinations. + /// Contains the status and metrics per destination and total publish time. + /// </summary> public class TelemetryPublishResult { + /// <summary> + /// Defines the outcome of an attempt to publish to a specific destination. + /// </summary> public enum DestinationStatus { None, @@ -16,26 +23,98 @@ namespace Tango.Telemetry Failed, Postponed } - + + /// <summary> + /// Contains information about the result of publishing telemetry to a specific destination. + /// </summary> public class DestinationResult { + /// <summary> + /// The destination to which the telemetry was attempted to be published. + /// </summary> public ITelemetryDestination Destination { get; set; } + + /// <summary> + /// The result status of the publish attempt. + /// </summary> public DestinationStatus Status { get; set; } + + /// <summary> + /// Any error that occurred during the publish attempt. + /// </summary> public Exception Error { get; set; } + + /// <summary> + /// The amount of time it took to attempt publishing to this destination. + /// </summary> public TimeSpan ElapsedTime { get; set; } + + /// <summary> + /// Number of retry attempts for this destination. + /// </summary> + public int RetryCount { get; internal set; } + + /// <summary> + /// Time until the next eligible retry attempt. + /// </summary> + public TimeSpan RetryDelay { get; internal set; } } + /// <summary> + /// Gets or sets the telemetry source that generated the package associated with this publish result. + /// </summary> + public ITelemetrySource Source { get; set; } + + /// <summary> + /// Gets or sets the source type of the telemetry (e.g., Streaming, ExternalStorage, PendingStorage). + /// </summary> + public TelemetrySourceTypes SourceType { get; set; } + + /// <summary> + /// List of results for each destination that was part of the publish process. + /// </summary> public List<DestinationResult> DestinationsResults { get; set; } + + /// <summary> + /// Total elapsed time taken to publish the telemetry package across all destinations. + /// </summary> public TimeSpan TotalElapsedTime { get; set; } + /// <summary> + /// Time spent outside of destination publishing, typically system overhead or coordination. + /// </summary> public TimeSpan OverheadTime { get { return TimeSpan.FromMilliseconds(TotalElapsedTime.TotalMilliseconds - DestinationsResults.Sum(x => x.ElapsedTime.TotalMilliseconds)); } } + /// <summary> + /// Initializes a new instance of the TelemetryPublishResult class. + /// </summary> public TelemetryPublishResult() { DestinationsResults = new List<DestinationResult>(); } + + public override string ToString() + { + var sb = new StringBuilder(); + + sb.AppendLine($"Source: {Source?.Name ?? "Unknown"} ({SourceType})"); + sb.AppendLine($"Total Elapsed Time: {TotalElapsedTime.TotalMilliseconds:F1} ms"); + sb.AppendLine($"Overhead Time: {OverheadTime.TotalMilliseconds:F1} ms"); + sb.AppendLine("Destination Results:"); + + foreach (var result in DestinationsResults ?? Enumerable.Empty<DestinationResult>()) + { + sb.Append($" - {result.Destination.Name}: {result.Status}, {result.ElapsedTime.TotalMilliseconds:F1} ms"); + if (!string.IsNullOrWhiteSpace(result.Error.ToStringSafe())) + sb.Append($" (Error: {result.Error})"); + + sb.AppendLine(); + } + + return sb.ToString(); + } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResultAvailableEventArgs.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResultAvailableEventArgs.cs new file mode 100644 index 000000000..2a177d2ab --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublishResultAvailableEventArgs.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public class TelemetryPublishResultAvailableEventArgs : TelemetryPublisherEventArgs + { + public TelemetryPublishResult PublishResult { get; set; } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs index 99d96edff..42121d884 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs @@ -16,45 +16,109 @@ using Tango.Integration.Operation; using Tango.Logging; using Tango.PMR.Diagnostics; using Tango.PMR.Insights; +using Tango.Telemetry.Reporting; using Tango.Telemetry.Telemetries; namespace Tango.Telemetry { + /// <summary> + /// TelemetryPublisher is responsible for collecting telemetry data from sources, + /// queuing it, storing it if needed, and publishing it to one or more destinations. + /// It supports streaming, historical, and retry-based telemetry flows. + /// </summary> public class TelemetryPublisher : ExtendedObject, ITelemetryPublisher { + /// <summary> + /// Occurs before a telemetry package is published to a destination. + /// </summary> public event EventHandler<TelemetryPackagePublishingEventArgs> PublishingPackage; + + /// <summary> + /// Occurs when a telemetry package has been successfully published to a destination. + /// </summary> public event EventHandler<TelemetryPackagePublishedEventArgs> PackagePublished; + + /// <summary> + /// Occurs when a telemetry package fails to publish to a destination. + /// </summary> public event EventHandler<TelemetryPackagePublishFailedEventArgs> PublishPackageFailed; + /// <summary> + /// Occurs when a telemetry publish operation has completed and a publish result is available, + /// indicating the success or failure status for each destination. + /// </summary> + public event EventHandler<TelemetryPublishResultAvailableEventArgs> PublishResultAvailable; + + // Timer to periodically check and publish pending telemetry from local storage private System.Timers.Timer _pendingStorageCheckTimer; + + // Timer to periodically fetch historical data from ITelemetryHistorySource private System.Timers.Timer _historicalDataTimer; + // Indicates if the publisher has been disposed protected bool _isDisposed; + + // Background thread responsible for dequeuing and publishing telemetry private Thread _publishThread; + + // Source used to tag telemetry loaded from pending storage private TelemetryPendingStorageSource _pendingStorageSource; + //Timer responsible for triggering periodic cleanup of the published telemetries cache, + private System.Timers.Timer _publishedTelemetriesCacheCleanupTimer; + + private List<TelemetryPublishResult> _pastResults; + #region Properties + /// <summary> + /// Indicates whether the publisher is actively running. + /// </summary> public bool IsStarted { get; private set; } - public TelemetryPublisherConfiguration Config { get; private set; } + /// <summary> + /// Publisher configuration containing telemetry parameters and limits. + /// </summary> + public TelemetryPublisherConfiguration Config { get; } - public ITelemetryStorageManager StorageManager { get; private set; } + /// <summary> + /// Manages persistence of telemetry data (e.g., LiteDB). + /// </summary> + public ITelemetryStorageManager StorageManager { get; } private List<ITelemetrySource> InnerSources { get; } + /// <summary> + /// Public read-only access to telemetry sources. + /// </summary> public ReadOnlyCollection<ITelemetrySource> Sources { get; } private List<ITelemetryDestination> InnerDestinations { get; } + /// <summary> + /// Public read-only access to telemetry destinations. + /// </summary> public ReadOnlyCollection<ITelemetryDestination> Destinations { get; } + /// <summary> + /// Manages telemetry queuing between ingestion and publish phases. + /// </summary> public ITelemetryQueueManager QueueManager { get; private set; } + /// <summary> + /// Gets the client used for remote checkpoint recovery. + /// </summary> + public ITelemetryCheckpointsRecoveryClient CheckpointsRecoveryClient { get; } + #endregion #region Constructor - public TelemetryPublisher(TelemetryPublisherConfiguration config) + /// <summary> + /// Initializes the telemetry publisher with default storage and queue managers. + /// </summary> + public TelemetryPublisher(TelemetryPublisherConfiguration config, ITelemetryCheckpointsRecoveryClient checkPointsRecoveryClient) { + _pastResults = new List<TelemetryPublishResult>(); + Config = config ?? new TelemetryPublisherConfiguration(); _pendingStorageSource = new TelemetryPendingStorageSource(); @@ -68,11 +132,16 @@ namespace Tango.Telemetry _publishThread = new Thread(PublishThreadMethod); _publishThread.IsBackground = true; + CheckpointsRecoveryClient = checkPointsRecoveryClient; + StorageManager = new TelemetryLiteDBStorageManager(); QueueManager = new TelemetryInMemoryQueueManager(); } - public TelemetryPublisher(ITelemetryStorageManager storageManager, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : this(config) + /// <summary> + /// Initializes the telemetry publisher with custom storage and queue managers. + /// </summary> + public TelemetryPublisher(ITelemetryStorageManager storageManager, ITelemetryCheckpointsRecoveryClient checkPointsRecoveryClient, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : this(config, checkPointsRecoveryClient) { StorageManager = storageManager; QueueManager = queueManager; @@ -82,6 +151,9 @@ namespace Tango.Telemetry #region Sources + /// <summary> + /// Registers a telemetry source, such as a streaming or historical source. + /// </summary> public void RegisterSource(ITelemetrySource source) { if (source == null) return; @@ -114,6 +186,9 @@ namespace Tango.Telemetry LogManager.Log($"Telemetry source {source.Name} registered."); } + /// <summary> + /// Callback when a telemetry streaming source emits new telemetry. + /// </summary> private void StreamingSource_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e) { var source = sender as ITelemetrySource; @@ -128,6 +203,9 @@ namespace Tango.Telemetry #region Destinations + /// <summary> + /// Registers a telemetry destination, such as a cloud service or local database. + /// </summary> public void RegisterDestination(ITelemetryDestination destination) { if (destination == null) return; @@ -147,7 +225,10 @@ namespace Tango.Telemetry #region Start / Stop - public void Start() + /// <summary> + /// Starts all timers, threads, and streaming sources for publishing telemetry. + /// </summary> + public async Task Start() { if (!IsStarted) { @@ -160,6 +241,8 @@ namespace Tango.Telemetry IsStarted = true; + await StorageManager.Init(CheckpointsRecoveryClient); + if (_pendingStorageCheckTimer == null) { _pendingStorageCheckTimer = new System.Timers.Timer(); @@ -178,6 +261,15 @@ namespace Tango.Telemetry _historicalDataTimer.Start(); + if (_publishedTelemetriesCacheCleanupTimer == null) + { + _publishedTelemetriesCacheCleanupTimer = new System.Timers.Timer(); + _publishedTelemetriesCacheCleanupTimer.Interval = Config.PublishedTelemetriesCacheCleanupInterval.TotalMilliseconds; + _publishedTelemetriesCacheCleanupTimer.Elapsed += PublishedTelemetriesCacheCleanupTimer_Elapsed; + } + + _publishedTelemetriesCacheCleanupTimer.Start(); + _publishThread.Start(); InnerSources.OfType<ITelemetryStreamingSource>().ToList().ForEach(x => x.Start()); @@ -187,13 +279,16 @@ namespace Tango.Telemetry catch (Exception ex) { LogManager.Log(ex, "Error starting telemetry publisher."); - Stop(); - throw ex; + await Stop(); + throw; } } } - public void Stop() + /// <summary> + /// Stops all activity and releases threads and sources gracefully. + /// </summary> + public Task Stop() { if (IsStarted) { @@ -212,14 +307,19 @@ namespace Tango.Telemetry LogManager.Log(ex, $"Error while trying to stop telemetry source {x.Name}."); } }); - _pendingStorageCheckTimer.Stop(); - _historicalDataTimer.Stop(); - QueueManager.Enqueue(null); + _pendingStorageCheckTimer?.Stop(); + _historicalDataTimer?.Stop(); + QueueManager?.Enqueue(null); LogManager.Log("Telemetry publisher stopped."); } + + return Task.FromResult(true); } + /// <summary> + /// Performs runtime validation of configuration, sources, and destinations. + /// </summary> public void Validate() { // Validate all registered sources @@ -262,81 +362,148 @@ namespace Tango.Telemetry #region Timers + /// <summary> + /// Periodically invoked to process telemetry from persistent local storage. + /// </summary> private async void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e) { - _pendingStorageCheckTimer.Stop(); - - LogManager.Log($"Fetching pending telemetries from storage (MaxCount: {Config.MaxPendingStorageTelemetriesPerCycle})..."); + LogManager.Log("Pending storage check timer elapsed. Starting flush operation for pending telemetries.", LogCategory.Debug); - var batch = StorageManager.GetPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle); + _pendingStorageCheckTimer.Stop(); - LogManager.Log($"Pending telemetries count is {batch.Count}. Publishing..."); + try + { + var results = await FlushPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle); + LogManager.Log($"Flush operation completed. {results.Count} telemetry package(s) processed from pending storage.", LogCategory.Debug); + } + catch (Exception ex) + { + LogManager.Log(ex, LogCategory.Error, "Exception occurred while flushing pending telemetry packages."); + } + finally + { + _pendingStorageCheckTimer.Start(); + LogManager.Log("Pending storage check timer restarted.", LogCategory.Debug); + } + } - Dictionary<String, int> destinationsPasses = new Dictionary<string, int>(); - List<TelemetryPublishResult> results = new List<TelemetryPublishResult>(); + /// <summary> + /// Periodically invoked to fetch and push historical data from history sources. + /// </summary> + private async void HistoricalDataTimer_Elapsed(object sender, ElapsedEventArgs e) + { + LogManager.Log("Historical data timer elapsed. Checking for available capacity...", LogCategory.Debug); + _historicalDataTimer.Stop(); - foreach (var pendingTelemetry in batch) + try { - var result = await PublishTelemetryPackage(new TelemetryPublishPackage() - { - Source = _pendingStorageSource, - PendingTelemetry = pendingTelemetry, - SourceType = TelemetrySourceTypes.PendingStorage - }); + int queueCount = QueueManager.Count; + int storageCount = StorageManager.GetPendingTelemetriesCount(); - results.Add(result); + LogManager.Log($"Current queue count: {queueCount}, pending storage count: {storageCount}", LogCategory.Debug); - foreach (var d in result.DestinationsResults) + if (queueCount < Config.MaxPendingTelemetries && storageCount < Config.MaxPendingTelemetries) { - if (d.Status == TelemetryPublishResult.DestinationStatus.Passed) + foreach (var source in InnerSources.OfType<ITelemetryHistorySource>().ToList()) { - destinationsPasses[d.Destination.Name] += 1; - } - } + try + { + TelemetryHistorySourceCheckPoint checkPoint = StorageManager.GetHistorySourceCheckPoint(source); - if (!IsStarted || _isDisposed) return; - } + LogManager.Log($"Evaluating history source '{source.Name}' at checkpoint time {checkPoint?.Time:u}", LogCategory.Debug); + + if (await source.CanRequestHistory(checkPoint.Time)) + { + var historyTelemetries = (await source.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList(); + + LogManager.Log($"History source '{source.Name}' returned {historyTelemetries.Count} telemetry items.", LogCategory.Debug); - LogManager.Log($"Publishing pending telemetries completed after {results.Sum(x => x.TotalElapsedTime.Seconds)} seconds. Destination OK Count: {String.Join(", ", destinationsPasses.Select(x => x.Key + " -> " + x.Value))}"); + foreach (var telemetry in historyTelemetries) + { + await PushTelemetryPackageAwait(source, telemetry, TelemetrySourceTypes.ExternalStorage); + checkPoint.Time = telemetry.Time; + checkPoint.TotalCount++; + StorageManager.SetHistorySourceCheckPoint(source, checkPoint.Time, checkPoint.TotalCount); + } - _pendingStorageCheckTimer.Start(); + LogManager.Log($"Checkpoint updated for source '{source.Name}': time = {checkPoint.Time:u}, total = {checkPoint.TotalCount}", LogCategory.Debug); + } + else + { + LogManager.Log($"History request for source '{source.Name}' was not permitted at checkpoint time {checkPoint?.Time:u}", LogCategory.Debug); + } + } + catch (Exception ex) + { + LogManager.Log(ex, LogCategory.Error, $"Exception while processing history for source '{source?.Name}'"); + } + } + } + else + { + LogManager.Log("Historical data fetch skipped due to max pending telemetry limit reached.", LogCategory.Debug); + } + } + catch (Exception ex) + { + LogManager.Log(ex, LogCategory.Critical, "Unexpected error during HistoricalDataTimer_Elapsed."); + } + finally + { + _historicalDataTimer.Start(); + LogManager.Log("Historical data timer restarted.", LogCategory.Debug); + } } - private async void HistoricalDataTimer_Elapsed(object sender, ElapsedEventArgs e) + /// <summary> + /// Handles the elapsed event of the published telemetries cache cleanup timer. + /// Determines the earliest checkpoint across all history sources and removes published telemetry entries + /// older than that point to keep the cache size manageable over time. + /// </summary> + private void PublishedTelemetriesCacheCleanupTimer_Elapsed(object sender, ElapsedEventArgs e) { - _historicalDataTimer.Stop(); + LogManager.Log("Published telemetry cache cleanup timer elapsed. Starting cleanup process...", LogCategory.Debug); - LogManager.Log(""); + _publishedTelemetriesCacheCleanupTimer.Stop(); - if (QueueManager.Count < Config.MaxPendingTelemetries && StorageManager.GetPendingTelemetriesCount() < Config.MaxPendingTelemetries) + try { - foreach (var source in InnerSources.OfType<ITelemetryHistorySource>().ToList()) + var checkPoints = StorageManager.GetHistorySourcesCheckPoints(); + LogManager.Log($"Retrieved {checkPoints.Count} source checkpoints for cleanup evaluation.", LogCategory.Debug); + + if (checkPoints.Count > 0) { - TelemetryHistorySourceCheckPoint checkPoint = StorageManager.GetHistorySourceCheckPoint(source); - if (await source.CanRequestHistory(checkPoint.Time)) - { - var historyTelemetries = (await source.RequestHistory(checkPoint.Time)).OrderBy(x => x.Time).ToList(); + DateTime olderThan = checkPoints.Min(x => x.Time); + LogManager.Log($"Initiating cleanup of published telemetries older than {olderThan:u}.", LogCategory.Debug); - foreach (var telemetry in historyTelemetries) - { - PushTelemetryPackage(source, telemetry, TelemetrySourceTypes.ExternalStorage); - checkPoint.Time = telemetry.Time; - checkPoint.TotalCount++; - } + StorageManager.PerformPublishedTelemetriesCleanUp(olderThan); - StorageManager.SetHistorySourceCheckPoint(source, checkPoint.Time, checkPoint.TotalCount); - } + LogManager.Log("Published telemetry cache cleanup completed successfully.", LogCategory.Debug); + } + else + { + LogManager.Log("No checkpoints found. Cleanup skipped.", LogCategory.Debug); } } - - _historicalDataTimer.Start(); + catch (Exception ex) + { + LogManager.Log(ex, LogCategory.Error, "Exception occurred during published telemetry cache cleanup."); + } + finally + { + _publishedTelemetriesCacheCleanupTimer.Start(); + LogManager.Log("Published telemetry cache cleanup timer restarted.", LogCategory.Debug); + } } #endregion #region Push - private void PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType) + /// <summary> + /// Enqueues telemetry into the system based on a source and type. + /// </summary> + private TelemetryPublishPackage PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType) { PendingTelemetry pendingTelemetry = new PendingTelemetry(); pendingTelemetry.Created = DateTime.UtcNow; @@ -345,18 +512,45 @@ namespace Tango.Telemetry pendingTelemetry.SourceType = sourceType; pendingTelemetry.TelemetryObject = telemetry; - PushTelemetryPackage(new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType }); + var package = new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType }; + + PushTelemetryPackage(package); + + return package; } + /// <summary> + /// Enqueues telemetry and returns a task that resolves when it is published. + /// </summary> + private Task<TelemetryPublishResult> PushTelemetryPackageAwait(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType) + { + return PushTelemetryPackage(source, telemetry, sourceType).CompletionSource.Task; + } + + /// <summary> + /// Enqueues an already-wrapped package for background publishing. + /// </summary> private void PushTelemetryPackage(TelemetryPublishPackage package) { QueueManager.Enqueue(package); } + /// <summary> + /// Enqueues a wrapped package and awaits publish result asynchronously. + /// </summary> + private Task<TelemetryPublishResult> PushTelemetryPackageAwait(TelemetryPublishPackage package) + { + PushTelemetryPackage(package); + return package.CompletionSource.Task; + } + #endregion #region Publish + /// <summary> + /// Background thread method to publish telemetry from the queue. + /// </summary> private async void PublishThreadMethod() { while (IsStarted) @@ -379,118 +573,360 @@ namespace Tango.Telemetry } } + // This method is responsible for publishing a telemetry package to all configured destinations. + // It handles per-destination retry logic, exponential backoff, availability checks, and result reporting. + // The goal is to guarantee eventual delivery of telemetry with robust fault tolerance and observability. protected virtual async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package) { - Stopwatch totalWatch = Stopwatch.StartNew(); + LogManager.Log($"Starting publish process for telemetry package from source '{package.Source?.Name}' with type '{package.SourceType}'", LogCategory.Debug); - var result = new TelemetryPublishResult(); + Stopwatch totalWatch = Stopwatch.StartNew(); // Start measuring total publish duration + var result = new TelemetryPublishResult(); // Result container with per-destination feedback + result.Source = package.Source; + result.SourceType = package.SourceType; - if (!IsStarted || _isDisposed) return result; + // Abort early if the publisher is inactive + if (!IsStarted || _isDisposed) + { + LogManager.Log("Publish attempt skipped because the publisher is not started or has been disposed.", LogCategory.Warning); + package.CompletionSource.SetResult(result); + return result; + } - List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>(); + //Marking the telemetry as published to avoid duplication from streaming and history sources that can produce the same telemetry. + if (package.Source.RequiresTelemetryDuplicationTracking) + { + if (StorageManager.IsTelemetryInPublishedCache(package.PendingTelemetry.TelemetryObject)) + { + LogManager.Log("Publish attempt skipped because the telemetry was already published.", LogCategory.Warning); + package.CompletionSource.SetResult(result); + return result; + } + else + { + StorageManager.AddToPublishedTelemetryCache(package.PendingTelemetry.TelemetryObject); + } + } + + // Prepare standard metadata properties attached to all telemetry sent + var telemetryName = package.PendingTelemetry.TelemetryObject.ToTelemetryName(); - properties.Add(new KeyValuePair<string, string>("MachineID", Config.MachineID)); - properties.Add(new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName())); + List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>(); + properties.Add(new KeyValuePair<string, string>("SerialNumber", Config.SerialNumber)); + properties.Add(new KeyValuePair<string, string>("MachineType", Config.MachineType.ToShortName())); properties.Add(new KeyValuePair<string, string>("Environment", Config.Environment)); + properties.Add(new KeyValuePair<string, string>("Type", telemetryName)); + + //Setting telemetry package basic properties for destination.. + package.TelemetryName = telemetryName; + package.SerialNumber = Config.SerialNumber; + package.Environment = Config.Environment; + package.MachineType = Config.MachineType.ToShortName(); + + var now = DateTime.UtcNow; // Capture timestamp once for all retry logic List<TelemetryPendingDestination> pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); - //Add all destinations if streaming or external (They will be remove later if successfull) - //If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db. + // If this is a fresh package, initialize pending destinations if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { + LogManager.Log("Evaluating destinations for initial pending destination registration...", LogCategory.Debug); foreach (var destination in Destinations) { - if (!pendingDestinations.Exists(x => x.Name == destination.Name)) + if (destination.SupportedSourceTypes.Contains(package.SourceType)) { - pendingDestinations.Add(new TelemetryPendingDestination { Name = destination.Name }); + if (!pendingDestinations.Exists(x => x.Name == destination.Name)) + { + pendingDestinations.Add(new TelemetryPendingDestination + { + Name = destination.Name, + RetryCount = 0, + LastAttempt = DateTime.MinValue, + NextEligibleAttempt = now + }); + LogManager.Log($"Added destination '{destination.Name}' to pending destinations.", LogCategory.Debug); + } } } } - foreach (var destination in Destinations.Where(x => x.Enable && x.SupportedSourceTypes.Contains(package.SourceType))) + // Try publishing to each valid destination + foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType))) { - if (pendingDestinations.Exists(x => x.Name == destination.Name)) + var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name); + if (pendingEntry == null) continue; // Skip destinations not pending for this package + + // Prepare result tracking for this destination + var destinationResult = new TelemetryPublishResult.DestinationResult(); + destinationResult.Destination = destination; + destinationResult.RetryCount = pendingEntry.RetryCount; + destinationResult.RetryDelay = TimeSpan.FromSeconds(Math.Max(0, (pendingEntry.NextEligibleAttempt - now).TotalSeconds)); + result.DestinationsResults.Add(destinationResult); + + // If we're still in a backoff delay, skip for now + if (now < pendingEntry.NextEligibleAttempt) { - Stopwatch destinationWatch = Stopwatch.StartNew(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed; + destinationResult.ElapsedTime = TimeSpan.Zero; + LogManager.Log($"Skipping '{destination.Name}' until {pendingEntry.NextEligibleAttempt:O} (backoff in effect).", LogCategory.Debug); + continue; + } - var destinationResult = new TelemetryPublishResult.DestinationResult(); - destinationResult.Destination = destination; - result.DestinationsResults.Add(destinationResult); + Stopwatch destinationWatch = Stopwatch.StartNew(); // Measure this attempt duration - try + try + { + // Remove destination from pending list so we can re-add it if needed after this attempt + pendingDestinations.RemoveAll(x => x.Name == destination.Name); + LogManager.Log($"Attempting to publish to destination '{destination.Name}'...", LogCategory.Debug); + + // Allow event handlers to cancel or inspect the publish + if (OnPublishingPackage(package, destination)) { - pendingDestinations.RemoveAll(x => x.Name == destination.Name); - if (OnPublishingPackage(package, destination)) + // Ensure destination is ready before sending + if (await destination.IsAvailable()) { - if (await destination.IsAvailable()) - { - await destination.Publish(package, properties); - OnPackagePublished(package, destination); + await destination.Publish(package, properties); // Perform publish + OnPackagePublished(package, destination); // Notify success event + + destinationWatch.Stop(); + + destinationResult.RetryDelay = TimeSpan.Zero; + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + + LogManager.Log($"Successfully published to '{destination.Name}' in {destinationResult.ElapsedTime.TotalMilliseconds} ms."); + } + else + { + // Mark as temporarily unavailable and schedule retry + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; + destinationResult.ElapsedTime = destinationWatch.Elapsed; - destinationWatch.Stop(); - destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; - destinationResult.ElapsedTime = destinationWatch.Elapsed; + LogManager.Log($"Destination '{destination.Name}' is unavailable.", LogCategory.Warning); + + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) + { + pendingEntry.RetryCount++; + pendingEntry.LastAttempt = now; + int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)Config.MaxExponentialBackoff.TotalSeconds); + pendingEntry.NextEligibleAttempt = now.AddSeconds(delay); + LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds.", LogCategory.Debug); + pendingDestinations.Add(pendingEntry); } else { - destinationWatch.Stop(); - destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; - destinationResult.ElapsedTime = destinationWatch.Elapsed; - - if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) - { - if (!pendingDestinations.Exists(x => x.Name == destination.Name)) - { - pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name }); - } - } + LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending.", LogCategory.Debug); } } } - catch (Exception ex) - { - destinationWatch.Stop(); - destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; - destinationResult.Error = ex; - destinationResult.ElapsedTime = destinationWatch.Elapsed; - - LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); + } + catch (Exception ex) + { + // Log unexpected failure and retry if supported + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; + destinationResult.Error = ex; + destinationResult.ElapsedTime = destinationWatch.Elapsed; - OnPackagePublishFailed(package, destination, ex); + LogManager.Log(ex, $"Error publishing telemetry to '{destination.Name}'."); + OnPackagePublishFailed(package, destination, ex); - if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) - { - if (!pendingDestinations.Exists(x => x.Name == destination.Name)) - { - pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name }); - } - } + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) + { + pendingEntry.RetryCount++; + pendingEntry.LastAttempt = now; + int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)Config.MaxExponentialBackoff.TotalSeconds); + pendingEntry.NextEligibleAttempt = now.AddSeconds(delay); + LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds due to failure.", LogCategory.Debug); + pendingDestinations.Add(pendingEntry); + } + else + { + LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending after failure.", LogCategory.Debug); } } } + // Save retry state back into the package package.PendingTelemetry.PendingDestinations = pendingDestinations; - if (package.SourceType == TelemetrySourceTypes.PendingStorage && package.PendingTelemetry.PendingDestinations.Count == 0) + // Remove from storage if all destinations succeeded; otherwise persist state + if (package.PendingTelemetry.PendingDestinations.Count == 0) { + LogManager.Log("Deleting successfully published telemetry from storage.", LogCategory.Debug); StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } else { + LogManager.Log("Saving telemetry package for future retry or tracking.", LogCategory.Debug); StorageManager.UpsertPendingTelemetry(package.PendingTelemetry); } + // Finalize result and notify completion totalWatch.Stop(); result.TotalElapsedTime = totalWatch.Elapsed; + LogManager.Log($"Completed publish process for telemetry from source '{package.Source?.Name}' in {result.TotalElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug); + + //Add results for reporting + _pastResults.Add(result); + + //Set task result for once that are awaiting from outside this method. + package.CompletionSource.SetResult(result); + + //Raising final event + OnPublishResultAvailable(package, result); return result; } #endregion + #region Flush + + /// <summary> + /// Flushes up to the specified number of pending telemetries from local storage, + /// attempting to publish them immediately. This can be used to force a retry of previously failed or postponed telemetry packages. + /// </summary> + /// <param name="maxCount">The maximum number of pending telemetry packages to flush.</param> + /// <returns> + /// A task that represents the asynchronous flush operation, returning a list of publish results for the flushed packages. + /// </returns> + public async Task<List<TelemetryPublishResult>> FlushPendingTelemetries(int maxCount) + { + if (!IsStarted || _isDisposed) + { + LogManager.Log("FlushPendingTelemetries called while publisher is not started or already disposed. Operation aborted.", LogCategory.Warning); + return new List<TelemetryPublishResult>(); + } + + var batch = StorageManager.GetPendingTelemetries(maxCount); + LogManager.Log($"Flushing {batch.Count} pending telemetry package(s).", LogCategory.Info); + + List<TelemetryPublishResult> results = new List<TelemetryPublishResult>(); + + foreach (var pendingTelemetry in batch) + { + try + { + var package = new TelemetryPublishPackage() + { + Source = _pendingStorageSource, + PendingTelemetry = pendingTelemetry, + SourceType = TelemetrySourceTypes.PendingStorage + }; + + var result = await PushTelemetryPackageAwait(package); + results.Add(result); + + LogManager.Log( + $"Flushed telemetry to destinations: {string.Join(", ", result.DestinationsResults.Select(r => $"{r.Destination.Name}={r.Status}"))}", + LogCategory.Debug); + } + catch (Exception ex) + { + LogManager.Log(ex, LogCategory.Error, "Exception occurred while flushing a pending telemetry package."); + } + + if (!IsStarted || _isDisposed) + { + LogManager.Log("Flush operation interrupted: publisher is no longer active.", LogCategory.Warning); + return results; + } + } + + LogManager.Log("FlushPendingTelemetries completed successfully.", LogCategory.Info); + return results; + } + + #endregion + + #region Reporting + + /// <summary> + /// Generates a detailed telemetry report summarizing the current state of the telemetry system. + /// The report includes statistics on published and pending telemetry, as well as per-source and per-destination results. + /// </summary> + public Task<TelemetryReport> GetTelemetryReport() + { + return Task.Factory.StartNew(() => + { + TelemetryReport report = new TelemetryReport + { + GeneratedAt = DateTime.UtcNow, + TotalPending = StorageManager.GetPendingTelemetriesCount() + }; + + var results = _pastResults.ToList(); + report.TotalPublished = results.Count; + + foreach (var result in results) + { + var sourceType = result.SourceType; + var sourceName = result.Source?.Name ?? "UnknownSource"; + + if (!report.SourceTypes.TryGetValue(sourceType, out var sourceTypeSummary)) + { + sourceTypeSummary = new SourceTypeSummary + { + SourceType = sourceType + }; + report.SourceTypes[sourceType] = sourceTypeSummary; + } + + if (!sourceTypeSummary.Sources.TryGetValue(sourceName, out var sourceSummary)) + { + sourceSummary = new SourceSummary + { + SourceName = sourceName + }; + sourceTypeSummary.Sources[sourceName] = sourceSummary; + } + + foreach (var destResult in result.DestinationsResults) + { + var destName = destResult.Destination.Name; + + if (!sourceSummary.Destinations.TryGetValue(destName, out var destSummary)) + { + destSummary = new DestinationStatusSummary + { + DestinationName = destName + }; + sourceSummary.Destinations[destName] = destSummary; + } + + switch (destResult.Status) + { + case TelemetryPublishResult.DestinationStatus.Passed: + destSummary.Passed++; + break; + case TelemetryPublishResult.DestinationStatus.Failed: + destSummary.Failed++; + break; + case TelemetryPublishResult.DestinationStatus.Postponed: + destSummary.Postponed++; + break; + case TelemetryPublishResult.DestinationStatus.Unavailable: + destSummary.Unavailable++; + break; + } + } + } + + return report; + }); + } + + + #endregion + #region Virtual Methods + /// <summary> + /// Called before a package is published to allow for canceling or preprocessing. + /// </summary> protected virtual bool OnPublishingPackage(TelemetryPublishPackage package, ITelemetryDestination destination) { try @@ -505,6 +941,9 @@ namespace Tango.Telemetry } } + /// <summary> + /// Called after a package has been successfully delivered to a destination. + /// </summary> protected virtual void OnPackagePublished(TelemetryPublishPackage package, ITelemetryDestination destination) { try @@ -514,6 +953,9 @@ namespace Tango.Telemetry catch { } } + /// <summary> + /// Called after a failed attempt to publish a telemetry package. + /// </summary> protected virtual void OnPackagePublishFailed(TelemetryPublishPackage package, ITelemetryDestination destination, Exception exception) { try @@ -523,10 +965,27 @@ namespace Tango.Telemetry catch { } } + /// <summary> + /// Called when a publish result is available after a complete publish pass. + /// </summary> + /// <param name="package">The package.</param> + /// <param name="result">The result.</param> + protected virtual void OnPublishResultAvailable(TelemetryPublishPackage package, TelemetryPublishResult result) + { + try + { + PublishResultAvailable?.Invoke(this, new TelemetryPublishResultAvailableEventArgs() { Package = package, PublishResult = result }); + } + catch { } + } + #endregion #region Dispose + /// <summary> + /// Disposes all sources, destinations, timers, and gracefully shuts down. + /// </summary> public void Dispose() { if (!_isDisposed) diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs index c4e7d3d3e..971afa864 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Tango.Integration.Operation; +using Tango.Logging; namespace Tango.Telemetry { @@ -21,15 +22,26 @@ namespace Tango.Telemetry } + // This method handles the complete lifecycle of publishing a telemetry package to all supported destinations. + // It supports exponential backoff retry logic, destination filtering, and persistence of failed deliveries + // to ensure fault tolerance and guaranteed eventual delivery across multiple transports. protected override async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package) { - Stopwatch totalWatch = Stopwatch.StartNew(); + LogManager.Log($"Starting publish for package from source '{package.Source?.Name}' of type '{package.SourceType}'", LogCategory.Debug); + Stopwatch totalWatch = Stopwatch.StartNew(); var result = new TelemetryPublishResult(); - if (!IsStarted || _isDisposed) return result; + // Exit early if the publisher has been stopped or disposed to avoid invalid operations. + if (!IsStarted || _isDisposed) + { + LogManager.Log("Publisher not active. Skipping.", LogCategory.Warning); + package.CompletionSource.SetResult(result); + return result; + } - List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>> + // Prepare metadata headers to be attached to the telemetry, which are common to all destinations. + var properties = new List<KeyValuePair<string, string>> { new KeyValuePair<string, string>("MachineID", Config.MachineID), new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()), @@ -39,13 +51,15 @@ namespace Tango.Telemetry var now = DateTime.UtcNow; var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); - // For Streaming/External: initialize pending destinations list (used if publishing fails) + // If this is a newly generated package, not a retry, initialize its destination list. + // This ensures we only attempt to publish to destinations that support this telemetry type. if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { foreach (var dest in Destinations) { - if (!pendingDestinations.Any(x => x.Name == dest.Name)) + if (dest.SupportedSourceTypes.Contains(package.SourceType) && !pendingDestinations.Any(x => x.Name == dest.Name)) { + // Initialize with default retry tracking values. pendingDestinations.Add(new TelemetryPendingDestination { Name = dest.Name, @@ -53,38 +67,40 @@ namespace Tango.Telemetry LastAttempt = DateTime.MinValue, NextEligibleAttempt = now }); + LogManager.Log($"Registered destination '{dest.Name}' for initial delivery.", LogCategory.Debug); } } } + // Iterate over each valid destination, applying retry backoff and delivery logic. foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType))) { var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name); - - if (pendingEntry == null) - continue; + if (pendingEntry == null) continue; Stopwatch destinationWatch = Stopwatch.StartNew(); - var destinationResult = new TelemetryPublishResult.DestinationResult(); destinationResult.Destination = destination; result.DestinationsResults.Add(destinationResult); - // Respect backoff timing + // If the destination is in a cool-down period due to previous failures, skip for now. if (now < pendingEntry.NextEligibleAttempt) { destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed; destinationResult.ElapsedTime = destinationWatch.Elapsed; + LogManager.Log($"Skipping '{destination.Name}' until {pendingEntry.NextEligibleAttempt:O} (backoff in effect).", LogCategory.Debug); continue; } try { + LogManager.Log($"Attempting publish to '{destination.Name}'...", LogCategory.Debug); if (OnPublishingPackage(package, destination)) { if (await destination.IsAvailable()) { + // Send the telemetry payload await destination.Publish(package, properties); OnPackagePublished(package, destination); @@ -92,87 +108,90 @@ namespace Tango.Telemetry destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; destinationResult.ElapsedTime = destinationWatch.Elapsed; - // On success: remove entry from pending list + LogManager.Log($"Successfully published to '{destination.Name}' in {destinationResult.ElapsedTime.TotalMilliseconds} ms."); + + // Remove from retry queue since delivery succeeded. pendingDestinations.RemoveAll(x => x.Name == destination.Name); } else { + // Destination is temporarily unreachable; apply retry logic if supported. destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; destinationResult.ElapsedTime = destinationWatch.Elapsed; - // Only track retry state if retry is supported + LogManager.Log($"'{destination.Name}' unavailable.", LogCategory.Warning); if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { - if (pendingEntry == null) - { - pendingEntry = new TelemetryPendingDestination { Name = destination.Name }; - pendingDestinations.Add(pendingEntry); - } - pendingEntry.RetryCount++; pendingEntry.LastAttempt = now; - - // Apply exponential backoff - int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); - pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds); + int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); + pendingEntry.NextEligibleAttempt = now.AddSeconds(delay); + LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds.", LogCategory.Debug); } else { - // Remove if not retryable + // Remove from retry list if the destination is not retryable. pendingDestinations.RemoveAll(x => x.Name == destination.Name); + LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending.", LogCategory.Debug); } } } } catch (Exception ex) { + // Network, serialization, or other critical failures are handled here. destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; destinationResult.Error = ex; destinationResult.ElapsedTime = destinationWatch.Elapsed; - LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); + LogManager.Log(ex, $"Error publishing to '{destination.Name}'."); OnPackagePublishFailed(package, destination, ex); - // Only track retry state if retry is supported if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { - if (pendingEntry == null) - { - pendingEntry = new TelemetryPendingDestination { Name = destination.Name }; - pendingDestinations.Add(pendingEntry); - } - + // Retry and Backoff Logic: + // -------------------------- + // Each destination maintains its own retry metadata: RetryCount, LastAttempt, and NextEligibleAttempt. + // On each failed delivery attempt, RetryCount is incremented and a delay is calculated using exponential backoff (2^RetryCount seconds). + // This delay is capped using MaxExponentialBackoff to prevent runaway delays. + // The system checks NextEligibleAttempt before retrying, and skips publishing if the backoff period has not yet elapsed. + // This ensures robust delivery across unstable networks and prevents retry storms. pendingEntry.RetryCount++; pendingEntry.LastAttempt = now; - - // Apply exponential backoff - int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); - pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds); + int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); + pendingEntry.NextEligibleAttempt = now.AddSeconds(delay); + LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds due to failure.", LogCategory.Debug); } else { - // Remove if not retryable pendingDestinations.RemoveAll(x => x.Name == destination.Name); + LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending after failure.", LogCategory.Debug); } } } + // Save updated retry state for this telemetry package package.PendingTelemetry.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations); + // Determine whether to remove the telemetry or persist it based on remaining destinations if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any()) { + LogManager.Log("Telemetry successfully delivered to all destinations. Deleting.", LogCategory.Debug); StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } else { - StorageManager.DeletePendingTelemetry(package.PendingTelemetry); + LogManager.Log("Updating pending telemetry record.", LogCategory.Debug); + StorageManager.UpsertPendingTelemetry(package.PendingTelemetry); } totalWatch.Stop(); result.TotalElapsedTime = totalWatch.Elapsed; + LogManager.Log($"Completed publish for source '{package.Source?.Name}' in {result.TotalElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug); + package.CompletionSource.SetResult(result); return result; } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs index 4a3776b87..a9f4954dd 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs @@ -1,22 +1,62 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using Tango.BL.Enumerations; namespace Tango.Telemetry { public class TelemetryPublisherConfiguration { - public String MachineID { get; set; } + /// <summary> + /// Unique identifier of the machine sending telemetry. Used for tagging and routing. + /// </summary> + public String SerialNumber { get; set; } + + /// <summary> + /// Enum representing the type of machine (e.g., X1, X4). Helps differentiate models in telemetry. + /// </summary> public MachineTypes MachineType { get; set; } + + /// <summary> + /// Environment in which the telemetry is being published (e.g., Production, QA, Dev). + /// </summary> public String Environment { get; set; } + + /// <summary> + /// Interval for checking and reprocessing failed/pending telemetry from local storage. + /// </summary> public TimeSpan PendingStorageCheckInterval { get; set; } + + /// <summary> + /// Maximum number of pending telemetry records to process in a single retry cycle. + /// </summary> public int MaxPendingStorageTelemetriesPerCycle { get; set; } + + /// <summary> + /// Frequency at which historical sources are polled to request backlogged or missed telemetry. + /// </summary> public TimeSpan HistorySourcesRequestInterval { get; set; } + + /// <summary> + /// Maximum number of telemetry packages allowed in memory queues before rejecting new packages. + /// </summary> public int MaxPendingTelemetries { get; set; } + /// <summary> + /// Whether exponential backoff should be applied to retry logic per destination. + /// </summary> + public bool EnableBackoff { get; set; } + + /// <summary> + /// The maximum amount of time to delay retries during exponential backoff. + /// </summary> + public TimeSpan MaxExponentialBackoff { get; set; } = TimeSpan.FromHours(1); + + /// <summary> + /// Gets or sets the interval at which the published telemetry cache cleanup process should occur. + /// This defines how frequently old published telemetry entries are eligible for pruning, + /// based on their publication timestamp. + /// </summary> + public TimeSpan PublishedTelemetriesCacheCleanupInterval { get; set; } = TimeSpan.FromHours(1); + public TelemetryPublisherConfiguration() { PendingStorageCheckInterval = TimeSpan.FromMinutes(1); @@ -27,8 +67,8 @@ namespace Tango.Telemetry public void Validate() { - if (!MachineID.IsNotNullOrEmpty()) - throw new ArgumentNullException(nameof(MachineID), "MachineID is not set or empty."); + if (!SerialNumber.IsNotNullOrEmpty()) + throw new ArgumentNullException(nameof(SerialNumber), "SerialNumber is not set or empty."); if (!Environment.IsNotNullOrEmpty()) throw new ArgumentNullException(nameof(Environment), "Environment is not set or empty."); diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs index 1ed80d5c9..3eb6ad6e0 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetrySourceTypes.cs @@ -6,10 +6,25 @@ using System.Threading.Tasks; namespace Tango.Telemetry { + /// <summary> + /// Specifies the origin type of a telemetry data package. + /// Used to control routing and behavior during telemetry publishing. + /// </summary> public enum TelemetrySourceTypes { + /// <summary> + /// Telemetry generated in real-time by a live telemetry streaming source. + /// </summary> Streaming, + + /// <summary> + /// Telemetry loaded on demand from historical sources such as logs or external databases. + /// </summary> ExternalStorage, - PendingStorage, + + /// <summary> + /// Telemetry restored from a local storage buffer due to prior publish failure. + /// </summary> + PendingStorage } } diff --git a/Software/Visual_Studio/Tango.Telemetry/app.config b/Software/Visual_Studio/Tango.Telemetry/app.config new file mode 100644 index 000000000..601f6dd75 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/app.config @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="utf-8"?> +<configuration> + <runtime> + <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1"> + <dependentAssembly> + <assemblyIdentity name="System.Runtime.InteropServices.RuntimeInformation" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-4.0.2.0" newVersion="4.0.2.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="System.Net.Http" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-4.2.0.0" newVersion="4.2.0.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-9.0.0.0" newVersion="9.0.0.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="System.Runtime.CompilerServices.Unsafe" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-4.0.4.1" newVersion="4.0.4.1" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="Microsoft.Azure.Amqp" publicKeyToken="31bf3856ad364e35" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-2.4.0.0" newVersion="2.4.0.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="Microsoft.Owin" publicKeyToken="31bf3856ad364e35" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-4.0.0.0" newVersion="4.0.0.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="Microsoft.WindowsAzure.Storage" publicKeyToken="31bf3856ad364e35" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-8.7.0.0" newVersion="8.7.0.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="DotNetty.Transport" publicKeyToken="bc13ca065fa06c29" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-0.6.0.0" newVersion="0.6.0.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="DotNetty.Buffers" publicKeyToken="bc13ca065fa06c29" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-0.6.0.0" newVersion="0.6.0.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="DotNetty.Codecs.Mqtt" publicKeyToken="bc13ca065fa06c29" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-0.6.0.0" newVersion="0.6.0.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="DotNetty.Common" publicKeyToken="bc13ca065fa06c29" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-0.6.0.0" newVersion="0.6.0.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="DotNetty.Handlers" publicKeyToken="bc13ca065fa06c29" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-0.6.0.0" newVersion="0.6.0.0" /> + </dependentAssembly> + <dependentAssembly> + <assemblyIdentity name="System.Reactive.Core" publicKeyToken="94bc3704cddfc263" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-3.0.3000.0" newVersion="3.0.3000.0" /> + </dependentAssembly> + </assemblyBinding> + </runtime> +</configuration>
\ No newline at end of file diff --git a/Software/Visual_Studio/Tango.Telemetry/packages.config b/Software/Visual_Studio/Tango.Telemetry/packages.config index 1a4b2c3c8..027500d67 100644 --- a/Software/Visual_Studio/Tango.Telemetry/packages.config +++ b/Software/Visual_Studio/Tango.Telemetry/packages.config @@ -6,14 +6,18 @@ <package id="DotNetty.Common" version="0.6.0" targetFramework="net461" /> <package id="DotNetty.Handlers" version="0.6.0" targetFramework="net461" /> <package id="DotNetty.Transport" version="0.6.0" targetFramework="net461" /> + <package id="EnterpriseLibrary.TransientFaultHandling" version="6.0.1304.0" targetFramework="net461" /> <package id="EntityFramework" version="6.2.0" targetFramework="net461" /> <package id="LiteDB" version="5.0.4" targetFramework="net461" /> <package id="Microsoft.AspNet.WebApi.Client" version="5.2.3" targetFramework="net461" /> <package id="Microsoft.Azure.Amqp" version="2.5.10" targetFramework="net461" /> - <package id="Microsoft.Azure.Devices.Client" version="1.41.0" targetFramework="net461" /> - <package id="Microsoft.Azure.Devices.Shared" version="1.30.1" targetFramework="net461" /> + <package id="Microsoft.Azure.Devices.Client" version="1.6.0" targetFramework="net461" /> + <package id="Microsoft.Azure.Devices.Shared" version="1.3.0" targetFramework="net461" /> <package id="Microsoft.Azure.KeyVault.Core" version="1.0.0" targetFramework="net461" /> <package id="Microsoft.CSharp" version="4.7.0" targetFramework="net461" /> + <package id="Microsoft.Data.Edm" version="5.8.2" targetFramework="net461" /> + <package id="Microsoft.Data.OData" version="5.8.2" targetFramework="net461" /> + <package id="Microsoft.Data.Services.Client" version="5.8.2" targetFramework="net461" /> <package id="Microsoft.Extensions.DependencyInjection.Abstractions" version="1.1.0" targetFramework="net461" /> <package id="Microsoft.Extensions.Logging" version="1.1.1" targetFramework="net461" /> <package id="Microsoft.Extensions.Logging.Abstractions" version="1.1.1" targetFramework="net461" /> @@ -22,18 +26,25 @@ <package id="Microsoft.Win32.Primitives" version="4.3.0" targetFramework="net461" /> <package id="MQTTnet" version="3.1.2" targetFramework="net461" /> <package id="NETStandard.Library" version="1.6.1" targetFramework="net461" /> - <package id="Newtonsoft.Json" version="12.0.3" targetFramework="net461" /> + <package id="Newtonsoft.Json" version="9.0.1" targetFramework="net461" /> <package id="Owin" version="1.0" targetFramework="net461" /> + <package id="PCLCrypto" version="2.0.147" targetFramework="net461" /> + <package id="PInvoke.BCrypt" version="0.3.2" targetFramework="net461" /> + <package id="PInvoke.Kernel32" version="0.3.2" targetFramework="net461" /> + <package id="PInvoke.NCrypt" version="0.3.2" targetFramework="net461" /> + <package id="PInvoke.Windows.Core" version="0.3.2" targetFramework="net461" /> <package id="System.AppContext" version="4.3.0" targetFramework="net461" /> <package id="System.Collections" version="4.3.0" targetFramework="net461" /> <package id="System.Collections.Concurrent" version="4.3.0" targetFramework="net461" /> <package id="System.Collections.Immutable" version="1.5.0" targetFramework="net461" /> <package id="System.ComponentModel" version="4.3.0" targetFramework="net461" /> + <package id="System.ComponentModel.EventBasedAsync" version="4.0.11" targetFramework="net461" /> <package id="System.Console" version="4.3.0" targetFramework="net461" /> <package id="System.Diagnostics.Debug" version="4.3.0" targetFramework="net461" /> <package id="System.Diagnostics.DiagnosticSource" version="4.3.0" targetFramework="net461" /> <package id="System.Diagnostics.Tools" version="4.3.0" targetFramework="net461" /> <package id="System.Diagnostics.Tracing" version="4.3.0" targetFramework="net461" /> + <package id="System.Dynamic.Runtime" version="4.0.0" targetFramework="net461" /> <package id="System.Globalization" version="4.3.0" targetFramework="net461" /> <package id="System.Globalization.Calendars" version="4.3.0" targetFramework="net461" /> <package id="System.IO" version="4.3.0" targetFramework="net461" /> @@ -43,8 +54,10 @@ <package id="System.IO.FileSystem.Primitives" version="4.3.0" targetFramework="net461" /> <package id="System.Linq" version="4.3.0" targetFramework="net461" /> <package id="System.Linq.Expressions" version="4.3.0" targetFramework="net461" /> + <package id="System.Linq.Queryable" version="4.0.0" targetFramework="net461" /> <package id="System.Net.Http" version="4.3.0" targetFramework="net461" /> <package id="System.Net.Primitives" version="4.3.0" targetFramework="net461" /> + <package id="System.Net.Requests" version="4.0.11" targetFramework="net461" /> <package id="System.Net.Sockets" version="4.3.0" targetFramework="net461" /> <package id="System.ObjectModel" version="4.3.0" targetFramework="net461" /> <package id="System.Reflection" version="4.3.0" targetFramework="net461" /> @@ -62,6 +75,7 @@ <package id="System.Security.Cryptography.Encoding" version="4.3.0" targetFramework="net461" /> <package id="System.Security.Cryptography.Primitives" version="4.3.0" targetFramework="net461" /> <package id="System.Security.Cryptography.X509Certificates" version="4.3.0" targetFramework="net461" /> + <package id="System.Spatial" version="5.8.2" targetFramework="net461" /> <package id="System.Text.Encoding" version="4.3.0" targetFramework="net461" /> <package id="System.Text.Encoding.Extensions" version="4.3.0" targetFramework="net461" /> <package id="System.Text.RegularExpressions" version="4.3.0" targetFramework="net461" /> @@ -71,5 +85,6 @@ <package id="System.Threading.Timer" version="4.3.0" targetFramework="net461" /> <package id="System.Xml.ReaderWriter" version="4.3.0" targetFramework="net461" /> <package id="System.Xml.XDocument" version="4.3.0" targetFramework="net461" /> - <package id="WindowsAzure.Storage" version="9.3.2" targetFramework="net461" /> + <package id="Validation" version="2.2.8" targetFramework="net461" /> + <package id="WindowsAzure.Storage" version="8.7.0" targetFramework="net461" /> </packages>
\ No newline at end of file |
