diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-30 12:36:30 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-30 12:36:30 +0300 |
| commit | 4222eddece906d6f0877022c06b853deb5068472 (patch) | |
| tree | a29b706b3a5aedb28a42b209d5bb72b0ef94d40e /Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs | |
| parent | a802fe75f9538371004f1833e69a69b798892d0c (diff) | |
| download | Tango-4222eddece906d6f0877022c06b853deb5068472.tar.gz Tango-4222eddece906d6f0877022c06b853deb5068472.zip | |
Telemetry source.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs | 81 |
1 files changed, 72 insertions, 9 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs index 0183de6c2..c4e7d3d3e 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -20,8 +21,14 @@ namespace Tango.Telemetry } - protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package) + protected override async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package) { + Stopwatch totalWatch = Stopwatch.StartNew(); + + var result = new TelemetryPublishResult(); + + if (!IsStarted || _isDisposed) return result; + List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>> { new KeyValuePair<string, string>("MachineID", Config.MachineID), @@ -33,7 +40,7 @@ namespace Tango.Telemetry var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); // For Streaming/External: initialize pending destinations list (used if publishing fails) - if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) + if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { foreach (var dest in Destinations) { @@ -50,35 +57,86 @@ namespace Tango.Telemetry } } - foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source))) + foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType))) { var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name); if (pendingEntry == null) continue; + Stopwatch destinationWatch = Stopwatch.StartNew(); + + var destinationResult = new TelemetryPublishResult.DestinationResult(); + destinationResult.Destination = destination; + result.DestinationsResults.Add(destinationResult); + // Respect backoff timing if (now < pendingEntry.NextEligibleAttempt) + { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed; + destinationResult.ElapsedTime = destinationWatch.Elapsed; continue; + } try { if (OnPublishingPackage(package, destination)) { - await destination.Publish(package, properties); - OnPackagePublished(package, destination); + if (await destination.IsAvailable()) + { + await destination.Publish(package, properties); + OnPackagePublished(package, destination); - // On success: remove entry from pending list - pendingDestinations.RemoveAll(x => x.Name == destination.Name); + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + + // On success: remove entry from pending list + pendingDestinations.RemoveAll(x => x.Name == destination.Name); + } + else + { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + + // Only track retry state if retry is supported + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) + { + if (pendingEntry == null) + { + pendingEntry = new TelemetryPendingDestination { Name = destination.Name }; + pendingDestinations.Add(pendingEntry); + } + + pendingEntry.RetryCount++; + pendingEntry.LastAttempt = now; + + // Apply exponential backoff + int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); + pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds); + } + else + { + // Remove if not retryable + pendingDestinations.RemoveAll(x => x.Name == destination.Name); + } + } } } catch (Exception ex) { + destinationWatch.Stop(); + destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; + destinationResult.Error = ex; + destinationResult.ElapsedTime = destinationWatch.Elapsed; + LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); OnPackagePublishFailed(package, destination, ex); // Only track retry state if retry is supported - if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage)) + if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { if (pendingEntry == null) { @@ -103,7 +161,7 @@ namespace Tango.Telemetry package.PendingTelemetry.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations); - if (package.Source == TelemetrySource.PendingStorage && !pendingDestinations.Any()) + if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any()) { StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } @@ -111,6 +169,11 @@ namespace Tango.Telemetry { StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } + + totalWatch.Stop(); + result.TotalElapsedTime = totalWatch.Elapsed; + + return result; } } } |
