diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-29 02:05:52 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-29 02:05:52 +0300 |
| commit | 3f86a214b4459e619a5a767369b996e56cd17564 (patch) | |
| tree | a71b6c68ac219183a6cc2bbb5ee4206fd8e053f0 /Software/Visual_Studio | |
| parent | 70f9f4dcbe3d4537ff2bd503f18288b9a7b0e19e (diff) | |
| download | Tango-3f86a214b4459e619a5a767369b996e56cd17564.tar.gz Tango-3f86a214b4459e619a5a767369b996e56cd17564.zip | |
Exponential Backoff
Diffstat (limited to 'Software/Visual_Studio')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs | 1 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs | 1 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs | 2 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs | 1 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs | 15 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj | 5 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs | 4 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs (renamed from Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageManager.cs) | 6 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryPendingDestination.cs | 18 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs | 38 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs | 116 |
11 files changed, 182 insertions, 25 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs index 713805471..75307f844 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs @@ -12,6 +12,7 @@ namespace Tango.Telemetry.Destinations { private DeviceClient _hubClient; + public string Name { get; private set; } = "Azure IoT Hub"; public bool Enable { get; set; } = true; public String ConnectionString { get; private set; } diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs index a37d149a1..2a4d3a297 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs @@ -19,6 +19,7 @@ namespace Tango.Telemetry.Destinations private IMqttClient _mqttClient; private IMqttClientOptions _mqttOptions; + public string Name { get; private set; } = "MQTT"; public bool Enable { get; set; } = true; public String Address { get; private set; } public int Port { get; private set; } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs index 3aa6f10a9..7059a3c75 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs @@ -13,7 +13,7 @@ namespace Tango.Telemetry [BsonId(true)] int Id { get; set; } DateTime Time { get; set; } - List<String> PendingDestinations { get; set; } + List<TelemetryPendingDestination> PendingDestinations { get; set; } String ToJson(Formatting format = Formatting.None, bool flatten = true); byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true); } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs index 7c1c1032b..a8cc46fd5 100644 --- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs @@ -9,6 +9,7 @@ namespace Tango.Telemetry public interface ITelemetryDestination : IDisposable { bool Enable { get; set; } + String Name { get; } IReadOnlyList<TelemetrySource> SupportedSources { get; } Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties); } diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs new file mode 100644 index 000000000..139600af0 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tango.Telemetry +{ + public interface ITelemetryPendingStorageManager + { + void InsertOrUpdateTelemetryObject<T>(T telemetry) where T : ITelemetry; + void DeleteTelemetryObject<T>(T telemetry) where T : ITelemetry; + List<ITelemetry> GetTelemetryAll(); + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj index 03ac99116..0cd4c9106 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj +++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj @@ -200,16 +200,19 @@ <Compile Include="ExtensionMethods\ITelemetryExtensions.cs" /> <Compile Include="ITelemetryDestination.cs" /> <Compile Include="Destinations\MqttTelemetryDestination.cs" /> + <Compile Include="ITelemetryPendingStorageManager.cs" /> <Compile Include="TelemetryNameAttribute.cs" /> <Compile Include="TelemetryPackagePublishedEventArgs.cs" /> <Compile Include="TelemetryPackagePublishFailedEventArgs.cs" /> + <Compile Include="TelemetryPendingDestination.cs" /> + <Compile Include="TelemetryPublisherAdvanced.cs" /> <Compile Include="TelemetryPublisherEventArgs.cs" /> <Compile Include="TelemetryPackagePublishingEventArgs.cs" /> <Compile Include="TelemetryPublishPackage.cs" /> <Compile Include="TelemetryPublisher.cs" /> <Compile Include="TelemetryPublisherConfiguration.cs" /> <Compile Include="TelemetrySource.cs" /> - <Compile Include="TelemetryPendingStorageManager.cs" /> + <Compile Include="TelemetryLiteDbPendingStorageManager.cs" /> <Compile Include="ITelemetry.cs" /> <Compile Include="JsonFlattener.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs index 3eda84a67..ca6c2925e 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs @@ -13,11 +13,11 @@ namespace Tango.Telemetry [BsonId(true)] public int Id { get; set; } public DateTime Time { get; set; } - public List<string> PendingDestinations { get; set; } + public List<TelemetryPendingDestination> PendingDestinations { get; set; } public TelemetryBase() { - PendingDestinations = new List<string>(); + PendingDestinations = new List<TelemetryPendingDestination>(); } public byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true) diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs index 70903b833..47b8db826 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageManager.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs @@ -8,14 +8,14 @@ using System.Threading.Tasks; namespace Tango.Telemetry { - public class TelemetryPendingStorageManager + public class TelemetryLiteDbPendingStorageManager : ITelemetryPendingStorageManager { private bool _disposed; private LiteDatabase _database; public String DatabasePath { get; private set; } - public TelemetryPendingStorageManager() + public TelemetryLiteDbPendingStorageManager() { DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry"); Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath)); @@ -40,7 +40,7 @@ namespace Tango.Telemetry } } - ~TelemetryPendingStorageManager() + ~TelemetryLiteDbPendingStorageManager() { Dispose(); } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingDestination.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingDestination.cs new file mode 100644 index 000000000..265cb14e0 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingDestination.cs @@ -0,0 +1,18 @@ +using System; + +namespace Tango.Telemetry +{ + public class TelemetryPendingDestination + { + public string Name { get; set; } + + // Tracks how many retry attempts have been made for this destination. + public int RetryCount { get; set; } = 0; + + // The UTC timestamp of the last attempt made. + public DateTime LastAttempt { get; set; } = DateTime.MinValue; + + // The UTC timestamp when the next attempt should be allowed, supporting exponential backoff. + public DateTime NextEligibleAttempt { get; set; } = DateTime.MinValue; + } +}
\ No newline at end of file diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs index 81cd88b2d..662df9bae 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs @@ -34,21 +34,21 @@ namespace Tango.Telemetry private bool _emptyWritten; private IMachineOperator _machineOperator; - private TelemetryPendingStorageManager _pendingStorageManager; #region Properties public TelemetryPublisherConfiguration Config { get; private set; } + public ITelemetryPendingStorageManager PendingStorageManager { get; private set; } public bool IsStarted { get; private set; } #endregion #region Constructor - public TelemetryPublisher(IMachineOperator machineOperator, TelemetryPendingStorageManager storageManager, TelemetryPublisherConfiguration config) + public TelemetryPublisher(IMachineOperator machineOperator, ITelemetryPendingStorageManager storageManager, TelemetryPublisherConfiguration config) { _machineOperator = machineOperator; - _pendingStorageManager = storageManager; + PendingStorageManager = storageManager; Config = config ?? new TelemetryPublisherConfiguration(); @@ -191,7 +191,7 @@ namespace Tango.Telemetry private void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e) { - var telemetryAll = _pendingStorageManager.GetTelemetryAll(); + var telemetryAll = PendingStorageManager.GetTelemetryAll(); foreach (var t in telemetryAll) { @@ -239,7 +239,7 @@ namespace Tango.Telemetry } } - private async Task PublishTelemetryPackage(TelemetryPublishPackage package) + protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package) { List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>(); @@ -247,26 +247,28 @@ namespace Tango.Telemetry properties.Add(new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName())); properties.Add(new KeyValuePair<string, string>("Environment", Config.Environment)); - List<String> pendingDestinations = package.TelemetryObject.PendingDestinations.ToList(); + List<TelemetryPendingDestination> pendingDestinations = package.TelemetryObject.PendingDestinations.ToList(); //Add all destinations if streaming or external (They will be remove later if successfull) //If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db. if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) { - pendingDestinations.AddRange(Config.TelemetryDestinations.Select(x => x.GetType().Name)); + foreach (var destination in Config.TelemetryDestinations) + { + if (!pendingDestinations.Exists(x => x.Name == destination.Name)) + { + pendingDestinations.Add(new TelemetryPendingDestination { Name = destination.Name }); + } + } } - pendingDestinations = pendingDestinations.Distinct().ToList(); - foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source))) { - String destinationName = destination.GetType().Name; - - if (pendingDestinations.Contains(destinationName)) + if (pendingDestinations.Exists(x => x.Name == destination.Name)) { try { - pendingDestinations.Remove(destinationName); + pendingDestinations.RemoveAll(x => x.Name == destination.Name); if (OnPublishingPackage(package, destination)) { await destination.Publish(package, properties); @@ -275,15 +277,15 @@ namespace Tango.Telemetry } catch (Exception ex) { - LogManager.Log(ex, $"Error publishing telemetry package to destination {destinationName}."); + LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); OnPackagePublishFailed(package, destination, ex); if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage)) { - if (!pendingDestinations.Contains(destinationName)) + if (!pendingDestinations.Exists(x => x.Name == destination.Name)) { - pendingDestinations.Add(destinationName); + pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name }); } } } @@ -294,11 +296,11 @@ namespace Tango.Telemetry if (package.Source == TelemetrySource.PendingStorage && package.TelemetryObject.PendingDestinations.Count == 0) { - _pendingStorageManager.DeleteTelemetryObject(package.TelemetryObject); + PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject); } else { - _pendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject); + PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject); } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs new file mode 100644 index 000000000..9ea817535 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs @@ -0,0 +1,116 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Integration.Operation; + +namespace Tango.Telemetry +{ + public class TelemetryPublisherAdvanced : TelemetryPublisher + { + /// <summary> + /// Defines the maximum backoff delay between retries when exponential backoff is applied. + /// For example, setting to 1 hour means the retry interval will not exceed 1 hour regardless of the retry count. + /// </summary> + public TimeSpan MaxExponentialBackoff { get; set; } = TimeSpan.FromHours(1); + + public TelemetryPublisherAdvanced(IMachineOperator machineOperator, TelemetryLiteDbPendingStorageManager storageManager, TelemetryPublisherConfiguration config) : base(machineOperator, storageManager, config) + { + + } + + protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package) + { + List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>> + { + new KeyValuePair<string, string>("MachineID", Config.MachineID), + new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()), + new KeyValuePair<string, string>("Environment", Config.Environment) + }; + + var now = DateTime.UtcNow; + var pendingDestinations = package.TelemetryObject.PendingDestinations.ToList(); + + // For Streaming/External: initialize pending destinations list (used if publishing fails) + if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage) + { + foreach (var dest in Config.TelemetryDestinations) + { + if (!pendingDestinations.Any(x => x.Name == dest.Name)) + { + pendingDestinations.Add(new TelemetryPendingDestination + { + Name = dest.Name, + RetryCount = 0, + LastAttempt = DateTime.MinValue, + NextEligibleAttempt = now + }); + } + } + } + + foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source))) + { + var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name); + + if (pendingEntry == null) + continue; + + // Respect backoff timing + if (now < pendingEntry.NextEligibleAttempt) + continue; + + try + { + if (OnPublishingPackage(package, destination)) + { + await destination.Publish(package, properties); + OnPackagePublished(package, destination); + + // On success: remove entry from pending list + pendingDestinations.RemoveAll(x => x.Name == destination.Name); + } + } + catch (Exception ex) + { + LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}."); + OnPackagePublishFailed(package, destination, ex); + + // Only track retry state if retry is supported + if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage)) + { + if (pendingEntry == null) + { + pendingEntry = new TelemetryPendingDestination { Name = destination.Name }; + pendingDestinations.Add(pendingEntry); + } + + pendingEntry.RetryCount++; + pendingEntry.LastAttempt = now; + + // Apply exponential backoff + int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds); + pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds); + } + else + { + // Remove if not retryable + pendingDestinations.RemoveAll(x => x.Name == destination.Name); + } + } + } + + package.TelemetryObject.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations); + + if (package.Source == TelemetrySource.PendingStorage && !pendingDestinations.Any()) + { + PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject); + } + else + { + PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject); + } + } + } +} |
