using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections; using MQTTnet.Client; using MQTTnet.Client.Options; using MQTTnet; using System.Reflection; using MQTTnet.Client.Connecting; using Tango.Core; using MQTTnet.Packets; namespace Tango.Telemetry.Destinations { public class TelemetryMqttDestination : ExtendedObject, ITelemetryDestination { private IMqttClient _mqttClient; private IMqttClientOptions _mqttOptions; private DateTime _nextRealAvailabilityCheck; public string Name { get; set; } = "MQTT"; public String Address { get; private set; } public int Port { get; private set; } public String Topic { get; private set; } public IReadOnlyList SupportedSourceTypes { get; private set; } /// /// /// /// e.g machie/telemetry/serial number /// Default localhost /// Default 1883 public TelemetryMqttDestination(String topic, String address = "localhost", int port = 1883) { _nextRealAvailabilityCheck = DateTime.Now; Topic = topic; Address = address; Port = port; SupportedSourceTypes = new List() { TelemetrySourceTypes.Streaming }; } public async Task IsAvailable() { if (_mqttClient == null) { return await EnsureConnection(); } else { if (DateTime.Now > _nextRealAvailabilityCheck) { _nextRealAvailabilityCheck = DateTime.Now.AddMinutes(5); return await EnsureConnection(); } else { return _mqttClient.IsConnected; } } } private async Task EnsureConnection() { if (_mqttClient == null || !_mqttClient.IsConnected) { try { var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); String exeName = Assembly.GetEntryAssembly().GetName().Name; _mqttOptions = new MqttClientOptionsBuilder() .WithClientId(exeName) .WithTcpServer(Address, Port) .WithCleanSession() .Build(); var result = await _mqttClient.ConnectAsync(_mqttOptions); if (result.ResultCode != MqttClientConnectResultCode.Success) { LogManager.Log(new Exception($"Error connecting to MQTT broker. {result.ResultCode}")); return false; } } catch (Exception ex) { LogManager.Log(ex, "Error connecting to MQTT broker."); return false; } } return true; } public async Task Publish(TelemetryPublishPackage package, List> properties) { if (await EnsureConnection()) { var message = new MqttApplicationMessageBuilder() .WithTopic($"{Topic}/{package.PendingTelemetry.TelemetryObject.ToTelemetryName()}") .WithPayload(package.ToPayload()) .WithExactlyOnceQoS() .WithRetainFlag(false) .Build(); message.UserProperties = new List(); foreach (var prop in properties) { message.UserProperties.Add(new MqttUserProperty(prop.Key, prop.Value ?? String.Empty)); } await _mqttClient.PublishAsync(message); } } public void Dispose() { _mqttClient?.Dispose(); } public override string ToString() { return $"{Name} -> {Address}:{Port}"; } } }