diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-08-02 21:38:19 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-08-02 21:38:19 +0300 |
| commit | 0df9f37075dd697ac34f4ed2a2749f62aa27a654 (patch) | |
| tree | 5d95103b41d4954eff9f266317c5a525e9a0e3e9 /Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs | |
| parent | 4222eddece906d6f0877022c06b853deb5068472 (diff) | |
| download | Tango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.tar.gz Tango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.zip | |
Telemetry Testing.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs | 193 |
1 files changed, 178 insertions, 15 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs index 2ceb95298..6700ba8af 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs @@ -5,45 +5,131 @@ using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; +using Tango.Core; +using Tango.Core.ExtensionMethods; +using Tango.Logging; namespace Tango.Telemetry { - public class TelemetryLiteDBStorageManager : ITelemetryStorageManager + public class TelemetryLiteDBStorageManager : ExtendedObject, ITelemetryStorageManager { + public class PublishedTelemetry + { + public String ID { get; set; } + public DateTime CreatedAt { get; set; } + } + private bool _disposed; private LiteDatabase _database; private static Object _lock = new object(); + private ITelemetryCheckpointsRecoveryClient _checkpointsRecoveryClient; + private HashSet<String> _publishedTelemetriesIDs; + private DateTime _lastCloudBackupTime = DateTime.MinValue; + private TimeSpan _checkpointsBackupInterval = TimeSpan.FromMinutes(1); + + public TimeSpan CheckpointsBackupInterval { get => _checkpointsBackupInterval; set => _checkpointsBackupInterval = value >= TimeSpan.FromMinutes(1) ? value : TimeSpan.FromMinutes(1); } public String DatabasePath { get; private set; } + public bool EnableCheckPointsRecovery { get; set; } + public bool EnforceCheckpointsRecovery { get; set; } + public TelemetryLiteDBStorageManager() { + _publishedTelemetriesIDs = new HashSet<string>(); + EnableCheckPointsRecovery = true; + EnforceCheckpointsRecovery = true; DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry"); + } + + public TelemetryLiteDBStorageManager(String databaseFile) : this() + { + DatabasePath = databaseFile; + } + + public async Task Init(ITelemetryCheckpointsRecoveryClient checkpointsRecoveryClient) + { + LogManager.Log("Initializing telemetry database..."); + + _checkpointsRecoveryClient = checkpointsRecoveryClient; + Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath)); + String backupPath = DatabasePath + ".bak"; + + if (!File.Exists(DatabasePath)) + { + if (File.Exists(backupPath)) + { + LogManager.Log("Telemetry database missing. Attempting to restore from backup.", LogCategory.Error); + File.Copy(backupPath, DatabasePath, overwrite: true); + } + else + { + LogManager.Log("Telemetry database was not found. A new one will be created and source checkpoints will be recovered from the remote service if required.", LogCategory.Critical); + + if (_checkpointsRecoveryClient == null && EnableCheckPointsRecovery && EnforceCheckpointsRecovery) + { + throw new NullReferenceException("No TelemetryCheckpointsRecoveryClient was introduced. Telemetry Storage manager should not operate."); + } + } + } + _database = new LiteDatabase($"Filename={DatabasePath}"); _database.Pragma("TIMEOUT", 10); //Read Timeout _database.Pragma("UTC_DATE", true); //Keep time as UTC when getting data _database.Commit(); - } - public virtual void Dispose() - { - if (_database != null && !_disposed) + var checkPointsCollection = GetSourcesCheckpointCollection(); + var localCheckPoints = checkPointsCollection.FindAll().ToList(); + + if (localCheckPoints.Count == 0) { - try + if (EnableCheckPointsRecovery) { - _disposed = true; - _database.Dispose(); - _database = null; + try + { + LogManager.Log("Attempting to retrieve sources checkpoints from backup..."); + var remoteCheckPoints = await _checkpointsRecoveryClient.GetCheckpointsBackup(); + if (remoteCheckPoints.Count > 0) + { + checkPointsCollection.InsertBulk(remoteCheckPoints); + LogManager.Log($"Sources checkpoints successfully recovered.\n{remoteCheckPoints.ToJsonString()}"); + } + else + { + LogManager.Log("No sources checkpoint found on backup. Assuming first operation..."); + } + } + catch (Exception ex) + { + if (EnforceCheckpointsRecovery) + { + LogManager.Log(ex, LogCategory.Critical, "Could not retrieve sources checkpoints from backup. Telemetry storage manager should not operate."); + throw; + } + else + { + LogManager.Log(ex, LogCategory.Warning, "Could not retrieve sources checkpoints from backup. No Checkpoints available!"); + } + } } - catch { } } + else + { + var minDateTime = localCheckPoints.Min(x => x.Time); + PerformPublishedTelemetriesCleanUp(minDateTime); + } + + LogManager.Log("Loading published telemetries cache..."); + _publishedTelemetriesIDs = new HashSet<string>(GetPublishedTelemetriesCollection().FindAll().Select(x => x.ID).ToList()); + + LogManager.Log("Telemetry LiteDB storage manager initialized..."); } - ~TelemetryLiteDBStorageManager() + private ILiteCollection<PublishedTelemetry> GetPublishedTelemetriesCollection() { - Dispose(); + return _database.GetCollection<PublishedTelemetry>("PublishedTelemetries"); } private ILiteCollection<PendingTelemetry> GetPendingTelemetriesCollection() @@ -79,7 +165,8 @@ namespace Tango.Telemetry lock (_lock) { var collection = GetPendingTelemetriesCollection(); - return collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList(); + var pendingTelemetries = collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList(); + return pendingTelemetries; } } @@ -88,7 +175,26 @@ namespace Tango.Telemetry lock (_lock) { var collection = GetSourcesCheckpointCollection(); - return collection.FindOne(x => x.SourceName == source.Name); + var checkpoint = collection.FindOne(x => x.SourceName == source.Name); + + if (checkpoint == null) + { + checkpoint = new TelemetryHistorySourceCheckPoint(); + checkpoint.SourceName = source.Name; + checkpoint.Time = DateTime.MinValue; + } + + return checkpoint; + } + } + + public List<TelemetryHistorySourceCheckPoint> GetHistorySourcesCheckPoints() + { + lock (_lock) + { + var collection = GetSourcesCheckpointCollection(); + var checkpoints = collection.FindAll().ToList(); + return checkpoints; } } @@ -98,6 +204,24 @@ namespace Tango.Telemetry { var collection = GetSourcesCheckpointCollection(); collection.Upsert(new TelemetryHistorySourceCheckPoint() { SourceName = source.Name, Time = time, TotalCount = totalCount }); + + if (_checkpointsRecoveryClient != null && DateTime.UtcNow - _lastCloudBackupTime > CheckpointsBackupInterval) + { + _lastCloudBackupTime = DateTime.UtcNow; + Task.Run(async () => + { + try + { + var allCheckpoints = collection.FindAll().ToList(); + await _checkpointsRecoveryClient.SaveCheckpointsBackup(allCheckpoints); + LogManager.Log("Sources checkpoints successfully backed up to remote service."); + } + catch (Exception ex) + { + LogManager.Log(ex, LogCategory.Warning, "Failed to back up checkpoints to remote service."); + } + }); + } } } @@ -106,8 +230,47 @@ namespace Tango.Telemetry lock (_lock) { var collection = GetPendingTelemetriesCollection(); - return collection.Count(); + var count = collection.Count(); + return count; + } + } + + public void AddToPublishedTelemetryCache(ITelemetry telemetry) + { + _publishedTelemetriesIDs.Add(telemetry.ID); + GetPublishedTelemetriesCollection().Insert(new PublishedTelemetry() { CreatedAt = DateTime.UtcNow, ID = telemetry.ID }); + } + + public bool IsTelemetryInPublishedCache(ITelemetry telementry) + { + return _publishedTelemetriesIDs.Contains(telementry.ID); + } + + public void PerformPublishedTelemetriesCleanUp(DateTime olderThan) + { + LogManager.Log("Performing published telemetries cache cleanup..."); + var collection = GetSourcesCheckpointCollection(); + int deleted = collection.DeleteMany(x => x.Time < olderThan); + LogManager.Log($"Published telemetries cleanup completed. {deleted} cleaned."); + } + + public virtual void Dispose() + { + if (_database != null && !_disposed) + { + try + { + _disposed = true; + _database.Dispose(); + _database = null; + } + catch { } } } + + ~TelemetryLiteDBStorageManager() + { + Dispose(); + } } } |
