diff options
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs new file mode 100644 index 000000000..11787c834 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Collections; +using MQTTnet.Client; +using MQTTnet.Client.Options; +using MQTTnet; +using System.Reflection; +using MQTTnet.Client.Connecting; +using Tango.Core; +using MQTTnet.Packets; + +namespace Tango.Telemetry.Destinations +{ + public class TelemetryMqttDestination : ExtendedObject, ITelemetryDestination + { + private IMqttClient _mqttClient; + private IMqttClientOptions _mqttOptions; + private DateTime _nextRealAvailabilityCheck; + + public string Name { get; set; } = "MQTT"; + public String Address { get; private set; } + public int Port { get; private set; } + public String Topic { get; private set; } + + public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; } + + /// <summary> + /// + /// </summary> + /// <param name="topic">e.g machie/telemetry/serial number</param> + /// <param name="address">Default localhost</param> + /// <param name="port">Default 1883</param> + public TelemetryMqttDestination(String topic, String address = "localhost", int port = 1883) + { + _nextRealAvailabilityCheck = DateTime.Now; + Topic = topic; + Address = address; + Port = port; + SupportedSourceTypes = new List<TelemetrySourceTypes>() { TelemetrySourceTypes.Streaming }; + } + + public async Task<bool> IsAvailable() + { + if (_mqttClient == null) + { + return await EnsureConnection(); + } + else + { + if (DateTime.Now > _nextRealAvailabilityCheck) + { + _nextRealAvailabilityCheck = DateTime.Now.AddMinutes(5); + return await EnsureConnection(); + } + else + { + return _mqttClient.IsConnected; + } + } + } + + private async Task<bool> EnsureConnection() + { + if (_mqttClient == null || !_mqttClient.IsConnected) + { + try + { + var factory = new MqttFactory(); + _mqttClient = factory.CreateMqttClient(); + + String exeName = Assembly.GetEntryAssembly().GetName().FullName; + _mqttOptions = new MqttClientOptionsBuilder() + .WithClientId(exeName) + .WithTcpServer(Address, Port) + .WithCleanSession() + .Build(); + + var result = await _mqttClient.ConnectAsync(_mqttOptions); + if (result.ResultCode != MqttClientConnectResultCode.Success) + { + LogManager.Log(new Exception($"Error connecting to MQTT broker. {result.ResultCode}")); + return false; + } + } + catch (Exception ex) + { + LogManager.Log(ex, "Error connecting to MQTT broker."); + return false; + } + } + + return true; + } + + public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties) + { + if (await EnsureConnection()) + { + var message = new MqttApplicationMessageBuilder() + .WithTopic($"{Topic}/{package.PendingTelemetry.TelemetryObject.ToTelemetryName()}") + .WithPayload(package.ToPayload()) + .WithExactlyOnceQoS() + .WithRetainFlag(false) + .Build(); + + foreach (var prop in properties) + { + message.UserProperties.Add(new MqttUserProperty(prop.Key, prop.Value)); + } + + + await _mqttClient.PublishAsync(message); + } + } + + public void Dispose() + { + _mqttClient?.Dispose(); + } + + public override string ToString() + { + return $"{Name} -> {Address}:{Port}"; + } + } +} |
