aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs188
1 files changed, 188 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs
new file mode 100644
index 000000000..01ad31432
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryAzureHubDestination.cs
@@ -0,0 +1,188 @@
+using Microsoft.Azure.Devices.Client;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Core;
+using Tango.Logging;
+
+namespace Tango.Telemetry.Destinations
+{
+ /// <summary>
+ /// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport.
+ /// Supports batching, source type filtering, and connection status tracking.
+ /// </summary>
+ public class TelemetryAzureHubDestination : ExtendedObject, ITelemetryDestination
+ {
+ private DeviceClient _hubClient;
+ private int _batchSize;
+ private ConcurrentList<Message> _batch;
+
+ private class ADXPAckage
+ {
+ public String Type { get; set; }
+ public String Environment { get; set; }
+ public String MachineType { get; set; }
+ public String SerialNumber { get; set; }
+ public ITelemetry Telemetry { get; set; }
+ public DateTime UploadTime { get; set; }
+
+ public String ToPayload()
+ {
+ return JsonConvert.SerializeObject(this, Formatting.None);
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the name of the destination.
+ /// </summary>
+ public string Name { get; set; } = "Azure IoT Hub";
+
+ /// <summary>
+ /// Gets the connection string used to connect to Azure IoT Hub.
+ /// </summary>
+ public string ConnectionString { get; private set; }
+
+ /// <summary>
+ /// Gets the current connection status of the Azure IoT Hub client.
+ /// </summary>
+ public ConnectionStatus HubConnectionStatus { get; private set; }
+
+ /// <summary>
+ /// Gets the source types supported by this destination.
+ /// </summary>
+ public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; }
+
+ /// <summary>
+ /// Gets or sets the maximum number of messages to send in a single batch.
+ /// The value is clamped between 1 and 100.
+ /// </summary>
+ public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); }
+
+ /// <summary>
+ /// Prevents a default instance of the <see cref="TelemetryAzureHubDestination"/> class from being created.
+ /// </summary>
+ private TelemetryAzureHubDestination()
+ {
+ _batch = new ConcurrentList<Message>();
+ HubConnectionStatus = ConnectionStatus.Connected;
+ SupportedSourceTypes = new List<TelemetrySourceTypes>()
+ {
+ TelemetrySourceTypes.PendingStorage,
+ TelemetrySourceTypes.Streaming,
+ TelemetrySourceTypes.ExternalStorage
+ };
+ BatchSize = 1;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TelemetryAzureHubDestination"/> class with the specified connection string.
+ /// </summary>
+ /// <param name="connectionString">The Azure IoT Hub connection string.</param>
+ public TelemetryAzureHubDestination(string connectionString) : this()
+ {
+ ConnectionString = connectionString;
+ }
+
+ /// <summary>
+ /// Determines whether the destination is currently available for publishing.
+ /// </summary>
+ /// <returns>True if the destination is available; otherwise, false.</returns>
+ public Task<bool> IsAvailable()
+ {
+ if (_hubClient == null)
+ {
+ return Task.FromResult(true);
+ }
+ else
+ {
+ return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected);
+ }
+ }
+
+ /// <summary>
+ /// Publishes a telemetry package to Azure IoT Hub.
+ /// Supports batching when <see cref="BatchSize"/> is greater than 1.
+ /// </summary>
+ /// <param name="package">The telemetry package to publish.</param>
+ /// <param name="properties">A list of properties to include with the message.</param>
+ public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<string, string>> properties)
+ {
+ if (_hubClient == null)
+ {
+ _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
+ _hubClient.SetConnectionStatusChangesHandler((status, reason) =>
+ {
+ HubConnectionStatus = status;
+ LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info);
+ });
+ }
+
+ ADXPAckage adxPackage = new ADXPAckage();
+ adxPackage.Type = package.TelemetryName;
+ adxPackage.Environment = package.Environment;
+ adxPackage.MachineType = package.MachineType;
+ adxPackage.SerialNumber = package.SerialNumber;
+ adxPackage.UploadTime = DateTime.UtcNow;
+ adxPackage.Telemetry = package.PendingTelemetry.TelemetryObject;
+
+ var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload()))
+ {
+ ContentType = "application/json",
+ ContentEncoding = "utf-8"
+ };
+
+ foreach (var prop in properties)
+ {
+ message.Properties.Add(prop.Key, prop.Value);
+ }
+
+ if (BatchSize > 1)
+ {
+ _batch.Add(message);
+
+ if (_batch.Count >= BatchSize)
+ {
+ LogManager.Log($"Sending telemetry batch of {_batch.Count} messages to Azure IoT Hub.", LogCategory.Debug);
+ await _hubClient.SendEventBatchAsync(_batch.ToList());
+ _batch.Clear();
+ }
+ else
+ {
+ LogManager.Log($"Queued telemetry message for batching. {_batch.Count}/{BatchSize} currently queued.", LogCategory.Debug);
+ }
+ }
+ else
+ {
+ LogManager.Log("Sending single telemetry message to Azure IoT Hub.", LogCategory.Debug);
+ await _hubClient.SendEventAsync(message);
+ }
+ }
+
+ /// <summary>
+ /// Disposes the destination and ensures any remaining batched messages are sent.
+ /// </summary>
+ public void Dispose()
+ {
+ if (_hubClient != null && _batch.Count > 0)
+ {
+ LogManager.Log($"Flushing {_batch.Count} remaining messages to Azure IoT Hub.", LogCategory.Info);
+ _hubClient.SendEventBatchAsync(_batch.ToList()).GetAwaiter().GetResult();
+ _batch.Clear();
+ }
+
+ _hubClient?.Dispose();
+ }
+
+ /// <summary>
+ /// Returns a string that represents the current destination instance.
+ /// </summary>
+ /// <returns>The name of the destination.</returns>
+ public override string ToString()
+ {
+ return Name;
+ }
+ }
+} \ No newline at end of file