using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Tango.Core;
using Tango.Logging;
namespace Tango.Telemetry.Helpers
{
///
/// Lightweight in-process MQTT broker for LAN subscribers.
/// Start at app boot; point TelemetryMqttDestination to its port.
///
public sealed class InProcessMqttBroker : ExtendedObject, IDisposable
{
private IMqttServer _server;
private MqttServerOptionsBuilder _optionsBuilder;
private readonly string _listenAddress;
private readonly int _port;
private readonly bool _allowAnonymous;
private readonly IDictionary _users;
public bool IsRunning { get; private set; }
/// "0.0.0.0" for all interfaces; "127.0.0.1" for local only.
/// Use 1884 if Mosquitto is on 1883.
/// False to require username/password.
/// User/pass map when allowAnonymous is false.
public InProcessMqttBroker(string listenAddress = "0.0.0.0", int port = 1883, bool allowAnonymous = true, IDictionary users = null)
{
_listenAddress = listenAddress;
_port = port;
_allowAnonymous = allowAnonymous;
_users = users ?? new Dictionary(StringComparer.Ordinal);
}
public async Task StartAsync()
{
if (IsRunning) return;
try
{
_optionsBuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.WithDefaultEndpointBoundIPAddress(ParseIPAddress(_listenAddress))
.WithDefaultEndpointPort(_port)
// Casts avoid the sync/async overload ambiguity:
.WithConnectionValidator((Action)OnConnectionValidate)
.WithSubscriptionInterceptor((Action)OnSubscription)
.WithApplicationMessageInterceptor((Action)OnMessage);
var factory = new MqttFactory();
_server = factory.CreateMqttServer();
await _server.StartAsync(_optionsBuilder.Build());
IsRunning = true;
LogManager.Log($"In-process MQTT broker started on {_listenAddress}:{_port}", LogCategory.Info);
}
catch (Exception ex)
{
LogManager.Log(ex, $"Failed to start in-process MQTT broker on {_listenAddress}:{_port}");
await SafeStopAsync();
throw;
}
}
public async Task StopAsync()
{
await SafeStopAsync();
}
public void Dispose()
{
try { SafeStopAsync().GetAwaiter().GetResult(); } catch { /* ignore */ }
}
// --- Interceptors / validators ---
private void OnConnectionValidate(MqttConnectionValidatorContext ctx)
{
try
{
if (!_allowAnonymous)
{
if (string.IsNullOrEmpty(ctx.Username) || !_users.ContainsKey(ctx.Username) ||
!string.Equals(_users[ctx.Username], ctx.Password ?? string.Empty, StringComparison.Ordinal))
{
ctx.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
}
ctx.ReasonCode = MqttConnectReasonCode.Success;
}
catch (Exception ex)
{
LogManager.Log(ex, "Error validating MQTT connection.");
ctx.ReasonCode = MqttConnectReasonCode.UnspecifiedError;
}
}
private void OnSubscription(MqttSubscriptionInterceptorContext ctx)
{
try
{
// Allow all topics by default. Add ACLs here if needed.
ctx.AcceptSubscription = true;
LogManager.Log($"MQTT SUB: client={ctx.ClientId} topicFilter={ctx.TopicFilter?.Topic}", LogCategory.Debug);
}
catch (Exception ex)
{
LogManager.Log(ex, "Error in MQTT subscription interceptor.");
ctx.AcceptSubscription = false;
}
}
private void OnMessage(MqttApplicationMessageInterceptorContext ctx)
{
try
{
var size = ctx.ApplicationMessage?.Payload?.Length ?? 0;
LogManager.Log($"MQTT PUBLISH: client={ctx.ClientId} topic={ctx.ApplicationMessage?.Topic} size={size} retain={ctx.ApplicationMessage?.Retain}", LogCategory.Debug);
ctx.AcceptPublish = true;
}
catch (Exception ex)
{
LogManager.Log(ex, "Error in MQTT message interceptor.");
ctx.AcceptPublish = false;
}
}
// --- Helpers ---
private async Task SafeStopAsync()
{
if (!IsRunning) return;
try
{
if (_server != null)
await _server.StopAsync();
}
catch (Exception ex)
{
LogManager.Log(ex, "Error while stopping MQTT broker.");
}
finally
{
if (_server != null)
_server.Dispose();
_server = null;
IsRunning = false;
LogManager.Log("In-process MQTT broker stopped.", LogCategory.Info);
}
}
private static IPAddress ParseIPAddress(string address)
{
IPAddress ip;
if (IPAddress.TryParse(address, out ip)) return ip;
return IPAddress.Any;
}
}
}