diff options
| author | Mirta <mirta@twine-s.com> | 2020-12-30 16:39:52 +0200 |
|---|---|---|
| committer | Mirta <mirta@twine-s.com> | 2020-12-30 16:39:52 +0200 |
| commit | 00a491d93733d4625ad329b2ba8237f445364b3f (patch) | |
| tree | 4b24c6fa78d7648f4bb7cefafa464bb0b063fec4 /Software/Visual_Studio/Tango.Core/Threading | |
| parent | 124ad4150f80c6846fdee41dbbda9848c105f6e5 (diff) | |
| download | Tango-00a491d9.tar.gz Tango-00a491d9.zip | |
merge
Diffstat (limited to 'Software/Visual_Studio/Tango.Core/Threading')
| -rw-r--r-- | Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs | 185 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs | 81 |
2 files changed, 0 insertions, 266 deletions
diff --git a/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs b/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs deleted file mode 100644 index 641bd83ac..000000000 --- a/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs +++ /dev/null @@ -1,185 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Tango.Core.Threading -{ - /// <summary> - /// Represents a message queue dispatcher for delivering messages at a constant interval. - /// </summary> - /// <typeparam name="TMessage"></typeparam> - /// <seealso cref="System.IDisposable" /> - public class IntervalMessageDispatcher<TMessage> : IDisposable - { - private Thread _queueThread; - private ProducerConsumerQueue<TMessage> _queue; - private Action<TMessage> _onNext; - private long _framesPushed; - private long _framesDelivered; - - /// <summary> - /// Gets or sets the interval. - /// </summary> - public int Interval { get; set; } - - /// <summary> - /// Gets or sets the drift compensation interval in milliseconds. - /// Meaning, when frames are pushed in a faster rate than delivery interval. - /// This will trigger a compensation mechanism which will deliver frames faster until it reaches balance - /// Between pushed and delivered frames. - /// </summary> - public int? DriftCompensationInterval { get; set; } - - /// <summary> - /// Gets or sets the maximum frames count for drift compensation calculation (default 100). - /// </summary> - public int MaximumFramesForDriftCompensation { get; set; } - - /// <summary> - /// Gets or sets the maximum allowed drift before starting to balance 0-1. default 0.1. - /// </summary> - public double MaximumDrift { get; set; } - - /// <summary> - /// Gets a value indicating whether this instance has started. - /// </summary> - public bool IsStarted { get; private set; } - - /// <summary> - /// Gets the number of queued messages. - /// </summary> - public int Count - { - get { return _queue.Count; } - } - - /// <summary> - /// Initializes a new instance of the <see cref="IntervalMessageDispatcher{T}"/> class. - /// </summary> - /// <param name="onNext">The delivery callback.</param> - public IntervalMessageDispatcher(Action<TMessage> onNext) - { - _onNext = onNext; - _queue = new ProducerConsumerQueue<TMessage>(); - MaximumDrift = 0.1; - MaximumFramesForDriftCompensation = 100; - } - - /// <summary> - /// Starts the dispatching of messages. - /// </summary> - public void Start() - { - if (!IsStarted) - { - _framesPushed = 0; - _framesDelivered = 0; - IsStarted = true; - _queueThread = new Thread(QueueThreadMethod); - _queueThread.Name = "Sequencer Thread"; - _queueThread.IsBackground = true; - _queueThread.Start(); - } - } - - /// <summary> - /// Pushes the specified message. - /// </summary> - /// <param name="message">The message.</param> - public void Push(TMessage message) - { - _framesPushed++; - _queue.BlockEnqueue(message); - } - - private void QueueThreadMethod() - { - DateTime lastDriftCompensation = DateTime.Now; - double requiredDriftCompensation = 0; - bool balancing = false; - double balancingFactor = 1; - - - Stopwatch watch = new Stopwatch(); - watch.Start(); - - while (IsStarted) - { - var item = _queue.BlockDequeue(); - - if (!IsStarted) break; - - watch.Restart(); - - try - { - _onNext?.Invoke(item); - _framesDelivered++; - } - catch { } - - //Compensate drift between pushed frames and delivered frames if any. - if (DriftCompensationInterval.HasValue) - { - if (balancing || (DateTime.Now - lastDriftCompensation).TotalMilliseconds > DriftCompensationInterval) - { - lastDriftCompensation = DateTime.Now; - - if (_framesPushed > MaximumFramesForDriftCompensation) - { - var reduction = _framesPushed - MaximumFramesForDriftCompensation; - _framesPushed = MaximumFramesForDriftCompensation; - _framesDelivered = _framesDelivered - reduction; - } - - requiredDriftCompensation = (double)(_framesDelivered / _framesPushed); - - if (1d - requiredDriftCompensation >= MaximumDrift) - { - balancing = true; - balancingFactor = Math.Max(0, balancingFactor - 0.1); - } - else - { - balancing = false; - balancingFactor = 1; - } - - //Debug.WriteLine($"Frames Pushed: {_framesPushed}, Frames Delivered: {_framesDelivered}, Required Compensation: {requiredDriftCompensation}, Balancing: {balancing}, Balancing Factor: {balancingFactor}"); - //Debug.WriteLine("Sleep before: " + (int)(Interval - watch.ElapsedMilliseconds)); - //Debug.WriteLine("Sleep After: " + (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor)); - } - } - - Thread.Sleep(Math.Max(0, (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor))); - } - } - - /// <summary> - /// Releases unmanaged and - optionally - managed resources. - /// </summary> - public void Dispose() - { - IsStarted = false; - } - - /// <summary> - /// Creates a new instance of <see cref="IntervalMessageDispatcher{T}"/> and starts it immediately. - /// </summary> - /// <typeparam name="T"></typeparam> - /// <param name="onNext">The on next.</param> - /// <param name="interval">The interval.</param> - /// <returns></returns> - public static IntervalMessageDispatcher<T> StartNew<T>(Action<T> onNext, int interval) - { - IntervalMessageDispatcher<T> dispatcher = new IntervalMessageDispatcher<T>(onNext); - dispatcher.Interval = interval; - dispatcher.Start(); - return dispatcher; - } - } -} diff --git a/Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs b/Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs deleted file mode 100644 index 6be8f80d9..000000000 --- a/Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs +++ /dev/null @@ -1,81 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tango.Core.Threading -{ - /// <summary> - /// Represents a task that will be awaited for a limited time. - /// If the timeout has reached, the task will return and will continue in the background. - /// </summary> - public class LimitedTimeTask - { - private Action _action; - private TimeSpan _timeout; - private bool _completed; - - public LimitedTimeTask(Action action, TimeSpan timeout) - { - _action = action; - _timeout = timeout; - } - - public Task Run() - { - TaskCompletionSource<Object> completion = new TaskCompletionSource<object>(); - - ThreadFactory.StartNew(() => - { - try - { - _action?.Invoke(); - - if (!_completed) - { - _completed = true; - - try - { - completion.SetResult(true); - } - catch { } - } - } - catch (Exception ex) - { - if (!_completed) - { - _completed = true; - try - { - completion.SetException(ex); - } - catch { } - } - } - }); - - TimeoutTask.StartNew(() => - { - if (!_completed) - { - _completed = true; - try - { - completion.SetException(new TimeoutException($"The limited time task did not complete within the given time of {(int)_timeout.TotalMilliseconds} milliseconds and will continue in the background if possible.")); - } - catch { } - } - }, _timeout); - - return completion.Task; - } - - public static Task StartNew(Action action, TimeSpan timeout) - { - return new LimitedTimeTask(action, timeout).Run(); - } - } -} |
