1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Tango.Core;
using Tango.Logging;
namespace Tango.Telemetry.Helpers
{
/// <summary>
/// Lightweight in-process MQTT broker for LAN subscribers.
/// Start at app boot; point TelemetryMqttDestination to its port.
/// </summary>
public sealed class InProcessMqttBroker : ExtendedObject, IDisposable
{
private IMqttServer _server;
private MqttServerOptionsBuilder _optionsBuilder;
private readonly string _listenAddress;
private readonly int _port;
private readonly bool _allowAnonymous;
private readonly IDictionary<string, string> _users;
public bool IsRunning { get; private set; }
/// <param name="listenAddress">"0.0.0.0" for all interfaces; "127.0.0.1" for local only.</param>
/// <param name="port">Use 1884 if Mosquitto is on 1883.</param>
/// <param name="allowAnonymous">False to require username/password.</param>
/// <param name="users">User/pass map when allowAnonymous is false.</param>
public InProcessMqttBroker(string listenAddress = "0.0.0.0", int port = 1883, bool allowAnonymous = true, IDictionary<string, string> users = null)
{
_listenAddress = listenAddress;
_port = port;
_allowAnonymous = allowAnonymous;
_users = users ?? new Dictionary<string, string>(StringComparer.Ordinal);
}
public async Task StartAsync()
{
if (IsRunning) return;
try
{
_optionsBuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.WithDefaultEndpointBoundIPAddress(ParseIPAddress(_listenAddress))
.WithDefaultEndpointPort(_port)
// Casts avoid the sync/async overload ambiguity:
.WithConnectionValidator((Action<MqttConnectionValidatorContext>)OnConnectionValidate)
.WithSubscriptionInterceptor((Action<MqttSubscriptionInterceptorContext>)OnSubscription)
.WithApplicationMessageInterceptor((Action<MqttApplicationMessageInterceptorContext>)OnMessage);
var factory = new MqttFactory();
_server = factory.CreateMqttServer();
await _server.StartAsync(_optionsBuilder.Build());
IsRunning = true;
LogManager.Log($"In-process MQTT broker started on {_listenAddress}:{_port}", LogCategory.Info);
}
catch (Exception ex)
{
LogManager.Log(ex, $"Failed to start in-process MQTT broker on {_listenAddress}:{_port}");
await SafeStopAsync();
throw;
}
}
public async Task StopAsync()
{
await SafeStopAsync();
}
public void Dispose()
{
try { SafeStopAsync().GetAwaiter().GetResult(); } catch { /* ignore */ }
}
// --- Interceptors / validators ---
private void OnConnectionValidate(MqttConnectionValidatorContext ctx)
{
try
{
if (!_allowAnonymous)
{
if (string.IsNullOrEmpty(ctx.Username) || !_users.ContainsKey(ctx.Username) ||
!string.Equals(_users[ctx.Username], ctx.Password ?? string.Empty, StringComparison.Ordinal))
{
ctx.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
}
ctx.ReasonCode = MqttConnectReasonCode.Success;
}
catch (Exception ex)
{
LogManager.Log(ex, "Error validating MQTT connection.");
ctx.ReasonCode = MqttConnectReasonCode.UnspecifiedError;
}
}
private void OnSubscription(MqttSubscriptionInterceptorContext ctx)
{
try
{
// Allow all topics by default. Add ACLs here if needed.
ctx.AcceptSubscription = true;
LogManager.Log($"MQTT SUB: client={ctx.ClientId} topicFilter={ctx.TopicFilter?.Topic}", LogCategory.Debug);
}
catch (Exception ex)
{
LogManager.Log(ex, "Error in MQTT subscription interceptor.");
ctx.AcceptSubscription = false;
}
}
private void OnMessage(MqttApplicationMessageInterceptorContext ctx)
{
try
{
var size = ctx.ApplicationMessage?.Payload?.Length ?? 0;
LogManager.Log($"MQTT PUBLISH: client={ctx.ClientId} topic={ctx.ApplicationMessage?.Topic} size={size} retain={ctx.ApplicationMessage?.Retain}", LogCategory.Debug);
ctx.AcceptPublish = true;
}
catch (Exception ex)
{
LogManager.Log(ex, "Error in MQTT message interceptor.");
ctx.AcceptPublish = false;
}
}
// --- Helpers ---
private async Task SafeStopAsync()
{
if (!IsRunning) return;
try
{
if (_server != null)
await _server.StopAsync();
}
catch (Exception ex)
{
LogManager.Log(ex, "Error while stopping MQTT broker.");
}
finally
{
if (_server != null)
_server.Dispose();
_server = null;
IsRunning = false;
LogManager.Log("In-process MQTT broker stopped.", LogCategory.Info);
}
}
private static IPAddress ParseIPAddress(string address)
{
IPAddress ip;
if (IPAddress.TryParse(address, out ip)) return ip;
return IPAddress.Any;
}
}
}
|