using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using Tango.PPC.Common.Web;
namespace Tango.MachineService.Telemetry
{
internal class TelemetryCheckpointEntity : TableEntity
{
public TelemetryCheckpointEntity() { }
public TelemetryCheckpointEntity(string machineGuid, string sourceName)
{
PartitionKey = machineGuid;
RowKey = SanitizeRowKey(sourceName);
}
public DateTime Time { get; set; }
public int TotalCount { get; set; }
internal static string SanitizeRowKey(string value) =>
(value ?? string.Empty)
.Replace("/", "_").Replace("\\", "_")
.Replace("#", "_").Replace("?", "_");
}
///
/// Store/get telemetry source checkpoints in Azure Table Storage.
/// PartitionKey = MachineGuid, RowKey = SourceName.
///
public sealed class TelemetryCheckpointStore
{
private readonly CloudTable _table;
/// Azure Storage connection string
/// Table name (default: TelemetryCheckPoints)
public TelemetryCheckpointStore(string connectionString, string tableName = "TelemetryCheckPoints")
{
if (string.IsNullOrWhiteSpace(connectionString)) throw new ArgumentNullException(nameof(connectionString));
if (string.IsNullOrWhiteSpace(tableName)) throw new ArgumentNullException(nameof(tableName));
var storageAccount = CloudStorageAccount.Parse(connectionString);
var tableClient = storageAccount.CreateCloudTableClient();
_table = tableClient.GetTableReference(tableName);
_table.CreateIfNotExists(); // sync
}
/// Upsert a single checkpoint for a machine + source (synchronous).
public void Save(string machineGuid, TelemetryCheckPoint checkpoint)
{
if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
if (checkpoint == null) throw new ArgumentNullException(nameof(checkpoint));
if (string.IsNullOrWhiteSpace(checkpoint.SourceName)) throw new ArgumentNullException(nameof(checkpoint.SourceName));
var entity = new TelemetryCheckpointEntity(machineGuid, checkpoint.SourceName)
{
Time = checkpoint.Time,
TotalCount = checkpoint.TotalCount
};
var op = TableOperation.InsertOrReplace(entity);
_table.Execute(op);
}
/// Upsert up to N checkpoints in batches of 100 (Table Storage limit) — synchronous.
public void SaveMany(string machineGuid, IEnumerable checkpoints)
{
if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
if (checkpoints == null) return;
var batch = new TableBatchOperation();
foreach (var cp in checkpoints)
{
if (cp == null || string.IsNullOrWhiteSpace(cp.SourceName)) continue;
var entity = new TelemetryCheckpointEntity(machineGuid, cp.SourceName)
{
Time = cp.Time,
TotalCount = cp.TotalCount
};
batch.InsertOrReplace(entity);
if (batch.Count == 100)
{
_table.ExecuteBatch(batch);
batch.Clear();
}
}
if (batch.Count > 0)
{
_table.ExecuteBatch(batch);
}
}
/// Get the checkpoint for a specific machine + source. Returns null if not found. (sync)
public TelemetryCheckPoint Get(string machineGuid, string sourceName)
{
if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
if (string.IsNullOrWhiteSpace(sourceName)) throw new ArgumentNullException(nameof(sourceName));
var rowKey = TelemetryCheckpointEntity.SanitizeRowKey(sourceName);
var op = TableOperation.Retrieve(machineGuid, rowKey);
var result = _table.Execute(op);
var entity = result.Result as TelemetryCheckpointEntity;
return entity == null ? null : new TelemetryCheckPoint
{
SourceName = sourceName,
Time = entity.Time,
TotalCount = entity.TotalCount
};
}
/// Get all checkpoints for a machine (sync). Uses ExecuteQuery which handles paging internally.
public IReadOnlyList GetAllForMachine(string machineGuid)
{
if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
var query = new TableQuery()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, machineGuid));
// ExecuteQuery is synchronous and iterates all segments internally
var entities = _table.ExecuteQuery(query);
var list = new List();
foreach (var e in entities)
{
list.Add(new TelemetryCheckPoint
{
// RowKey is sanitized; ensure you pass sanitized name when saving & querying
SourceName = e.RowKey,
Time = e.Time,
TotalCount = e.TotalCount
});
}
return list;
}
/// Delete a specific checkpoint (sync). No-op if not found.
public void Delete(string machineGuid, string sourceName)
{
if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
if (string.IsNullOrWhiteSpace(sourceName)) throw new ArgumentNullException(nameof(sourceName));
var rowKey = TelemetryCheckpointEntity.SanitizeRowKey(sourceName);
var retrieve = TableOperation.Retrieve(machineGuid, rowKey);
var result = _table.Execute(retrieve);
var entity = result.Result as TelemetryCheckpointEntity;
if (entity == null) return;
var del = TableOperation.Delete(entity);
_table.Execute(del);
}
}
}