using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tango.Integration.Operation;
using Tango.Logging;
namespace Tango.Telemetry
{
public class TelemetryPublisherAdvanced : TelemetryPublisher
{
///
/// Defines the maximum backoff delay between retries when exponential backoff is applied.
/// For example, setting to 1 hour means the retry interval will not exceed 1 hour regardless of the retry count.
///
public TimeSpan MaxExponentialBackoff { get; set; } = TimeSpan.FromHours(1);
public TelemetryPublisherAdvanced(ITelemetryStorageManager storageManager, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : base(storageManager, queueManager, config)
{
}
// This method handles the complete lifecycle of publishing a telemetry package to all supported destinations.
// It supports exponential backoff retry logic, destination filtering, and persistence of failed deliveries
// to ensure fault tolerance and guaranteed eventual delivery across multiple transports.
protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package)
{
LogManager.Log($"Starting publish for package from source '{package.Source?.Name}' of type '{package.SourceType}'", LogCategory.Debug);
Stopwatch totalWatch = Stopwatch.StartNew();
var result = new TelemetryPublishResult();
// Exit early if the publisher has been stopped or disposed to avoid invalid operations.
if (!IsStarted || _isDisposed)
{
LogManager.Log("Publisher not active. Skipping.", LogCategory.Warning);
package.CompletionSource.SetResult(result);
return result;
}
// Prepare metadata headers to be attached to the telemetry, which are common to all destinations.
var properties = new List>
{
new KeyValuePair("MachineID", Config.MachineID),
new KeyValuePair("Model", Config.MachineType.ToShortName()),
new KeyValuePair("Environment", Config.Environment)
};
var now = DateTime.UtcNow;
var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList();
// If this is a newly generated package, not a retry, initialize its destination list.
// This ensures we only attempt to publish to destinations that support this telemetry type.
if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage)
{
foreach (var dest in Destinations)
{
if (dest.SupportedSourceTypes.Contains(package.SourceType) && !pendingDestinations.Any(x => x.Name == dest.Name))
{
// Initialize with default retry tracking values.
pendingDestinations.Add(new TelemetryPendingDestination
{
Name = dest.Name,
RetryCount = 0,
LastAttempt = DateTime.MinValue,
NextEligibleAttempt = now
});
LogManager.Log($"Registered destination '{dest.Name}' for initial delivery.", LogCategory.Debug);
}
}
}
// Iterate over each valid destination, applying retry backoff and delivery logic.
foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType)))
{
var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name);
if (pendingEntry == null) continue;
Stopwatch destinationWatch = Stopwatch.StartNew();
var destinationResult = new TelemetryPublishResult.DestinationResult();
destinationResult.Destination = destination;
result.DestinationsResults.Add(destinationResult);
// If the destination is in a cool-down period due to previous failures, skip for now.
if (now < pendingEntry.NextEligibleAttempt)
{
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
LogManager.Log($"Skipping '{destination.Name}' until {pendingEntry.NextEligibleAttempt:O} (backoff in effect).", LogCategory.Debug);
continue;
}
try
{
LogManager.Log($"Attempting publish to '{destination.Name}'...", LogCategory.Debug);
if (OnPublishingPackage(package, destination))
{
if (await destination.IsAvailable())
{
// Send the telemetry payload
await destination.Publish(package, properties);
OnPackagePublished(package, destination);
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
LogManager.Log($"Successfully published to '{destination.Name}' in {destinationResult.ElapsedTime.TotalMilliseconds} ms.");
// Remove from retry queue since delivery succeeded.
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
}
else
{
// Destination is temporarily unreachable; apply retry logic if supported.
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
LogManager.Log($"'{destination.Name}' unavailable.", LogCategory.Warning);
if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
pendingEntry.RetryCount++;
pendingEntry.LastAttempt = now;
int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
pendingEntry.NextEligibleAttempt = now.AddSeconds(delay);
LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds.", LogCategory.Debug);
}
else
{
// Remove from retry list if the destination is not retryable.
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending.", LogCategory.Debug);
}
}
}
}
catch (Exception ex)
{
// Network, serialization, or other critical failures are handled here.
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed;
destinationResult.Error = ex;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
LogManager.Log(ex, $"Error publishing to '{destination.Name}'.");
OnPackagePublishFailed(package, destination, ex);
if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
// Retry and Backoff Logic:
// --------------------------
// Each destination maintains its own retry metadata: RetryCount, LastAttempt, and NextEligibleAttempt.
// On each failed delivery attempt, RetryCount is incremented and a delay is calculated using exponential backoff (2^RetryCount seconds).
// This delay is capped using MaxExponentialBackoff to prevent runaway delays.
// The system checks NextEligibleAttempt before retrying, and skips publishing if the backoff period has not yet elapsed.
// This ensures robust delivery across unstable networks and prevents retry storms.
pendingEntry.RetryCount++;
pendingEntry.LastAttempt = now;
int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
pendingEntry.NextEligibleAttempt = now.AddSeconds(delay);
LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds due to failure.", LogCategory.Debug);
}
else
{
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending after failure.", LogCategory.Debug);
}
}
}
// Save updated retry state for this telemetry package
package.PendingTelemetry.PendingDestinations = new List(pendingDestinations);
// Determine whether to remove the telemetry or persist it based on remaining destinations
if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any())
{
LogManager.Log("Telemetry successfully delivered to all destinations. Deleting.", LogCategory.Debug);
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
else
{
LogManager.Log("Updating pending telemetry record.", LogCategory.Debug);
StorageManager.UpsertPendingTelemetry(package.PendingTelemetry);
}
totalWatch.Stop();
result.TotalElapsedTime = totalWatch.Elapsed;
LogManager.Log($"Completed publish for source '{package.Source?.Name}' in {result.TotalElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug);
package.CompletionSource.SetResult(result);
return result;
}
}
}