diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-08-18 21:23:02 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-08-18 21:23:02 +0300 |
| commit | bda71b704d17773316b4b08e7dae7e5e536d0d0c (patch) | |
| tree | d46f18a2bcfc5d5d089368c3e0c40cc714c4e29f /Software/Visual_Studio/Tango.Telemetry | |
| parent | 94fb36e2eb00dfb575a5f5cc18bd377224b126ce (diff) | |
| download | Tango-bda71b704d17773316b4b08e7dae7e5e536d0d0c.tar.gz Tango-bda71b704d17773316b4b08e7dae7e5e536d0d0c.zip | |
Improved Telemetry IoT Destination.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs | 47 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Helpers/DateTimeUTCFixer.cs (renamed from Software/Visual_Studio/Tango.Telemetry/DateTimeUTCFixer.cs) | 2 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Helpers/InternetConnectivity.cs | 388 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Helpers/JsonFlattener.cs (renamed from Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs) | 2 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj | 8 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs | 1 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs | 1 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs | 2 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/packages.config | 1 |
9 files changed, 433 insertions, 19 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs index 94e949ba3..61a4bd880 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs @@ -7,6 +7,7 @@ using System.Text; using System.Threading.Tasks; using Tango.Core; using Tango.Logging; +using Tango.Telemetry.Helpers; namespace Tango.Telemetry.Destinations { @@ -96,13 +97,21 @@ namespace Tango.Telemetry.Destinations /// <returns>True if the destination is available; otherwise, false.</returns> public Task<bool> IsAvailable() { - if (_hubClient == null) + //if (NetworkListManager.GetNetworks(NetworkConnectivityLevels.Connected).Any(x => x.IsConnectedToInternet)) + if (InternetConnectivity.IsInternetAvailable()) { - return Task.FromResult(true); + if (_hubClient == null) + { + return Task.FromResult(true); + } + else + { + return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); + } } else { - return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); + return Task.FromResult(false); } } @@ -117,6 +126,7 @@ namespace Tango.Telemetry.Destinations if (_hubClient == null) { _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); + _hubClient.OperationTimeoutInMilliseconds = 2000; _hubClient.SetConnectionStatusChangesHandler((status, reason) => { HubConnectionStatus = status; @@ -147,25 +157,34 @@ namespace Tango.Telemetry.Destinations message.Properties.Add(prop.Key, prop.Value); } - if (BatchSize > 1) + try { - _batch.Add(message); - - if (_batch.Count >= BatchSize) + if (BatchSize > 1) { - LogManager.Log($"Sending telemetry batch of {_batch.Count} messages to Azure IoT Hub.", LogCategory.Debug); - await _hubClient.SendEventBatchAsync(_batch.ToList()); - _batch.Clear(); + _batch.Add(message); + + if (_batch.Count >= BatchSize) + { + LogManager.Log($"Sending telemetry batch of {_batch.Count} messages to Azure IoT Hub.", LogCategory.Debug); + await _hubClient.SendEventBatchAsync(_batch.ToList()); + _batch.Clear(); + } + else + { + LogManager.Log($"Queued telemetry message for batching. {_batch.Count}/{BatchSize} currently queued.", LogCategory.Debug); + } } else { - LogManager.Log($"Queued telemetry message for batching. {_batch.Count}/{BatchSize} currently queued.", LogCategory.Debug); + LogManager.Log("Sending single telemetry message to Azure IoT Hub.", LogCategory.Debug); + await _hubClient.SendEventAsync(message); } } - else + catch (Exception) { - LogManager.Log("Sending single telemetry message to Azure IoT Hub.", LogCategory.Debug); - await _hubClient.SendEventAsync(message); + _hubClient?.Dispose(); + _hubClient = null; + throw; } } diff --git a/Software/Visual_Studio/Tango.Telemetry/DateTimeUTCFixer.cs b/Software/Visual_Studio/Tango.Telemetry/Helpers/DateTimeUTCFixer.cs index 490a49e53..3ce0c700f 100644 --- a/Software/Visual_Studio/Tango.Telemetry/DateTimeUTCFixer.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Helpers/DateTimeUTCFixer.cs @@ -4,7 +4,7 @@ using System.Collections.Generic; using System.Reflection; using System.Runtime.CompilerServices; -namespace Tango.Telemetry +namespace Tango.Telemetry.Helpers { internal static class DateTimeUtcFixer { diff --git a/Software/Visual_Studio/Tango.Telemetry/Helpers/InternetConnectivity.cs b/Software/Visual_Studio/Tango.Telemetry/Helpers/InternetConnectivity.cs new file mode 100644 index 000000000..f747bb263 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Helpers/InternetConnectivity.cs @@ -0,0 +1,388 @@ +using System; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.NetworkInformation; +using System.Threading; +using System.Threading.Tasks; + +namespace Tango.Telemetry.Helpers +{ + /// <summary> + /// Active internet connectivity monitor with fast cached reads. + /// - Immediate wake on network changes (address/availability) -> near-instant pickup after Wi-Fi reconnect. + /// - Burst probing after reconnect to avoid brief false negatives. + /// - .NET Framework 4.6.1 / C# 7 compatible. + /// </summary> + public static class InternetConnectivity + { + // -------- Public API -------- + + /// <summary> + /// Returns the last known internet status instantly (safe to call every 200ms). + /// First call primes quickly so you don't get default false. + /// </summary> + public static bool IsInternetAvailable() + { + if (!_primed) + { + EnsureStarted(); + PrimeOnce(); + } + return _lastIsUp; + } + + /// <summary>Raised when status flips.</summary> + public static event Action<bool> StatusChanged; + + public static void EnsureStarted() + { + if (_started) return; + lock (_startLock) + { + if (_started) return; + _started = true; + + if (ServicePointManager.DefaultConnectionLimit < 16) + ServicePointManager.DefaultConnectionLimit = 16; + + // Subscribe to network changes to wake the loop immediately. + try + { + NetworkChange.NetworkAddressChanged += OnNetworkChanged; + NetworkChange.NetworkAvailabilityChanged += OnNetworkAvailabilityChanged; + } + catch { /* non-fatal */ } + + Task.Run(() => ProbeLoop()); + } + } + + public static void Dispose() + { + if (_disposed) return; + _disposed = true; + try + { + NetworkChange.NetworkAddressChanged -= OnNetworkChanged; + NetworkChange.NetworkAvailabilityChanged -= OnNetworkAvailabilityChanged; + } + catch { } + try { _cts.Cancel(); } catch { } + try { _http.Dispose(); } catch { } + try { _wake.Set(); } catch { } + } + + // -------- Config -------- + + private static readonly TimeSpan MinRefreshIntervalWhenUp = TimeSpan.FromSeconds(3); + private static readonly TimeSpan MinRefreshIntervalWhenDown = TimeSpan.FromSeconds(2); + private static readonly TimeSpan MaxBackoffWhenDown = TimeSpan.FromSeconds(30); + + private static readonly TimeSpan DnsTimeout = TimeSpan.FromMilliseconds(800); + private static readonly TimeSpan HttpTimeout = TimeSpan.FromMilliseconds(1200); + + // First-call quick prime + private static readonly TimeSpan PrimeBudget = TimeSpan.FromMilliseconds(300); + + // After a reconnect, probe aggressively a few times to avoid stale state + private const int ReconnectBurstAttempts = 3; + private static readonly TimeSpan ReconnectBurstDelay = TimeSpan.FromMilliseconds(300); + + private const string DnsProbeHost = "dns.google"; + private static readonly Uri NcsiUri = new Uri("http://www.msftconnecttest.com/connecttest.txt"); + + // -------- State -------- + + private static volatile bool _lastIsUp; + private static volatile bool _started; + private static volatile bool _disposed; + private static volatile bool _primed; + + private static readonly object _startLock = new object(); + private static readonly CancellationTokenSource _cts = new CancellationTokenSource(); + private static readonly HttpClient _http = CreateHttpClient(); + + // AsyncAutoResetEvent (since we’re on .NET Fx) to wake sleep early. + private static readonly AsyncAutoResetEvent _wake = new AsyncAutoResetEvent(); + + private static DateTime _lastAddressChangeUtc = DateTime.MinValue; + + // -------- Network change hooks -------- + + private static void OnNetworkChanged(object sender, EventArgs e) + { + _lastAddressChangeUtc = DateTime.UtcNow; + _wake.Set(); // wake probe loop immediately + } + + private static void OnNetworkAvailabilityChanged(object sender, NetworkAvailabilityEventArgs e) + { + _lastAddressChangeUtc = DateTime.UtcNow; + _wake.Set(); // wake probe loop immediately + } + + // -------- Prime-on-first-call -------- + + private static void PrimeOnce() + { + if (_primed) return; + lock (_startLock) + { + if (_primed) return; + _primed = true; + + try + { + bool isUp = false; + if (HasViableLocalNetwork()) + { + var primeCts = new CancellationTokenSource(); + primeCts.CancelAfter(PrimeBudget); + isUp = TryDns(primeCts.Token).GetAwaiter().GetResult(); + } + + _lastIsUp = isUp; + if (isUp) SafeRaise(true); + } + catch + { + _lastIsUp = false; + } + } + } + + // -------- Probe loop -------- + + private static async Task ProbeLoop() + { + var ct = _cts.Token; + var last = _lastIsUp; + var backoff = TimeSpan.Zero; + var lastHttpConfirmedUpUtc = DateTime.MinValue; + + while (!ct.IsCancellationRequested) + { + bool isUp = false; + bool localUp = HasViableLocalNetwork(); + + if (localUp) + { + // If we just had a network change, run a small aggressive burst. + if ((DateTime.UtcNow - _lastAddressChangeUtc) < TimeSpan.FromSeconds(3)) + { + for (int i = 0; i < ReconnectBurstAttempts; i++) + { + if (await TryDns(ct).ConfigureAwait(false)) + { + // Confirm once via HTTP (short timeout) after DNS says yes + if (await TryHttp(ct).ConfigureAwait(false)) + { + isUp = true; + break; + } + } + await SleepNoThrow(ReconnectBurstDelay, ct).ConfigureAwait(false); + } + } + else + { + // Regular cadence + isUp = await TryDns(ct).ConfigureAwait(false); + if (isUp) + { + bool needHttpConfirm = !last || + (DateTime.UtcNow - lastHttpConfirmedUpUtc) > TimeSpan.FromSeconds(30); + + if (needHttpConfirm) + { + isUp = await TryHttp(ct).ConfigureAwait(false); + if (isUp) lastHttpConfirmedUpUtc = DateTime.UtcNow; + } + } + } + } + + if (isUp != last) + { + _lastIsUp = isUp; + last = isUp; + SafeRaise(isUp); + } + else + { + _lastIsUp = isUp; + } + + // Compute next wait (with backoff when down) + TimeSpan wait; + if (isUp) + { + backoff = TimeSpan.Zero; + wait = MinRefreshIntervalWhenUp; + } + else + { + backoff = backoff == TimeSpan.Zero + ? MinRefreshIntervalWhenDown + : TimeSpan.FromMilliseconds(Math.Min( + MaxBackoffWhenDown.TotalMilliseconds, + backoff.TotalMilliseconds * 2)); + wait = backoff; + } + + // Wait for either the timer OR a wake signal (network change), whichever comes first + await WaitWithWake(wait, ct).ConfigureAwait(false); + } + } + + private static async Task WaitWithWake(TimeSpan delay, CancellationToken ct) + { + // Race delay vs wake; return as soon as either completes. + var delayTask = Task.Delay(delay, ct); + var wakeTask = _wake.WaitAsync(ct); + var completed = await Task.WhenAny(delayTask, wakeTask).ConfigureAwait(false); + // No need to do anything with the result; loop will run again immediately. + } + + // -------- Tiers -------- + + private static bool HasViableLocalNetwork() + { + try + { + var nics = NetworkInterface.GetAllNetworkInterfaces(); + return nics.Any(nic => + nic.OperationalStatus == OperationalStatus.Up && + nic.NetworkInterfaceType != NetworkInterfaceType.Loopback && + nic.NetworkInterfaceType != NetworkInterfaceType.Tunnel && + nic.GetIPProperties().GatewayAddresses.Any()); + } + catch + { + return false; + } + } + + private static async Task<bool> TryDns(CancellationToken ct) + { + try + { + var dnsTask = Dns.GetHostEntryAsync(DnsProbeHost); + var delayTask = Task.Delay(DnsTimeout, ct); + var completed = await Task.WhenAny(dnsTask, delayTask).ConfigureAwait(false); + if (completed == dnsTask) + { + var entry = await dnsTask.ConfigureAwait(false); + return entry != null && entry.AddressList != null && entry.AddressList.Length > 0; + } + return false; + } + catch + { + return false; + } + } + + private static async Task<bool> TryHttp(CancellationToken ct) + { + try + { + using (var cts = CancellationTokenSource.CreateLinkedTokenSource(ct)) + { + cts.CancelAfter(HttpTimeout); + + using (var headReq = new HttpRequestMessage(HttpMethod.Head, NcsiUri)) + using (var headResp = await _http.SendAsync( + headReq, HttpCompletionOption.ResponseHeadersRead, cts.Token).ConfigureAwait(false)) + { + if (headResp.IsSuccessStatusCode) return true; + } + + using (var getReq = new HttpRequestMessage(HttpMethod.Get, NcsiUri)) + using (var getResp = await _http.SendAsync( + getReq, HttpCompletionOption.ResponseHeadersRead, cts.Token).ConfigureAwait(false)) + { + return getResp.IsSuccessStatusCode; + } + } + } + catch + { + return false; + } + } + + // -------- Helpers -------- + + private static HttpClient CreateHttpClient() + { + var handler = new HttpClientHandler + { + AllowAutoRedirect = false, + AutomaticDecompression = DecompressionMethods.None, + UseProxy = true + }; + var client = new HttpClient(handler, true) { Timeout = TimeSpan.FromSeconds(2) }; + return client; + } + + private static void SafeRaise(bool value) + { + try { var h = StatusChanged; if (h != null) h(value); } catch { } + } + + private static async Task SleepNoThrow(TimeSpan delay, CancellationToken ct) + { + try { await Task.Delay(delay, ct).ConfigureAwait(false); } catch { } + } + + /// <summary> + /// Minimal AsyncAutoResetEvent for .NET Framework. + /// </summary> + private sealed class AsyncAutoResetEvent + { + private static readonly Task s_completed = Task.FromResult(true); + private readonly object _mutex = new object(); + private TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>(); + + public Task WaitAsync(CancellationToken ct) + { + lock (_mutex) + { + if (_tcs.Task.IsCompleted) + { + _tcs = new TaskCompletionSource<bool>(); + return s_completed; + } + + // Register cancellation against the current waiter + var tcs = _tcs; + if (ct.CanBeCanceled) + { + ct.Register(() => + { + try { tcs.TrySetCanceled(); } catch { } + }); + } + return tcs.Task; + } + } + + public void Set() + { + lock (_mutex) + { + if (!_tcs.Task.IsCompleted) + { + _tcs.TrySetResult(true); + } + else + { + // already signaled; keep it signaled for the next waiter + } + } + } + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs b/Software/Visual_Studio/Tango.Telemetry/Helpers/JsonFlattener.cs index 6658dcd9f..1355a8fc4 100644 --- a/Software/Visual_Studio/Tango.Telemetry/JsonFlattener.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Helpers/JsonFlattener.cs @@ -5,7 +5,7 @@ using System.Collections; using System.Collections.Generic; using System.Reflection; -namespace Tango.Telemetry +namespace Tango.Telemetry.Helpers { internal static class JsonFlattener { diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj index 14cd6e904..035625149 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj +++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj @@ -100,6 +100,9 @@ <Reference Include="Microsoft.Win32.Primitives, Version=4.0.2.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL"> <HintPath>..\packages\Microsoft.Win32.Primitives.4.3.0\lib\net46\Microsoft.Win32.Primitives.dll</HintPath> </Reference> + <Reference Include="Microsoft.WindowsAPICodePack, Version=1.1.0.0, Culture=neutral, processorArchitecture=MSIL"> + <HintPath>..\packages\WindowsAPICodePack-Core.1.1.1\lib\Microsoft.WindowsAPICodePack.dll</HintPath> + </Reference> <Reference Include="Microsoft.WindowsAzure.Storage, Version=8.7.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> <HintPath>..\packages\WindowsAzure.Storage.8.7.0\lib\net45\Microsoft.WindowsAzure.Storage.dll</HintPath> </Reference> @@ -239,9 +242,10 @@ </Reference> </ItemGroup> <ItemGroup> - <Compile Include="DateTimeUTCFixer.cs" /> + <Compile Include="Helpers\DateTimeUTCFixer.cs" /> <Compile Include="Destinations\TelemetryAzureHubDestination.cs" /> <Compile Include="ExtensionMethods\ITelemetryExtensions.cs" /> + <Compile Include="Helpers\InternetConnectivity.cs" /> <Compile Include="ITelemetryCheckpointsRecoveryClient.cs" /> <Compile Include="ITelemetryDestination.cs" /> <Compile Include="Destinations\TelemetryMqttDestination.cs" /> @@ -296,7 +300,7 @@ <Compile Include="TelemetrySourceTypes.cs" /> <Compile Include="TelemetryLiteDBStorageManager.cs" /> <Compile Include="ITelemetry.cs" /> - <Compile Include="JsonFlattener.cs" /> + <Compile Include="Helpers\JsonFlattener.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="TelemetryBase.cs" /> <Compile Include="Telemetries\TelemetryDiagnosticsFrame.cs" /> diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs index c2879e9db..72fcdd681 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryBase.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; +using Tango.Telemetry.Helpers; namespace Tango.Telemetry { diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs index 538a3a0d7..aa50041ca 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryLiteDBStorageManager.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using Tango.Core; using Tango.Core.ExtensionMethods; using Tango.Logging; +using Tango.Telemetry.Helpers; namespace Tango.Telemetry { diff --git a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs index 96022341d..a5e176ca7 100644 --- a/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs +++ b/Software/Visual_Studio/Tango.Telemetry/TelemetryPendingStorageSource.cs @@ -9,7 +9,7 @@ namespace Tango.Telemetry public class TelemetryPendingStorageSource : ITelemetrySource { public string Name { get; private set; } = "Pending Storage"; - public bool RequiresTelemetryDuplicationTracking { get => true; } + public bool RequiresTelemetryDuplicationTracking { get => false; } public void Dispose() { diff --git a/Software/Visual_Studio/Tango.Telemetry/packages.config b/Software/Visual_Studio/Tango.Telemetry/packages.config index f15b9fef6..b6e99cfef 100644 --- a/Software/Visual_Studio/Tango.Telemetry/packages.config +++ b/Software/Visual_Studio/Tango.Telemetry/packages.config @@ -87,5 +87,6 @@ <package id="System.Xml.ReaderWriter" version="4.3.0" targetFramework="net461" /> <package id="System.Xml.XDocument" version="4.3.0" targetFramework="net461" /> <package id="Validation" version="2.2.8" targetFramework="net461" /> + <package id="WindowsAPICodePack-Core" version="1.1.1" targetFramework="net461" /> <package id="WindowsAzure.Storage" version="8.7.0" targetFramework="net461" /> </packages>
\ No newline at end of file |
