using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tango.Core;
using Tango.Logging;
using Tango.Telemetry.Helpers;
// NEW:
using System.Timers;
namespace Tango.Telemetry.Destinations
{
///
/// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport.
/// Now always batches and flushes once per second on a timer.
///
public class TelemetryAzureHubBatchDestination : ExtendedObject, ITelemetryDestination
{
private DeviceClient _hubClient;
private int _batchSize; // kept for compatibility (no longer used for triggering sends)
private ConcurrentList _batch;
// NEW: timer + lock to coordinate flushes
private Timer _flushTimer;
private readonly object _batchLock = new object();
private class ADXPAckage
{
public String Type { get; set; }
public String Environment { get; set; }
public String MachineType { get; set; }
public String SerialNumber { get; set; }
public String Organization { get; set; }
public String Site { get; set; }
public ITelemetry Telemetry { get; set; }
public DateTime UploadTime { get; set; }
public DateTime CreatedTime { get; set; }
public int Version { get; set; }
public String ToPayload() => JsonConvert.SerializeObject(this, Formatting.None);
}
public string Name { get; set; } = "Azure IoT Hub";
public string ConnectionString { get; private set; }
public ConnectionStatus HubConnectionStatus { get; private set; }
public IReadOnlyList SupportedSourceTypes { get; private set; }
///
/// Kept for backward compatibility. Value is clamped 1..10 but not used to trigger flushes anymore.
///
public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); }
private TelemetryAzureHubBatchDestination()
{
_batch = new ConcurrentList();
HubConnectionStatus = ConnectionStatus.Connected;
SupportedSourceTypes = new List()
{
TelemetrySourceTypes.PendingStorage,
TelemetrySourceTypes.Streaming,
TelemetrySourceTypes.ExternalStorage
};
BatchSize = 1;
}
public TelemetryAzureHubBatchDestination(string connectionString) : this()
{
ConnectionString = connectionString;
}
public Task IsAvailable()
{
if (InternetConnectivity.IsInternetAvailable())
{
if (_hubClient == null) return Task.FromResult(true);
return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected);
}
return Task.FromResult(false);
}
public Task Publish(TelemetryPublishPackage package, List> properties)
{
if (_hubClient == null)
{
_hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
_hubClient.OperationTimeoutInMilliseconds = 2000;
_hubClient.SetConnectionStatusChangesHandler((status, reason) =>
{
HubConnectionStatus = status;
LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info);
});
// NEW: start the periodic flusher (every 1s)
EnsureFlushTimer();
}
var adxPackage = new ADXPAckage
{
Type = package.TelemetryName,
Version = package.TelemetryVersion,
Environment = package.Environment,
MachineType = package.MachineType,
SerialNumber = package.SerialNumber,
Organization = package.Organization,
Site = package.Site,
UploadTime = DateTime.UtcNow,
CreatedTime = package.PendingTelemetry.TelemetryObject.Time,
Telemetry = package.PendingTelemetry.TelemetryObject
};
var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload()))
{
ContentType = "application/json",
ContentEncoding = "utf-8"
};
foreach (var prop in properties)
message.Properties.Add(prop.Key, prop.Value);
// NEW: always enqueue; timer decides when to send
lock (_batchLock)
{
_batch.Add(message);
}
LogManager.Log("Queued telemetry message for 1s batch flush.", LogCategory.Debug);
return Task.FromResult(true);
}
// NEW: create the 1s timer (idempotent)
private void EnsureFlushTimer()
{
if (_flushTimer != null) return;
_flushTimer = new Timer(1000);
_flushTimer.AutoReset = true;
_flushTimer.Elapsed += async (s, e) =>
{
try
{
await FlushBatchAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
// If send fails, drop connection so IsAvailable() reflects it and upstream can retry later
try { _hubClient?.Dispose(); } catch { /* ignore */ }
_hubClient = null;
LogManager.Log($"Azure IoT Hub flush failed: {ex.Message}", LogCategory.Warning);
}
};
_flushTimer.Start();
}
// NEW: send whatever accumulated since last tick
private async Task FlushBatchAsync()
{
List toSend = null;
lock (_batchLock)
{
if (_batch.Count == 0) return;
toSend = _batch.ToList();
_batch.Clear();
}
LogManager.Log($"Sending telemetry batch of {toSend.Count} messages to Azure IoT Hub.", LogCategory.Debug);
await _hubClient.SendEventBatchAsync(toSend);
}
public void Dispose()
{
// NEW: stop timer first to avoid races
if (_flushTimer != null)
{
_flushTimer.Stop();
_flushTimer.Dispose();
_flushTimer = null;
}
// Flush any remaining messages synchronously
try
{
List toSend = null;
lock (_batchLock)
{
if (_batch.Count > 0)
{
toSend = _batch.ToList();
_batch.Clear();
}
}
if (_hubClient != null && toSend != null && toSend.Count > 0)
{
LogManager.Log($"Flushing {toSend.Count} remaining messages to Azure IoT Hub.", LogCategory.Info);
_hubClient.SendEventBatchAsync(toSend).GetAwaiter().GetResult();
}
}
finally
{
_hubClient?.Dispose();
}
}
public override string ToString() => Name;
}
}