aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
diff options
context:
space:
mode:
authorRoy Ben Shabat <Roy.mail.net@gmail.com>2025-08-02 21:38:19 +0300
committerRoy Ben Shabat <Roy.mail.net@gmail.com>2025-08-02 21:38:19 +0300
commit0df9f37075dd697ac34f4ed2a2749f62aa27a654 (patch)
tree5d95103b41d4954eff9f266317c5a525e9a0e3e9 /Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
parent4222eddece906d6f0877022c06b853deb5068472 (diff)
downloadTango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.tar.gz
Tango-0df9f37075dd697ac34f4ed2a2749f62aa27a654.zip
Telemetry Testing.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs93
1 files changed, 56 insertions, 37 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
index c4e7d3d3e..971afa864 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
@@ -5,6 +5,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tango.Integration.Operation;
+using Tango.Logging;
namespace Tango.Telemetry
{
@@ -21,15 +22,26 @@ namespace Tango.Telemetry
}
+ // This method handles the complete lifecycle of publishing a telemetry package to all supported destinations.
+ // It supports exponential backoff retry logic, destination filtering, and persistence of failed deliveries
+ // to ensure fault tolerance and guaranteed eventual delivery across multiple transports.
protected override async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package)
{
- Stopwatch totalWatch = Stopwatch.StartNew();
+ LogManager.Log($"Starting publish for package from source '{package.Source?.Name}' of type '{package.SourceType}'", LogCategory.Debug);
+ Stopwatch totalWatch = Stopwatch.StartNew();
var result = new TelemetryPublishResult();
- if (!IsStarted || _isDisposed) return result;
+ // Exit early if the publisher has been stopped or disposed to avoid invalid operations.
+ if (!IsStarted || _isDisposed)
+ {
+ LogManager.Log("Publisher not active. Skipping.", LogCategory.Warning);
+ package.CompletionSource.SetResult(result);
+ return result;
+ }
- List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>>
+ // Prepare metadata headers to be attached to the telemetry, which are common to all destinations.
+ var properties = new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>("MachineID", Config.MachineID),
new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()),
@@ -39,13 +51,15 @@ namespace Tango.Telemetry
var now = DateTime.UtcNow;
var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList();
- // For Streaming/External: initialize pending destinations list (used if publishing fails)
+ // If this is a newly generated package, not a retry, initialize its destination list.
+ // This ensures we only attempt to publish to destinations that support this telemetry type.
if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage)
{
foreach (var dest in Destinations)
{
- if (!pendingDestinations.Any(x => x.Name == dest.Name))
+ if (dest.SupportedSourceTypes.Contains(package.SourceType) && !pendingDestinations.Any(x => x.Name == dest.Name))
{
+ // Initialize with default retry tracking values.
pendingDestinations.Add(new TelemetryPendingDestination
{
Name = dest.Name,
@@ -53,38 +67,40 @@ namespace Tango.Telemetry
LastAttempt = DateTime.MinValue,
NextEligibleAttempt = now
});
+ LogManager.Log($"Registered destination '{dest.Name}' for initial delivery.", LogCategory.Debug);
}
}
}
+ // Iterate over each valid destination, applying retry backoff and delivery logic.
foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType)))
{
var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name);
-
- if (pendingEntry == null)
- continue;
+ if (pendingEntry == null) continue;
Stopwatch destinationWatch = Stopwatch.StartNew();
-
var destinationResult = new TelemetryPublishResult.DestinationResult();
destinationResult.Destination = destination;
result.DestinationsResults.Add(destinationResult);
- // Respect backoff timing
+ // If the destination is in a cool-down period due to previous failures, skip for now.
if (now < pendingEntry.NextEligibleAttempt)
{
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
+ LogManager.Log($"Skipping '{destination.Name}' until {pendingEntry.NextEligibleAttempt:O} (backoff in effect).", LogCategory.Debug);
continue;
}
try
{
+ LogManager.Log($"Attempting publish to '{destination.Name}'...", LogCategory.Debug);
if (OnPublishingPackage(package, destination))
{
if (await destination.IsAvailable())
{
+ // Send the telemetry payload
await destination.Publish(package, properties);
OnPackagePublished(package, destination);
@@ -92,87 +108,90 @@ namespace Tango.Telemetry
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
- // On success: remove entry from pending list
+ LogManager.Log($"Successfully published to '{destination.Name}' in {destinationResult.ElapsedTime.TotalMilliseconds} ms.");
+
+ // Remove from retry queue since delivery succeeded.
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
}
else
{
+ // Destination is temporarily unreachable; apply retry logic if supported.
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
- // Only track retry state if retry is supported
+ LogManager.Log($"'{destination.Name}' unavailable.", LogCategory.Warning);
if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
- if (pendingEntry == null)
- {
- pendingEntry = new TelemetryPendingDestination { Name = destination.Name };
- pendingDestinations.Add(pendingEntry);
- }
-
pendingEntry.RetryCount++;
pendingEntry.LastAttempt = now;
-
- // Apply exponential backoff
- int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
- pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds);
+ int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
+ pendingEntry.NextEligibleAttempt = now.AddSeconds(delay);
+ LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds.", LogCategory.Debug);
}
else
{
- // Remove if not retryable
+ // Remove from retry list if the destination is not retryable.
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending.", LogCategory.Debug);
}
}
}
}
catch (Exception ex)
{
+ // Network, serialization, or other critical failures are handled here.
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed;
destinationResult.Error = ex;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
- LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}.");
+ LogManager.Log(ex, $"Error publishing to '{destination.Name}'.");
OnPackagePublishFailed(package, destination, ex);
- // Only track retry state if retry is supported
if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
- if (pendingEntry == null)
- {
- pendingEntry = new TelemetryPendingDestination { Name = destination.Name };
- pendingDestinations.Add(pendingEntry);
- }
-
+ // Retry and Backoff Logic:
+ // --------------------------
+ // Each destination maintains its own retry metadata: RetryCount, LastAttempt, and NextEligibleAttempt.
+ // On each failed delivery attempt, RetryCount is incremented and a delay is calculated using exponential backoff (2^RetryCount seconds).
+ // This delay is capped using MaxExponentialBackoff to prevent runaway delays.
+ // The system checks NextEligibleAttempt before retrying, and skips publishing if the backoff period has not yet elapsed.
+ // This ensures robust delivery across unstable networks and prevents retry storms.
pendingEntry.RetryCount++;
pendingEntry.LastAttempt = now;
-
- // Apply exponential backoff
- int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
- pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds);
+ int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
+ pendingEntry.NextEligibleAttempt = now.AddSeconds(delay);
+ LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds due to failure.", LogCategory.Debug);
}
else
{
- // Remove if not retryable
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending after failure.", LogCategory.Debug);
}
}
}
+ // Save updated retry state for this telemetry package
package.PendingTelemetry.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations);
+ // Determine whether to remove the telemetry or persist it based on remaining destinations
if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any())
{
+ LogManager.Log("Telemetry successfully delivered to all destinations. Deleting.", LogCategory.Debug);
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
else
{
- StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
+ LogManager.Log("Updating pending telemetry record.", LogCategory.Debug);
+ StorageManager.UpsertPendingTelemetry(package.PendingTelemetry);
}
totalWatch.Stop();
result.TotalElapsedTime = totalWatch.Elapsed;
+ LogManager.Log($"Completed publish for source '{package.Source?.Name}' in {result.TotalElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug);
+ package.CompletionSource.SetResult(result);
return result;
}
}