aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Visual_Studio')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs1
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs1
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs2
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs1
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs15
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj5
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs4
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs (renamed from Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageManager.cs)6
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPendingDestination.cs18
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs38
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs116
11 files changed, 182 insertions, 25 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs
index 713805471..75307f844 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs
@@ -12,6 +12,7 @@ namespace Tango.Telemetry.Destinations
{
private DeviceClient _hubClient;
+ public string Name { get; private set; } = "Azure IoT Hub";
public bool Enable { get; set; } = true;
public String ConnectionString { get; private set; }
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
index a37d149a1..2a4d3a297 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
@@ -19,6 +19,7 @@ namespace Tango.Telemetry.Destinations
private IMqttClient _mqttClient;
private IMqttClientOptions _mqttOptions;
+ public string Name { get; private set; } = "MQTT";
public bool Enable { get; set; } = true;
public String Address { get; private set; }
public int Port { get; private set; }
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs
index 3aa6f10a9..7059a3c75 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetry.cs
@@ -13,7 +13,7 @@ namespace Tango.Telemetry
[BsonId(true)]
int Id { get; set; }
DateTime Time { get; set; }
- List<String> PendingDestinations { get; set; }
+ List<TelemetryPendingDestination> PendingDestinations { get; set; }
String ToJson(Formatting format = Formatting.None, bool flatten = true);
byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true);
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs
index 7c1c1032b..a8cc46fd5 100644
--- a/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryDestination.cs
@@ -9,6 +9,7 @@ namespace Tango.Telemetry
public interface ITelemetryDestination : IDisposable
{
bool Enable { get; set; }
+ String Name { get; }
IReadOnlyList<TelemetrySource> SupportedSources { get; }
Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties);
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs
new file mode 100644
index 000000000..139600af0
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/ITelemetryPendingStorageManager.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Tango.Telemetry
+{
+ public interface ITelemetryPendingStorageManager
+ {
+ void InsertOrUpdateTelemetryObject<T>(T telemetry) where T : ITelemetry;
+ void DeleteTelemetryObject<T>(T telemetry) where T : ITelemetry;
+ List<ITelemetry> GetTelemetryAll();
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
index 03ac99116..0cd4c9106 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
+++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
@@ -200,16 +200,19 @@
<Compile Include="ExtensionMethods\ITelemetryExtensions.cs" />
<Compile Include="ITelemetryDestination.cs" />
<Compile Include="Destinations\MqttTelemetryDestination.cs" />
+ <Compile Include="ITelemetryPendingStorageManager.cs" />
<Compile Include="TelemetryNameAttribute.cs" />
<Compile Include="TelemetryPackagePublishedEventArgs.cs" />
<Compile Include="TelemetryPackagePublishFailedEventArgs.cs" />
+ <Compile Include="TelemetryPendingDestination.cs" />
+ <Compile Include="TelemetryPublisherAdvanced.cs" />
<Compile Include="TelemetryPublisherEventArgs.cs" />
<Compile Include="TelemetryPackagePublishingEventArgs.cs" />
<Compile Include="TelemetryPublishPackage.cs" />
<Compile Include="TelemetryPublisher.cs" />
<Compile Include="TelemetryPublisherConfiguration.cs" />
<Compile Include="TelemetrySource.cs" />
- <Compile Include="TelemetryPendingStorageManager.cs" />
+ <Compile Include="TelemetryLiteDbPendingStorageManager.cs" />
<Compile Include="ITelemetry.cs" />
<Compile Include="JsonFlattener.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
index 3eda84a67..ca6c2925e 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs
@@ -13,11 +13,11 @@ namespace Tango.Telemetry
[BsonId(true)]
public int Id { get; set; }
public DateTime Time { get; set; }
- public List<string> PendingDestinations { get; set; }
+ public List<TelemetryPendingDestination> PendingDestinations { get; set; }
public TelemetryBase()
{
- PendingDestinations = new List<string>();
+ PendingDestinations = new List<TelemetryPendingDestination>();
}
public byte[] ToBytes(Formatting format = Formatting.None, bool flatten = true)
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs
index 70903b833..47b8db826 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageManager.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDbPendingStorageManager.cs
@@ -8,14 +8,14 @@ using System.Threading.Tasks;
namespace Tango.Telemetry
{
- public class TelemetryPendingStorageManager
+ public class TelemetryLiteDbPendingStorageManager : ITelemetryPendingStorageManager
{
private bool _disposed;
private LiteDatabase _database;
public String DatabasePath { get; private set; }
- public TelemetryPendingStorageManager()
+ public TelemetryLiteDbPendingStorageManager()
{
DatabasePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Twine", "Tango", "Telemetry", Path.GetFileNameWithoutExtension(AppDomain.CurrentDomain.FriendlyName) + ".telemetry");
Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath));
@@ -40,7 +40,7 @@ namespace Tango.Telemetry
}
}
- ~TelemetryPendingStorageManager()
+ ~TelemetryLiteDbPendingStorageManager()
{
Dispose();
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingDestination.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingDestination.cs
new file mode 100644
index 000000000..265cb14e0
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingDestination.cs
@@ -0,0 +1,18 @@
+using System;
+
+namespace Tango.Telemetry
+{
+ public class TelemetryPendingDestination
+ {
+ public string Name { get; set; }
+
+ // Tracks how many retry attempts have been made for this destination.
+ public int RetryCount { get; set; } = 0;
+
+ // The UTC timestamp of the last attempt made.
+ public DateTime LastAttempt { get; set; } = DateTime.MinValue;
+
+ // The UTC timestamp when the next attempt should be allowed, supporting exponential backoff.
+ public DateTime NextEligibleAttempt { get; set; } = DateTime.MinValue;
+ }
+} \ No newline at end of file
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
index 81cd88b2d..662df9bae 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisher.cs
@@ -34,21 +34,21 @@ namespace Tango.Telemetry
private bool _emptyWritten;
private IMachineOperator _machineOperator;
- private TelemetryPendingStorageManager _pendingStorageManager;
#region Properties
public TelemetryPublisherConfiguration Config { get; private set; }
+ public ITelemetryPendingStorageManager PendingStorageManager { get; private set; }
public bool IsStarted { get; private set; }
#endregion
#region Constructor
- public TelemetryPublisher(IMachineOperator machineOperator, TelemetryPendingStorageManager storageManager, TelemetryPublisherConfiguration config)
+ public TelemetryPublisher(IMachineOperator machineOperator, ITelemetryPendingStorageManager storageManager, TelemetryPublisherConfiguration config)
{
_machineOperator = machineOperator;
- _pendingStorageManager = storageManager;
+ PendingStorageManager = storageManager;
Config = config ?? new TelemetryPublisherConfiguration();
@@ -191,7 +191,7 @@ namespace Tango.Telemetry
private void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e)
{
- var telemetryAll = _pendingStorageManager.GetTelemetryAll();
+ var telemetryAll = PendingStorageManager.GetTelemetryAll();
foreach (var t in telemetryAll)
{
@@ -239,7 +239,7 @@ namespace Tango.Telemetry
}
}
- private async Task PublishTelemetryPackage(TelemetryPublishPackage package)
+ protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package)
{
List<KeyValuePair<String, String>> properties = new List<KeyValuePair<string, string>>();
@@ -247,26 +247,28 @@ namespace Tango.Telemetry
properties.Add(new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()));
properties.Add(new KeyValuePair<string, string>("Environment", Config.Environment));
- List<String> pendingDestinations = package.TelemetryObject.PendingDestinations.ToList();
+ List<TelemetryPendingDestination> pendingDestinations = package.TelemetryObject.PendingDestinations.ToList();
//Add all destinations if streaming or external (They will be remove later if successfull)
//If source is "PendingStorage" the "PendingDestination" would be already propagated from the pending storage db.
if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage)
{
- pendingDestinations.AddRange(Config.TelemetryDestinations.Select(x => x.GetType().Name));
+ foreach (var destination in Config.TelemetryDestinations)
+ {
+ if (!pendingDestinations.Exists(x => x.Name == destination.Name))
+ {
+ pendingDestinations.Add(new TelemetryPendingDestination { Name = destination.Name });
+ }
+ }
}
- pendingDestinations = pendingDestinations.Distinct().ToList();
-
foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source)))
{
- String destinationName = destination.GetType().Name;
-
- if (pendingDestinations.Contains(destinationName))
+ if (pendingDestinations.Exists(x => x.Name == destination.Name))
{
try
{
- pendingDestinations.Remove(destinationName);
+ pendingDestinations.RemoveAll(x => x.Name == destination.Name);
if (OnPublishingPackage(package, destination))
{
await destination.Publish(package, properties);
@@ -275,15 +277,15 @@ namespace Tango.Telemetry
}
catch (Exception ex)
{
- LogManager.Log(ex, $"Error publishing telemetry package to destination {destinationName}.");
+ LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}.");
OnPackagePublishFailed(package, destination, ex);
if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage))
{
- if (!pendingDestinations.Contains(destinationName))
+ if (!pendingDestinations.Exists(x => x.Name == destination.Name))
{
- pendingDestinations.Add(destinationName);
+ pendingDestinations.Add(new TelemetryPendingDestination() { Name = destination.Name });
}
}
}
@@ -294,11 +296,11 @@ namespace Tango.Telemetry
if (package.Source == TelemetrySource.PendingStorage && package.TelemetryObject.PendingDestinations.Count == 0)
{
- _pendingStorageManager.DeleteTelemetryObject(package.TelemetryObject);
+ PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject);
}
else
{
- _pendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject);
+ PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject);
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
new file mode 100644
index 000000000..9ea817535
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherAdvanced.cs
@@ -0,0 +1,116 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Integration.Operation;
+
+namespace Tango.Telemetry
+{
+ public class TelemetryPublisherAdvanced : TelemetryPublisher
+ {
+ /// <summary>
+ /// Defines the maximum backoff delay between retries when exponential backoff is applied.
+ /// For example, setting to 1 hour means the retry interval will not exceed 1 hour regardless of the retry count.
+ /// </summary>
+ public TimeSpan MaxExponentialBackoff { get; set; } = TimeSpan.FromHours(1);
+
+ public TelemetryPublisherAdvanced(IMachineOperator machineOperator, TelemetryLiteDbPendingStorageManager storageManager, TelemetryPublisherConfiguration config) : base(machineOperator, storageManager, config)
+ {
+
+ }
+
+ protected override async Task PublishTelemetryPackage(TelemetryPublishPackage package)
+ {
+ List<KeyValuePair<string, string>> properties = new List<KeyValuePair<string, string>>
+ {
+ new KeyValuePair<string, string>("MachineID", Config.MachineID),
+ new KeyValuePair<string, string>("Model", Config.MachineType.ToShortName()),
+ new KeyValuePair<string, string>("Environment", Config.Environment)
+ };
+
+ var now = DateTime.UtcNow;
+ var pendingDestinations = package.TelemetryObject.PendingDestinations.ToList();
+
+ // For Streaming/External: initialize pending destinations list (used if publishing fails)
+ if (package.Source == TelemetrySource.Streaming || package.Source == TelemetrySource.ExternalStorage)
+ {
+ foreach (var dest in Config.TelemetryDestinations)
+ {
+ if (!pendingDestinations.Any(x => x.Name == dest.Name))
+ {
+ pendingDestinations.Add(new TelemetryPendingDestination
+ {
+ Name = dest.Name,
+ RetryCount = 0,
+ LastAttempt = DateTime.MinValue,
+ NextEligibleAttempt = now
+ });
+ }
+ }
+ }
+
+ foreach (var destination in Config.TelemetryDestinations.Where(x => x.SupportedSources.Contains(package.Source)))
+ {
+ var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name);
+
+ if (pendingEntry == null)
+ continue;
+
+ // Respect backoff timing
+ if (now < pendingEntry.NextEligibleAttempt)
+ continue;
+
+ try
+ {
+ if (OnPublishingPackage(package, destination))
+ {
+ await destination.Publish(package, properties);
+ OnPackagePublished(package, destination);
+
+ // On success: remove entry from pending list
+ pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ }
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, $"Error publishing telemetry package to destination {destination.Name}.");
+ OnPackagePublishFailed(package, destination, ex);
+
+ // Only track retry state if retry is supported
+ if (destination.SupportedSources.Contains(TelemetrySource.PendingStorage))
+ {
+ if (pendingEntry == null)
+ {
+ pendingEntry = new TelemetryPendingDestination { Name = destination.Name };
+ pendingDestinations.Add(pendingEntry);
+ }
+
+ pendingEntry.RetryCount++;
+ pendingEntry.LastAttempt = now;
+
+ // Apply exponential backoff
+ int delaySeconds = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)MaxExponentialBackoff.TotalSeconds);
+ pendingEntry.NextEligibleAttempt = now.AddSeconds(delaySeconds);
+ }
+ else
+ {
+ // Remove if not retryable
+ pendingDestinations.RemoveAll(x => x.Name == destination.Name);
+ }
+ }
+ }
+
+ package.TelemetryObject.PendingDestinations = new List<TelemetryPendingDestination>(pendingDestinations);
+
+ if (package.Source == TelemetrySource.PendingStorage && !pendingDestinations.Any())
+ {
+ PendingStorageManager.DeleteTelemetryObject(package.TelemetryObject);
+ }
+ else
+ {
+ PendingStorageManager.InsertOrUpdateTelemetryObject(package.TelemetryObject);
+ }
+ }
+ }
+}