aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Core/Threading
diff options
context:
space:
mode:
authorMirta <mirta@twine-s.com>2020-12-30 16:39:52 +0200
committerMirta <mirta@twine-s.com>2020-12-30 16:39:52 +0200
commit00a491d93733d4625ad329b2ba8237f445364b3f (patch)
tree4b24c6fa78d7648f4bb7cefafa464bb0b063fec4 /Software/Visual_Studio/Tango.Core/Threading
parent124ad4150f80c6846fdee41dbbda9848c105f6e5 (diff)
downloadTango-00a491d9.tar.gz
Tango-00a491d9.zip
merge
Diffstat (limited to 'Software/Visual_Studio/Tango.Core/Threading')
-rw-r--r--Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs185
-rw-r--r--Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs81
2 files changed, 0 insertions, 266 deletions
diff --git a/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs b/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs
deleted file mode 100644
index 641bd83ac..000000000
--- a/Software/Visual_Studio/Tango.Core/Threading/IntervalMessageDispatcher.cs
+++ /dev/null
@@ -1,185 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Tango.Core.Threading
-{
- /// <summary>
- /// Represents a message queue dispatcher for delivering messages at a constant interval.
- /// </summary>
- /// <typeparam name="TMessage"></typeparam>
- /// <seealso cref="System.IDisposable" />
- public class IntervalMessageDispatcher<TMessage> : IDisposable
- {
- private Thread _queueThread;
- private ProducerConsumerQueue<TMessage> _queue;
- private Action<TMessage> _onNext;
- private long _framesPushed;
- private long _framesDelivered;
-
- /// <summary>
- /// Gets or sets the interval.
- /// </summary>
- public int Interval { get; set; }
-
- /// <summary>
- /// Gets or sets the drift compensation interval in milliseconds.
- /// Meaning, when frames are pushed in a faster rate than delivery interval.
- /// This will trigger a compensation mechanism which will deliver frames faster until it reaches balance
- /// Between pushed and delivered frames.
- /// </summary>
- public int? DriftCompensationInterval { get; set; }
-
- /// <summary>
- /// Gets or sets the maximum frames count for drift compensation calculation (default 100).
- /// </summary>
- public int MaximumFramesForDriftCompensation { get; set; }
-
- /// <summary>
- /// Gets or sets the maximum allowed drift before starting to balance 0-1. default 0.1.
- /// </summary>
- public double MaximumDrift { get; set; }
-
- /// <summary>
- /// Gets a value indicating whether this instance has started.
- /// </summary>
- public bool IsStarted { get; private set; }
-
- /// <summary>
- /// Gets the number of queued messages.
- /// </summary>
- public int Count
- {
- get { return _queue.Count; }
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="IntervalMessageDispatcher{T}"/> class.
- /// </summary>
- /// <param name="onNext">The delivery callback.</param>
- public IntervalMessageDispatcher(Action<TMessage> onNext)
- {
- _onNext = onNext;
- _queue = new ProducerConsumerQueue<TMessage>();
- MaximumDrift = 0.1;
- MaximumFramesForDriftCompensation = 100;
- }
-
- /// <summary>
- /// Starts the dispatching of messages.
- /// </summary>
- public void Start()
- {
- if (!IsStarted)
- {
- _framesPushed = 0;
- _framesDelivered = 0;
- IsStarted = true;
- _queueThread = new Thread(QueueThreadMethod);
- _queueThread.Name = "Sequencer Thread";
- _queueThread.IsBackground = true;
- _queueThread.Start();
- }
- }
-
- /// <summary>
- /// Pushes the specified message.
- /// </summary>
- /// <param name="message">The message.</param>
- public void Push(TMessage message)
- {
- _framesPushed++;
- _queue.BlockEnqueue(message);
- }
-
- private void QueueThreadMethod()
- {
- DateTime lastDriftCompensation = DateTime.Now;
- double requiredDriftCompensation = 0;
- bool balancing = false;
- double balancingFactor = 1;
-
-
- Stopwatch watch = new Stopwatch();
- watch.Start();
-
- while (IsStarted)
- {
- var item = _queue.BlockDequeue();
-
- if (!IsStarted) break;
-
- watch.Restart();
-
- try
- {
- _onNext?.Invoke(item);
- _framesDelivered++;
- }
- catch { }
-
- //Compensate drift between pushed frames and delivered frames if any.
- if (DriftCompensationInterval.HasValue)
- {
- if (balancing || (DateTime.Now - lastDriftCompensation).TotalMilliseconds > DriftCompensationInterval)
- {
- lastDriftCompensation = DateTime.Now;
-
- if (_framesPushed > MaximumFramesForDriftCompensation)
- {
- var reduction = _framesPushed - MaximumFramesForDriftCompensation;
- _framesPushed = MaximumFramesForDriftCompensation;
- _framesDelivered = _framesDelivered - reduction;
- }
-
- requiredDriftCompensation = (double)(_framesDelivered / _framesPushed);
-
- if (1d - requiredDriftCompensation >= MaximumDrift)
- {
- balancing = true;
- balancingFactor = Math.Max(0, balancingFactor - 0.1);
- }
- else
- {
- balancing = false;
- balancingFactor = 1;
- }
-
- //Debug.WriteLine($"Frames Pushed: {_framesPushed}, Frames Delivered: {_framesDelivered}, Required Compensation: {requiredDriftCompensation}, Balancing: {balancing}, Balancing Factor: {balancingFactor}");
- //Debug.WriteLine("Sleep before: " + (int)(Interval - watch.ElapsedMilliseconds));
- //Debug.WriteLine("Sleep After: " + (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor));
- }
- }
-
- Thread.Sleep(Math.Max(0, (int)((Interval - watch.ElapsedMilliseconds) * requiredDriftCompensation * balancingFactor)));
- }
- }
-
- /// <summary>
- /// Releases unmanaged and - optionally - managed resources.
- /// </summary>
- public void Dispose()
- {
- IsStarted = false;
- }
-
- /// <summary>
- /// Creates a new instance of <see cref="IntervalMessageDispatcher{T}"/> and starts it immediately.
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="onNext">The on next.</param>
- /// <param name="interval">The interval.</param>
- /// <returns></returns>
- public static IntervalMessageDispatcher<T> StartNew<T>(Action<T> onNext, int interval)
- {
- IntervalMessageDispatcher<T> dispatcher = new IntervalMessageDispatcher<T>(onNext);
- dispatcher.Interval = interval;
- dispatcher.Start();
- return dispatcher;
- }
- }
-}
diff --git a/Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs b/Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs
deleted file mode 100644
index 6be8f80d9..000000000
--- a/Software/Visual_Studio/Tango.Core/Threading/LimitedTimeTask.cs
+++ /dev/null
@@ -1,81 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Tango.Core.Threading
-{
- /// <summary>
- /// Represents a task that will be awaited for a limited time.
- /// If the timeout has reached, the task will return and will continue in the background.
- /// </summary>
- public class LimitedTimeTask
- {
- private Action _action;
- private TimeSpan _timeout;
- private bool _completed;
-
- public LimitedTimeTask(Action action, TimeSpan timeout)
- {
- _action = action;
- _timeout = timeout;
- }
-
- public Task Run()
- {
- TaskCompletionSource<Object> completion = new TaskCompletionSource<object>();
-
- ThreadFactory.StartNew(() =>
- {
- try
- {
- _action?.Invoke();
-
- if (!_completed)
- {
- _completed = true;
-
- try
- {
- completion.SetResult(true);
- }
- catch { }
- }
- }
- catch (Exception ex)
- {
- if (!_completed)
- {
- _completed = true;
- try
- {
- completion.SetException(ex);
- }
- catch { }
- }
- }
- });
-
- TimeoutTask.StartNew(() =>
- {
- if (!_completed)
- {
- _completed = true;
- try
- {
- completion.SetException(new TimeoutException($"The limited time task did not complete within the given time of {(int)_timeout.TotalMilliseconds} milliseconds and will continue in the background if possible."));
- }
- catch { }
- }
- }, _timeout);
-
- return completion.Task;
- }
-
- public static Task StartNew(Action action, TimeSpan timeout)
- {
- return new LimitedTimeTask(action, timeout).Run();
- }
- }
-}