diff options
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs new file mode 100644 index 000000000..01ad31432 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs @@ -0,0 +1,188 @@ +using Microsoft.Azure.Devices.Client; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Core; +using Tango.Logging; + +namespace Tango.Telemetry.Destinations +{ + /// <summary> + /// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport. + /// Supports batching, source type filtering, and connection status tracking. + /// </summary> + public class TelemetryAzureHubDestination : ExtendedObject, ITelemetryDestination + { + private DeviceClient _hubClient; + private int _batchSize; + private ConcurrentList<Message> _batch; + + private class ADXPAckage + { + public String Type { get; set; } + public String Environment { get; set; } + public String MachineType { get; set; } + public String SerialNumber { get; set; } + public ITelemetry Telemetry { get; set; } + public DateTime UploadTime { get; set; } + + public String ToPayload() + { + return JsonConvert.SerializeObject(this, Formatting.None); + } + } + + /// <summary> + /// Gets or sets the name of the destination. + /// </summary> + public string Name { get; set; } = "Azure IoT Hub"; + + /// <summary> + /// Gets the connection string used to connect to Azure IoT Hub. + /// </summary> + public string ConnectionString { get; private set; } + + /// <summary> + /// Gets the current connection status of the Azure IoT Hub client. + /// </summary> + public ConnectionStatus HubConnectionStatus { get; private set; } + + /// <summary> + /// Gets the source types supported by this destination. + /// </summary> + public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; } + + /// <summary> + /// Gets or sets the maximum number of messages to send in a single batch. + /// The value is clamped between 1 and 100. + /// </summary> + public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); } + + /// <summary> + /// Prevents a default instance of the <see cref="TelemetryAzureHubDestination"/> class from being created. + /// </summary> + private TelemetryAzureHubDestination() + { + _batch = new ConcurrentList<Message>(); + HubConnectionStatus = ConnectionStatus.Connected; + SupportedSourceTypes = new List<TelemetrySourceTypes>() + { + TelemetrySourceTypes.PendingStorage, + TelemetrySourceTypes.Streaming, + TelemetrySourceTypes.ExternalStorage + }; + BatchSize = 1; + } + + /// <summary> + /// Initializes a new instance of the <see cref="TelemetryAzureHubDestination"/> class with the specified connection string. + /// </summary> + /// <param name="connectionString">The Azure IoT Hub connection string.</param> + public TelemetryAzureHubDestination(string connectionString) : this() + { + ConnectionString = connectionString; + } + + /// <summary> + /// Determines whether the destination is currently available for publishing. + /// </summary> + /// <returns>True if the destination is available; otherwise, false.</returns> + public Task<bool> IsAvailable() + { + if (_hubClient == null) + { + return Task.FromResult(true); + } + else + { + return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected); + } + } + + /// <summary> + /// Publishes a telemetry package to Azure IoT Hub. + /// Supports batching when <see cref="BatchSize"/> is greater than 1. + /// </summary> + /// <param name="package">The telemetry package to publish.</param> + /// <param name="properties">A list of properties to include with the message.</param> + public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<string, string>> properties) + { + if (_hubClient == null) + { + _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); + _hubClient.SetConnectionStatusChangesHandler((status, reason) => + { + HubConnectionStatus = status; + LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info); + }); + } + + ADXPAckage adxPackage = new ADXPAckage(); + adxPackage.Type = package.TelemetryName; + adxPackage.Environment = package.Environment; + adxPackage.MachineType = package.MachineType; + adxPackage.SerialNumber = package.SerialNumber; + adxPackage.UploadTime = DateTime.UtcNow; + adxPackage.Telemetry = package.PendingTelemetry.TelemetryObject; + + var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload())) + { + ContentType = "application/json", + ContentEncoding = "utf-8" + }; + + foreach (var prop in properties) + { + message.Properties.Add(prop.Key, prop.Value); + } + + if (BatchSize > 1) + { + _batch.Add(message); + + if (_batch.Count >= BatchSize) + { + LogManager.Log($"Sending telemetry batch of {_batch.Count} messages to Azure IoT Hub.", LogCategory.Debug); + await _hubClient.SendEventBatchAsync(_batch.ToList()); + _batch.Clear(); + } + else + { + LogManager.Log($"Queued telemetry message for batching. {_batch.Count}/{BatchSize} currently queued.", LogCategory.Debug); + } + } + else + { + LogManager.Log("Sending single telemetry message to Azure IoT Hub.", LogCategory.Debug); + await _hubClient.SendEventAsync(message); + } + } + + /// <summary> + /// Disposes the destination and ensures any remaining batched messages are sent. + /// </summary> + public void Dispose() + { + if (_hubClient != null && _batch.Count > 0) + { + LogManager.Log($"Flushing {_batch.Count} remaining messages to Azure IoT Hub.", LogCategory.Info); + _hubClient.SendEventBatchAsync(_batch.ToList()).GetAwaiter().GetResult(); + _batch.Clear(); + } + + _hubClient?.Dispose(); + } + + /// <summary> + /// Returns a string that represents the current destination instance. + /// </summary> + /// <returns>The name of the destination.</returns> + public override string ToString() + { + return Name; + } + } +}
\ No newline at end of file |
