using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Timers; using Tango.Core; using Tango.Insights; using Tango.Integration.Operation; using Tango.Logging; using Tango.PMR.Diagnostics; using Tango.PMR.Insights; using Tango.Telemetry.Telemetries; namespace Tango.Telemetry.Sources { public class TelemetryDiagnosticsStreamingSource : TelemetryConfigurableSource, ITelemetryStreamingSource { public const int MIN_SAMPLING_INTERVAL_SECONDS = 1; private IMachineOperator _machineOperator; private Timer _diagnosticsSamplingTimer; private List _diagnosticsQueue; private bool _writing; private bool _emptyWritten; public event EventHandler TelemetryAvailable; public string Name { get; private set; } = "Diagnostics"; public bool RequiresTelemetryDuplicationTracking { get => false; } public bool IsStarted { get; private set; } private TelemetryDiagnosticsStreamingSource() : base() { _diagnosticsQueue = new List(); } public TelemetryDiagnosticsStreamingSource(IMachineOperator machineOperator) : base() { _machineOperator = machineOperator; _machineOperator.DiagnosticsDataAvailable += DiagnosticsDataAvailable; } private void DiagnosticsDataAvailable(object sender, StartDiagnosticsResponse diagnostics) { if (IsStarted && diagnostics.Monitors != null) { _diagnosticsQueue.Add(diagnostics); } } public void Start() { IsStarted = true; if (Config.DiagnosticsSamplingInterval.TotalSeconds < MIN_SAMPLING_INTERVAL_SECONDS) { Config.DiagnosticsSamplingInterval = TimeSpan.FromSeconds(MIN_SAMPLING_INTERVAL_SECONDS); } if (_diagnosticsSamplingTimer == null) { _diagnosticsSamplingTimer = new Timer(); _diagnosticsSamplingTimer.Interval = Config.DiagnosticsSamplingInterval.TotalMilliseconds; _diagnosticsSamplingTimer.Elapsed += DiagnosticsSamplingTimer_Elapsed; } _diagnosticsQueue.Clear(); _writing = false; _diagnosticsSamplingTimer.Start(); } public void Stop() { IsStarted = false; } private void DiagnosticsSamplingTimer_Elapsed(object sender, ElapsedEventArgs e) { if (!IsStarted || _writing) return; try { _diagnosticsSamplingTimer.Stop(); _writing = true; if (_diagnosticsQueue.Count > 0) { var queue = _diagnosticsQueue.ToList(); _diagnosticsQueue.Clear(); _emptyWritten = false; var monitorsAvg = InsightsHelper.AverageMonitors(queue.Select(x => x.Monitors).ToList()); queue.Clear(); TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); frame.Monitors = monitorsAvg; frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs() { TelemetryObject = frame }); } else { if (!_emptyWritten) { TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); frame.Monitors = new InsightsMonitors(); frame.Time = DateTime.UtcNow.Subtract(Config.DiagnosticsSamplingInterval); TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs() { TelemetryObject = frame }); _emptyWritten = true; } } } catch (Exception ex) { LogManager.Log(ex, "Error occurred on insights frame insertion."); } finally { _writing = false; _diagnosticsSamplingTimer.Start(); } } public void Dispose() { _machineOperator.DiagnosticsDataAvailable -= DiagnosticsDataAvailable; } } }