using System.Data; using System.Security.Cryptography; using System.Text; using System.Text.Json; using Tango.Portal.Chat.Web.Models; using Tango.Portal.Chat.Web.Services; namespace Tango.Portal.Chat.Web.Alerts { public sealed class AlertsWorker : BackgroundService { private readonly ILogger _logger; private readonly IServiceScopeFactory _serviceScopeFactory; private readonly Dictionary> _publishedRowHashes = new(); public AlertsWorker(ILogger logger, IServiceScopeFactory serviceScopeFactory) { _logger = logger; _serviceScopeFactory = serviceScopeFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("AlertsWorker service started"); // Wait for other services to initialize await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); while (!stoppingToken.IsCancellationRequested) { try { await ProcessAlertsAsync(stoppingToken); } catch (Exception ex) { _logger.LogError(ex, "Error occurred during alerts processing"); } // Wait for 1 minute before next cycle await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken); } _logger.LogInformation("AlertsWorker service stopped"); } private async Task ProcessAlertsAsync(CancellationToken cancellationToken) { using var scope = _serviceScopeFactory.CreateScope(); var alertsQueryService = scope.ServiceProvider.GetRequiredService(); var kustoQueryService = scope.ServiceProvider.GetRequiredService(); var n8nService = scope.ServiceProvider.GetRequiredService(); try { // Ensure the AlertsQueries table exists await alertsQueryService.EnsureTableExistsAsync(cancellationToken); // Get enabled queries that are ready to execute var queries = await alertsQueryService.GetEnabledQueriesAsync(cancellationToken); _logger.LogInformation("Found {Count} queries ready for execution", queries.Count); foreach (var query in queries) { await ProcessSingleQueryAsync(query, alertsQueryService, kustoQueryService, n8nService, cancellationToken); } } catch (Exception ex) { _logger.LogError(ex, "Error occurred while processing alerts"); } } private async Task ProcessSingleQueryAsync( AlertsQuery query, AlertsQueryService alertsQueryService, KustoQueryService kustoQueryService, N8NService n8nService, CancellationToken cancellationToken) { try { _logger.LogInformation("Processing query: {Name}", query.Name); // Execute the Kusto query var dataTable = await kustoQueryService.QueryAsync(query.Query, new Dictionary(), cancellationToken); // Convert DataTable to array of objects var rows = ConvertDataTableToObjectArray(dataTable); _logger.LogInformation("Query {Name} returned {RowCount} rows", query.Name, rows.Length); // Filter out duplicates if AvoidDuplicates is enabled if (query.AvoidDuplicates) { rows = FilterDuplicateRows(query.Name, rows); _logger.LogInformation("After duplicate filtering, {RowCount} rows remain for query {Name}", rows.Length, query.Name); } bool success = true; if (rows.Count() > 0) { // Post to n8n success = await n8nService.PostAlertDataAsync(query.Name, query.Type, query.ChartType, query.ChartTitle, query.Instructions, rows, cancellationToken); // If successful and AvoidDuplicates is enabled, store hashes of published rows if (success && query.AvoidDuplicates) { StorePublishedRowHashes(query.Name, rows); } } else { _logger.LogInformation("No data returned for query: {Name}, skipping n8n post", query.Name); } if (success) { // Update the execution time for next run await alertsQueryService.UpdateExecutionTimeAsync(query, cancellationToken); _logger.LogInformation("Successfully processed and updated query: {Name}", query.Name); } else { _logger.LogWarning("Failed to post data to n8n for query: {Name}", query.Name); } } catch (Exception ex) { _logger.LogError(ex, "Error processing query: {Name}", query.Name); } } private object[] FilterDuplicateRows(string queryName, object[] rows) { if (!_publishedRowHashes.TryGetValue(queryName, out var publishedHashes)) { return rows; } var filteredRows = new List(); foreach (var row in rows) { var rowHash = GenerateRowHash(row); if (!publishedHashes.Contains(rowHash)) { filteredRows.Add(row); } } return filteredRows.ToArray(); } private void StorePublishedRowHashes(string queryName, object[] rows) { if (!_publishedRowHashes.ContainsKey(queryName)) { _publishedRowHashes[queryName] = new HashSet(); } var publishedHashes = _publishedRowHashes[queryName]; foreach (var row in rows) { var rowHash = GenerateRowHash(row); publishedHashes.Add(rowHash); } // Optional: Limit the size of the hash set to prevent memory issues // Keep only the most recent 10,000 hashes per query const int maxHashCount = 10000; if (publishedHashes.Count > maxHashCount) { var hashesToRemove = publishedHashes.Take(publishedHashes.Count - maxHashCount).ToList(); foreach (var hash in hashesToRemove) { publishedHashes.Remove(hash); } _logger.LogInformation("Pruned {Count} old hashes for query {Name} to maintain memory limits", hashesToRemove.Count, queryName); } } private static string GenerateRowHash(object row) { // Serialize the row to JSON for consistent hashing var json = JsonSerializer.Serialize(row, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull }); // Generate SHA256 hash of the JSON using var sha256 = SHA256.Create(); var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(json)); return Convert.ToHexString(hashBytes); } private static object[] ConvertDataTableToObjectArray(DataTable dataTable) { var rows = new object[dataTable.Rows.Count]; for (int i = 0; i < dataTable.Rows.Count; i++) { var row = dataTable.Rows[i]; var rowData = new Dictionary(); foreach (DataColumn column in dataTable.Columns) { var value = row[column]; rowData[column.ColumnName] = value == DBNull.Value ? null : value; } rows[i] = rowData; } return rows; } } }