aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
diff options
context:
space:
mode:
authorRoy Ben Shabat <Roy.mail.net@gmail.com>2025-08-02 21:38:19 +0300
committerRoy Ben Shabat <Roy.mail.net@gmail.com>2025-08-02 21:38:19 +0300
commit0df9f37075dd697ac34f4ed2a2749f62aa27a654 (patch)
tree5d95103b41d4954eff9f266317c5a525e9a0e3e9 /Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
parent4222eddece906d6f0877022c06b853deb5068472 (diff)
downloadTango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.tar.gz
Tango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.zip
Telemetry Testing.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs193
1 files changed, 178 insertions, 15 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
index 2ceb95298..6700ba8af 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
@@ -5,45 +5,131 @@ using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
+using Tango.Core;
+using Tango.Core.ExtensionMethods;
+using Tango.Logging;
namespace Tango.Telemetry
{
- public class TelemetryLiteDBStorageManager : ITelemetryStorageManager
+ public class TelemetryLiteDBStorageManager : ExtendedObject, ITelemetryStorageManager
{
+ public class PublishedTelemetry
+ {
+ public String ID { get; set; }
+ public DateTime CreatedAt { get; set; }
+ }
+
private bool _disposed;
private LiteDatabase _database;
private static Object _lock = new object();
+ private ITelemetryCheckpointsRecoveryClient _checkpointsRecoveryClient;
+ private HashSet<String> _publishedTelemetriesIDs;
+ private DateTime _lastCloudBackupTime = DateTime.MinValue;
+ private TimeSpan _checkpointsBackupInterval = TimeSpan.FromMinutes(1);
+
+ public TimeSpan CheckpointsBackupInterval { get => _checkpointsBackupInterval; set => _checkpointsBackupInterval = value >= TimeSpan.FromMinutes(1) ? value : TimeSpan.FromMinutes(1); }
public String DatabasePath { get; private set; }
+ public bool EnableCheckPointsRecovery { get; set; }
+ public bool EnforceCheckpointsRecovery { get; set; }
+
public TelemetryLiteDBStorageManager()
{
+ _publishedTelemetriesIDs = new HashSet<string>();
+ EnableCheckPointsRecovery = true;
+ EnforceCheckpointsRecovery = true;
DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry");
+ }
+
+ public TelemetryLiteDBStorageManager(String databaseFile) : this()
+ {
+ DatabasePath = databaseFile;
+ }
+
+ public async Task Init(ITelemetryCheckpointsRecoveryClient checkpointsRecoveryClient)
+ {
+ LogManager.Log("Initializing telemetry database...");
+
+ _checkpointsRecoveryClient = checkpointsRecoveryClient;
+
Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath));
+ String backupPath = DatabasePath + ".bak";
+
+ if (!File.Exists(DatabasePath))
+ {
+ if (File.Exists(backupPath))
+ {
+ LogManager.Log("Telemetry database missing. Attempting to restore from backup.", LogCategory.Error);
+ File.Copy(backupPath, DatabasePath, overwrite: true);
+ }
+ else
+ {
+ LogManager.Log("Telemetry database was not found. A new one will be created and source checkpoints will be recovered from the remote service if required.", LogCategory.Critical);
+
+ if (_checkpointsRecoveryClient == null && EnableCheckPointsRecovery && EnforceCheckpointsRecovery)
+ {
+ throw new NullReferenceException("No TelemetryCheckpointsRecoveryClient was introduced. Telemetry Storage manager should not operate.");
+ }
+ }
+ }
+
_database = new LiteDatabase($"Filename={DatabasePath}");
_database.Pragma("TIMEOUT", 10); //Read Timeout
_database.Pragma("UTC_DATE", true); //Keep time as UTC when getting data
_database.Commit();
- }
- public virtual void Dispose()
- {
- if (_database != null && !_disposed)
+ var checkPointsCollection = GetSourcesCheckpointCollection();
+ var localCheckPoints = checkPointsCollection.FindAll().ToList();
+
+ if (localCheckPoints.Count == 0)
{
- try
+ if (EnableCheckPointsRecovery)
{
- _disposed = true;
- _database.Dispose();
- _database = null;
+ try
+ {
+ LogManager.Log("Attempting to retrieve sources checkpoints from backup...");
+ var remoteCheckPoints = await _checkpointsRecoveryClient.GetCheckpointsBackup();
+ if (remoteCheckPoints.Count > 0)
+ {
+ checkPointsCollection.InsertBulk(remoteCheckPoints);
+ LogManager.Log($"Sources checkpoints successfully recovered.\n{remoteCheckPoints.ToJsonString()}");
+ }
+ else
+ {
+ LogManager.Log("No sources checkpoint found on backup. Assuming first operation...");
+ }
+ }
+ catch (Exception ex)
+ {
+ if (EnforceCheckpointsRecovery)
+ {
+ LogManager.Log(ex, LogCategory.Critical, "Could not retrieve sources checkpoints from backup. Telemetry storage manager should not operate.");
+ throw;
+ }
+ else
+ {
+ LogManager.Log(ex, LogCategory.Warning, "Could not retrieve sources checkpoints from backup. No Checkpoints available!");
+ }
+ }
}
- catch { }
}
+ else
+ {
+ var minDateTime = localCheckPoints.Min(x => x.Time);
+ PerformPublishedTelemetriesCleanUp(minDateTime);
+ }
+
+ LogManager.Log("Loading published telemetries cache...");
+ _publishedTelemetriesIDs = new HashSet<string>(GetPublishedTelemetriesCollection().FindAll().Select(x => x.ID).ToList());
+
+ LogManager.Log("Telemetry LiteDB storage manager initialized...");
}
- ~TelemetryLiteDBStorageManager()
+ private ILiteCollection<PublishedTelemetry> GetPublishedTelemetriesCollection()
{
- Dispose();
+ return _database.GetCollection<PublishedTelemetry>("PublishedTelemetries");
}
private ILiteCollection<PendingTelemetry> GetPendingTelemetriesCollection()
@@ -79,7 +165,8 @@ namespace Tango.Telemetry
lock (_lock)
{
var collection = GetPendingTelemetriesCollection();
- return collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList();
+ var pendingTelemetries = collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList();
+ return pendingTelemetries;
}
}
@@ -88,7 +175,26 @@ namespace Tango.Telemetry
lock (_lock)
{
var collection = GetSourcesCheckpointCollection();
- return collection.FindOne(x => x.SourceName == source.Name);
+ var checkpoint = collection.FindOne(x => x.SourceName == source.Name);
+
+ if (checkpoint == null)
+ {
+ checkpoint = new TelemetryHistorySourceCheckPoint();
+ checkpoint.SourceName = source.Name;
+ checkpoint.Time = DateTime.MinValue;
+ }
+
+ return checkpoint;
+ }
+ }
+
+ public List<TelemetryHistorySourceCheckPoint> GetHistorySourcesCheckPoints()
+ {
+ lock (_lock)
+ {
+ var collection = GetSourcesCheckpointCollection();
+ var checkpoints = collection.FindAll().ToList();
+ return checkpoints;
}
}
@@ -98,6 +204,24 @@ namespace Tango.Telemetry
{
var collection = GetSourcesCheckpointCollection();
collection.Upsert(new TelemetryHistorySourceCheckPoint() { SourceName = source.Name, Time = time, TotalCount = totalCount });
+
+ if (_checkpointsRecoveryClient != null && DateTime.UtcNow - _lastCloudBackupTime > CheckpointsBackupInterval)
+ {
+ _lastCloudBackupTime = DateTime.UtcNow;
+ Task.Run(async () =>
+ {
+ try
+ {
+ var allCheckpoints = collection.FindAll().ToList();
+ await _checkpointsRecoveryClient.SaveCheckpointsBackup(allCheckpoints);
+ LogManager.Log("Sources checkpoints successfully backed up to remote service.");
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, LogCategory.Warning, "Failed to back up checkpoints to remote service.");
+ }
+ });
+ }
}
}
@@ -106,8 +230,47 @@ namespace Tango.Telemetry
lock (_lock)
{
var collection = GetPendingTelemetriesCollection();
- return collection.Count();
+ var count = collection.Count();
+ return count;
+ }
+ }
+
+ public void AddToPublishedTelemetryCache(ITelemetry telemetry)
+ {
+ _publishedTelemetriesIDs.Add(telemetry.ID);
+ GetPublishedTelemetriesCollection().Insert(new PublishedTelemetry() { CreatedAt = DateTime.UtcNow, ID = telemetry.ID });
+ }
+
+ public bool IsTelemetryInPublishedCache(ITelemetry telementry)
+ {
+ return _publishedTelemetriesIDs.Contains(telementry.ID);
+ }
+
+ public void PerformPublishedTelemetriesCleanUp(DateTime olderThan)
+ {
+ LogManager.Log("Performing published telemetries cache cleanup...");
+ var collection = GetSourcesCheckpointCollection();
+ int deleted = collection.DeleteMany(x => x.Time < olderThan);
+ LogManager.Log($"Published telemetries cleanup completed. {deleted} cleaned.");
+ }
+
+ public virtual void Dispose()
+ {
+ if (_database != null && !_disposed)
+ {
+ try
+ {
+ _disposed = true;
+ _database.Dispose();
+ _database = null;
+ }
+ catch { }
}
}
+
+ ~TelemetryLiteDBStorageManager()
+ {
+ Dispose();
+ }
}
}