aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry
diff options
context:
space:
mode:
authorRoy Ben Shabat <roy.mail.net@gmail.com>2025-08-30 22:05:03 +0300
committerRoy Ben Shabat <roy.mail.net@gmail.com>2025-08-30 22:05:03 +0300
commita1d55aa060450b0a16b597794686b2aa41eea259 (patch)
tree62dfd7829b81335103a3c2d1d17c99d8408a5e54 /Software/Visual_Studio/Tango.Telemetry
parentd9d5b0161efbce3f0d44e890b435883f49df2963 (diff)
downloadTango-a1d55aa060450b0a16b597794686b2aa41eea259.tar.gz
Tango-a1d55aa060450b0a16b597794686b2aa41eea259.zip
PPC Telemetry First Integration.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs208
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSource.cs2
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSourceConfig.cs2
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobStatusSource.cs126
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj1
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs18
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs2
7 files changed, 297 insertions, 62 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs
new file mode 100644
index 000000000..10bc3981a
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs
@@ -0,0 +1,208 @@
+using Microsoft.Azure.Devices.Client;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Core;
+using Tango.Logging;
+using Tango.Telemetry.Helpers;
+// NEW:
+using System.Timers;
+
+namespace Tango.Telemetry.Destinations
+{
+ /// <summary>
+ /// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport.
+ /// Now always batches and flushes once per second on a timer.
+ /// </summary>
+ public class TelemetryAzureHubBatchDestination : ExtendedObject, ITelemetryDestination
+ {
+ private DeviceClient _hubClient;
+ private int _batchSize; // kept for compatibility (no longer used for triggering sends)
+ private ConcurrentList<Message> _batch;
+
+ // NEW: timer + lock to coordinate flushes
+ private Timer _flushTimer;
+ private readonly object _batchLock = new object();
+
+ private class ADXPAckage
+ {
+ public String Type { get; set; }
+ public String Environment { get; set; }
+ public String MachineType { get; set; }
+ public String SerialNumber { get; set; }
+ public String Organization { get; set; }
+ public String Site { get; set; }
+ public ITelemetry Telemetry { get; set; }
+ public DateTime UploadTime { get; set; }
+ public DateTime CreatedTime { get; set; }
+ public int Version { get; set; }
+ public String ToPayload() => JsonConvert.SerializeObject(this, Formatting.None);
+ }
+
+ public string Name { get; set; } = "Azure IoT Hub";
+ public string ConnectionString { get; private set; }
+ public ConnectionStatus HubConnectionStatus { get; private set; }
+ public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; }
+
+ /// <summary>
+ /// Kept for backward compatibility. Value is clamped 1..10 but not used to trigger flushes anymore.
+ /// </summary>
+ public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); }
+
+ private TelemetryAzureHubBatchDestination()
+ {
+ _batch = new ConcurrentList<Message>();
+ HubConnectionStatus = ConnectionStatus.Connected;
+ SupportedSourceTypes = new List<TelemetrySourceTypes>()
+ {
+ TelemetrySourceTypes.PendingStorage,
+ TelemetrySourceTypes.Streaming,
+ TelemetrySourceTypes.ExternalStorage
+ };
+ BatchSize = 1;
+ }
+
+ public TelemetryAzureHubBatchDestination(string connectionString) : this()
+ {
+ ConnectionString = connectionString;
+ }
+
+ public Task<bool> IsAvailable()
+ {
+ if (InternetConnectivity.IsInternetAvailable())
+ {
+ if (_hubClient == null) return Task.FromResult(true);
+ return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected);
+ }
+ return Task.FromResult(false);
+ }
+
+ public Task Publish(TelemetryPublishPackage package, List<KeyValuePair<string, string>> properties)
+ {
+ if (_hubClient == null)
+ {
+ _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
+ _hubClient.OperationTimeoutInMilliseconds = 2000;
+ _hubClient.SetConnectionStatusChangesHandler((status, reason) =>
+ {
+ HubConnectionStatus = status;
+ LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info);
+ });
+
+ // NEW: start the periodic flusher (every 1s)
+ EnsureFlushTimer();
+ }
+
+ var adxPackage = new ADXPAckage
+ {
+ Type = package.TelemetryName,
+ Version = package.TelemetryVersion,
+ Environment = package.Environment,
+ MachineType = package.MachineType,
+ SerialNumber = package.SerialNumber,
+ Organization = package.Organization,
+ Site = package.Site,
+ UploadTime = DateTime.UtcNow,
+ CreatedTime = package.PendingTelemetry.TelemetryObject.Time,
+ Telemetry = package.PendingTelemetry.TelemetryObject
+ };
+
+ var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload()))
+ {
+ ContentType = "application/json",
+ ContentEncoding = "utf-8"
+ };
+
+ foreach (var prop in properties)
+ message.Properties.Add(prop.Key, prop.Value);
+
+ // NEW: always enqueue; timer decides when to send
+ lock (_batchLock)
+ {
+ _batch.Add(message);
+ }
+ LogManager.Log("Queued telemetry message for 1s batch flush.", LogCategory.Debug);
+
+ return Task.FromResult(true);
+ }
+
+ // NEW: create the 1s timer (idempotent)
+ private void EnsureFlushTimer()
+ {
+ if (_flushTimer != null) return;
+
+ _flushTimer = new Timer(1000);
+ _flushTimer.AutoReset = true;
+ _flushTimer.Elapsed += async (s, e) =>
+ {
+ try
+ {
+ await FlushBatchAsync().ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ // If send fails, drop connection so IsAvailable() reflects it and upstream can retry later
+ try { _hubClient?.Dispose(); } catch { /* ignore */ }
+ _hubClient = null;
+ LogManager.Log($"Azure IoT Hub flush failed: {ex.Message}", LogCategory.Warning);
+ }
+ };
+ _flushTimer.Start();
+ }
+
+ // NEW: send whatever accumulated since last tick
+ private async Task FlushBatchAsync()
+ {
+ List<Message> toSend = null;
+
+ lock (_batchLock)
+ {
+ if (_batch.Count == 0) return;
+ toSend = _batch.ToList();
+ _batch.Clear();
+ }
+
+ LogManager.Log($"Sending telemetry batch of {toSend.Count} messages to Azure IoT Hub.", LogCategory.Debug);
+ await _hubClient.SendEventBatchAsync(toSend);
+ }
+
+ public void Dispose()
+ {
+ // NEW: stop timer first to avoid races
+ if (_flushTimer != null)
+ {
+ _flushTimer.Stop();
+ _flushTimer.Dispose();
+ _flushTimer = null;
+ }
+
+ // Flush any remaining messages synchronously
+ try
+ {
+ List<Message> toSend = null;
+ lock (_batchLock)
+ {
+ if (_batch.Count > 0)
+ {
+ toSend = _batch.ToList();
+ _batch.Clear();
+ }
+ }
+ if (_hubClient != null && toSend != null && toSend.Count > 0)
+ {
+ LogManager.Log($"Flushing {toSend.Count} remaining messages to Azure IoT Hub.", LogCategory.Info);
+ _hubClient.SendEventBatchAsync(toSend).GetAwaiter().GetResult();
+ }
+ }
+ finally
+ {
+ _hubClient?.Dispose();
+ }
+ }
+
+ public override string ToString() => Name;
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSource.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSource.cs
index 657fa83e8..a5b2da647 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSource.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSource.cs
@@ -37,7 +37,7 @@ namespace Tango.Telemetry.Sources
_diagnosticsQueue = new List<StartDiagnosticsResponse>();
}
- public TelemetryDiagnosticsStreamingSource(IMachineOperator machineOperator) : base()
+ public TelemetryDiagnosticsStreamingSource(IMachineOperator machineOperator) : this()
{
_machineOperator = machineOperator;
_machineOperator.DiagnosticsDataAvailable += DiagnosticsDataAvailable;
diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSourceConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSourceConfig.cs
index c161ed680..032d318d9 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSourceConfig.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSourceConfig.cs
@@ -13,7 +13,7 @@ namespace Tango.Telemetry.Sources
public TelemetryDiagnosticsStreamingSourceConfig()
{
- DiagnosticsSamplingInterval = TimeSpan.FromSeconds(10);
+ DiagnosticsSamplingInterval = TimeSpan.FromSeconds(60);
}
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobStatusSource.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobStatusSource.cs
index 93a5cc54e..9f7488909 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobStatusSource.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobStatusSource.cs
@@ -6,6 +6,8 @@ using System.Threading.Tasks;
using Tango.BL.Entities;
using Tango.Integration.Operation;
using Tango.Telemetry.Telemetries;
+// NEW:
+using System.Timers;
namespace Tango.Telemetry.Sources
{
@@ -17,6 +19,12 @@ namespace Tango.Telemetry.Sources
private String _groupID;
private DateTime _startTime;
+ // NEW: throttle state
+ private readonly object _lock = new object();
+ private DateTime _lastStatusArrivedUtc;
+ private DateTime _lastEmittedUtc;
+ private Timer _emitTimer; // ticks every 1s
+
public bool IsStarted { get; private set; }
public string Name { get; } = "Job Status Streaming";
public bool RequiresTelemetryDuplicationTracking { get; }
@@ -34,6 +42,15 @@ namespace Tango.Telemetry.Sources
{
IsStarted = true;
_machineOperator.PrintingStarted += MachineOperator_PrintingStarted;
+
+ // NEW: start 1s emitter (idempotent)
+ if (_emitTimer == null)
+ {
+ _emitTimer = new Timer(1000);
+ _emitTimer.AutoReset = true;
+ _emitTimer.Elapsed += EmitTimer_Elapsed;
+ _emitTimer.Start();
+ }
}
}
@@ -43,6 +60,15 @@ namespace Tango.Telemetry.Sources
{
IsStarted = false;
_machineOperator.PrintingStarted -= MachineOperator_PrintingStarted;
+
+ // NEW: stop timer
+ if (_emitTimer != null)
+ {
+ _emitTimer.Stop();
+ _emitTimer.Elapsed -= EmitTimer_Elapsed;
+ _emitTimer.Dispose();
+ _emitTimer = null;
+ }
}
}
@@ -58,70 +84,80 @@ namespace Tango.Telemetry.Sources
}
}
- private void JobHandler_StatusChanged(object sender, RunningJobStatus status)
+ // NEW: timer tick — emit only the latest status observed within last 5 seconds, once per tick
+ private void EmitTimer_Elapsed(object sender, ElapsedEventArgs e)
{
- if (IsStarted)
- {
- TelemetryJobStatus tStatus = new TelemetryJobStatus();
+ if (!IsStarted) return;
- tStatus.JobName = _job?.Name;
+ TelemetryJobStatus toEmit = null;
+ DateTime now = DateTime.UtcNow;
- if (status.ProgressMinusSettingUp > 0)
- {
- tStatus.Status = TelemetryJobStatus.JobStatus.InProgress;
- }
-
- if (status.IsCompleted)
- {
- tStatus.Status = TelemetryJobStatus.JobStatus.Completed;
- }
-
- if (status.IsFailed)
+ lock (_lock)
+ {
+ // Only emit if we have a newer status than last emitted,
+ // and that status is fresh (arrived within the last 5 seconds).
+ if (_lastStatus != null &&
+ _lastStatusArrivedUtc > _lastEmittedUtc &&
+ (now - _lastStatusArrivedUtc) <= TimeSpan.FromSeconds(5))
{
- tStatus.Status = TelemetryJobStatus.JobStatus.Failed;
+ toEmit = _lastStatus;
+ _lastEmittedUtc = now; // mark emission (1 per tick max)
}
+ }
- if (status.IsCanceled)
+ if (toEmit != null)
+ {
+ TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs
{
- tStatus.Status = TelemetryJobStatus.JobStatus.Aborted;
- }
-
- tStatus.ID = _groupID;
- tStatus.TotalTime = status.TotalTime;
- tStatus.RemainingTime = status.RemainingTime;
-
- tStatus.Progress = status.ProgressMinusSettingUp;
- tStatus.TotalProgress = status.TotalProgressMinusSettingUp;
+ TelemetryObject = toEmit,
+ DisableDeliveryRetries = true
+ });
+ }
+ }
- tStatus.CurrentUnit = status.CurrentUnit;
- tStatus.RemainingUnits = status.RemainingUnits;
+ private void JobHandler_StatusChanged(object sender, RunningJobStatus status)
+ {
+ if (!IsStarted) return;
- tStatus.CurrentUnitProgress = status.CurrentUnitProgress;
- tStatus.CurrentUnitTotalProgress = status.CurrentUnitTotalProgress;
+ // CHANGED: just update the latest snapshot fast; do NOT emit here.
+ var tStatus = new TelemetryJobStatus
+ {
+ JobName = _job?.Name,
+ ID = _groupID,
+ TotalTime = status.TotalTime,
+ RemainingTime = status.RemainingTime,
+ Progress = status.ProgressMinusSettingUp,
+ TotalProgress = status.TotalProgressMinusSettingUp,
+ CurrentUnit = status.CurrentUnit,
+ RemainingUnits = status.RemainingUnits,
+ CurrentUnitProgress = status.CurrentUnitProgress,
+ CurrentUnitTotalProgress = status.CurrentUnitTotalProgress,
+ Message = status.Message
+ };
- tStatus.Message = status.Message;
+ if (status.ProgressMinusSettingUp > 0) tStatus.Status = TelemetryJobStatus.JobStatus.InProgress;
+ if (status.IsCompleted) tStatus.Status = TelemetryJobStatus.JobStatus.Completed;
+ if (status.IsFailed) tStatus.Status = TelemetryJobStatus.JobStatus.Failed;
+ if (status.IsCanceled) tStatus.Status = TelemetryJobStatus.JobStatus.Aborted;
+ lock (_lock)
+ {
_lastStatus = tStatus;
-
- TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs()
- {
- TelemetryObject = tStatus,
- DisableDeliveryRetries = true
- });
+ _lastStatusArrivedUtc = DateTime.UtcNow;
}
}
private void JobHandler_Failed(object sender, Exception e)
{
- if (_lastStatus != null)
+ // CHANGED: update the latest snapshot and let the timer emit it (within ~1s)
+ lock (_lock)
{
- _lastStatus.Message = e.FlattenMessage();
-
- TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs()
+ if (_lastStatus != null)
{
- TelemetryObject = _lastStatus,
- DisableDeliveryRetries = true
- });
+ _lastStatus.Message = e.FlattenMessage();
+ _lastStatus.Status = TelemetryJobStatus.JobStatus.Failed;
+ _lastStatusArrivedUtc = DateTime.UtcNow;
+ }
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
index 611734724..dc61f443d 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
+++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
@@ -242,6 +242,7 @@
</Reference>
</ItemGroup>
<ItemGroup>
+ <Compile Include="Destinations\TelemetryAzureHubBatchDestination.cs" />
<Compile Include="Helpers\DateTimeUTCFixer.cs" />
<Compile Include="Destinations\TelemetryAzureHubDestination.cs" />
<Compile Include="ExtensionMethods\ITelemetryExtensions.cs" />
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
index aa50041ca..ab0ff2b58 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs
@@ -56,23 +56,13 @@ namespace Tango.Telemetry
Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath));
- String backupPath = DatabasePath + ".bak";
-
if (!File.Exists(DatabasePath))
{
- if (File.Exists(backupPath))
- {
- LogManager.Log("Telemetry database missing. Attempting to restore from backup.", LogCategory.Error);
- File.Copy(backupPath, DatabasePath, overwrite: true);
- }
- else
- {
- LogManager.Log("Telemetry database was not found. A new one will be created and source checkpoints will be recovered from the remote service if required.", LogCategory.Critical);
+ LogManager.Log("Telemetry database was not found. A new one will be created and source checkpoints will be recovered from the remote service if required.", LogCategory.Critical);
- if (_checkpointsRecoveryClient == null && EnableCheckPointsRecovery && EnforceCheckpointsRecovery)
- {
- throw new NullReferenceException("No TelemetryCheckpointsRecoveryClient was introduced. Telemetry Storage manager should not operate.");
- }
+ if (_checkpointsRecoveryClient == null && EnableCheckPointsRecovery && EnforceCheckpointsRecovery)
+ {
+ throw new NullReferenceException("No TelemetryCheckpointsRecoveryClient was introduced. Telemetry Storage manager should not operate.");
}
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
index ceb405a4e..07ef8061e 100644
--- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs
@@ -42,7 +42,7 @@ namespace Tango.Telemetry
/// <summary>
/// Maximum number of telemetry packages allowed in memory queues before rejecting new packages.
/// </summary>
- public int MaxPendingTelemetries { get; set; } = 200;
+ public int MaxPendingTelemetries { get; set; } = 10000;
/// <summary>
/// Whether exponential backoff should be applied to retry logic per destination.