using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; using Tango.Integration.Operation; using Tango.Logging; namespace Tango.Telemetry { public class TelemetryPublisherAdvanced : TelemetryPublisher { /// /// Defines the maximum backoff delay between retries when exponential backoff is applied. /// For example, setting to 1 hour means the retry interval will not exceed 1 hour regardless of the retry count. /// public TimeSpan MaxExponentialBackoff { get; set; } = TimeSpan.FromHours(1); public TelemetryPublisherAdvanced(ITelemetryStorageManager storageManager, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : base(storageManager, queueManager, config) { } // This method handles the complete lifecycle of publishing a telemetry package to all supported destinations. // It supports exponential backoff retry logic, destination filtering, and persistence of failed deliveries // to ensure fault tolerance and guaranteed eventual delivery across multiple transports. protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package) { LogManager.Log($"Starting publish for package from source '{package.Source?.Name}' of type '{package.SourceType}'", LogCategory.Debug); Stopwatch totalWatch = Stopwatch.StartNew(); var result = new TelemetryPublishResult(); // Exit early if the publisher has been stopped or disposed to avoid invalid operations. if (!IsStarted || _isDisposed) { LogManager.Log("Publisher not active. Skipping.", LogCategory.Warning); package.CompletionSource.SetResult(result); return result; } // Prepare metadata headers to be attached to the telemetry, which are common to all destinations. var properties = new List> { new KeyValuePair("MachineID", Config.MachineID), new KeyValuePair("Model", Config.MachineType.ToShortName()), new KeyValuePair("Environment", Config.Environment) }; var now = DateTime.UtcNow; var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); // If this is a newly generated package, not a retry, initialize its destination list. // This ensures we only attempt to publish to destinations that support this telemetry type. if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { foreach (var dest in Destinations) { if (dest.SupportedSourceTypes.Contains(package.SourceType) && !pendingDestinations.Any(x => x.Name == dest.Name)) { // Initialize with default retry tracking values. pendingDestinations.Add(new TelemetryPendingDestination { Name = dest.Name, RetryCount = 0, LastAttempt = DateTime.MinValue, NextEligibleAttempt = now }); LogManager.Log($"Registered destination '{dest.Name}' for initial delivery.", LogCategory.Debug); } } } // Iterate over each valid destination, applying retry backoff and delivery logic. foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType))) { var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name); if (pendingEntry == null) continue; Stopwatch destinationWatch = Stopwatch.StartNew(); var destinationResult = new TelemetryPublishResult.DestinationResult(); destinationResult.Destination = destination; result.DestinationsResults.Add(destinationResult); // If the destination is in a cool-down period due to previous failures, skip for now. if (now < pendingEntry.NextEligibleAttempt) { destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed; destinationResult.ElapsedTime = destinationWatch.Elapsed; LogManager.Log($"Skipping '{destination.Name}' until {pendingEntry.NextEligibleAttempt:O} (backoff in effect).", LogCategory.Debug); continue; } try { LogManager.Log($"Attempting publish to '{destination.Name}'...", LogCategory.Debug); if (OnPublishingPackage(package, destination)) { if (await destination.IsAvailable()) { // Send the telemetry payload await destination.Publish(package, properties); OnPackagePublished(package, destination); destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; destinationResult.ElapsedTime = destinationWatch.Elapsed; LogManager.Log($"Successfully published to '{destination.Name}' in {destinationResult.ElapsedTime.TotalMilliseconds} ms."); // Remove from retry queue since delivery succeeded. pendingDestinations.RemoveAll(x => x.Name == destination.Name); } else { // Destination is temporarily unreachable; apply retry logic if supported. destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; destinationResult.ElapsedTime = destinationWatch.Elapsed; LogManager.Log($"'{destination.Name}' unavailable.", LogCategory.Warning); if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { pendingEntry.RetryCount++; pendingEntry.LastAttempt = now; int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); pendingEntry.NextEligibleAttempt = now.AddSeconds(delay); LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds.", LogCategory.Debug); } else { // Remove from retry list if the destination is not retryable. pendingDestinations.RemoveAll(x => x.Name == destination.Name); LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending.", LogCategory.Debug); } } } } catch (Exception ex) { // Network, serialization, or other critical failures are handled here. destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; destinationResult.Error = ex; destinationResult.ElapsedTime = destinationWatch.Elapsed; LogManager.Log(ex, $"Error publishing to '{destination.Name}'."); OnPackagePublishFailed(package, destination, ex); if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { // Retry and Backoff Logic: // -------------------------- // Each destination maintains its own retry metadata: RetryCount, LastAttempt, and NextEligibleAttempt. // On each failed delivery attempt, RetryCount is incremented and a delay is calculated using exponential backoff (2^RetryCount seconds). // This delay is capped using MaxExponentialBackoff to prevent runaway delays. // The system checks NextEligibleAttempt before retrying, and skips publishing if the backoff period has not yet elapsed. // This ensures robust delivery across unstable networks and prevents retry storms. pendingEntry.RetryCount++; pendingEntry.LastAttempt = now; int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); pendingEntry.NextEligibleAttempt = now.AddSeconds(delay); LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds due to failure.", LogCategory.Debug); } else { pendingDestinations.RemoveAll(x => x.Name == destination.Name); LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending after failure.", LogCategory.Debug); } } } // Save updated retry state for this telemetry package package.PendingTelemetry.PendingDestinations = new List(pendingDestinations); // Determine whether to remove the telemetry or persist it based on remaining destinations if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any()) { LogManager.Log("Telemetry successfully delivered to all destinations. Deleting.", LogCategory.Debug); StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } else { LogManager.Log("Updating pending telemetry record.", LogCategory.Debug); StorageManager.UpsertPendingTelemetry(package.PendingTelemetry); } totalWatch.Stop(); result.TotalElapsedTime = totalWatch.Elapsed; LogManager.Log($"Completed publish for source '{package.Source?.Name}' in {result.TotalElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug); package.CompletionSource.SetResult(result); return result; } } }