using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tango.Core;
using Tango.Logging;
using Tango.Telemetry.Helpers;
namespace Tango.Telemetry.Destinations
{
///
/// Represents a telemetry destination that publishes telemetry data to Azure IoT Hub using MQTT transport.
/// Supports batching, source type filtering, and connection status tracking.
///
public class TelemetryAzureHubDestination : ExtendedObject, ITelemetryDestination
{
private DeviceClient _hubClient;
private int _batchSize;
private ConcurrentList _batch;
private class ADXPAckage
{
public String Type { get; set; }
public String Environment { get; set; }
public String MachineType { get; set; }
public String SerialNumber { get; set; }
public String Organization { get; set; }
public String Site { get; set; }
public ITelemetry Telemetry { get; set; }
public DateTime UploadTime { get; set; }
public DateTime CreatedTime { get; set; }
public int Version { get; set; }
public String ToPayload()
{
return JsonConvert.SerializeObject(this, Formatting.None);
}
}
///
/// Gets or sets the name of the destination.
///
public string Name { get; set; } = "Azure IoT Hub";
///
/// Gets the connection string used to connect to Azure IoT Hub.
///
public string ConnectionString { get; private set; }
///
/// Gets the current connection status of the Azure IoT Hub client.
///
public ConnectionStatus HubConnectionStatus { get; private set; }
///
/// Gets the source types supported by this destination.
///
public IReadOnlyList SupportedSourceTypes { get; private set; }
///
/// Gets or sets the maximum number of messages to send in a single batch.
/// The value is clamped between 1 and 100.
///
public int BatchSize { get => _batchSize; set => _batchSize = Math.Min(Math.Max(1, value), 10); }
///
/// Prevents a default instance of the class from being created.
///
private TelemetryAzureHubDestination()
{
_batch = new ConcurrentList();
HubConnectionStatus = ConnectionStatus.Connected;
SupportedSourceTypes = new List()
{
TelemetrySourceTypes.PendingStorage,
TelemetrySourceTypes.Streaming,
TelemetrySourceTypes.ExternalStorage
};
BatchSize = 1;
}
///
/// Initializes a new instance of the class with the specified connection string.
///
/// The Azure IoT Hub connection string.
public TelemetryAzureHubDestination(string connectionString) : this()
{
ConnectionString = connectionString;
}
///
/// Determines whether the destination is currently available for publishing.
///
/// True if the destination is available; otherwise, false.
public Task IsAvailable()
{
//if (NetworkListManager.GetNetworks(NetworkConnectivityLevels.Connected).Any(x => x.IsConnectedToInternet))
if (InternetConnectivity.IsInternetAvailable())
{
if (_hubClient == null)
{
return Task.FromResult(true);
}
else
{
return Task.FromResult(HubConnectionStatus == ConnectionStatus.Connected);
}
}
else
{
return Task.FromResult(false);
}
}
///
/// Publishes a telemetry package to Azure IoT Hub.
/// Supports batching when is greater than 1.
///
/// The telemetry package to publish.
/// A list of properties to include with the message.
public async Task Publish(TelemetryPublishPackage package, List> properties)
{
if (_hubClient == null)
{
_hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
_hubClient.OperationTimeoutInMilliseconds = 2000;
_hubClient.SetConnectionStatusChangesHandler((status, reason) =>
{
HubConnectionStatus = status;
LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.", LogCategory.Info);
if (status == ConnectionStatus.Disabled || status == ConnectionStatus.Disconnected)
{
_hubClient = null;
}
});
}
ADXPAckage adxPackage = new ADXPAckage();
adxPackage.Type = package.TelemetryName;
adxPackage.Version = package.TelemetryVersion;
adxPackage.Environment = package.Environment;
adxPackage.MachineType = package.MachineType;
adxPackage.SerialNumber = package.SerialNumber;
adxPackage.Organization = package.Organization;
adxPackage.Site = package.Site;
adxPackage.UploadTime = DateTime.UtcNow;
adxPackage.CreatedTime = package.PendingTelemetry.TelemetryObject.Time;
adxPackage.Telemetry = package.PendingTelemetry.TelemetryObject;
var message = new Message(Encoding.UTF8.GetBytes(adxPackage.ToPayload()))
{
ContentType = "application/json",
ContentEncoding = "utf-8"
};
foreach (var prop in properties)
{
message.Properties.Add(prop.Key, prop.Value);
}
try
{
if (BatchSize > 1)
{
_batch.Add(message);
if (_batch.Count >= BatchSize)
{
LogManager.Log($"Sending telemetry batch of {_batch.Count} messages to Azure IoT Hub.", LogCategory.Debug);
await _hubClient.SendEventBatchAsync(_batch.ToList());
_batch.Clear();
}
else
{
LogManager.Log($"Queued telemetry message for batching. {_batch.Count}/{BatchSize} currently queued.", LogCategory.Debug);
}
}
else
{
LogManager.Log("Sending single telemetry message to Azure IoT Hub.", LogCategory.Debug);
await _hubClient.SendEventAsync(message);
}
}
catch (Exception ex)
{
_hubClient?.Dispose();
_hubClient = null;
throw;
}
}
///
/// Disposes the destination and ensures any remaining batched messages are sent.
///
public void Dispose()
{
if (_hubClient != null && _batch.Count > 0)
{
LogManager.Log($"Flushing {_batch.Count} remaining messages to Azure IoT Hub.", LogCategory.Info);
_hubClient.SendEventBatchAsync(_batch.ToList()).GetAwaiter().GetResult();
_batch.Clear();
}
_hubClient?.Dispose();
}
///
/// Returns a string that represents the current destination instance.
///
/// The name of the destination.
public override string ToString()
{
return Name;
}
}
}