diff options
| author | Roy Ben Shabat <roy.mail.net@gmail.com> | 2025-08-30 22:05:03 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <roy.mail.net@gmail.com> | 2025-08-30 22:05:03 +0300 |
| commit | a1d55aa060450b0a16b597794686b2aa41eea259 (patch) | |
| tree | 62dfd7829b81335103a3c2d1d17c99d8408a5e54 /Software/Visual_Studio/Tango.Telemetry | |
| parent | d9d5b0161efbce3f0d44e890b435883f49df2963 (diff) | |
| download | Tango-a1d55aa060450b0a16b597794686b2aa41eea259.tar.gz Tango-a1d55aa060450b0a16b597794686b2aa41eea259.zip | |
PPC Telemetry First Integration.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry')
7 files changed, 297 insertions, 62 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs new file mode 100644 index 000000000..10bc3981a --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs @@ -0,0 +1,208 @@ +using Microsoft.Azure.Devices.Client; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Core; +using Tango.Logging; +using Tango.Telemetry.Helpers; +// NEW: +using System.Timers; + +namespace Tango.Telemetry.Destinations +{ + /// <summary> + /// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport. + /// Now always batches and flushes once per second on a timer. + /// </summary> + public class TelemetryAzureHubBatchDestination : ExtendedObject, ITelemetryDestination + { + private DeviceClient _hubClient; + private int _batchSize; // kept for compatibility (no longer used for triggering sends) + private ConcurrentList<Message> _batch; + + // NEW: timer + lock to coordinate flushes + private Timer _flushTimer; + private readonly object _batchLock = new object(); + + private class ADXPAckage + { + public String Type { get; set; } + public String Environment { get; set; } + public String MachineType { get; set; } + public String SerialNumber { get; set; } + public String Organization { get; set; } + public String Site { get; set; } + public ITelemetry Telemetry { get; set; } + public DateTime UploadTime { get; set; } + public DateTime CreatedTime { get; set; } + public int Version { get; set; } + public String ToPayload() => JsonConvert.SerializeObject(this, Formatting.None); + } + + public string Name { get; set; } = "Azure IoT Hub"; + public string ConnectionString { get; private set; } + public ConnectionStatus HubConnectionStatus { get; private set; } + public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; } + + /// <summary> + /// Kept for backward compatibility. Value is clamped 1..10 but not used to trigger flushes anymore. + /// </summary> + public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); } + + private TelemetryAzureHubBatchDestination() + { + _batch = new ConcurrentList<Message>(); + HubConnectionStatus = ConnectionStatus.Connected; + SupportedSourceTypes = new List<TelemetrySourceTypes>() + { + TelemetrySourceTypes.PendingStorage, + TelemetrySourceTypes.Streaming, + TelemetrySourceTypes.ExternalStorage + }; + BatchSize = 1; + } + + public TelemetryAzureHubBatchDestination(string connectionString) : this() + { + ConnectionString = connectionString; + } + + public Task<bool> IsAvailable() + { + if (InternetConnectivity.IsInternetAvailable()) + { + if (_hubClient == null) return Task.FromResult(true); + return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); + } + return Task.FromResult(false); + } + + public Task Publish(TelemetryPublishPackage package, List<KeyValuePair<string, string>> properties) + { + if (_hubClient == null) + { + _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); + _hubClient.OperationTimeoutInMilliseconds = 2000; + _hubClient.SetConnectionStatusChangesHandler((status, reason) => + { + HubConnectionStatus = status; + LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info); + }); + + // NEW: start the periodic flusher (every 1s) + EnsureFlushTimer(); + } + + var adxPackage = new ADXPAckage + { + Type = package.TelemetryName, + Version = package.TelemetryVersion, + Environment = package.Environment, + MachineType = package.MachineType, + SerialNumber = package.SerialNumber, + Organization = package.Organization, + Site = package.Site, + UploadTime = DateTime.UtcNow, + CreatedTime = package.PendingTelemetry.TelemetryObject.Time, + Telemetry = package.PendingTelemetry.TelemetryObject + }; + + var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload())) + { + ContentType = "application/json", + ContentEncoding = "utf-8" + }; + + foreach (var prop in properties) + message.Properties.Add(prop.Key, prop.Value); + + // NEW: always enqueue; timer decides when to send + lock (_batchLock) + { + _batch.Add(message); + } + LogManager.Log("Queued telemetry message for 1s batch flush.", LogCategory.Debug); + + return Task.FromResult(true); + } + + // NEW: create the 1s timer (idempotent) + private void EnsureFlushTimer() + { + if (_flushTimer != null) return; + + _flushTimer = new Timer(1000); + _flushTimer.AutoReset = true; + _flushTimer.Elapsed += async (s, e) => + { + try + { + await FlushBatchAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + // If send fails, drop connection so IsAvailable() reflects it and upstream can retry later + try { _hubClient?.Dispose(); } catch { /* ignore */ } + _hubClient = null; + LogManager.Log($"Azure IoT Hub flush failed: {ex.Message}", LogCategory.Warning); + } + }; + _flushTimer.Start(); + } + + // NEW: send whatever accumulated since last tick + private async Task FlushBatchAsync() + { + List<Message> toSend = null; + + lock (_batchLock) + { + if (_batch.Count == 0) return; + toSend = _batch.ToList(); + _batch.Clear(); + } + + LogManager.Log($"Sending telemetry batch of {toSend.Count} messages to Azure IoT Hub.", LogCategory.Debug); + await _hubClient.SendEventBatchAsync(toSend); + } + + public void Dispose() + { + // NEW: stop timer first to avoid races + if (_flushTimer != null) + { + _flushTimer.Stop(); + _flushTimer.Dispose(); + _flushTimer = null; + } + + // Flush any remaining messages synchronously + try + { + List<Message> toSend = null; + lock (_batchLock) + { + if (_batch.Count > 0) + { + toSend = _batch.ToList(); + _batch.Clear(); + } + } + if (_hubClient != null && toSend != null && toSend.Count > 0) + { + LogManager.Log($"Flushing {toSend.Count} remaining messages to Azure IoT Hub.", LogCategory.Info); + _hubClient.SendEventBatchAsync(toSend).GetAwaiter().GetResult(); + } + } + finally + { + _hubClient?.Dispose(); + } + } + + public override string ToString() => Name; + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSource.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSource.cs index 657fa83e8..a5b2da647 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSource.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSource.cs @@ -37,7 +37,7 @@ namespace Tango.Telemetry.Sources _diagnosticsQueue = new List<StartDiagnosticsResponse>(); } - public TelemetryDiagnosticsStreamingSource(IMachineOperator machineOperator) : base() + public TelemetryDiagnosticsStreamingSource(IMachineOperator machineOperator) : this() { _machineOperator = machineOperator; _machineOperator.DiagnosticsDataAvailable += DiagnosticsDataAvailable; diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSourceConfig.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSourceConfig.cs index c161ed680..032d318d9 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSourceConfig.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryDiagnosticsStreamingSourceConfig.cs @@ -13,7 +13,7 @@ namespace Tango.Telemetry.Sources public TelemetryDiagnosticsStreamingSourceConfig() { - DiagnosticsSamplingInterval = TimeSpan.FromSeconds(10); + DiagnosticsSamplingInterval = TimeSpan.FromSeconds(60); } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobStatusSource.cs b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobStatusSource.cs index 93a5cc54e..9f7488909 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobStatusSource.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Sources/TelemetryJobStatusSource.cs @@ -6,6 +6,8 @@ using System.Threading.Tasks; using Tango.BL.Entities; using Tango.Integration.Operation; using Tango.Telemetry.Telemetries; +// NEW: +using System.Timers; namespace Tango.Telemetry.Sources { @@ -17,6 +19,12 @@ namespace Tango.Telemetry.Sources private String _groupID; private DateTime _startTime; + // NEW: throttle state + private readonly object _lock = new object(); + private DateTime _lastStatusArrivedUtc; + private DateTime _lastEmittedUtc; + private Timer _emitTimer; // ticks every 1s + public bool IsStarted { get; private set; } public string Name { get; } = "Job Status Streaming"; public bool RequiresTelemetryDuplicationTracking { get; } @@ -34,6 +42,15 @@ namespace Tango.Telemetry.Sources { IsStarted = true; _machineOperator.PrintingStarted += MachineOperator_PrintingStarted; + + // NEW: start 1s emitter (idempotent) + if (_emitTimer == null) + { + _emitTimer = new Timer(1000); + _emitTimer.AutoReset = true; + _emitTimer.Elapsed += EmitTimer_Elapsed; + _emitTimer.Start(); + } } } @@ -43,6 +60,15 @@ namespace Tango.Telemetry.Sources { IsStarted = false; _machineOperator.PrintingStarted -= MachineOperator_PrintingStarted; + + // NEW: stop timer + if (_emitTimer != null) + { + _emitTimer.Stop(); + _emitTimer.Elapsed -= EmitTimer_Elapsed; + _emitTimer.Dispose(); + _emitTimer = null; + } } } @@ -58,70 +84,80 @@ namespace Tango.Telemetry.Sources } } - private void JobHandler_StatusChanged(object sender, RunningJobStatus status) + // NEW: timer tick — emit only the latest status observed within last 5 seconds, once per tick + private void EmitTimer_Elapsed(object sender, ElapsedEventArgs e) { - if (IsStarted) - { - TelemetryJobStatus tStatus = new TelemetryJobStatus(); + if (!IsStarted) return; - tStatus.JobName = _job?.Name; + TelemetryJobStatus toEmit = null; + DateTime now = DateTime.UtcNow; - if (status.ProgressMinusSettingUp > 0) - { - tStatus.Status = TelemetryJobStatus.JobStatus.InProgress; - } - - if (status.IsCompleted) - { - tStatus.Status = TelemetryJobStatus.JobStatus.Completed; - } - - if (status.IsFailed) + lock (_lock) + { + // Only emit if we have a newer status than last emitted, + // and that status is fresh (arrived within the last 5 seconds). + if (_lastStatus != null && + _lastStatusArrivedUtc > _lastEmittedUtc && + (now - _lastStatusArrivedUtc) <= TimeSpan.FromSeconds(5)) { - tStatus.Status = TelemetryJobStatus.JobStatus.Failed; + toEmit = _lastStatus; + _lastEmittedUtc = now; // mark emission (1 per tick max) } + } - if (status.IsCanceled) + if (toEmit != null) + { + TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs { - tStatus.Status = TelemetryJobStatus.JobStatus.Aborted; - } - - tStatus.ID = _groupID; - tStatus.TotalTime = status.TotalTime; - tStatus.RemainingTime = status.RemainingTime; - - tStatus.Progress = status.ProgressMinusSettingUp; - tStatus.TotalProgress = status.TotalProgressMinusSettingUp; + TelemetryObject = toEmit, + DisableDeliveryRetries = true + }); + } + } - tStatus.CurrentUnit = status.CurrentUnit; - tStatus.RemainingUnits = status.RemainingUnits; + private void JobHandler_StatusChanged(object sender, RunningJobStatus status) + { + if (!IsStarted) return; - tStatus.CurrentUnitProgress = status.CurrentUnitProgress; - tStatus.CurrentUnitTotalProgress = status.CurrentUnitTotalProgress; + // CHANGED: just update the latest snapshot fast; do NOT emit here. + var tStatus = new TelemetryJobStatus + { + JobName = _job?.Name, + ID = _groupID, + TotalTime = status.TotalTime, + RemainingTime = status.RemainingTime, + Progress = status.ProgressMinusSettingUp, + TotalProgress = status.TotalProgressMinusSettingUp, + CurrentUnit = status.CurrentUnit, + RemainingUnits = status.RemainingUnits, + CurrentUnitProgress = status.CurrentUnitProgress, + CurrentUnitTotalProgress = status.CurrentUnitTotalProgress, + Message = status.Message + }; - tStatus.Message = status.Message; + if (status.ProgressMinusSettingUp > 0) tStatus.Status = TelemetryJobStatus.JobStatus.InProgress; + if (status.IsCompleted) tStatus.Status = TelemetryJobStatus.JobStatus.Completed; + if (status.IsFailed) tStatus.Status = TelemetryJobStatus.JobStatus.Failed; + if (status.IsCanceled) tStatus.Status = TelemetryJobStatus.JobStatus.Aborted; + lock (_lock) + { _lastStatus = tStatus; - - TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs() - { - TelemetryObject = tStatus, - DisableDeliveryRetries = true - }); + _lastStatusArrivedUtc = DateTime.UtcNow; } } private void JobHandler_Failed(object sender, Exception e) { - if (_lastStatus != null) + // CHANGED: update the latest snapshot and let the timer emit it (within ~1s) + lock (_lock) { - _lastStatus.Message = e.FlattenMessage(); - - TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs() + if (_lastStatus != null) { - TelemetryObject = _lastStatus, - DisableDeliveryRetries = true - }); + _lastStatus.Message = e.FlattenMessage(); + _lastStatus.Status = TelemetryJobStatus.JobStatus.Failed; + _lastStatusArrivedUtc = DateTime.UtcNow; + } } } diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj index 611734724..dc61f443d 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj +++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj @@ -242,6 +242,7 @@ </Reference> </ItemGroup> <ItemGroup> + <Compile Include="Destinations\TelemetryAzureHubBatchDestination.cs" /> <Compile Include="Helpers\DateTimeUTCFixer.cs" /> <Compile Include="Destinations\TelemetryAzureHubDestination.cs" /> <Compile Include="ExtensionMethods\ITelemetryExtensions.cs" /> diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs index aa50041ca..ab0ff2b58 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs @@ -56,23 +56,13 @@ namespace Tango.Telemetry Directory.CreateDirectory(Path.GetDirectoryName(DatabasePath)); - String backupPath = DatabasePath + ".bak"; - if (!File.Exists(DatabasePath)) { - if (File.Exists(backupPath)) - { - LogManager.Log("Telemetry database missing. Attempting to restore from backup.", LogCategory.Error); - File.Copy(backupPath, DatabasePath, overwrite: true); - } - else - { - LogManager.Log("Telemetry database was not found. A new one will be created and source checkpoints will be recovered from the remote service if required.", LogCategory.Critical); + LogManager.Log("Telemetry database was not found. A new one will be created and source checkpoints will be recovered from the remote service if required.", LogCategory.Critical); - if (_checkpointsRecoveryClient == null && EnableCheckPointsRecovery && EnforceCheckpointsRecovery) - { - throw new NullReferenceException("No TelemetryCheckpointsRecoveryClient was introduced. Telemetry Storage manager should not operate."); - } + if (_checkpointsRecoveryClient == null && EnableCheckPointsRecovery && EnforceCheckpointsRecovery) + { + throw new NullReferenceException("No TelemetryCheckpointsRecoveryClient was introduced. Telemetry Storage manager should not operate."); } } diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs index ceb405a4e..07ef8061e 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPublisherConfiguration.cs @@ -42,7 +42,7 @@ namespace Tango.Telemetry /// <summary> /// Maximum number of telemetry packages allowed in memory queues before rejecting new packages. /// </summary> - public int MaxPendingTelemetries { get; set; } = 200; + public int MaxPendingTelemetries { get; set; } = 10000; /// <summary> /// Whether exponential backoff should be applied to retry logic per destination. |
