diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-08-02 21:38:19 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-08-02 21:38:19 +0300 |
| commit | 0df9f37075dd697ac34f4ed2a2749f62aa27a654 (patch) | |
| tree | 5d95103b41d4954eff9f266317c5a525e9a0e3e9 /Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs | |
| parent | 4222eddece906d6f0877022c06b853deb5068472 (diff) | |
| download | Tango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.tar.gz Tango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.zip | |
Telemetry Testing.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs | 93 |
1 files changed, 56 insertions, 37 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs index c4e7d3d3e..971afa864 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Tango.Integration.Operation; +using Tango.Logging; namespace Tango.Telemetry { @@ -21,15 +22,26 @@ namespace Tango.Telemetry } + // This method handles the complete lifecycle of publishing a telemetry package to all supported destinations. + // It supports exponential backoff retry logic, destination filtering, and persistence of failed deliveries + // to ensure fault tolerance and guaranteed eventual delivery across multiple transports. protected override async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package) { - Stopwatch totalWatch = Stopwatch.StartNew(); + LogManager.Log($"Starting publish for package from source '{package.Source?.Name}' of type '{package.SourceType}'", LogCategory.Debug); + Stopwatch totalWatch = Stopwatch.StartNew(); var result = new TelemetryPublishResult(); - if (!IsStarted || _isDisposed) return result; + // Exit early if the publisher has been stopped or disposed to avoid invalid operations. + if (!IsStarted || _isDisposed) + { + LogManager.Log("Publisher not active. Skipping.", LogCategory.Warning); + package.CompletionSource.SetResult(result); + return result; + } - List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>> + // Prepare metadata headers to be attached to the telemetry, which are common to all destinations. + var properties = new List<KeyValuePair<string, string>> { new KeyValuePair<string, string>("MachineID", Config.MachineID), new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()), @@ -39,13 +51,15 @@ namespace Tango.Telemetry var now = DateTime.UtcNow; var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); - // For Streaming/External: initialize pending destinations list (used if publishing fails) + // If this is a newly generated package, not a retry, initialize its destination list. + // This ensures we only attempt to publish to destinations that support this telemetry type. if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { foreach (var dest in Destinations) { - if (!pendingDestinations.Any(x => x.Name == dest.Name)) + if (dest.SupportedSourceTypes.Contains(package.SourceType) && !pendingDestinations.Any(x => x.Name == dest.Name)) { + // Initialize with default retry tracking values. pendingDestinations.Add(new TelemetryPendingDestination { Name = dest.Name, @@ -53,38 +67,40 @@ namespace Tango.Telemetry LastAttempt = DateTime.MinValue, NextEligibleAttempt = now }); + LogManager.Log($"Registered destination '{dest.Name}' for initial delivery.", LogCategory.Debug); } } } + // Iterate over each valid destination, applying retry backoff and delivery logic. foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType))) { var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name); - - if (pendingEntry == null) - continue; + if (pendingEntry == null) continue; Stopwatch destinationWatch = Stopwatch.StartNew(); - var destinationResult = new TelemetryPublishResult.DestinationResult(); destinationResult.Destination = destination; result.DestinationsResults.Add(destinationResult); - // Respect backoff timing + // If the destination is in a cool-down period due to previous failures, skip for now. if (now < pendingEntry.NextEligibleAttempt) { destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed; destinationResult.ElapsedTime = destinationWatch.Elapsed; + LogManager.Log($"Skipping '{destination.Name}' until {pendingEntry.NextEligibleAttempt:O} (backoff in effect).", LogCategory.Debug); continue; } try { + LogManager.Log($"Attempting publish to '{destination.Name}'...", LogCategory.Debug); if (OnPublishingPackage(package, destination)) { if (await destination.IsAvailable()) { + // Send the telemetry payload await destination.Publish(package, properties); OnPackagePublished(package, destination); @@ -92,87 +108,90 @@ namespace Tango.Telemetry destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; destinationResult.ElapsedTime = destinationWatch.Elapsed; - // On success: remove entry from pending list + LogManager.Log($"Successfully published to '{destination.Name}' in {destinationResult.ElapsedTime.TotalMilliseconds} ms."); + + // Remove from retry queue since delivery succeeded. pendingDestinations.RemoveAll(x => x.Name == destination.Name); } else { + // Destination is temporarily unreachable; apply retry logic if supported. destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; destinationResult.ElapsedTime = destinationWatch.Elapsed; - // Only track retry state if retry is supported + LogManager.Log($"'{destination.Name}' unavailable.", LogCategory.Warning); if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { - if (pendingEntry == null) - { - pendingEntry = new TelemetryPendingDestination { Name = destination.Name }; - pendingDestinations.Add(pendingEntry); - } - pendingEntry.RetryCount++; pendingEntry.LastAttempt = now; - - // Apply exponential backoff - int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); - pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds); + int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); + pendingEntry.NextEligibleAttempt = now.AddSeconds(delay); + LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds.", LogCategory.Debug); } else { - // Remove if not retryable + // Remove from retry list if the destination is not retryable. pendingDestinations.RemoveAll(x => x.Name == destination.Name); + LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending.", LogCategory.Debug); } } } } catch (Exception ex) { + // Network, serialization, or other critical failures are handled here. destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; destinationResult.Error = ex; destinationResult.ElapsedTime = destinationWatch.Elapsed; - LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); + LogManager.Log(ex, $"Error publishing to '{destination.Name}'."); OnPackagePublishFailed(package, destination, ex); - // Only track retry state if retry is supported if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { - if (pendingEntry == null) - { - pendingEntry = new TelemetryPendingDestination { Name = destination.Name }; - pendingDestinations.Add(pendingEntry); - } - + // Retry and Backoff Logic: + // -------------------------- + // Each destination maintains its own retry metadata: RetryCount, LastAttempt, and NextEligibleAttempt. + // On each failed delivery attempt, RetryCount is incremented and a delay is calculated using exponential backoff (2^RetryCount seconds). + // This delay is capped using MaxExponentialBackoff to prevent runaway delays. + // The system checks NextEligibleAttempt before retrying, and skips publishing if the backoff period has not yet elapsed. + // This ensures robust delivery across unstable networks and prevents retry storms. pendingEntry.RetryCount++; pendingEntry.LastAttempt = now; - - // Apply exponential backoff - int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); - pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds); + int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); + pendingEntry.NextEligibleAttempt = now.AddSeconds(delay); + LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds due to failure.", LogCategory.Debug); } else { - // Remove if not retryable pendingDestinations.RemoveAll(x => x.Name == destination.Name); + LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending after failure.", LogCategory.Debug); } } } + // Save updated retry state for this telemetry package package.PendingTelemetry.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations); + // Determine whether to remove the telemetry or persist it based on remaining destinations if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any()) { + LogManager.Log("Telemetry successfully delivered to all destinations. Deleting.", LogCategory.Debug); StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } else { - StorageManager.DeletePendingTelemetry(package.PendingTelemetry); + LogManager.Log("Updating pending telemetry record.", LogCategory.Debug); + StorageManager.UpsertPendingTelemetry(package.PendingTelemetry); } totalWatch.Stop(); result.TotalElapsedTime = totalWatch.Elapsed; + LogManager.Log($"Completed publish for source '{package.Source?.Name}' in {result.TotalElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug); + package.CompletionSource.SetResult(result); return result; } } |
