diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2020-04-26 00:30:43 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2020-04-26 00:30:43 +0300 |
| commit | 4fc2f7cc3421a364006819102e4659ac27278e16 (patch) | |
| tree | c88a809a5608d28b18c848b0ecfa5766fd915990 /Software/Visual_Studio/Tango.Core | |
| parent | a42a0f23a6f957008ae5d0ac7ce8994146100d79 (diff) | |
| download | Tango-4fc2f7cc3421a364006819102e4659ac27278e16.tar.gz Tango-4fc2f7cc3421a364006819102e4659ac27278e16.zip | |
Improvements on IntervalMessageDispatcher ;//
firmware Storage manager chunks on low priority.
Diffstat (limited to 'Software/Visual_Studio/Tango.Core')
| -rw-r--r-- | Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs | 70 |
1 files changed, 68 insertions, 2 deletions
diff --git a/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs b/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs index ed61bea62..641bd83ac 100644 --- a/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs +++ b/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs @@ -18,6 +18,8 @@ namespace Tango.Core.Threading private Thread _queueThread; private ProducerConsumerQueue<TMessage> _queue; private Action<TMessage> _onNext; + private long _framesPushed; + private long _framesDelivered; /// <summary> /// Gets or sets the interval. @@ -25,6 +27,24 @@ namespace Tango.Core.Threading public int Interval { get; set; } /// <summary> + /// Gets or sets the drift compensation interval in milliseconds. + /// Meaning, when frames are pushed in a faster rate than delivery interval. + /// This will trigger a compensation mechanism which will deliver frames faster until it reaches balance + /// Between pushed and delivered frames. + /// </summary> + public int? DriftCompensationInterval { get; set; } + + /// <summary> + /// Gets or sets the maximum frames count for drift compensation calculation (default 100). + /// </summary> + public int MaximumFramesForDriftCompensation { get; set; } + + /// <summary> + /// Gets or sets the maximum allowed drift before starting to balance 0-1. default 0.1. + /// </summary> + public double MaximumDrift { get; set; } + + /// <summary> /// Gets a value indicating whether this instance has started. /// </summary> public bool IsStarted { get; private set; } @@ -45,6 +65,8 @@ namespace Tango.Core.Threading { _onNext = onNext; _queue = new ProducerConsumerQueue<TMessage>(); + MaximumDrift = 0.1; + MaximumFramesForDriftCompensation = 100; } /// <summary> @@ -54,6 +76,8 @@ namespace Tango.Core.Threading { if (!IsStarted) { + _framesPushed = 0; + _framesDelivered = 0; IsStarted = true; _queueThread = new Thread(QueueThreadMethod); _queueThread.Name = "Sequencer Thread"; @@ -68,28 +92,70 @@ namespace Tango.Core.Threading /// <param name="message">The message.</param> public void Push(TMessage message) { + _framesPushed++; _queue.BlockEnqueue(message); } private void QueueThreadMethod() { + DateTime lastDriftCompensation = DateTime.Now; + double requiredDriftCompensation = 0; + bool balancing = false; + double balancingFactor = 1; + + Stopwatch watch = new Stopwatch(); watch.Start(); while (IsStarted) { - watch.Restart(); var item = _queue.BlockDequeue(); if (!IsStarted) break; + watch.Restart(); + try { _onNext?.Invoke(item); + _framesDelivered++; } catch { } - Thread.Sleep(Math.Max(1, (int)(Interval - watch.ElapsedMilliseconds))); + //Compensate drift between pushed frames and delivered frames if any. + if (DriftCompensationInterval.HasValue) + { + if (balancing || (DateTime.Now - lastDriftCompensation).TotalMilliseconds > DriftCompensationInterval) + { + lastDriftCompensation = DateTime.Now; + + if (_framesPushed > MaximumFramesForDriftCompensation) + { + var reduction = _framesPushed - MaximumFramesForDriftCompensation; + _framesPushed = MaximumFramesForDriftCompensation; + _framesDelivered = _framesDelivered - reduction; + } + + requiredDriftCompensation = (double)(_framesDelivered / _framesPushed); + + if (1d - requiredDriftCompensation >= MaximumDrift) + { + balancing = true; + balancingFactor = Math.Max(0, balancingFactor - 0.1); + } + else + { + balancing = false; + balancingFactor = 1; + } + + //Debug.WriteLine($"Frames Pushed: {_framesPushed}, Frames Delivered: {_framesDelivered}, Required Compensation: {requiredDriftCompensation}, Balancing: {balancing}, Balancing Factor: {balancingFactor}"); + //Debug.WriteLine("Sleep before: " + (int)(Interval - watch.ElapsedMilliseconds)); + //Debug.WriteLine("Sleep After: " + (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor)); + } + } + + Thread.Sleep(Math.Max(0, (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor))); } } |
