aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Core/Threading
diff options
context:
space:
mode:
authorRoy Ben Shabat <Roy.mail.net@gmail.com>2020-04-26 00:30:43 +0300
committerRoy Ben Shabat <Roy.mail.net@gmail.com>2020-04-26 00:30:43 +0300
commit4fc2f7cc3421a364006819102e4659ac27278e16 (patch)
treec88a809a5608d28b18c848b0ecfa5766fd915990 /Software/Visual_Studio/Tango.Core/Threading
parenta42a0f23a6f957008ae5d0ac7ce8994146100d79 (diff)
downloadTango-4fc2f7cc3421a364006819102e4659ac27278e16.tar.gz
Tango-4fc2f7cc3421a364006819102e4659ac27278e16.zip
Improvements on IntervalMessageDispatcher ;//
firmware Storage manager chunks on low priority.
Diffstat (limited to 'Software/Visual_Studio/Tango.Core/Threading')
-rw-r--r--Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs70
1 files changed, 68 insertions, 2 deletions
diff --git a/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs b/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs
index ed61bea62..641bd83ac 100644
--- a/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs
+++ b/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs
@@ -18,6 +18,8 @@ namespace Tango.Core.Threading
private Thread _queueThread;
private ProducerConsumerQueue<TMessage> _queue;
private Action<TMessage> _onNext;
+ private long _framesPushed;
+ private long _framesDelivered;
/// <summary>
/// Gets or sets the interval.
@@ -25,6 +27,24 @@ namespace Tango.Core.Threading
public int Interval { get; set; }
/// <summary>
+ /// Gets or sets the drift compensation interval in milliseconds.
+ /// Meaning, when frames are pushed in a faster rate than delivery interval.
+ /// This will trigger a compensation mechanism which will deliver frames faster until it reaches balance
+ /// Between pushed and delivered frames.
+ /// </summary>
+ public int? DriftCompensationInterval { get; set; }
+
+ /// <summary>
+ /// Gets or sets the maximum frames count for drift compensation calculation (default 100).
+ /// </summary>
+ public int MaximumFramesForDriftCompensation { get; set; }
+
+ /// <summary>
+ /// Gets or sets the maximum allowed drift before starting to balance 0-1. default 0.1.
+ /// </summary>
+ public double MaximumDrift { get; set; }
+
+ /// <summary>
/// Gets a value indicating whether this instance has started.
/// </summary>
public bool IsStarted { get; private set; }
@@ -45,6 +65,8 @@ namespace Tango.Core.Threading
{
_onNext = onNext;
_queue = new ProducerConsumerQueue<TMessage>();
+ MaximumDrift = 0.1;
+ MaximumFramesForDriftCompensation = 100;
}
/// <summary>
@@ -54,6 +76,8 @@ namespace Tango.Core.Threading
{
if (!IsStarted)
{
+ _framesPushed = 0;
+ _framesDelivered = 0;
IsStarted = true;
_queueThread = new Thread(QueueThreadMethod);
_queueThread.Name = "Sequencer Thread";
@@ -68,28 +92,70 @@ namespace Tango.Core.Threading
/// <param name="message">The message.</param>
public void Push(TMessage message)
{
+ _framesPushed++;
_queue.BlockEnqueue(message);
}
private void QueueThreadMethod()
{
+ DateTime lastDriftCompensation = DateTime.Now;
+ double requiredDriftCompensation = 0;
+ bool balancing = false;
+ double balancingFactor = 1;
+
+
Stopwatch watch = new Stopwatch();
watch.Start();
while (IsStarted)
{
- watch.Restart();
var item = _queue.BlockDequeue();
if (!IsStarted) break;
+ watch.Restart();
+
try
{
_onNext?.Invoke(item);
+ _framesDelivered++;
}
catch { }
- Thread.Sleep(Math.Max(1, (int)(Interval - watch.ElapsedMilliseconds)));
+ //Compensate drift between pushed frames and delivered frames if any.
+ if (DriftCompensationInterval.HasValue)
+ {
+ if (balancing || (DateTime.Now - lastDriftCompensation).TotalMilliseconds > DriftCompensationInterval)
+ {
+ lastDriftCompensation = DateTime.Now;
+
+ if (_framesPushed > MaximumFramesForDriftCompensation)
+ {
+ var reduction = _framesPushed - MaximumFramesForDriftCompensation;
+ _framesPushed = MaximumFramesForDriftCompensation;
+ _framesDelivered = _framesDelivered - reduction;
+ }
+
+ requiredDriftCompensation = (double)(_framesDelivered / _framesPushed);
+
+ if (1d - requiredDriftCompensation >= MaximumDrift)
+ {
+ balancing = true;
+ balancingFactor = Math.Max(0, balancingFactor - 0.1);
+ }
+ else
+ {
+ balancing = false;
+ balancingFactor = 1;
+ }
+
+ //Debug.WriteLine($"Frames Pushed: {_framesPushed}, Frames Delivered: {_framesDelivered}, Required Compensation: {requiredDriftCompensation}, Balancing: {balancing}, Balancing Factor: {balancingFactor}");
+ //Debug.WriteLine("Sleep before: " + (int)(Interval - watch.ElapsedMilliseconds));
+ //Debug.WriteLine("Sleep After: " + (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor));
+ }
+ }
+
+ Thread.Sleep(Math.Max(0, (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor)));
}
}