diff options
Diffstat (limited to 'Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs')
| -rw-r--r-- | Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs b/Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs new file mode 100644 index 000000000..1cd40e523 --- /dev/null +++ b/Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs @@ -0,0 +1,161 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Table; +using Tango.PPC.Common.Web; + +namespace Tango.MachineService.Telemetry +{ + internal class TelemetryCheckpointEntity : TableEntity + { + public TelemetryCheckpointEntity() { } + + public TelemetryCheckpointEntity(string machineGuid, string sourceName) + { + PartitionKey = machineGuid; + RowKey = SanitizeRowKey(sourceName); + } + + public DateTime Time { get; set; } + public int TotalCount { get; set; } + + internal static string SanitizeRowKey(string value) => + (value ?? string.Empty) + .Replace("/", "_").Replace("\\", "_") + .Replace("#", "_").Replace("?", "_"); + } + + /// <summary> + /// Store/get telemetry source checkpoints in Azure Table Storage. + /// PartitionKey = MachineGuid, RowKey = SourceName. + /// </summary> + public sealed class TelemetryCheckpointStore + { + private readonly CloudTable _table; + + /// <param name="connectionString">Azure Storage connection string</param> + /// <param name="tableName">Table name (default: TelemetryCheckPoints)</param> + public TelemetryCheckpointStore(string connectionString, string tableName = "TelemetryCheckPoints") + { + if (string.IsNullOrWhiteSpace(connectionString)) throw new ArgumentNullException(nameof(connectionString)); + if (string.IsNullOrWhiteSpace(tableName)) throw new ArgumentNullException(nameof(tableName)); + + var storageAccount = CloudStorageAccount.Parse(connectionString); + var tableClient = storageAccount.CreateCloudTableClient(); + + _table = tableClient.GetTableReference(tableName); + _table.CreateIfNotExists(); // sync + } + + /// <summary>Upsert a single checkpoint for a machine + source (synchronous).</summary> + public void Save(string machineGuid, TelemetryCheckPoint checkpoint) + { + if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid)); + if (checkpoint == null) throw new ArgumentNullException(nameof(checkpoint)); + if (string.IsNullOrWhiteSpace(checkpoint.SourceName)) throw new ArgumentNullException(nameof(checkpoint.SourceName)); + + var entity = new TelemetryCheckpointEntity(machineGuid, checkpoint.SourceName) + { + Time = checkpoint.Time, + TotalCount = checkpoint.TotalCount + }; + + var op = TableOperation.InsertOrReplace(entity); + _table.Execute(op); + } + + /// <summary>Upsert up to N checkpoints in batches of 100 (Table Storage limit) — synchronous.</summary> + public void SaveMany(string machineGuid, IEnumerable<TelemetryCheckPoint> checkpoints) + { + if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid)); + if (checkpoints == null) return; + + var batch = new TableBatchOperation(); + foreach (var cp in checkpoints) + { + if (cp == null || string.IsNullOrWhiteSpace(cp.SourceName)) continue; + + var entity = new TelemetryCheckpointEntity(machineGuid, cp.SourceName) + { + Time = cp.Time, + TotalCount = cp.TotalCount + }; + + batch.InsertOrReplace(entity); + + if (batch.Count == 100) + { + _table.ExecuteBatch(batch); + batch.Clear(); + } + } + + if (batch.Count > 0) + { + _table.ExecuteBatch(batch); + } + } + + /// <summary>Get the checkpoint for a specific machine + source. Returns null if not found. (sync)</summary> + public TelemetryCheckPoint Get(string machineGuid, string sourceName) + { + if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid)); + if (string.IsNullOrWhiteSpace(sourceName)) throw new ArgumentNullException(nameof(sourceName)); + + var rowKey = TelemetryCheckpointEntity.SanitizeRowKey(sourceName); + var op = TableOperation.Retrieve<TelemetryCheckpointEntity>(machineGuid, rowKey); + var result = _table.Execute(op); + + var entity = result.Result as TelemetryCheckpointEntity; + return entity == null ? null : new TelemetryCheckPoint + { + SourceName = sourceName, + Time = entity.Time, + TotalCount = entity.TotalCount + }; + } + + /// <summary>Get all checkpoints for a machine (sync). Uses ExecuteQuery which handles paging internally.</summary> + public IReadOnlyList<TelemetryCheckPoint> GetAllForMachine(string machineGuid) + { + if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid)); + + var query = new TableQuery<TelemetryCheckpointEntity>() + .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, machineGuid)); + + // ExecuteQuery is synchronous and iterates all segments internally + var entities = _table.ExecuteQuery(query); + + var list = new List<TelemetryCheckPoint>(); + foreach (var e in entities) + { + list.Add(new TelemetryCheckPoint + { + // RowKey is sanitized; ensure you pass sanitized name when saving & querying + SourceName = e.RowKey, + Time = e.Time, + TotalCount = e.TotalCount + }); + } + + return list; + } + + /// <summary>Delete a specific checkpoint (sync). No-op if not found.</summary> + public void Delete(string machineGuid, string sourceName) + { + if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid)); + if (string.IsNullOrWhiteSpace(sourceName)) throw new ArgumentNullException(nameof(sourceName)); + + var rowKey = TelemetryCheckpointEntity.SanitizeRowKey(sourceName); + var retrieve = TableOperation.Retrieve<TelemetryCheckpointEntity>(machineGuid, rowKey); + var result = _table.Execute(retrieve); + var entity = result.Result as TelemetryCheckpointEntity; + if (entity == null) return; + + var del = TableOperation.Delete(entity); + _table.Execute(del); + } + } +} |
