using Microsoft.Azure.Devices.Client; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Tango.Core; using Tango.Logging; using Tango.Telemetry.Helpers; // NEW: using System.Timers; namespace Tango.Telemetry.Destinations { /// /// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport. /// Now always batches and flushes once per second on a timer. /// public class TelemetryAzureHubBatchDestination : ExtendedObject, ITelemetryDestination { private DeviceClient _hubClient; private int _batchSize; // kept for compatibility (no longer used for triggering sends) private ConcurrentList _batch; // NEW: timer + lock to coordinate flushes private Timer _flushTimer; private readonly object _batchLock = new object(); private class ADXPAckage { public String Type { get; set; } public String Environment { get; set; } public String MachineType { get; set; } public String SerialNumber { get; set; } public String Organization { get; set; } public String Site { get; set; } public ITelemetry Telemetry { get; set; } public DateTime UploadTime { get; set; } public DateTime CreatedTime { get; set; } public int Version { get; set; } public String ToPayload() => JsonConvert.SerializeObject(this, Formatting.None); } public string Name { get; set; } = "Azure IoT Hub"; public string ConnectionString { get; private set; } public ConnectionStatus HubConnectionStatus { get; private set; } public IReadOnlyList SupportedSourceTypes { get; private set; } /// /// Kept for backward compatibility. Value is clamped 1..10 but not used to trigger flushes anymore. /// public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); } private TelemetryAzureHubBatchDestination() { _batch = new ConcurrentList(); HubConnectionStatus = ConnectionStatus.Connected; SupportedSourceTypes = new List() { TelemetrySourceTypes.PendingStorage, TelemetrySourceTypes.Streaming, TelemetrySourceTypes.ExternalStorage }; BatchSize = 1; } public TelemetryAzureHubBatchDestination(string connectionString) : this() { ConnectionString = connectionString; } public Task IsAvailable() { if (InternetConnectivity.IsInternetAvailable()) { if (_hubClient == null) return Task.FromResult(true); return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); } return Task.FromResult(false); } public Task Publish(TelemetryPublishPackage package, List> properties) { if (_hubClient == null) { _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); _hubClient.OperationTimeoutInMilliseconds = 2000; _hubClient.SetConnectionStatusChangesHandler((status, reason) => { HubConnectionStatus = status; LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info); }); // NEW: start the periodic flusher (every 1s) EnsureFlushTimer(); } var adxPackage = new ADXPAckage { Type = package.TelemetryName, Version = package.TelemetryVersion, Environment = package.Environment, MachineType = package.MachineType, SerialNumber = package.SerialNumber, Organization = package.Organization, Site = package.Site, UploadTime = DateTime.UtcNow, CreatedTime = package.PendingTelemetry.TelemetryObject.Time, Telemetry = package.PendingTelemetry.TelemetryObject }; var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload())) { ContentType = "application/json", ContentEncoding = "utf-8" }; foreach (var prop in properties) message.Properties.Add(prop.Key, prop.Value); // NEW: always enqueue; timer decides when to send lock (_batchLock) { _batch.Add(message); } LogManager.Log("Queued telemetry message for 1s batch flush.", LogCategory.Debug); return Task.FromResult(true); } // NEW: create the 1s timer (idempotent) private void EnsureFlushTimer() { if (_flushTimer != null) return; _flushTimer = new Timer(1000); _flushTimer.AutoReset = true; _flushTimer.Elapsed += async (s, e) => { try { await FlushBatchAsync().ConfigureAwait(false); } catch (Exception ex) { // If send fails, drop connection so IsAvailable() reflects it and upstream can retry later try { _hubClient?.Dispose(); } catch { /* ignore */ } _hubClient = null; LogManager.Log($"Azure IoT Hub flush failed: {ex.Message}", LogCategory.Warning); } }; _flushTimer.Start(); } // NEW: send whatever accumulated since last tick private async Task FlushBatchAsync() { List toSend = null; lock (_batchLock) { if (_batch.Count == 0) return; toSend = _batch.ToList(); _batch.Clear(); } LogManager.Log($"Sending telemetry batch of {toSend.Count} messages to Azure IoT Hub.", LogCategory.Debug); await _hubClient.SendEventBatchAsync(toSend); } public void Dispose() { // NEW: stop timer first to avoid races if (_flushTimer != null) { _flushTimer.Stop(); _flushTimer.Dispose(); _flushTimer = null; } // Flush any remaining messages synchronously try { List toSend = null; lock (_batchLock) { if (_batch.Count > 0) { toSend = _batch.ToList(); _batch.Clear(); } } if (_hubClient != null && toSend != null && toSend.Count > 0) { LogManager.Log($"Flushing {toSend.Count} remaining messages to Azure IoT Hub.", LogCategory.Info); _hubClient.SendEventBatchAsync(toSend).GetAwaiter().GetResult(); } } finally { _hubClient?.Dispose(); } } public override string ToString() => Name; } }