using LiteDB; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; using Tango.Core; using Tango.Core.ExtensionMethods; using Tango.Logging; using Tango.Telemetry.Helpers; namespace Tango.Telemetry { public class TelemetryLiteDBStorageManager : ExtendedObject, ITelemetryStorageManager { public class PublishedTelemetry { public String ID { get; set; } public DateTime CreatedAt { get; set; } } private bool _disposed; private LiteDatabase _database; private static Object _lock = new object(); private ITelemetryCheckpointsRecoveryClient _checkpointsRecoveryClient; private HashSet _publishedTelemetriesIDs; private DateTime _lastCloudBackupTime = DateTime.MinValue; private TimeSpan _checkpointsBackupInterval = TimeSpan.FromMinutes(1); public TimeSpan CheckpointsBackupInterval { get => _checkpointsBackupInterval; set => _checkpointsBackupInterval = value >= TimeSpan.FromMinutes(1) ? value : TimeSpan.FromMinutes(1); } public String DatabasePath { get; private set; } public bool EnableCheckPointsRecovery { get; set; } public bool EnforceCheckpointsRecovery { get; set; } public TelemetryLiteDBStorageManager() { _publishedTelemetriesIDs = new HashSet(); EnableCheckPointsRecovery = true; EnforceCheckpointsRecovery = true; DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry"); } public TelemetryLiteDBStorageManager(String databaseFile) : this() { DatabasePath = databaseFile; } public async Task Init(ITelemetryCheckpointsRecoveryClient checkpointsRecoveryClient) { LogManager.Log("Initializing telemetry database..."); _checkpointsRecoveryClient = checkpointsRecoveryClient; Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath)); if (!File.Exists(DatabasePath)) { LogManager.Log("Telemetry database was not found. A new one will be created and source checkpoints will be recovered from the remote service if required.", LogCategory.Critical); if (_checkpointsRecoveryClient == null && EnableCheckPointsRecovery && EnforceCheckpointsRecovery) { throw new NullReferenceException("No TelemetryCheckpointsRecoveryClient was introduced. Telemetry Storage manager should not operate."); } } _database = new LiteDatabase($"Filename={DatabasePath}"); _database.Pragma("TIMEOUT", 10); //Read Timeout _database.Pragma("UTC_DATE", true); //Keep time as UTC when getting data _database.Commit(); var checkPointsCollection = GetSourcesCheckpointCollection(); var localCheckPoints = checkPointsCollection.FindAll().ToList(); if (localCheckPoints.Count == 0) { if (EnableCheckPointsRecovery) { try { LogManager.Log("Attempting to retrieve sources checkpoints from backup..."); var remoteCheckPoints = await _checkpointsRecoveryClient.GetCheckpointsBackup(); if (remoteCheckPoints.Count > 0) { checkPointsCollection.InsertBulk(remoteCheckPoints); LogManager.Log($"Sources checkpoints successfully recovered.\n{remoteCheckPoints.ToJsonString()}"); } else { LogManager.Log("No sources checkpoint found on backup. Assuming first operation..."); } } catch (Exception ex) { if (EnforceCheckpointsRecovery) { LogManager.Log(ex, LogCategory.Critical, "Could not retrieve sources checkpoints from backup. Telemetry storage manager should not operate."); throw; } else { LogManager.Log(ex, LogCategory.Warning, "Could not retrieve sources checkpoints from backup. No Checkpoints available!"); } } } } else { var minDateTime = localCheckPoints.Min(x => x.Time); PerformPublishedTelemetriesCleanUp(minDateTime); } LogManager.Log("Loading published telemetries cache..."); _publishedTelemetriesIDs = new HashSet(GetPublishedTelemetriesCollection().FindAll().Select(x => x.ID).ToList()); LogManager.Log("Telemetry LiteDB storage manager initialized..."); } private ILiteCollection GetPublishedTelemetriesCollection() { return _database.GetCollection("PublishedTelemetries"); } private ILiteCollection GetPendingTelemetriesCollection() { return _database.GetCollection("PendingTelemetries"); } private ILiteCollection GetSourcesCheckpointCollection() { return _database.GetCollection("SourcesCheckPoints"); } public void UpsertPendingTelemetry(PendingTelemetry pendingTelemetry) { lock (_lock) { var collection = GetPendingTelemetriesCollection(); //Ensure all datetimes "Kind" is UTC so LiteDB won't change them on query. DateTimeUtcFixer.EnsureDateTimeUTC(pendingTelemetry.TelemetryObject); collection.Upsert(pendingTelemetry); } } public void DeletePendingTelemetry(PendingTelemetry pendingTelemetry) { lock (_lock) { var collection = GetPendingTelemetriesCollection(); collection.Delete(pendingTelemetry.Id); } } public List GetPendingTelemetries(int maxCount) { lock (_lock) { var collection = GetPendingTelemetriesCollection(); var pendingTelemetries = collection.FindAll().OrderBy(x => x.TelemetryObject.Time).Take(Math.Max(maxCount, 1)).ToList(); return pendingTelemetries; } } public TelemetryHistorySourceCheckPoint GetHistorySourceCheckPoint(ITelemetryHistorySource source) { lock (_lock) { var collection = GetSourcesCheckpointCollection(); var checkpoint = collection.FindOne(x => x.SourceName == source.Name); return checkpoint; } } public List GetHistorySourcesCheckPoints() { lock (_lock) { var collection = GetSourcesCheckpointCollection(); var checkpoints = collection.FindAll().ToList(); return checkpoints; } } public void SetHistorySourceCheckPoint(ITelemetryHistorySource source, DateTime time, int totalCount) { var a = time.Kind; lock (_lock) { DateTime utcTime = DateTime.SpecifyKind(time, DateTimeKind.Utc); var collection = GetSourcesCheckpointCollection(); collection.Upsert(new TelemetryHistorySourceCheckPoint() { SourceName = source.Name, Time = utcTime, TotalCount = totalCount }); if (_checkpointsRecoveryClient != null && DateTime.UtcNow - _lastCloudBackupTime > CheckpointsBackupInterval) { _lastCloudBackupTime = DateTime.UtcNow; Task.Run(async () => { try { var allCheckpoints = collection.FindAll().ToList(); await _checkpointsRecoveryClient.SaveCheckpointsBackup(allCheckpoints); LogManager.Log("Sources checkpoints successfully backed up to remote service."); } catch (Exception ex) { LogManager.Log(ex, LogCategory.Warning, "Failed to back up checkpoints to remote service."); } }); } } } public int GetPendingTelemetriesCount() { lock (_lock) { var collection = GetPendingTelemetriesCollection(); var count = collection.Count(); return count; } } public void AddToPublishedTelemetryCache(ITelemetry telemetry) { _publishedTelemetriesIDs.Add(telemetry.ID); GetPublishedTelemetriesCollection().Insert(new PublishedTelemetry() { CreatedAt = DateTime.UtcNow, ID = telemetry.ID }); } public bool IsTelemetryInPublishedCache(ITelemetry telementry) { return _publishedTelemetriesIDs.Contains(telementry.ID); } public void PerformPublishedTelemetriesCleanUp(DateTime olderThan) { LogManager.Log("Performing published telemetries cache cleanup..."); var collection = GetPublishedTelemetriesCollection(); int deleted = collection.DeleteMany(x => x.CreatedAt < olderThan); LogManager.Log($"Published telemetries cleanup completed. {deleted} cleaned."); } public virtual void Dispose() { if (_database != null && !_disposed) { try { _disposed = true; _database.Dispose(); _database = null; } catch { } } } ~TelemetryLiteDBStorageManager() { Dispose(); } } }