From 00a491d93733d4625ad329b2ba8237f445364b3f Mon Sep 17 00:00:00 2001 From: Mirta Date: Wed, 30 Dec 2020 16:39:52 +0200 Subject: merge --- .../Threading/IntervalMessageDispatcher.cs | 185 --------------------- .../Tango.Core/Threading/LimitedTimeTask.cs | 81 --------- 2 files changed, 266 deletions(-) delete mode 100644 Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs delete mode 100644 Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs (limited to 'Software/Visual_Studio/Tango.Core/Threading') diff --git a/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs b/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs deleted file mode 100644 index 641bd83ac..000000000 --- a/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs +++ /dev/null @@ -1,185 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Tango.Core.Threading -{ - /// - /// Represents a message queue dispatcher for delivering messages at a constant interval. - /// - /// - /// - public class IntervalMessageDispatcher : IDisposable - { - private Thread _queueThread; - private ProducerConsumerQueue _queue; - private Action _onNext; - private long _framesPushed; - private long _framesDelivered; - - /// - /// Gets or sets the interval. - /// - public int Interval { get; set; } - - /// - /// Gets or sets the drift compensation interval in milliseconds. - /// Meaning, when frames are pushed in a faster rate than delivery interval. - /// This will trigger a compensation mechanism which will deliver frames faster until it reaches balance - /// Between pushed and delivered frames. - /// - public int? DriftCompensationInterval { get; set; } - - /// - /// Gets or sets the maximum frames count for drift compensation calculation (default 100). - /// - public int MaximumFramesForDriftCompensation { get; set; } - - /// - /// Gets or sets the maximum allowed drift before starting to balance 0-1. default 0.1. - /// - public double MaximumDrift { get; set; } - - /// - /// Gets a value indicating whether this instance has started. - /// - public bool IsStarted { get; private set; } - - /// - /// Gets the number of queued messages. - /// - public int Count - { - get { return _queue.Count; } - } - - /// - /// Initializes a new instance of the class. - /// - /// The delivery callback. - public IntervalMessageDispatcher(Action onNext) - { - _onNext = onNext; - _queue = new ProducerConsumerQueue(); - MaximumDrift = 0.1; - MaximumFramesForDriftCompensation = 100; - } - - /// - /// Starts the dispatching of messages. - /// - public void Start() - { - if (!IsStarted) - { - _framesPushed = 0; - _framesDelivered = 0; - IsStarted = true; - _queueThread = new Thread(QueueThreadMethod); - _queueThread.Name = "Sequencer Thread"; - _queueThread.IsBackground = true; - _queueThread.Start(); - } - } - - /// - /// Pushes the specified message. - /// - /// The message. - public void Push(TMessage message) - { - _framesPushed++; - _queue.BlockEnqueue(message); - } - - private void QueueThreadMethod() - { - DateTime lastDriftCompensation = DateTime.Now; - double requiredDriftCompensation = 0; - bool balancing = false; - double balancingFactor = 1; - - - Stopwatch watch = new Stopwatch(); - watch.Start(); - - while (IsStarted) - { - var item = _queue.BlockDequeue(); - - if (!IsStarted) break; - - watch.Restart(); - - try - { - _onNext?.Invoke(item); - _framesDelivered++; - } - catch { } - - //Compensate drift between pushed frames and delivered frames if any. - if (DriftCompensationInterval.HasValue) - { - if (balancing || (DateTime.Now - lastDriftCompensation).TotalMilliseconds > DriftCompensationInterval) - { - lastDriftCompensation = DateTime.Now; - - if (_framesPushed > MaximumFramesForDriftCompensation) - { - var reduction = _framesPushed - MaximumFramesForDriftCompensation; - _framesPushed = MaximumFramesForDriftCompensation; - _framesDelivered = _framesDelivered - reduction; - } - - requiredDriftCompensation = (double)(_framesDelivered / _framesPushed); - - if (1d - requiredDriftCompensation >= MaximumDrift) - { - balancing = true; - balancingFactor = Math.Max(0, balancingFactor - 0.1); - } - else - { - balancing = false; - balancingFactor = 1; - } - - //Debug.WriteLine($"Frames Pushed: {_framesPushed}, Frames Delivered: {_framesDelivered}, Required Compensation: {requiredDriftCompensation}, Balancing: {balancing}, Balancing Factor: {balancingFactor}"); - //Debug.WriteLine("Sleep before: " + (int)(Interval - watch.ElapsedMilliseconds)); - //Debug.WriteLine("Sleep After: " + (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor)); - } - } - - Thread.Sleep(Math.Max(0, (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor))); - } - } - - /// - /// Releases unmanaged and - optionally - managed resources. - /// - public void Dispose() - { - IsStarted = false; - } - - /// - /// Creates a new instance of and starts it immediately. - /// - /// - /// The on next. - /// The interval. - /// - public static IntervalMessageDispatcher StartNew(Action onNext, int interval) - { - IntervalMessageDispatcher dispatcher = new IntervalMessageDispatcher(onNext); - dispatcher.Interval = interval; - dispatcher.Start(); - return dispatcher; - } - } -} diff --git a/Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs b/Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs deleted file mode 100644 index 6be8f80d9..000000000 --- a/Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs +++ /dev/null @@ -1,81 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Core.Threading -{ - /// - /// Represents a task that will be awaited for a limited time. - /// If the timeout has reached, the task will return and will continue in the background. - /// - public class LimitedTimeTask - { - private Action _action; - private TimeSpan _timeout; - private bool _completed; - - public LimitedTimeTask(Action action, TimeSpan timeout) - { - _action = action; - _timeout = timeout; - } - - public Task Run() - { - TaskCompletionSource completion = new TaskCompletionSource(); - - ThreadFactory.StartNew(() => - { - try - { - _action?.Invoke(); - - if (!_completed) - { - _completed = true; - - try - { - completion.SetResult(true); - } - catch { } - } - } - catch (Exception ex) - { - if (!_completed) - { - _completed = true; - try - { - completion.SetException(ex); - } - catch { } - } - } - }); - - TimeoutTask.StartNew(() => - { - if (!_completed) - { - _completed = true; - try - { - completion.SetException(new TimeoutException($"The limited time task did not complete within the given time of {(int)_timeout.TotalMilliseconds} milliseconds and will continue in the background if possible.")); - } - catch { } - } - }, _timeout); - - return completion.Task; - } - - public static Task StartNew(Action action, TimeSpan timeout) - { - return new LimitedTimeTask(action, timeout).Run(); - } - } -} -- cgit v1.3.1