aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry/Destinations
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/Destinations')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs208
1 files changed, 208 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs
new file mode 100644
index 000000000..10bc3981a
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs
@@ -0,0 +1,208 @@
+using Microsoft.Azure.Devices.Client;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Core;
+using Tango.Logging;
+using Tango.Telemetry.Helpers;
+// NEW:
+using System.Timers;
+
+namespace Tango.Telemetry.Destinations
+{
+ /// <summary>
+ /// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport.
+ /// Now always batches and flushes once per second on a timer.
+ /// </summary>
+ public class TelemetryAzureHubBatchDestination : ExtendedObject, ITelemetryDestination
+ {
+ private DeviceClient _hubClient;
+ private int _batchSize; // kept for compatibility (no longer used for triggering sends)
+ private ConcurrentList<Message> _batch;
+
+ // NEW: timer + lock to coordinate flushes
+ private Timer _flushTimer;
+ private readonly object _batchLock = new object();
+
+ private class ADXPAckage
+ {
+ public String Type { get; set; }
+ public String Environment { get; set; }
+ public String MachineType { get; set; }
+ public String SerialNumber { get; set; }
+ public String Organization { get; set; }
+ public String Site { get; set; }
+ public ITelemetry Telemetry { get; set; }
+ public DateTime UploadTime { get; set; }
+ public DateTime CreatedTime { get; set; }
+ public int Version { get; set; }
+ public String ToPayload() => JsonConvert.SerializeObject(this, Formatting.None);
+ }
+
+ public string Name { get; set; } = "Azure IoT Hub";
+ public string ConnectionString { get; private set; }
+ public ConnectionStatus HubConnectionStatus { get; private set; }
+ public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; }
+
+ /// <summary>
+ /// Kept for backward compatibility. Value is clamped 1..10 but not used to trigger flushes anymore.
+ /// </summary>
+ public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); }
+
+ private TelemetryAzureHubBatchDestination()
+ {
+ _batch = new ConcurrentList<Message>();
+ HubConnectionStatus = ConnectionStatus.Connected;
+ SupportedSourceTypes = new List<TelemetrySourceTypes>()
+ {
+ TelemetrySourceTypes.PendingStorage,
+ TelemetrySourceTypes.Streaming,
+ TelemetrySourceTypes.ExternalStorage
+ };
+ BatchSize = 1;
+ }
+
+ public TelemetryAzureHubBatchDestination(string connectionString) : this()
+ {
+ ConnectionString = connectionString;
+ }
+
+ public Task<bool> IsAvailable()
+ {
+ if (InternetConnectivity.IsInternetAvailable())
+ {
+ if (_hubClient == null) return Task.FromResult(true);
+ return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected);
+ }
+ return Task.FromResult(false);
+ }
+
+ public Task Publish(TelemetryPublishPackage package, List<KeyValuePair<string, string>> properties)
+ {
+ if (_hubClient == null)
+ {
+ _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
+ _hubClient.OperationTimeoutInMilliseconds = 2000;
+ _hubClient.SetConnectionStatusChangesHandler((status, reason) =>
+ {
+ HubConnectionStatus = status;
+ LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info);
+ });
+
+ // NEW: start the periodic flusher (every 1s)
+ EnsureFlushTimer();
+ }
+
+ var adxPackage = new ADXPAckage
+ {
+ Type = package.TelemetryName,
+ Version = package.TelemetryVersion,
+ Environment = package.Environment,
+ MachineType = package.MachineType,
+ SerialNumber = package.SerialNumber,
+ Organization = package.Organization,
+ Site = package.Site,
+ UploadTime = DateTime.UtcNow,
+ CreatedTime = package.PendingTelemetry.TelemetryObject.Time,
+ Telemetry = package.PendingTelemetry.TelemetryObject
+ };
+
+ var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload()))
+ {
+ ContentType = "application/json",
+ ContentEncoding = "utf-8"
+ };
+
+ foreach (var prop in properties)
+ message.Properties.Add(prop.Key, prop.Value);
+
+ // NEW: always enqueue; timer decides when to send
+ lock (_batchLock)
+ {
+ _batch.Add(message);
+ }
+ LogManager.Log("Queued telemetry message for 1s batch flush.", LogCategory.Debug);
+
+ return Task.FromResult(true);
+ }
+
+ // NEW: create the 1s timer (idempotent)
+ private void EnsureFlushTimer()
+ {
+ if (_flushTimer != null) return;
+
+ _flushTimer = new Timer(1000);
+ _flushTimer.AutoReset = true;
+ _flushTimer.Elapsed += async (s, e) =>
+ {
+ try
+ {
+ await FlushBatchAsync().ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ // If send fails, drop connection so IsAvailable() reflects it and upstream can retry later
+ try { _hubClient?.Dispose(); } catch { /* ignore */ }
+ _hubClient = null;
+ LogManager.Log($"Azure IoT Hub flush failed: {ex.Message}", LogCategory.Warning);
+ }
+ };
+ _flushTimer.Start();
+ }
+
+ // NEW: send whatever accumulated since last tick
+ private async Task FlushBatchAsync()
+ {
+ List<Message> toSend = null;
+
+ lock (_batchLock)
+ {
+ if (_batch.Count == 0) return;
+ toSend = _batch.ToList();
+ _batch.Clear();
+ }
+
+ LogManager.Log($"Sending telemetry batch of {toSend.Count} messages to Azure IoT Hub.", LogCategory.Debug);
+ await _hubClient.SendEventBatchAsync(toSend);
+ }
+
+ public void Dispose()
+ {
+ // NEW: stop timer first to avoid races
+ if (_flushTimer != null)
+ {
+ _flushTimer.Stop();
+ _flushTimer.Dispose();
+ _flushTimer = null;
+ }
+
+ // Flush any remaining messages synchronously
+ try
+ {
+ List<Message> toSend = null;
+ lock (_batchLock)
+ {
+ if (_batch.Count > 0)
+ {
+ toSend = _batch.ToList();
+ _batch.Clear();
+ }
+ }
+ if (_hubClient != null && toSend != null && toSend.Count > 0)
+ {
+ LogManager.Log($"Flushing {toSend.Count} remaining messages to Azure IoT Hub.", LogCategory.Info);
+ _hubClient.SendEventBatchAsync(toSend).GetAwaiter().GetResult();
+ }
+ }
+ finally
+ {
+ _hubClient?.Dispose();
+ }
+ }
+
+ public override string ToString() => Name;
+ }
+}