aboutsummaryrefslogtreecommitdiffstats
path: root/Software/Visual_Studio
diff options
context:
space:
mode:
Diffstat (limited to 'Software/Visual_Studio')
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs6
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Helpers/InProcessMqttBroker.cs169
-rw-r--r--Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj1
3 files changed, 174 insertions, 2 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs
index 11787c834..16b741a82 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs
+++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs
@@ -71,7 +71,7 @@ namespace Tango.Telemetry.Destinations
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
- String exeName = Assembly.GetEntryAssembly().GetName().FullName;
+ String exeName = Assembly.GetEntryAssembly().GetName().Name;
_mqttOptions = new MqttClientOptionsBuilder()
.WithClientId(exeName)
.WithTcpServer(Address, Port)
@@ -106,9 +106,11 @@ namespace Tango.Telemetry.Destinations
.WithRetainFlag(false)
.Build();
+ message.UserProperties = new List<MqttUserProperty>();
+
foreach (var prop in properties)
{
- message.UserProperties.Add(new MqttUserProperty(prop.Key, prop.Value));
+ message.UserProperties.Add(new MqttUserProperty(prop.Key, prop.Value ?? String.Empty));
}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Helpers/InProcessMqttBroker.cs b/Software/Visual_Studio/Tango.Telemetry/Helpers/InProcessMqttBroker.cs
new file mode 100644
index 000000000..35df0998d
--- /dev/null
+++ b/Software/Visual_Studio/Tango.Telemetry/Helpers/InProcessMqttBroker.cs
@@ -0,0 +1,169 @@
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Text;
+using System.Threading.Tasks;
+using MQTTnet;
+using MQTTnet.Protocol;
+using MQTTnet.Server;
+using Tango.Core;
+using Tango.Logging;
+
+namespace Tango.Telemetry.Helpers
+{
+ /// <summary>
+ /// Lightweight in-process MQTT broker for LAN subscribers.
+ /// Start at app boot; point TelemetryMqttDestination to its port.
+ /// </summary>
+ public sealed class InProcessMqttBroker : ExtendedObject, IDisposable
+ {
+ private IMqttServer _server;
+ private MqttServerOptionsBuilder _optionsBuilder;
+ private readonly string _listenAddress;
+ private readonly int _port;
+ private readonly bool _allowAnonymous;
+ private readonly IDictionary<string, string> _users;
+
+ public bool IsRunning { get; private set; }
+
+ /// <param name="listenAddress">"0.0.0.0" for all interfaces; "127.0.0.1" for local only.</param>
+ /// <param name="port">Use 1884 if Mosquitto is on 1883.</param>
+ /// <param name="allowAnonymous">False to require username/password.</param>
+ /// <param name="users">User/pass map when allowAnonymous is false.</param>
+ public InProcessMqttBroker(string listenAddress = "0.0.0.0", int port = 1883, bool allowAnonymous = true, IDictionary<string, string> users = null)
+ {
+ _listenAddress = listenAddress;
+ _port = port;
+ _allowAnonymous = allowAnonymous;
+ _users = users ?? new Dictionary<string, string>(StringComparer.Ordinal);
+ }
+
+ public async Task StartAsync()
+ {
+ if (IsRunning) return;
+
+ try
+ {
+ _optionsBuilder = new MqttServerOptionsBuilder()
+ .WithDefaultEndpoint()
+ .WithDefaultEndpointBoundIPAddress(ParseIPAddress(_listenAddress))
+ .WithDefaultEndpointPort(_port)
+ // Casts avoid the sync/async overload ambiguity:
+ .WithConnectionValidator((Action<MqttConnectionValidatorContext>)OnConnectionValidate)
+ .WithSubscriptionInterceptor((Action<MqttSubscriptionInterceptorContext>)OnSubscription)
+ .WithApplicationMessageInterceptor((Action<MqttApplicationMessageInterceptorContext>)OnMessage);
+
+ var factory = new MqttFactory();
+ _server = factory.CreateMqttServer();
+
+ await _server.StartAsync(_optionsBuilder.Build());
+ IsRunning = true;
+
+ LogManager.Log($"In-process MQTT broker started on {_listenAddress}:{_port}", LogCategory.Info);
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, $"Failed to start in-process MQTT broker on {_listenAddress}:{_port}");
+ await SafeStopAsync();
+ throw;
+ }
+ }
+
+ public async Task StopAsync()
+ {
+ await SafeStopAsync();
+ }
+
+ public void Dispose()
+ {
+ try { SafeStopAsync().GetAwaiter().GetResult(); } catch { /* ignore */ }
+ }
+
+ // --- Interceptors / validators ---
+
+ private void OnConnectionValidate(MqttConnectionValidatorContext ctx)
+ {
+ try
+ {
+ if (!_allowAnonymous)
+ {
+ if (string.IsNullOrEmpty(ctx.Username) || !_users.ContainsKey(ctx.Username) ||
+ !string.Equals(_users[ctx.Username], ctx.Password ?? string.Empty, StringComparison.Ordinal))
+ {
+ ctx.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
+ return;
+ }
+ }
+
+ ctx.ReasonCode = MqttConnectReasonCode.Success;
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Error validating MQTT connection.");
+ ctx.ReasonCode = MqttConnectReasonCode.UnspecifiedError;
+ }
+ }
+
+ private void OnSubscription(MqttSubscriptionInterceptorContext ctx)
+ {
+ try
+ {
+ // Allow all topics by default. Add ACLs here if needed.
+ ctx.AcceptSubscription = true;
+ LogManager.Log($"MQTT SUB: client={ctx.ClientId} topicFilter={ctx.TopicFilter?.Topic}", LogCategory.Debug);
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Error in MQTT subscription interceptor.");
+ ctx.AcceptSubscription = false;
+ }
+ }
+
+ private void OnMessage(MqttApplicationMessageInterceptorContext ctx)
+ {
+ try
+ {
+ var size = ctx.ApplicationMessage?.Payload?.Length ?? 0;
+ LogManager.Log($"MQTT PUBLISH: client={ctx.ClientId} topic={ctx.ApplicationMessage?.Topic} size={size} retain={ctx.ApplicationMessage?.Retain}", LogCategory.Debug);
+ ctx.AcceptPublish = true;
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Error in MQTT message interceptor.");
+ ctx.AcceptPublish = false;
+ }
+ }
+
+ // --- Helpers ---
+
+ private async Task SafeStopAsync()
+ {
+ if (!IsRunning) return;
+
+ try
+ {
+ if (_server != null)
+ await _server.StopAsync();
+ }
+ catch (Exception ex)
+ {
+ LogManager.Log(ex, "Error while stopping MQTT broker.");
+ }
+ finally
+ {
+ if (_server != null)
+ _server.Dispose();
+ _server = null;
+ IsRunning = false;
+ LogManager.Log("In-process MQTT broker stopped.", LogCategory.Info);
+ }
+ }
+
+ private static IPAddress ParseIPAddress(string address)
+ {
+ IPAddress ip;
+ if (IPAddress.TryParse(address, out ip)) return ip;
+ return IPAddress.Any;
+ }
+ }
+}
diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
index 035625149..249a9968f 100644
--- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
+++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj
@@ -245,6 +245,7 @@
<Compile Include="Helpers\DateTimeUTCFixer.cs" />
<Compile Include="Destinations\TelemetryAzureHubDestination.cs" />
<Compile Include="ExtensionMethods\ITelemetryExtensions.cs" />
+ <Compile Include="Helpers\InProcessMqttBroker.cs" />
<Compile Include="Helpers\InternetConnectivity.cs" />
<Compile Include="ITelemetryCheckpointsRecoveryClient.cs" />
<Compile Include="ITelemetryDestination.cs" />