diff options
| author | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-08-19 14:53:52 +0300 |
|---|---|---|
| committer | Roy Ben Shabat <Roy.mail.net@gmail.com> | 2025-08-19 14:53:52 +0300 |
| commit | 4fd21b08a24f1697ab6159509cad4fadf66c1a32 (patch) | |
| tree | 41e668c7b1567a6ea2f19abd034125111dc21739 /Software/Visual_Studio/Tango.Telemetry | |
| parent | e83090a37f55de0a454866d22df1f9f9c36567af (diff) | |
| download | Tango-4fd21b08a24f1697ab6159509cad4fadf66c1a32.tar.gz Tango-4fd21b08a24f1697ab6159509cad4fadf66c1a32.zip | |
Telemetry MQTT destination tested + In-Process Broker.
Diffstat (limited to 'Software/Visual_Studio/Tango.Telemetry')
3 files changed, 174 insertions, 2 deletions
diff --git a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs index 11787c834..16b741a82 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs +++ b/Software/Visual_Studio/Tango.Telemetry/Destinations/TelemetryMqttDestination.cs @@ -71,7 +71,7 @@ namespace Tango.Telemetry.Destinations var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); - String exeName = Assembly.GetEntryAssembly().GetName().FullName; + String exeName = Assembly.GetEntryAssembly().GetName().Name; _mqttOptions = new MqttClientOptionsBuilder() .WithClientId(exeName) .WithTcpServer(Address, Port) @@ -106,9 +106,11 @@ namespace Tango.Telemetry.Destinations .WithRetainFlag(false) .Build(); + message.UserProperties = new List<MqttUserProperty>(); + foreach (var prop in properties) { - message.UserProperties.Add(new MqttUserProperty(prop.Key, prop.Value)); + message.UserProperties.Add(new MqttUserProperty(prop.Key, prop.Value ?? String.Empty)); } diff --git a/Software/Visual_Studio/Tango.Telemetry/Helpers/InProcessMqttBroker.cs b/Software/Visual_Studio/Tango.Telemetry/Helpers/InProcessMqttBroker.cs new file mode 100644 index 000000000..35df0998d --- /dev/null +++ b/Software/Visual_Studio/Tango.Telemetry/Helpers/InProcessMqttBroker.cs @@ -0,0 +1,169 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading.Tasks; +using MQTTnet; +using MQTTnet.Protocol; +using MQTTnet.Server; +using Tango.Core; +using Tango.Logging; + +namespace Tango.Telemetry.Helpers +{ + /// <summary> + /// Lightweight in-process MQTT broker for LAN subscribers. + /// Start at app boot; point TelemetryMqttDestination to its port. + /// </summary> + public sealed class InProcessMqttBroker : ExtendedObject, IDisposable + { + private IMqttServer _server; + private MqttServerOptionsBuilder _optionsBuilder; + private readonly string _listenAddress; + private readonly int _port; + private readonly bool _allowAnonymous; + private readonly IDictionary<string, string> _users; + + public bool IsRunning { get; private set; } + + /// <param name="listenAddress">"0.0.0.0" for all interfaces; "127.0.0.1" for local only.</param> + /// <param name="port">Use 1884 if Mosquitto is on 1883.</param> + /// <param name="allowAnonymous">False to require username/password.</param> + /// <param name="users">User/pass map when allowAnonymous is false.</param> + public InProcessMqttBroker(string listenAddress = "0.0.0.0", int port = 1883, bool allowAnonymous = true, IDictionary<string, string> users = null) + { + _listenAddress = listenAddress; + _port = port; + _allowAnonymous = allowAnonymous; + _users = users ?? new Dictionary<string, string>(StringComparer.Ordinal); + } + + public async Task StartAsync() + { + if (IsRunning) return; + + try + { + _optionsBuilder = new MqttServerOptionsBuilder() + .WithDefaultEndpoint() + .WithDefaultEndpointBoundIPAddress(ParseIPAddress(_listenAddress)) + .WithDefaultEndpointPort(_port) + // Casts avoid the sync/async overload ambiguity: + .WithConnectionValidator((Action<MqttConnectionValidatorContext>)OnConnectionValidate) + .WithSubscriptionInterceptor((Action<MqttSubscriptionInterceptorContext>)OnSubscription) + .WithApplicationMessageInterceptor((Action<MqttApplicationMessageInterceptorContext>)OnMessage); + + var factory = new MqttFactory(); + _server = factory.CreateMqttServer(); + + await _server.StartAsync(_optionsBuilder.Build()); + IsRunning = true; + + LogManager.Log($"In-process MQTT broker started on {_listenAddress}:{_port}", LogCategory.Info); + } + catch (Exception ex) + { + LogManager.Log(ex, $"Failed to start in-process MQTT broker on {_listenAddress}:{_port}"); + await SafeStopAsync(); + throw; + } + } + + public async Task StopAsync() + { + await SafeStopAsync(); + } + + public void Dispose() + { + try { SafeStopAsync().GetAwaiter().GetResult(); } catch { /* ignore */ } + } + + // --- Interceptors / validators --- + + private void OnConnectionValidate(MqttConnectionValidatorContext ctx) + { + try + { + if (!_allowAnonymous) + { + if (string.IsNullOrEmpty(ctx.Username) || !_users.ContainsKey(ctx.Username) || + !string.Equals(_users[ctx.Username], ctx.Password ?? string.Empty, StringComparison.Ordinal)) + { + ctx.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; + return; + } + } + + ctx.ReasonCode = MqttConnectReasonCode.Success; + } + catch (Exception ex) + { + LogManager.Log(ex, "Error validating MQTT connection."); + ctx.ReasonCode = MqttConnectReasonCode.UnspecifiedError; + } + } + + private void OnSubscription(MqttSubscriptionInterceptorContext ctx) + { + try + { + // Allow all topics by default. Add ACLs here if needed. + ctx.AcceptSubscription = true; + LogManager.Log($"MQTT SUB: client={ctx.ClientId} topicFilter={ctx.TopicFilter?.Topic}", LogCategory.Debug); + } + catch (Exception ex) + { + LogManager.Log(ex, "Error in MQTT subscription interceptor."); + ctx.AcceptSubscription = false; + } + } + + private void OnMessage(MqttApplicationMessageInterceptorContext ctx) + { + try + { + var size = ctx.ApplicationMessage?.Payload?.Length ?? 0; + LogManager.Log($"MQTT PUBLISH: client={ctx.ClientId} topic={ctx.ApplicationMessage?.Topic} size={size} retain={ctx.ApplicationMessage?.Retain}", LogCategory.Debug); + ctx.AcceptPublish = true; + } + catch (Exception ex) + { + LogManager.Log(ex, "Error in MQTT message interceptor."); + ctx.AcceptPublish = false; + } + } + + // --- Helpers --- + + private async Task SafeStopAsync() + { + if (!IsRunning) return; + + try + { + if (_server != null) + await _server.StopAsync(); + } + catch (Exception ex) + { + LogManager.Log(ex, "Error while stopping MQTT broker."); + } + finally + { + if (_server != null) + _server.Dispose(); + _server = null; + IsRunning = false; + LogManager.Log("In-process MQTT broker stopped.", LogCategory.Info); + } + } + + private static IPAddress ParseIPAddress(string address) + { + IPAddress ip; + if (IPAddress.TryParse(address, out ip)) return ip; + return IPAddress.Any; + } + } +} diff --git a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj index 035625149..249a9968f 100644 --- a/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj +++ b/Software/Visual_Studio/Tango.Telemetry/Tango.Telemetry.csproj @@ -245,6 +245,7 @@ <Compile Include="Helpers\DateTimeUTCFixer.cs" /> <Compile Include="Destinations\TelemetryAzureHubDestination.cs" /> <Compile Include="ExtensionMethods\ITelemetryExtensions.cs" /> + <Compile Include="Helpers\InProcessMqttBroker.cs" /> <Compile Include="Helpers\InternetConnectivity.cs" /> <Compile Include="ITelemetryCheckpointsRecoveryClient.cs" /> <Compile Include="ITelemetryDestination.cs" /> |
