diff options
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/Destinations')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs new file mode 100644 index 000000000..10bc3981a --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubBatchDestination.cs @@ -0,0 +1,208 @@ +using Microsoft.Azure.Devices.Client; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Core; +using Tango.Logging; +using Tango.Telemetry.Helpers; +// NEW: +using System.Timers; + +namespace Tango.Telemetry.Destinations +{ + /// <summary> + /// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport. + /// Now always batches and flushes once per second on a timer. + /// </summary> + public class TelemetryAzureHubBatchDestination : ExtendedObject, ITelemetryDestination + { + private DeviceClient _hubClient; + private int _batchSize; // kept for compatibility (no longer used for triggering sends) + private ConcurrentList<Message> _batch; + + // NEW: timer + lock to coordinate flushes + private Timer _flushTimer; + private readonly object _batchLock = new object(); + + private class ADXPAckage + { + public String Type { get; set; } + public String Environment { get; set; } + public String MachineType { get; set; } + public String SerialNumber { get; set; } + public String Organization { get; set; } + public String Site { get; set; } + public ITelemetry Telemetry { get; set; } + public DateTime UploadTime { get; set; } + public DateTime CreatedTime { get; set; } + public int Version { get; set; } + public String ToPayload() => JsonConvert.SerializeObject(this, Formatting.None); + } + + public string Name { get; set; } = "Azure IoT Hub"; + public string ConnectionString { get; private set; } + public ConnectionStatus HubConnectionStatus { get; private set; } + public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; } + + /// <summary> + /// Kept for backward compatibility. Value is clamped 1..10 but not used to trigger flushes anymore. + /// </summary> + public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); } + + private TelemetryAzureHubBatchDestination() + { + _batch = new ConcurrentList<Message>(); + HubConnectionStatus = ConnectionStatus.Connected; + SupportedSourceTypes = new List<TelemetrySourceTypes>() + { + TelemetrySourceTypes.PendingStorage, + TelemetrySourceTypes.Streaming, + TelemetrySourceTypes.ExternalStorage + }; + BatchSize = 1; + } + + public TelemetryAzureHubBatchDestination(string connectionString) : this() + { + ConnectionString = connectionString; + } + + public Task<bool> IsAvailable() + { + if (InternetConnectivity.IsInternetAvailable()) + { + if (_hubClient == null) return Task.FromResult(true); + return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); + } + return Task.FromResult(false); + } + + public Task Publish(TelemetryPublishPackage package, List<KeyValuePair<string, string>> properties) + { + if (_hubClient == null) + { + _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); + _hubClient.OperationTimeoutInMilliseconds = 2000; + _hubClient.SetConnectionStatusChangesHandler((status, reason) => + { + HubConnectionStatus = status; + LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info); + }); + + // NEW: start the periodic flusher (every 1s) + EnsureFlushTimer(); + } + + var adxPackage = new ADXPAckage + { + Type = package.TelemetryName, + Version = package.TelemetryVersion, + Environment = package.Environment, + MachineType = package.MachineType, + SerialNumber = package.SerialNumber, + Organization = package.Organization, + Site = package.Site, + UploadTime = DateTime.UtcNow, + CreatedTime = package.PendingTelemetry.TelemetryObject.Time, + Telemetry = package.PendingTelemetry.TelemetryObject + }; + + var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload())) + { + ContentType = "application/json", + ContentEncoding = "utf-8" + }; + + foreach (var prop in properties) + message.Properties.Add(prop.Key, prop.Value); + + // NEW: always enqueue; timer decides when to send + lock (_batchLock) + { + _batch.Add(message); + } + LogManager.Log("Queued telemetry message for 1s batch flush.", LogCategory.Debug); + + return Task.FromResult(true); + } + + // NEW: create the 1s timer (idempotent) + private void EnsureFlushTimer() + { + if (_flushTimer != null) return; + + _flushTimer = new Timer(1000); + _flushTimer.AutoReset = true; + _flushTimer.Elapsed += async (s, e) => + { + try + { + await FlushBatchAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + // If send fails, drop connection so IsAvailable() reflects it and upstream can retry later + try { _hubClient?.Dispose(); } catch { /* ignore */ } + _hubClient = null; + LogManager.Log($"Azure IoT Hub flush failed: {ex.Message}", LogCategory.Warning); + } + }; + _flushTimer.Start(); + } + + // NEW: send whatever accumulated since last tick + private async Task FlushBatchAsync() + { + List<Message> toSend = null; + + lock (_batchLock) + { + if (_batch.Count == 0) return; + toSend = _batch.ToList(); + _batch.Clear(); + } + + LogManager.Log($"Sending telemetry batch of {toSend.Count} messages to Azure IoT Hub.", LogCategory.Debug); + await _hubClient.SendEventBatchAsync(toSend); + } + + public void Dispose() + { + // NEW: stop timer first to avoid races + if (_flushTimer != null) + { + _flushTimer.Stop(); + _flushTimer.Dispose(); + _flushTimer = null; + } + + // Flush any remaining messages synchronously + try + { + List<Message> toSend = null; + lock (_batchLock) + { + if (_batch.Count > 0) + { + toSend = _batch.ToList(); + _batch.Clear(); + } + } + if (_hubClient != null && toSend != null && toSend.Count > 0) + { + LogManager.Log($"Flushing {toSend.Count} remaining messages to Azure IoT Hub.", LogCategory.Info); + _hubClient.SendEventBatchAsync(toSend).GetAwaiter().GetResult(); + } + } + finally + { + _hubClient?.Dispose(); + } + } + + public override string ToString() => Name; + } +} |
