using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Tango.BL.Entities; using Tango.Integration.Operation; using Tango.Telemetry.Telemetries; // NEW: using System.Timers; namespace Tango.Telemetry.Sources { public class TelemetryJobStatusSource : ITelemetryStreamingSource { private IMachineOperator _machineOperator; private Job _job; private TelemetryJobStatus _lastStatus; private String _groupID; private DateTime _startTime; // NEW: throttle state private readonly object _lock = new object(); private DateTime _lastStatusArrivedUtc; private DateTime _lastEmittedUtc; private Timer _emitTimer; // ticks every 1s public bool IsStarted { get; private set; } public string Name { get; } = "Job Status Streaming"; public bool RequiresTelemetryDuplicationTracking { get; } public event EventHandler TelemetryAvailable; public TelemetryJobStatusSource(IMachineOperator machineOperator) { _machineOperator = machineOperator; } public void Start() { if (!IsStarted) { IsStarted = true; _machineOperator.PrintingStarted += MachineOperator_PrintingStarted; // NEW: start 1s emitter (idempotent) if (_emitTimer == null) { _emitTimer = new Timer(1000); _emitTimer.AutoReset = true; _emitTimer.Elapsed += EmitTimer_Elapsed; _emitTimer.Start(); } } } public void Stop() { if (IsStarted) { IsStarted = false; _machineOperator.PrintingStarted -= MachineOperator_PrintingStarted; // NEW: stop timer if (_emitTimer != null) { _emitTimer.Stop(); _emitTimer.Elapsed -= EmitTimer_Elapsed; _emitTimer.Dispose(); _emitTimer = null; } } } private void MachineOperator_PrintingStarted(object sender, PrintingEventArgs e) { if (e.JobHandler != null) { _groupID = Guid.NewGuid().ToString(); _job = e.Job; _startTime = DateTime.UtcNow; e.JobHandler.StatusChanged += JobHandler_StatusChanged; e.JobHandler.Failed += JobHandler_Failed; } } // NEW: timer tick — emit only the latest status observed within last 5 seconds, once per tick private void EmitTimer_Elapsed(object sender, ElapsedEventArgs e) { if (!IsStarted) return; TelemetryJobStatus toEmit = null; DateTime now = DateTime.UtcNow; lock (_lock) { // Only emit if we have a newer status than last emitted, // and that status is fresh (arrived within the last 5 seconds). if (_lastStatus != null && _lastStatusArrivedUtc > _lastEmittedUtc && (now - _lastStatusArrivedUtc) <= TimeSpan.FromSeconds(5)) { toEmit = _lastStatus; _lastEmittedUtc = now; // mark emission (1 per tick max) } } if (toEmit != null) { TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs { TelemetryObject = toEmit, DisableDeliveryRetries = true }); } } private void JobHandler_StatusChanged(object sender, RunningJobStatus status) { if (!IsStarted) return; // CHANGED: just update the latest snapshot fast; do NOT emit here. var tStatus = new TelemetryJobStatus { JobName = _job?.Name, ID = _groupID, TotalTime = status.TotalTime, RemainingTime = status.RemainingTime, Progress = status.ProgressMinusSettingUp, TotalProgress = status.TotalProgressMinusSettingUp, CurrentUnit = status.CurrentUnit, RemainingUnits = status.RemainingUnits, CurrentUnitProgress = status.CurrentUnitProgress, CurrentUnitTotalProgress = status.CurrentUnitTotalProgress, Message = status.Message }; if (status.ProgressMinusSettingUp > 0) tStatus.Status = TelemetryJobStatus.JobStatus.InProgress; if (status.IsCompleted) tStatus.Status = TelemetryJobStatus.JobStatus.Completed; if (status.IsFailed) tStatus.Status = TelemetryJobStatus.JobStatus.Failed; if (status.IsCanceled) tStatus.Status = TelemetryJobStatus.JobStatus.Aborted; lock (_lock) { _lastStatus = tStatus; _lastStatusArrivedUtc = DateTime.UtcNow; } } private void JobHandler_Failed(object sender, Exception e) { // CHANGED: update the latest snapshot and let the timer emit it (within ~1s) lock (_lock) { if (_lastStatus != null) { _lastStatus.Message = e.FlattenMessage(); _lastStatus.Status = TelemetryJobStatus.JobStatus.Failed; _lastStatusArrivedUtc = DateTime.UtcNow; } } } public void Dispose() { Stop(); } } }