using System; using System.Collections.Generic; using System.Net; using System.Text; using System.Threading.Tasks; using MQTTnet; using MQTTnet.Protocol; using MQTTnet.Server; using Tango.Core; using Tango.Logging; namespace Tango.Telemetry.Helpers { /// /// Lightweight in-process MQTT broker for LAN subscribers. /// Start at app boot; point TelemetryMqttDestination to its port. /// public sealed class InProcessMqttBroker : ExtendedObject, IDisposable { private IMqttServer _server; private MqttServerOptionsBuilder _optionsBuilder; private readonly string _listenAddress; private readonly int _port; private readonly bool _allowAnonymous; private readonly IDictionary _users; public bool IsRunning { get; private set; } /// "0.0.0.0" for all interfaces; "127.0.0.1" for local only. /// Use 1884 if Mosquitto is on 1883. /// False to require username/password. /// User/pass map when allowAnonymous is false. public InProcessMqttBroker(string listenAddress = "0.0.0.0", int port = 1883, bool allowAnonymous = true, IDictionary users = null) { _listenAddress = listenAddress; _port = port; _allowAnonymous = allowAnonymous; _users = users ?? new Dictionary(StringComparer.Ordinal); } public async Task StartAsync() { if (IsRunning) return; try { _optionsBuilder = new MqttServerOptionsBuilder() .WithDefaultEndpoint() .WithDefaultEndpointBoundIPAddress(ParseIPAddress(_listenAddress)) .WithDefaultEndpointPort(_port) // Casts avoid the sync/async overload ambiguity: .WithConnectionValidator((Action)OnConnectionValidate) .WithSubscriptionInterceptor((Action)OnSubscription) .WithApplicationMessageInterceptor((Action)OnMessage); var factory = new MqttFactory(); _server = factory.CreateMqttServer(); await _server.StartAsync(_optionsBuilder.Build()); IsRunning = true; LogManager.Log($"In-process MQTT broker started on {_listenAddress}:{_port}", LogCategory.Info); } catch (Exception ex) { LogManager.Log(ex, $"Failed to start in-process MQTT broker on {_listenAddress}:{_port}"); await SafeStopAsync(); throw; } } public async Task StopAsync() { await SafeStopAsync(); } public void Dispose() { try { SafeStopAsync().GetAwaiter().GetResult(); } catch { /* ignore */ } } // --- Interceptors / validators --- private void OnConnectionValidate(MqttConnectionValidatorContext ctx) { try { if (!_allowAnonymous) { if (string.IsNullOrEmpty(ctx.Username) || !_users.ContainsKey(ctx.Username) || !string.Equals(_users[ctx.Username], ctx.Password ?? string.Empty, StringComparison.Ordinal)) { ctx.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } } ctx.ReasonCode = MqttConnectReasonCode.Success; } catch (Exception ex) { LogManager.Log(ex, "Error validating MQTT connection."); ctx.ReasonCode = MqttConnectReasonCode.UnspecifiedError; } } private void OnSubscription(MqttSubscriptionInterceptorContext ctx) { try { // Allow all topics by default. Add ACLs here if needed. ctx.AcceptSubscription = true; LogManager.Log($"MQTT SUB: client={ctx.ClientId} topicFilter={ctx.TopicFilter?.Topic}", LogCategory.Debug); } catch (Exception ex) { LogManager.Log(ex, "Error in MQTT subscription interceptor."); ctx.AcceptSubscription = false; } } private void OnMessage(MqttApplicationMessageInterceptorContext ctx) { try { var size = ctx.ApplicationMessage?.Payload?.Length ?? 0; LogManager.Log($"MQTT PUBLISH: client={ctx.ClientId} topic={ctx.ApplicationMessage?.Topic} size={size} retain={ctx.ApplicationMessage?.Retain}", LogCategory.Debug); ctx.AcceptPublish = true; } catch (Exception ex) { LogManager.Log(ex, "Error in MQTT message interceptor."); ctx.AcceptPublish = false; } } // --- Helpers --- private async Task SafeStopAsync() { if (!IsRunning) return; try { if (_server != null) await _server.StopAsync(); } catch (Exception ex) { LogManager.Log(ex, "Error while stopping MQTT broker."); } finally { if (_server != null) _server.Dispose(); _server = null; IsRunning = false; LogManager.Log("In-process MQTT broker stopped.", LogCategory.Info); } } private static IPAddress ParseIPAddress(string address) { IPAddress ip; if (IPAddress.TryParse(address, out ip)) return ip; return IPAddress.Any; } } }