aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs129
1 files changed, 129 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs
new file mode 100644
index 000000000..11787c834
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs
@@ -0,0 +1,129 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.Collections;
+using MQTTnet.Client;
+using MQTTnet.Client.Options;
+using MQTTnet;
+using System.Reflection;
+using MQTTnet.Client.Connecting;
+using Tango.Core;
+using MQTTnet.Packets;
+
+namespace Tango.Telemetry.Destinations
+{
+ public class TelemetryMqttDestination : ExtendedObject, ITelemetryDestination
+ {
+ private IMqttClient _mqttClient;
+ private IMqttClientOptions _mqttOptions;
+ private DateTime _nextRealAvailabilityCheck;
+
+ public string Name { get; set; } = "MQTT";
+ public String Address { get; private set; }
+ public int Port { get; private set; }
+ public String Topic { get; private set; }
+
+ public IReadOnlyList<TelemetrySourceTypes> SupportedSourceTypes { get; private set; }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="topic">e.g machie/telemetry/serial number</param>
+ /// <param name="address">Default localhost</param>
+ /// <param name="port">Default 1883</param>
+ public TelemetryMqttDestination(String topic, String address = "localhost", int port = 1883)
+ {
+ _nextRealAvailabilityCheck = DateTime.Now;
+ Topic = topic;
+ Address = address;
+ Port = port;
+ SupportedSourceTypes = new List<TelemetrySourceTypes>() { TelemetrySourceTypes.Streaming };
+ }
+
+ public async Task<bool> IsAvailable()
+ {
+ if (_mqttClient == null)
+ {
+ return await EnsureConnection();
+ }
+ else
+ {
+ if (DateTime.Now > _nextRealAvailabilityCheck)
+ {
+ _nextRealAvailabilityCheck = DateTime.Now.AddMinutes(5);
+ return await EnsureConnection();
+ }
+ else
+ {
+ return _mqttClient.IsConnected;
+ }
+ }
+ }
+
+ private async Task<bool> EnsureConnection()
+ {
+ if (_mqttClient == null || !_mqttClient.IsConnected)
+ {
+ try
+ {
+ var factory = new MqttFactory();
+ _mqttClient = factory.CreateMqttClient();
+
+ String exeName = Assembly.GetEntryAssembly().GetName().FullName;
+ _mqttOptions = new MqttClientOptionsBuilder()
+ .WithClientId(exeName)
+ .WithTcpServer(Address, Port)
+ .WithCleanSession()
+ .Build();
+
+ var result = await _mqttClient.ConnectAsync(_mqttOptions);
+ if (result.ResultCode != MqttClientConnectResultCode.Success)
+ {
+ LogManager.Log(new Exception($"Error connecting to MQTT broker. {result.ResultCode}"));
+ return false;
+ }
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Error connecting to MQTT broker.");
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties)
+ {
+ if (await EnsureConnection())
+ {
+ var message = new MqttApplicationMessageBuilder()
+ .WithTopic($"{Topic}/{package.PendingTelemetry.TelemetryObject.ToTelemetryName()}")
+ .WithPayload(package.ToPayload())
+ .WithExactlyOnceQoS()
+ .WithRetainFlag(false)
+ .Build();
+
+ foreach (var prop in properties)
+ {
+ message.UserProperties.Add(new MqttUserProperty(prop.Key, prop.Value));
+ }
+
+
+ await _mqttClient.PublishAsync(message);
+ }
+ }
+
+ public void Dispose()
+ {
+ _mqttClient?.Dispose();
+ }
+
+ public override string ToString()
+ {
+ return $"{Name} -> {Address}:{Port}";
+ }
+ }
+}