aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio_22/Tango.Portal.Chat.Web/Alerts/AlertsWorker.cs
blob: 25d31ef374e04e3d7f1c739773db803b910760d9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
using System.Data;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using Tango.Portal.Chat.Web.Models;
using Tango.Portal.Chat.Web.Services;

namespace Tango.Portal.Chat.Web.Alerts
{
    public sealed class AlertsWorker : BackgroundService
    {
        private readonly ILogger<AlertsWorker> _logger;
        private readonly IServiceScopeFactory _serviceScopeFactory;
        private readonly Dictionary<string, HashSet<string>> _publishedRowHashes = new();

        public AlertsWorker(ILogger<AlertsWorker> logger, IServiceScopeFactory serviceScopeFactory)
        {
            _logger = logger;
            _serviceScopeFactory = serviceScopeFactory;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("AlertsWorker service started");

            // Wait for other services to initialize
            await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);

            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await ProcessAlertsAsync(stoppingToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error occurred during alerts processing");
                }

                // Wait for 1 minute before next cycle
                await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
            }

            _logger.LogInformation("AlertsWorker service stopped");
        }

        private async Task ProcessAlertsAsync(CancellationToken cancellationToken)
        {
            using var scope = _serviceScopeFactory.CreateScope();
            var alertsQueryService = scope.ServiceProvider.GetRequiredService<AlertsQueryService>();
            var kustoQueryService = scope.ServiceProvider.GetRequiredService<KustoQueryService>();
            var n8nService = scope.ServiceProvider.GetRequiredService<N8NService>();

            try
            {
                // Ensure the AlertsQueries table exists
                await alertsQueryService.EnsureTableExistsAsync(cancellationToken);

                // Get enabled queries that are ready to execute
                var queries = await alertsQueryService.GetEnabledQueriesAsync(cancellationToken);

                _logger.LogInformation("Found {Count} queries ready for execution", queries.Count);

                foreach (var query in queries)
                {
                    await ProcessSingleQueryAsync(query, alertsQueryService, kustoQueryService, n8nService, cancellationToken);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error occurred while processing alerts");
            }
        }

        private async Task ProcessSingleQueryAsync(
            AlertsQuery query,
            AlertsQueryService alertsQueryService,
            KustoQueryService kustoQueryService,
            N8NService n8nService,
            CancellationToken cancellationToken)
        {
            try
            {
                _logger.LogInformation("Processing query: {Name}", query.Name);

                // Execute the Kusto query
                var dataTable = await kustoQueryService.QueryAsync(query.Query, new Dictionary<string, string>(), cancellationToken);

                // Convert DataTable to array of objects
                var rows = ConvertDataTableToObjectArray(dataTable);

                _logger.LogInformation("Query {Name} returned {RowCount} rows", query.Name, rows.Length);

                // Filter out duplicates if AvoidDuplicates is enabled
                if (query.AvoidDuplicates)
                {
                    rows = FilterDuplicateRows(query.Name, rows);
                    _logger.LogInformation("After duplicate filtering, {RowCount} rows remain for query {Name}", rows.Length, query.Name);
                }

                bool success = true;

                if (rows.Count() > 0)
                {
                    // Post to n8n
                    success = await n8nService.PostAlertDataAsync(query.Name, query.Type, query.ChartType, query.ChartTitle, query.Instructions, rows, cancellationToken);

                    // If successful and AvoidDuplicates is enabled, store hashes of published rows
                    if (success && query.AvoidDuplicates)
                    {
                        StorePublishedRowHashes(query.Name, rows);
                    }
                }
                else
                {
                    _logger.LogInformation("No data returned for query: {Name}, skipping n8n post", query.Name);
                }

                if (success)
                {
                    // Update the execution time for next run
                    await alertsQueryService.UpdateExecutionTimeAsync(query, cancellationToken);
                    _logger.LogInformation("Successfully processed and updated query: {Name}", query.Name);
                }
                else
                {
                    _logger.LogWarning("Failed to post data to n8n for query: {Name}", query.Name);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing query: {Name}", query.Name);
            }
        }

        private object[] FilterDuplicateRows(string queryName, object[] rows)
        {
            if (!_publishedRowHashes.TryGetValue(queryName, out var publishedHashes))
            {
                return rows;
            }

            var filteredRows = new List<object>();

            foreach (var row in rows)
            {
                var rowHash = GenerateRowHash(row);
                if (!publishedHashes.Contains(rowHash))
                {
                    filteredRows.Add(row);
                }
            }

            return filteredRows.ToArray();
        }

        private void StorePublishedRowHashes(string queryName, object[] rows)
        {
            if (!_publishedRowHashes.ContainsKey(queryName))
            {
                _publishedRowHashes[queryName] = new HashSet<string>();
            }

            var publishedHashes = _publishedRowHashes[queryName];

            foreach (var row in rows)
            {
                var rowHash = GenerateRowHash(row);
                publishedHashes.Add(rowHash);
            }

            // Optional: Limit the size of the hash set to prevent memory issues
            // Keep only the most recent 10,000 hashes per query
            const int maxHashCount = 10000;
            if (publishedHashes.Count > maxHashCount)
            {
                var hashesToRemove = publishedHashes.Take(publishedHashes.Count - maxHashCount).ToList();
                foreach (var hash in hashesToRemove)
                {
                    publishedHashes.Remove(hash);
                }
                _logger.LogInformation("Pruned {Count} old hashes for query {Name} to maintain memory limits",
                    hashesToRemove.Count, queryName);
            }
        }

        private static string GenerateRowHash(object row)
        {
            // Serialize the row to JSON for consistent hashing
            var json = JsonSerializer.Serialize(row, new JsonSerializerOptions
            {
                PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
                DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull
            });

            // Generate SHA256 hash of the JSON
            using var sha256 = SHA256.Create();
            var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(json));
            return Convert.ToHexString(hashBytes);
        }

        private static object[] ConvertDataTableToObjectArray(DataTable dataTable)
        {
            var rows = new object[dataTable.Rows.Count];

            for (int i = 0; i < dataTable.Rows.Count; i++)
            {
                var row = dataTable.Rows[i];
                var rowData = new Dictionary<string, object?>();

                foreach (DataColumn column in dataTable.Columns)
                {
                    var value = row[column];
                    rowData[column.ColumnName] = value == DBNull.Value ? null : value;
                }

                rows[i] = rowData;
            }

            return rows;
        }
    }
}