using Microsoft.Azure.Devices.Client; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Tango.Core; using Tango.Logging; using Tango.Telemetry.Helpers; namespace Tango.Telemetry.Destinations { /// /// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport. /// Supports batching, source type filtering, and connection status tracking. /// public class TelemetryAzureHubDestination : ExtendedObject, ITelemetryDestination { private DeviceClient _hubClient; private int _batchSize; private ConcurrentList _batch; private class ADXPAckage { public String Type { get; set; } public String Environment { get; set; } public String MachineType { get; set; } public String SerialNumber { get; set; } public String Organization { get; set; } public String Site { get; set; } public ITelemetry Telemetry { get; set; } public DateTime UploadTime { get; set; } public DateTime CreatedTime { get; set; } public int Version { get; set; } public String ToPayload() { return JsonConvert.SerializeObject(this, Formatting.None); } } /// /// Gets or sets the name of the destination. /// public string Name { get; set; } = "Azure IoT Hub"; /// /// Gets the connection string used to connect to Azure IoT Hub. /// public string ConnectionString { get; private set; } /// /// Gets the current connection status of the Azure IoT Hub client. /// public ConnectionStatus HubConnectionStatus { get; private set; } /// /// Gets the source types supported by this destination. /// public IReadOnlyList SupportedSourceTypes { get; private set; } /// /// Gets or sets the maximum number of messages to send in a single batch. /// The value is clamped between 1 and 100. /// public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); } /// /// Prevents a default instance of the class from being created. /// private TelemetryAzureHubDestination() { _batch = new ConcurrentList(); HubConnectionStatus = ConnectionStatus.Connected; SupportedSourceTypes = new List() { TelemetrySourceTypes.PendingStorage, TelemetrySourceTypes.Streaming, TelemetrySourceTypes.ExternalStorage }; BatchSize = 1; } /// /// Initializes a new instance of the class with the specified connection string. /// /// The Azure IoT Hub connection string. public TelemetryAzureHubDestination(string connectionString) : this() { ConnectionString = connectionString; } /// /// Determines whether the destination is currently available for publishing. /// /// True if the destination is available; otherwise, false. public Task IsAvailable() { //if (NetworkListManager.GetNetworks(NetworkConnectivityLevels.Connected).Any(x => x.IsConnectedToInternet)) if (InternetConnectivity.IsInternetAvailable()) { if (_hubClient == null) { return Task.FromResult(true); } else { return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); } } else { return Task.FromResult(false); } } /// /// Publishes a telemetry package to Azure IoT Hub. /// Supports batching when is greater than 1. /// /// The telemetry package to publish. /// A list of properties to include with the message. public async Task Publish(TelemetryPublishPackage package, List> properties) { if (_hubClient == null) { _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); _hubClient.OperationTimeoutInMilliseconds = 2000; _hubClient.SetConnectionStatusChangesHandler((status, reason) => { HubConnectionStatus = status; LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info); if (status == ConnectionStatus.Disabled || status == ConnectionStatus.Disconnected) { _hubClient = null; } }); } ADXPAckage adxPackage = new ADXPAckage(); adxPackage.Type = package.TelemetryName; adxPackage.Version = package.TelemetryVersion; adxPackage.Environment = package.Environment; adxPackage.MachineType = package.MachineType; adxPackage.SerialNumber = package.SerialNumber; adxPackage.Organization = package.Organization; adxPackage.Site = package.Site; adxPackage.UploadTime = DateTime.UtcNow; adxPackage.CreatedTime = package.PendingTelemetry.TelemetryObject.Time; adxPackage.Telemetry = package.PendingTelemetry.TelemetryObject; var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload())) { ContentType = "application/json", ContentEncoding = "utf-8" }; foreach (var prop in properties) { message.Properties.Add(prop.Key, prop.Value); } try { if (BatchSize > 1) { _batch.Add(message); if (_batch.Count >= BatchSize) { LogManager.Log($"Sending telemetry batch of {_batch.Count} messages to Azure IoT Hub.", LogCategory.Debug); await _hubClient.SendEventBatchAsync(_batch.ToList()); _batch.Clear(); } else { LogManager.Log($"Queued telemetry message for batching. {_batch.Count}/{BatchSize} currently queued.", LogCategory.Debug); } } else { LogManager.Log("Sending single telemetry message to Azure IoT Hub.", LogCategory.Debug); await _hubClient.SendEventAsync(message); } } catch (Exception ex) { _hubClient?.Dispose(); _hubClient = null; throw; } } /// /// Disposes the destination and ensures any remaining batched messages are sent. /// public void Dispose() { if (_hubClient != null && _batch.Count > 0) { LogManager.Log($"Flushing {_batch.Count} remaining messages to Azure IoT Hub.", LogCategory.Info); _hubClient.SendEventBatchAsync(_batch.ToList()).GetAwaiter().GetResult(); _batch.Clear(); } _hubClient?.Dispose(); } /// /// Returns a string that represents the current destination instance. /// /// The name of the destination. public override string ToString() { return Name; } } }