aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
diff options
context:
space:
mode:
authorRoy Ben Shabat <Roy.mail.net@gmail.com>2025-07-30 12:36:30 +0300
committerRoy Ben Shabat <Roy.mail.net@gmail.com>2025-07-30 12:36:30 +0300
commit4222eddece906d6f0877022c06b853deb5068472 (patch)
treea29b706b3a5aedb28a42b209d5bb72b0ef94d40e /Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
parenta802fe75f9538371004f1833e69a69b798892d0c (diff)
downloadTango-4222eddece906d6f0877022c06b853deb5068472.tar.gz
Tango-4222eddece906d6f0877022c06b853deb5068472.zip
Telemetry source.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs81
1 files changed, 72 insertions, 9 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
index 0183de6c2..c4e7d3d3e 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
@@ -20,8 +21,14 @@ namespace Tango.Telemetry
}
- protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package)
+ protected override async Task<TelemetryPublishResult> PublishTelemetryPackage(TelemetryPublishPackage package)
{
+ Stopwatch totalWatch = Stopwatch.StartNew();
+
+ var result = new TelemetryPublishResult();
+
+ if (!IsStarted || _isDisposed) return result;
+
List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>("MachineID", Config.MachineID),
@@ -33,7 +40,7 @@ namespace Tango.Telemetry
var pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList();
// For Streaming/External: initialize pending destinations list (used if publishing fails)
- if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage)
+ if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage)
{
foreach (var dest in Destinations)
{
@@ -50,35 +57,86 @@ namespace Tango.Telemetry
}
}
- foreach (var destination in Destinations.Where(x => x.SupportedSources.Contains(package.Source)))
+ foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType)))
{
var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name);
if (pendingEntry == null)
continue;
+ Stopwatch destinationWatch = Stopwatch.StartNew();
+
+ var destinationResult = new TelemetryPublishResult.DestinationResult();
+ destinationResult.Destination = destination;
+ result.DestinationsResults.Add(destinationResult);
+
// Respect backoff timing
if (now < pendingEntry.NextEligibleAttempt)
+ {
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
continue;
+ }
try
{
if (OnPublishingPackage(package, destination))
{
- await destination.Publish(package, properties);
- OnPackagePublished(package, destination);
+ if (await destination.IsAvailable())
+ {
+ await destination.Publish(package, properties);
+ OnPackagePublished(package, destination);
- // On success: remove entry from pending list
- pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
+ // On success: remove entry from pending list
+ pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ }
+ else
+ {
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
+ // Only track retry state if retry is supported
+ if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
+ {
+ if (pendingEntry == null)
+ {
+ pendingEntry = new TelemetryPendingDestination { Name = destination.Name };
+ pendingDestinations.Add(pendingEntry);
+ }
+
+ pendingEntry.RetryCount++;
+ pendingEntry.LastAttempt = now;
+
+ // Apply exponential backoff
+ int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
+ pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds);
+ }
+ else
+ {
+ // Remove if not retryable
+ pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ }
+ }
}
}
catch (Exception ex)
{
+ destinationWatch.Stop();
+ destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed;
+ destinationResult.Error = ex;
+ destinationResult.ElapsedTime = destinationWatch.Elapsed;
+
LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}.");
OnPackagePublishFailed(package, destination, ex);
// Only track retry state if retry is supported
- if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage))
+ if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
if (pendingEntry == null)
{
@@ -103,7 +161,7 @@ namespace Tango.Telemetry
package.PendingTelemetry.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations);
- if (package.Source == TelemetrySource.PendingStorage && !pendingDestinations.Any())
+ if (package.SourceType == TelemetrySourceTypes.PendingStorage && !pendingDestinations.Any())
{
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
@@ -111,6 +169,11 @@ namespace Tango.Telemetry
{
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
+
+ totalWatch.Stop();
+ result.TotalElapsedTime = totalWatch.Elapsed;
+
+ return result;
}
}
}