aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs116
1 files changed, 116 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
new file mode 100644
index 000000000..9ea817535
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
@@ -0,0 +1,116 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Integration.Operation;
+
+namespace Tango.Telemetry
+{
+ public class TelemetryPublisherAdvanced : TelemetryPublisher
+ {
+ /// <summary>
+ /// Defines the maximum backoff delay between retries when exponential backoff is applied.
+ /// For example, setting to 1 hour means the retry interval will not exceed 1 hour regardless of the retry count.
+ /// </summary>
+ public TimeSpan MaxExponentialBackoff { get; set; } = TimeSpan.FromHours(1);
+
+ public TelemetryPublisherAdvanced(IMachineOperator machineOperator, TelemetryLiteDbPendingStorageManager storageManager, TelemetryPublisherConfiguration config) : base(machineOperator, storageManager, config)
+ {
+
+ }
+
+ protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package)
+ {
+ List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>>
+ {
+ new KeyValuePair<string, string>("MachineID", Config.MachineID),
+ new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()),
+ new KeyValuePair<string, string>("Environment", Config.Environment)
+ };
+
+ var now = DateTime.UtcNow;
+ var pendingDestinations = package.TelemetryObject.PendingDestinations.ToList();
+
+ // For Streaming/External: initialize pending destinations list (used if publishing fails)
+ if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage)
+ {
+ foreach (var dest in Config.TelemetryDestinations)
+ {
+ if (!pendingDestinations.Any(x => x.Name == dest.Name))
+ {
+ pendingDestinations.Add(new TelemetryPendingDestination
+ {
+ Name = dest.Name,
+ RetryCount = 0,
+ LastAttempt = DateTime.MinValue,
+ NextEligibleAttempt = now
+ });
+ }
+ }
+ }
+
+ foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source)))
+ {
+ var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name);
+
+ if (pendingEntry == null)
+ continue;
+
+ // Respect backoff timing
+ if (now < pendingEntry.NextEligibleAttempt)
+ continue;
+
+ try
+ {
+ if (OnPublishingPackage(package, destination))
+ {
+ await destination.Publish(package, properties);
+ OnPackagePublished(package, destination);
+
+ // On success: remove entry from pending list
+ pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ }
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}.");
+ OnPackagePublishFailed(package, destination, ex);
+
+ // Only track retry state if retry is supported
+ if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage))
+ {
+ if (pendingEntry == null)
+ {
+ pendingEntry = new TelemetryPendingDestination { Name = destination.Name };
+ pendingDestinations.Add(pendingEntry);
+ }
+
+ pendingEntry.RetryCount++;
+ pendingEntry.LastAttempt = now;
+
+ // Apply exponential backoff
+ int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
+ pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds);
+ }
+ else
+ {
+ // Remove if not retryable
+ pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ }
+ }
+ }
+
+ package.TelemetryObject.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations);
+
+ if (package.Source == TelemetrySource.PendingStorage && !pendingDestinations.Any())
+ {
+ PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject);
+ }
+ else
+ {
+ PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject);
+ }
+ }
+ }
+}