diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-29 00:54:47 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-07-29 00:54:47 +0300 |
| commit | 70f9f4dcbe3d4537ff2bd503f18288b9a7b0e19e (patch) | |
| tree | ab7c70dbe23c04e9e6ea9a165b6840e0b2ed1df7 /Software/Visual_Studio/Tango.Telemetry/Destinations | |
| parent | d70056692f43b2a39dbaefebe7c6e096a1205fb4 (diff) | |
| download | Tango-70f9f4dcbe3d4537ff2bd503f18288b9a7b0e19e.tar.gz Tango-70f9f4dcbe3d4537ff2bd503f18288b9a7b0e19e.zip | |
Tango.Telemetry v1
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry/Destinations')
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs | 56 | ||||
| -rw-r--r-- | Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs | 101 |
2 files changed, 157 insertions, 0 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs new file mode 100644 index 000000000..713805471 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/AzureHubTelemetryDestination.cs @@ -0,0 +1,56 @@ +using Microsoft.Azure.Devices.Client; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tango.Core; + +namespace Tango.Telemetry.Destinations +{ + public class AzureHubTelemetryDestination : ExtendedObject, ITelemetryDestination + { + private DeviceClient _hubClient; + + public bool Enable { get; set; } = true; + public String ConnectionString { get; private set; } + + public IReadOnlyList<TelemetrySource> SupportedSources { get; private set; } + + public AzureHubTelemetryDestination(String connectionString) + { + ConnectionString = connectionString; + SupportedSources = new List<TelemetrySource>() { TelemetrySource.PendingStorage, TelemetrySource.Streaming, TelemetrySource.ExternalStorage }; + } + + public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties) + { + if (_hubClient == null) + { + _hubClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt); + _hubClient.SetConnectionStatusChangesHandler((status, reason) => + { + LogManager.Log($"IoT hub status changed to: {status}, Reason: {reason}."); + }); + } + + var message = new Message(Encoding.UTF8.GetBytes(package.ToPayload())) + { + ContentType = "application/json", + ContentEncoding = "utf-8" + }; + + foreach (var prop in properties) + { + message.Properties.Add(prop.Key, prop.Value); + } + + await _hubClient.SendEventAsync(message); + } + + public void Dispose() + { + _hubClient?.Dispose(); + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs new file mode 100644 index 000000000..a37d149a1 --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/MqttTelemetryDestination.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Collections; +using MQTTnet.Client; +using MQTTnet.Client.Options; +using MQTTnet; +using System.Reflection; +using MQTTnet.Client.Connecting; +using Tango.Core; +using MQTTnet.Packets; + +namespace Tango.Telemetry.Destinations +{ + public class MqttTelemetryDestination : ExtendedObject, ITelemetryDestination + { + private IMqttClient _mqttClient; + private IMqttClientOptions _mqttOptions; + + public bool Enable { get; set; } = true; + public String Address { get; private set; } + public int Port { get; private set; } + public String Topic { get; set; } + + public IReadOnlyList<TelemetrySource> SupportedSources { get; private set; } + + /// <summary> + /// + /// </summary> + /// <param name="topic">example machie/telemetry/serial number</param> + /// <param name="address">Default localhost</param> + /// <param name="port">Default 1883</param> + public MqttTelemetryDestination(String topic, String address = "localhost", int port = 1883) + { + Topic = topic; + Port = port; + SupportedSources = new List<TelemetrySource>() { TelemetrySource.Streaming }; + } + + public async Task<bool> EnsureConnection() + { + if (_mqttClient == null || !_mqttClient.IsConnected) + { + try + { + var factory = new MqttFactory(); + _mqttClient = factory.CreateMqttClient(); + + String exeName = Assembly.GetEntryAssembly().GetName().FullName; + _mqttOptions = new MqttClientOptionsBuilder() + .WithClientId(exeName) + .WithTcpServer(Address, Port) + .WithCleanSession() + .Build(); + + var result = await _mqttClient.ConnectAsync(_mqttOptions); + if (result.ResultCode != MqttClientConnectResultCode.Success) + { + LogManager.Log(new Exception($"Error connecting to MQTT broker. {result.ResultCode}")); + return false; + } + } + catch (Exception ex) + { + LogManager.Log(ex, "Error connecting to MQTT broker."); + return false; + } + } + + return true; + } + + public async Task Publish(TelemetryPublishPackage package, List<KeyValuePair<String, String>> properties) + { + if (await EnsureConnection()) + { + var message = new MqttApplicationMessageBuilder() + .WithTopic($"{Topic}/{package.TelemetryObject.TelemetryName()}") + .WithPayload(package.ToPayload()) + .WithExactlyOnceQoS() + .WithRetainFlag(false) + .Build(); + + foreach (var prop in properties) + { + message.UserProperties.Add(new MqttUserProperty(prop.Key, prop.Value)); + } + + + await _mqttClient.PublishAsync(message); + } + } + + public void Dispose() + { + _mqttClient?.Dispose(); + } + } +} |
