using System; using System.Collections.Generic; using System.Linq; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Table; using Tango.PPC.Common.Web; namespace Tango.MachineService.Telemetry { internal class TelemetryCheckpointEntity : TableEntity { public TelemetryCheckpointEntity() { } public TelemetryCheckpointEntity(string machineGuid, string sourceName) { PartitionKey = machineGuid; RowKey = SanitizeRowKey(sourceName); } public DateTime Time { get; set; } public int TotalCount { get; set; } internal static string SanitizeRowKey(string value) => (value ?? string.Empty) .Replace("/", "_").Replace("\\", "_") .Replace("#", "_").Replace("?", "_"); } /// /// Store/get telemetry source checkpoints in Azure Table Storage. /// PartitionKey = MachineGuid, RowKey = SourceName. /// public sealed class TelemetryCheckpointStore { private readonly CloudTable _table; /// Azure Storage connection string /// Table name (default: TelemetryCheckPoints) public TelemetryCheckpointStore(string connectionString, string tableName = "TelemetryCheckPoints") { if (string.IsNullOrWhiteSpace(connectionString)) throw new ArgumentNullException(nameof(connectionString)); if (string.IsNullOrWhiteSpace(tableName)) throw new ArgumentNullException(nameof(tableName)); var storageAccount = CloudStorageAccount.Parse(connectionString); var tableClient = storageAccount.CreateCloudTableClient(); _table = tableClient.GetTableReference(tableName); _table.CreateIfNotExists(); // sync } /// Upsert a single checkpoint for a machine + source (synchronous). public void Save(string machineGuid, TelemetryCheckPoint checkpoint) { if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid)); if (checkpoint == null) throw new ArgumentNullException(nameof(checkpoint)); if (string.IsNullOrWhiteSpace(checkpoint.SourceName)) throw new ArgumentNullException(nameof(checkpoint.SourceName)); var entity = new TelemetryCheckpointEntity(machineGuid, checkpoint.SourceName) { Time = checkpoint.Time, TotalCount = checkpoint.TotalCount }; var op = TableOperation.InsertOrReplace(entity); _table.Execute(op); } /// Upsert up to N checkpoints in batches of 100 (Table Storage limit) — synchronous. public void SaveMany(string machineGuid, IEnumerable checkpoints) { if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid)); if (checkpoints == null) return; var batch = new TableBatchOperation(); foreach (var cp in checkpoints) { if (cp == null || string.IsNullOrWhiteSpace(cp.SourceName)) continue; var entity = new TelemetryCheckpointEntity(machineGuid, cp.SourceName) { Time = cp.Time, TotalCount = cp.TotalCount }; batch.InsertOrReplace(entity); if (batch.Count == 100) { _table.ExecuteBatch(batch); batch.Clear(); } } if (batch.Count > 0) { _table.ExecuteBatch(batch); } } /// Get the checkpoint for a specific machine + source. Returns null if not found. (sync) public TelemetryCheckPoint Get(string machineGuid, string sourceName) { if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid)); if (string.IsNullOrWhiteSpace(sourceName)) throw new ArgumentNullException(nameof(sourceName)); var rowKey = TelemetryCheckpointEntity.SanitizeRowKey(sourceName); var op = TableOperation.Retrieve(machineGuid, rowKey); var result = _table.Execute(op); var entity = result.Result as TelemetryCheckpointEntity; return entity == null ? null : new TelemetryCheckPoint { SourceName = sourceName, Time = entity.Time, TotalCount = entity.TotalCount }; } /// Get all checkpoints for a machine (sync). Uses ExecuteQuery which handles paging internally. public IReadOnlyList GetAllForMachine(string machineGuid) { if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid)); var query = new TableQuery() .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, machineGuid)); // ExecuteQuery is synchronous and iterates all segments internally var entities = _table.ExecuteQuery(query); var list = new List(); foreach (var e in entities) { list.Add(new TelemetryCheckPoint { // RowKey is sanitized; ensure you pass sanitized name when saving & querying SourceName = e.RowKey, Time = e.Time, TotalCount = e.TotalCount }); } return list; } /// Delete a specific checkpoint (sync). No-op if not found. public void Delete(string machineGuid, string sourceName) { if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid)); if (string.IsNullOrWhiteSpace(sourceName)) throw new ArgumentNullException(nameof(sourceName)); var rowKey = TelemetryCheckpointEntity.SanitizeRowKey(sourceName); var retrieve = TableOperation.Retrieve(machineGuid, rowKey); var result = _table.Execute(retrieve); var entity = result.Result as TelemetryCheckpointEntity; if (entity == null) return; var del = TableOperation.Delete(entity); _table.Execute(del); } } }