using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Tango.Core.Threading
{
///
/// Represents a message queue dispatcher for delivering messages at a constant interval.
///
///
///
public class IntervalMessageDispatcher : IDisposable
{
private Thread _queueThread;
private ProducerConsumerQueue _queue;
private Action _onNext;
private long _framesPushed;
private long _framesDelivered;
///
/// Gets or sets the interval.
///
public int Interval { get; set; }
///
/// Gets or sets the drift compensation interval in milliseconds.
/// Meaning, when frames are pushed in a faster rate than delivery interval.
/// This will trigger a compensation mechanism which will deliver frames faster until it reaches balance
/// Between pushed and delivered frames.
///
public int? DriftCompensationInterval { get; set; }
///
/// Gets or sets the maximum frames count for drift compensation calculation (default 100).
///
public int MaximumFramesForDriftCompensation { get; set; }
///
/// Gets or sets the maximum allowed drift before starting to balance 0-1. default 0.1.
///
public double MaximumDrift { get; set; }
///
/// Gets a value indicating whether this instance has started.
///
public bool IsStarted { get; private set; }
///
/// Gets the number of queued messages.
///
public int Count
{
get { return _queue.Count; }
}
///
/// Initializes a new instance of the class.
///
/// The delivery callback.
public IntervalMessageDispatcher(Action onNext)
{
_onNext = onNext;
_queue = new ProducerConsumerQueue();
MaximumDrift = 0.1;
MaximumFramesForDriftCompensation = 100;
}
///
/// Starts the dispatching of messages.
///
public void Start()
{
if (!IsStarted)
{
_framesPushed = 0;
_framesDelivered = 0;
IsStarted = true;
_queueThread = new Thread(QueueThreadMethod);
_queueThread.Name = "Sequencer Thread";
_queueThread.IsBackground = true;
_queueThread.Start();
}
}
///
/// Pushes the specified message.
///
/// The message.
public void Push(TMessage message)
{
_framesPushed++;
_queue.BlockEnqueue(message);
}
private void QueueThreadMethod()
{
DateTime lastDriftCompensation = DateTime.Now;
double requiredDriftCompensation = 0;
bool balancing = false;
double balancingFactor = 1;
Stopwatch watch = new Stopwatch();
watch.Start();
while (IsStarted)
{
var item = _queue.BlockDequeue();
if (!IsStarted) break;
watch.Restart();
try
{
_onNext?.Invoke(item);
_framesDelivered++;
}
catch { }
//Compensate drift between pushed frames and delivered frames if any.
if (DriftCompensationInterval.HasValue)
{
if (balancing || (DateTime.Now - lastDriftCompensation).TotalMilliseconds > DriftCompensationInterval)
{
lastDriftCompensation = DateTime.Now;
if (_framesPushed > MaximumFramesForDriftCompensation)
{
var reduction = _framesPushed - MaximumFramesForDriftCompensation;
_framesPushed = MaximumFramesForDriftCompensation;
_framesDelivered = _framesDelivered - reduction;
}
requiredDriftCompensation = (double)(_framesDelivered / _framesPushed);
if (1d - requiredDriftCompensation >= MaximumDrift)
{
balancing = true;
balancingFactor = Math.Max(0, balancingFactor - 0.1);
}
else
{
balancing = false;
balancingFactor = 1;
}
//Debug.WriteLine($"Frames Pushed: {_framesPushed}, Frames Delivered: {_framesDelivered}, Required Compensation: {requiredDriftCompensation}, Balancing: {balancing}, Balancing Factor: {balancingFactor}");
//Debug.WriteLine("Sleep before: " + (int)(Interval - watch.ElapsedMilliseconds));
//Debug.WriteLine("Sleep After: " + (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor));
}
}
Thread.Sleep(Math.Max(0, (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor)));
}
}
///
/// Releases unmanaged and - optionally - managed resources.
///
public void Dispose()
{
IsStarted = false;
}
///
/// Creates a new instance of and starts it immediately.
///
///
/// The on next.
/// The interval.
///
public static IntervalMessageDispatcher StartNew(Action onNext, int interval)
{
IntervalMessageDispatcher dispatcher = new IntervalMessageDispatcher(onNext);
dispatcher.Interval = interval;
dispatcher.Start();
return dispatcher;
}
}
}