aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio/Tango.Telemetry/Destinations
diff options
context:
space:
mode:
authorRoy Ben Shabat <Roy.mail.net@gmail.com>2025-07-29 00:54:47 +0300
committerRoy Ben Shabat <Roy.mail.net@gmail.com>2025-07-29 00:54:47 +0300
commit70f9f4dcbe3d4537ff2bd503f18288b9a7b0e19e (patch)
treeab7c70dbe23c04e9e6ea9a165b6840e0b2ed1df7 /Software/Visual_Studio/Tango.Telemetry/Destinations
parentd70056692f43b2a39dbaefebe7c6e096a1205fb4 (diff)
downloadTango-70f9f4dcbe3d4537ff2bd503f18288b9a7b0e19e.tar.gz
Tango-70f9f4dcbe3d4537ff2bd503f18288b9a7b0e19e.zip
Tango.Telemetry v1
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/Destinations')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs56
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs101
2 files changed, 157 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs
new file mode 100644
index 000000000..713805471
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs
@@ -0,0 +1,56 @@
+using Microsoft.Azure.Devices.Client;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Tango.Core;
+
+namespace Tango.Telemetry.Destinations
+{
+ public class AzureHubTelemetryDestination : ExtendedObject, ITelemetryDestination
+ {
+ private DeviceClient _hubClient;
+
+ public bool Enable { get; set; } = true;
+ public String ConnectionString { get; private set; }
+
+ public IReadOnlyList<TelemetrySource> SupportedSources { get; private set; }
+
+ public AzureHubTelemetryDestination(String connectionString)
+ {
+ ConnectionString = connectionString;
+ SupportedSources = new List<TelemetrySource>() { TelemetrySource.PendingStorage, TelemetrySource.Streaming, TelemetrySource.ExternalStorage };
+ }
+
+ public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties)
+ {
+ if (_hubClient == null)
+ {
+ _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
+ _hubClient.SetConnectionStatusChangesHandler((status, reason) =>
+ {
+ LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}.");
+ });
+ }
+
+ var message = new Message(Encoding.UTF8.GetBytes(package.ToPayload()))
+ {
+ ContentType = "application/json",
+ ContentEncoding = "utf-8"
+ };
+
+ foreach (var prop in properties)
+ {
+ message.Properties.Add(prop.Key, prop.Value);
+ }
+
+ await _hubClient.SendEventAsync(message);
+ }
+
+ public void Dispose()
+ {
+ _hubClient?.Dispose();
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
new file mode 100644
index 000000000..a37d149a1
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs
@@ -0,0 +1,101 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.Collections;
+using MQTTnet.Client;
+using MQTTnet.Client.Options;
+using MQTTnet;
+using System.Reflection;
+using MQTTnet.Client.Connecting;
+using Tango.Core;
+using MQTTnet.Packets;
+
+namespace Tango.Telemetry.Destinations
+{
+ public class MqttTelemetryDestination : ExtendedObject, ITelemetryDestination
+ {
+ private IMqttClient _mqttClient;
+ private IMqttClientOptions _mqttOptions;
+
+ public bool Enable { get; set; } = true;
+ public String Address { get; private set; }
+ public int Port { get; private set; }
+ public String Topic { get; set; }
+
+ public IReadOnlyList<TelemetrySource> SupportedSources { get; private set; }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="topic">example machie/telemetry/serial number</param>
+ /// <param name="address">Default localhost</param>
+ /// <param name="port">Default 1883</param>
+ public MqttTelemetryDestination(String topic, String address = "localhost", int port = 1883)
+ {
+ Topic = topic;
+ Port = port;
+ SupportedSources = new List<TelemetrySource>() { TelemetrySource.Streaming };
+ }
+
+ public async Task<bool> EnsureConnection()
+ {
+ if (_mqttClient == null || !_mqttClient.IsConnected)
+ {
+ try
+ {
+ var factory = new MqttFactory();
+ _mqttClient = factory.CreateMqttClient();
+
+ String exeName = Assembly.GetEntryAssembly().GetName().FullName;
+ _mqttOptions = new MqttClientOptionsBuilder()
+ .WithClientId(exeName)
+ .WithTcpServer(Address, Port)
+ .WithCleanSession()
+ .Build();
+
+ var result = await _mqttClient.ConnectAsync(_mqttOptions);
+ if (result.ResultCode != MqttClientConnectResultCode.Success)
+ {
+ LogManager.Log(new Exception($"Error connecting to MQTT broker. {result.ResultCode}"));
+ return false;
+ }
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Error connecting to MQTT broker.");
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties)
+ {
+ if (await EnsureConnection())
+ {
+ var message = new MqttApplicationMessageBuilder()
+ .WithTopic($"{Topic}/{package.TelemetryObject.TelemetryName()}")
+ .WithPayload(package.ToPayload())
+ .WithExactlyOnceQoS()
+ .WithRetainFlag(false)
+ .Build();
+
+ foreach (var prop in properties)
+ {
+ message.UserProperties.Add(new MqttUserProperty(prop.Key, prop.Value));
+ }
+
+
+ await _mqttClient.PublishAsync(message);
+ }
+ }
+
+ public void Dispose()
+ {
+ _mqttClient?.Dispose();
+ }
+ }
+}