1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
|
using System.Data;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using Tango.Portal.Chat.Web.Models;
using Tango.Portal.Chat.Web.Services;
namespace Tango.Portal.Chat.Web.Alerts
{
public sealed class AlertsWorker : BackgroundService
{
private readonly ILogger<AlertsWorker> _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly Dictionary<string, HashSet<string>> _publishedRowHashes = new();
public AlertsWorker(ILogger<AlertsWorker> logger, IServiceScopeFactory serviceScopeFactory)
{
_logger = logger;
_serviceScopeFactory = serviceScopeFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("AlertsWorker service started");
// Wait for other services to initialize
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessAlertsAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred during alerts processing");
}
// Wait for 1 minute before next cycle
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
_logger.LogInformation("AlertsWorker service stopped");
}
private async Task ProcessAlertsAsync(CancellationToken cancellationToken)
{
using var scope = _serviceScopeFactory.CreateScope();
var alertsQueryService = scope.ServiceProvider.GetRequiredService<AlertsQueryService>();
var kustoQueryService = scope.ServiceProvider.GetRequiredService<KustoQueryService>();
var n8nService = scope.ServiceProvider.GetRequiredService<N8NService>();
try
{
// Ensure the AlertsQueries table exists
await alertsQueryService.EnsureTableExistsAsync(cancellationToken);
// Get enabled queries that are ready to execute
var queries = await alertsQueryService.GetEnabledQueriesAsync(cancellationToken);
_logger.LogInformation("Found {Count} queries ready for execution", queries.Count);
foreach (var query in queries)
{
await ProcessSingleQueryAsync(query, alertsQueryService, kustoQueryService, n8nService, cancellationToken);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred while processing alerts");
}
}
private async Task ProcessSingleQueryAsync(
AlertsQuery query,
AlertsQueryService alertsQueryService,
KustoQueryService kustoQueryService,
N8NService n8nService,
CancellationToken cancellationToken)
{
try
{
_logger.LogInformation("Processing query: {Name}", query.Name);
// Execute the Kusto query
var dataTable = await kustoQueryService.QueryAsync(query.Query, new Dictionary<string, string>(), cancellationToken);
// Convert DataTable to array of objects
var rows = ConvertDataTableToObjectArray(dataTable);
_logger.LogInformation("Query {Name} returned {RowCount} rows", query.Name, rows.Length);
// Filter out duplicates if AvoidDuplicates is enabled
if (query.AvoidDuplicates)
{
rows = FilterDuplicateRows(query.Name, rows);
_logger.LogInformation("After duplicate filtering, {RowCount} rows remain for query {Name}", rows.Length, query.Name);
}
bool success = true;
if (rows.Count() > 0)
{
// Post to n8n
success = await n8nService.PostAlertDataAsync(query.Name, query.Type, query.ChartType, query.ChartTitle, query.Instructions, rows, cancellationToken);
// If successful and AvoidDuplicates is enabled, store hashes of published rows
if (success && query.AvoidDuplicates)
{
StorePublishedRowHashes(query.Name, rows);
}
}
else
{
_logger.LogInformation("No data returned for query: {Name}, skipping n8n post", query.Name);
}
if (success)
{
// Update the execution time for next run
await alertsQueryService.UpdateExecutionTimeAsync(query, cancellationToken);
_logger.LogInformation("Successfully processed and updated query: {Name}", query.Name);
}
else
{
_logger.LogWarning("Failed to post data to n8n for query: {Name}", query.Name);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing query: {Name}", query.Name);
}
}
private object[] FilterDuplicateRows(string queryName, object[] rows)
{
if (!_publishedRowHashes.TryGetValue(queryName, out var publishedHashes))
{
return rows;
}
var filteredRows = new List<object>();
foreach (var row in rows)
{
var rowHash = GenerateRowHash(row);
if (!publishedHashes.Contains(rowHash))
{
filteredRows.Add(row);
}
}
return filteredRows.ToArray();
}
private void StorePublishedRowHashes(string queryName, object[] rows)
{
if (!_publishedRowHashes.ContainsKey(queryName))
{
_publishedRowHashes[queryName] = new HashSet<string>();
}
var publishedHashes = _publishedRowHashes[queryName];
foreach (var row in rows)
{
var rowHash = GenerateRowHash(row);
publishedHashes.Add(rowHash);
}
// Optional: Limit the size of the hash set to prevent memory issues
// Keep only the most recent 10,000 hashes per query
const int maxHashCount = 10000;
if (publishedHashes.Count > maxHashCount)
{
var hashesToRemove = publishedHashes.Take(publishedHashes.Count - maxHashCount).ToList();
foreach (var hash in hashesToRemove)
{
publishedHashes.Remove(hash);
}
_logger.LogInformation("Pruned {Count} old hashes for query {Name} to maintain memory limits",
hashesToRemove.Count, queryName);
}
}
private static string GenerateRowHash(object row)
{
// Serialize the row to JSON for consistent hashing
var json = JsonSerializer.Serialize(row, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull
});
// Generate SHA256 hash of the JSON
using var sha256 = SHA256.Create();
var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(json));
return Convert.ToHexString(hashBytes);
}
private static object[] ConvertDataTableToObjectArray(DataTable dataTable)
{
var rows = new object[dataTable.Rows.Count];
for (int i = 0; i < dataTable.Rows.Count; i++)
{
var row = dataTable.Rows[i];
var rowData = new Dictionary<string, object?>();
foreach (DataColumn column in dataTable.Columns)
{
var value = row[column];
rowData[column.ColumnName] = value == DBNull.Value ? null : value;
}
rows[i] = rowData;
}
return rows;
}
}
}
|