aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs')
-rw-r--r--Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs161
1 files changed, 161 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs b/Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs
new file mode 100644
index 000000000..1cd40e523
--- /dev/null
+++ b/Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs
@@ -0,0 +1,161 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Microsoft.WindowsAzure.Storage;
+using Microsoft.WindowsAzure.Storage.Table;
+using Tango.PPC.Common.Web;
+
+namespace Tango.MachineService.Telemetry
+{
+ internal class TelemetryCheckpointEntity : TableEntity
+ {
+ public TelemetryCheckpointEntity() { }
+
+ public TelemetryCheckpointEntity(string machineGuid, string sourceName)
+ {
+ PartitionKey = machineGuid;
+ RowKey = SanitizeRowKey(sourceName);
+ }
+
+ public DateTime Time { get; set; }
+ public int TotalCount { get; set; }
+
+ internal static string SanitizeRowKey(string value) =>
+ (value ?? string.Empty)
+ .Replace("/", "_").Replace("\\", "_")
+ .Replace("#", "_").Replace("?", "_");
+ }
+
+ /// <summary>
+ /// Store/get telemetry source checkpoints in Azure Table Storage.
+ /// PartitionKey = MachineGuid, RowKey = SourceName.
+ /// </summary>
+ public sealed class TelemetryCheckpointStore
+ {
+ private readonly CloudTable _table;
+
+ /// <param name="connectionString">Azure Storage connection string</param>
+ /// <param name="tableName">Table name (default: TelemetryCheckPoints)</param>
+ public TelemetryCheckpointStore(string connectionString, string tableName = "TelemetryCheckPoints")
+ {
+ if (string.IsNullOrWhiteSpace(connectionString)) throw new ArgumentNullException(nameof(connectionString));
+ if (string.IsNullOrWhiteSpace(tableName)) throw new ArgumentNullException(nameof(tableName));
+
+ var storageAccount = CloudStorageAccount.Parse(connectionString);
+ var tableClient = storageAccount.CreateCloudTableClient();
+
+ _table = tableClient.GetTableReference(tableName);
+ _table.CreateIfNotExists(); // sync
+ }
+
+ /// <summary>Upsert a single checkpoint for a machine + source (synchronous).</summary>
+ public void Save(string machineGuid, TelemetryCheckPoint checkpoint)
+ {
+ if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
+ if (checkpoint == null) throw new ArgumentNullException(nameof(checkpoint));
+ if (string.IsNullOrWhiteSpace(checkpoint.SourceName)) throw new ArgumentNullException(nameof(checkpoint.SourceName));
+
+ var entity = new TelemetryCheckpointEntity(machineGuid, checkpoint.SourceName)
+ {
+ Time = checkpoint.Time,
+ TotalCount = checkpoint.TotalCount
+ };
+
+ var op = TableOperation.InsertOrReplace(entity);
+ _table.Execute(op);
+ }
+
+ /// <summary>Upsert up to N checkpoints in batches of 100 (Table Storage limit) — synchronous.</summary>
+ public void SaveMany(string machineGuid, IEnumerable<TelemetryCheckPoint> checkpoints)
+ {
+ if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
+ if (checkpoints == null) return;
+
+ var batch = new TableBatchOperation();
+ foreach (var cp in checkpoints)
+ {
+ if (cp == null || string.IsNullOrWhiteSpace(cp.SourceName)) continue;
+
+ var entity = new TelemetryCheckpointEntity(machineGuid, cp.SourceName)
+ {
+ Time = cp.Time,
+ TotalCount = cp.TotalCount
+ };
+
+ batch.InsertOrReplace(entity);
+
+ if (batch.Count == 100)
+ {
+ _table.ExecuteBatch(batch);
+ batch.Clear();
+ }
+ }
+
+ if (batch.Count > 0)
+ {
+ _table.ExecuteBatch(batch);
+ }
+ }
+
+ /// <summary>Get the checkpoint for a specific machine + source. Returns null if not found. (sync)</summary>
+ public TelemetryCheckPoint Get(string machineGuid, string sourceName)
+ {
+ if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
+ if (string.IsNullOrWhiteSpace(sourceName)) throw new ArgumentNullException(nameof(sourceName));
+
+ var rowKey = TelemetryCheckpointEntity.SanitizeRowKey(sourceName);
+ var op = TableOperation.Retrieve<TelemetryCheckpointEntity>(machineGuid, rowKey);
+ var result = _table.Execute(op);
+
+ var entity = result.Result as TelemetryCheckpointEntity;
+ return entity == null ? null : new TelemetryCheckPoint
+ {
+ SourceName = sourceName,
+ Time = entity.Time,
+ TotalCount = entity.TotalCount
+ };
+ }
+
+ /// <summary>Get all checkpoints for a machine (sync). Uses ExecuteQuery which handles paging internally.</summary>
+ public IReadOnlyList<TelemetryCheckPoint> GetAllForMachine(string machineGuid)
+ {
+ if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
+
+ var query = new TableQuery<TelemetryCheckpointEntity>()
+ .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, machineGuid));
+
+ // ExecuteQuery is synchronous and iterates all segments internally
+ var entities = _table.ExecuteQuery(query);
+
+ var list = new List<TelemetryCheckPoint>();
+ foreach (var e in entities)
+ {
+ list.Add(new TelemetryCheckPoint
+ {
+ // RowKey is sanitized; ensure you pass sanitized name when saving & querying
+ SourceName = e.RowKey,
+ Time = e.Time,
+ TotalCount = e.TotalCount
+ });
+ }
+
+ return list;
+ }
+
+ /// <summary>Delete a specific checkpoint (sync). No-op if not found.</summary>
+ public void Delete(string machineGuid, string sourceName)
+ {
+ if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
+ if (string.IsNullOrWhiteSpace(sourceName)) throw new ArgumentNullException(nameof(sourceName));
+
+ var rowKey = TelemetryCheckpointEntity.SanitizeRowKey(sourceName);
+ var retrieve = TableOperation.Retrieve<TelemetryCheckpointEntity>(machineGuid, rowKey);
+ var result = _table.Execute(retrieve);
+ var entity = result.Result as TelemetryCheckpointEntity;
+ if (entity == null) return;
+
+ var del = TableOperation.Delete(entity);
+ _table.Execute(del);
+ }
+ }
+}