aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Web/Tango.MachineService/Telemetry/TelemetryCheckpointStore.cs
blob: 1cd40e523912f211d99b96681f3ab0d488a81337 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using Tango.PPC.Common.Web;

namespace Tango.MachineService.Telemetry
{
    internal class TelemetryCheckpointEntity : TableEntity
    {
        public TelemetryCheckpointEntity() { }

        public TelemetryCheckpointEntity(string machineGuid, string sourceName)
        {
            PartitionKey = machineGuid;
            RowKey = SanitizeRowKey(sourceName);
        }

        public DateTime Time { get; set; }
        public int TotalCount { get; set; }

        internal static string SanitizeRowKey(string value) =>
            (value ?? string.Empty)
            .Replace("/", "_").Replace("\\", "_")
            .Replace("#", "_").Replace("?", "_");
    }

    /// <summary>
    /// Store/get telemetry source checkpoints in Azure Table Storage.
    /// PartitionKey = MachineGuid, RowKey = SourceName.
    /// </summary>
    public sealed class TelemetryCheckpointStore
    {
        private readonly CloudTable _table;

        /// <param name="connectionString">Azure Storage connection string</param>
        /// <param name="tableName">Table name (default: TelemetryCheckPoints)</param>
        public TelemetryCheckpointStore(string connectionString, string tableName = "TelemetryCheckPoints")
        {
            if (string.IsNullOrWhiteSpace(connectionString)) throw new ArgumentNullException(nameof(connectionString));
            if (string.IsNullOrWhiteSpace(tableName)) throw new ArgumentNullException(nameof(tableName));

            var storageAccount = CloudStorageAccount.Parse(connectionString);
            var tableClient = storageAccount.CreateCloudTableClient();

            _table = tableClient.GetTableReference(tableName);
            _table.CreateIfNotExists(); // sync
        }

        /// <summary>Upsert a single checkpoint for a machine + source (synchronous).</summary>
        public void Save(string machineGuid, TelemetryCheckPoint checkpoint)
        {
            if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
            if (checkpoint == null) throw new ArgumentNullException(nameof(checkpoint));
            if (string.IsNullOrWhiteSpace(checkpoint.SourceName)) throw new ArgumentNullException(nameof(checkpoint.SourceName));

            var entity = new TelemetryCheckpointEntity(machineGuid, checkpoint.SourceName)
            {
                Time = checkpoint.Time,
                TotalCount = checkpoint.TotalCount
            };

            var op = TableOperation.InsertOrReplace(entity);
            _table.Execute(op);
        }

        /// <summary>Upsert up to N checkpoints in batches of 100 (Table Storage limit) — synchronous.</summary>
        public void SaveMany(string machineGuid, IEnumerable<TelemetryCheckPoint> checkpoints)
        {
            if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
            if (checkpoints == null) return;

            var batch = new TableBatchOperation();
            foreach (var cp in checkpoints)
            {
                if (cp == null || string.IsNullOrWhiteSpace(cp.SourceName)) continue;

                var entity = new TelemetryCheckpointEntity(machineGuid, cp.SourceName)
                {
                    Time = cp.Time,
                    TotalCount = cp.TotalCount
                };

                batch.InsertOrReplace(entity);

                if (batch.Count == 100)
                {
                    _table.ExecuteBatch(batch);
                    batch.Clear();
                }
            }

            if (batch.Count > 0)
            {
                _table.ExecuteBatch(batch);
            }
        }

        /// <summary>Get the checkpoint for a specific machine + source. Returns null if not found. (sync)</summary>
        public TelemetryCheckPoint Get(string machineGuid, string sourceName)
        {
            if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
            if (string.IsNullOrWhiteSpace(sourceName)) throw new ArgumentNullException(nameof(sourceName));

            var rowKey = TelemetryCheckpointEntity.SanitizeRowKey(sourceName);
            var op = TableOperation.Retrieve<TelemetryCheckpointEntity>(machineGuid, rowKey);
            var result = _table.Execute(op);

            var entity = result.Result as TelemetryCheckpointEntity;
            return entity == null ? null : new TelemetryCheckPoint
            {
                SourceName = sourceName,
                Time = entity.Time,
                TotalCount = entity.TotalCount
            };
        }

        /// <summary>Get all checkpoints for a machine (sync). Uses ExecuteQuery which handles paging internally.</summary>
        public IReadOnlyList<TelemetryCheckPoint> GetAllForMachine(string machineGuid)
        {
            if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));

            var query = new TableQuery<TelemetryCheckpointEntity>()
                .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, machineGuid));

            // ExecuteQuery is synchronous and iterates all segments internally
            var entities = _table.ExecuteQuery(query);

            var list = new List<TelemetryCheckPoint>();
            foreach (var e in entities)
            {
                list.Add(new TelemetryCheckPoint
                {
                    // RowKey is sanitized; ensure you pass sanitized name when saving & querying
                    SourceName = e.RowKey,
                    Time = e.Time,
                    TotalCount = e.TotalCount
                });
            }

            return list;
        }

        /// <summary>Delete a specific checkpoint (sync). No-op if not found.</summary>
        public void Delete(string machineGuid, string sourceName)
        {
            if (string.IsNullOrWhiteSpace(machineGuid)) throw new ArgumentNullException(nameof(machineGuid));
            if (string.IsNullOrWhiteSpace(sourceName)) throw new ArgumentNullException(nameof(sourceName));

            var rowKey = TelemetryCheckpointEntity.SanitizeRowKey(sourceName);
            var retrieve = TableOperation.Retrieve<TelemetryCheckpointEntity>(machineGuid, rowKey);
            var result = _table.Execute(retrieve);
            var entity = result.Result as TelemetryCheckpointEntity;
            if (entity == null) return;

            var del = TableOperation.Delete(entity);
            _table.Execute(del);
        }
    }
}