using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Tango.Core.Threading { /// /// Represents a message queue dispatcher for delivering messages at a constant interval. /// /// /// public class IntervalMessageDispatcher : IDisposable { private Thread _queueThread; private ProducerConsumerQueue _queue; private Action _onNext; private long _framesPushed; private long _framesDelivered; /// /// Gets or sets the interval. /// public int Interval { get; set; } /// /// Gets or sets the drift compensation interval in milliseconds. /// Meaning, when frames are pushed in a faster rate than delivery interval. /// This will trigger a compensation mechanism which will deliver frames faster until it reaches balance /// Between pushed and delivered frames. /// public int? DriftCompensationInterval { get; set; } /// /// Gets or sets the maximum frames count for drift compensation calculation (default 100). /// public int MaximumFramesForDriftCompensation { get; set; } /// /// Gets or sets the maximum allowed drift before starting to balance 0-1. default 0.1. /// public double MaximumDrift { get; set; } /// /// Gets a value indicating whether this instance has started. /// public bool IsStarted { get; private set; } /// /// Gets the number of queued messages. /// public int Count { get { return _queue.Count; } } /// /// Initializes a new instance of the class. /// /// The delivery callback. public IntervalMessageDispatcher(Action onNext) { _onNext = onNext; _queue = new ProducerConsumerQueue(); MaximumDrift = 0.1; MaximumFramesForDriftCompensation = 100; } /// /// Starts the dispatching of messages. /// public void Start() { if (!IsStarted) { _framesPushed = 0; _framesDelivered = 0; IsStarted = true; _queueThread = new Thread(QueueThreadMethod); _queueThread.Name = "Sequencer Thread"; _queueThread.IsBackground = true; _queueThread.Start(); } } /// /// Pushes the specified message. /// /// The message. public void Push(TMessage message) { _framesPushed++; _queue.BlockEnqueue(message); } private void QueueThreadMethod() { DateTime lastDriftCompensation = DateTime.Now; double requiredDriftCompensation = 0; bool balancing = false; double balancingFactor = 1; Stopwatch watch = new Stopwatch(); watch.Start(); while (IsStarted) { var item = _queue.BlockDequeue(); if (!IsStarted) break; watch.Restart(); try { _onNext?.Invoke(item); _framesDelivered++; } catch { } //Compensate drift between pushed frames and delivered frames if any. if (DriftCompensationInterval.HasValue) { if (balancing || (DateTime.Now - lastDriftCompensation).TotalMilliseconds > DriftCompensationInterval) { lastDriftCompensation = DateTime.Now; if (_framesPushed > MaximumFramesForDriftCompensation) { var reduction = _framesPushed - MaximumFramesForDriftCompensation; _framesPushed = MaximumFramesForDriftCompensation; _framesDelivered = _framesDelivered - reduction; } requiredDriftCompensation = (double)(_framesDelivered / _framesPushed); if (1d - requiredDriftCompensation >= MaximumDrift) { balancing = true; balancingFactor = Math.Max(0, balancingFactor - 0.1); } else { balancing = false; balancingFactor = 1; } //Debug.WriteLine($"Frames Pushed: {_framesPushed}, Frames Delivered: {_framesDelivered}, Required Compensation: {requiredDriftCompensation}, Balancing: {balancing}, Balancing Factor: {balancingFactor}"); //Debug.WriteLine("Sleep before: " + (int)(Interval - watch.ElapsedMilliseconds)); //Debug.WriteLine("Sleep After: " + (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor)); } } Thread.Sleep(Math.Max(0, (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor))); } } /// /// Releases unmanaged and - optionally - managed resources. /// public void Dispose() { IsStarted = false; } /// /// Creates a new instance of and starts it immediately. /// /// /// The on next. /// The interval. /// public static IntervalMessageDispatcher StartNew(Action onNext, int interval) { IntervalMessageDispatcher dispatcher = new IntervalMessageDispatcher(onNext); dispatcher.Interval = interval; dispatcher.Start(); return dispatcher; } } }