Last active
July 22, 2025 17:33
-
-
Save aholmes/c7241e1c5fafda6358cc8c0b1fb6d479 to your computer and use it in GitHub Desktop.
Demonstrate the use of a websocket server that uses an MQTT broker to handle lightbulb state change requests from a phone application.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Important: This code executes in LinqPad as-is. | |
// | |
// You will need to install the nuget packages `MQTTnet` and `MQTTnet.Extensions.TopicTemplate`. | |
// | |
// This can be made in a console application. After creating the application, | |
// move this file to `Programs.cs` and uncomment `//await Main();` after the `using` statements. | |
// | |
// Tested in LinqPad 8 and .NET SDK 9.0 | |
using MQTTnet; | |
using MQTTnet.Formatter; | |
using MQTTnet.Protocol; | |
using MQTTnet.Extensions.TopicTemplate; | |
//await Main(); | |
async Task Main() | |
{ | |
/* | |
Emulate the use of a phone application (PhoneApp) and a smart light bulb (LightBulb). | |
The methods LightBulb and PhoneApp "connect to" a fake websocket server and | |
subscribe to, and publish to, two separate MQTT topics ("change" and "changed"). | |
"change" is intended for the LightBulb to subscribe to, and is used to change the light | |
bulb's state. PhoneApp publishes "ON" or "OFF" to this topic. | |
"changed" is intended for the LightBulb to publish to, and is used to report the light | |
bulb's state. LightBulb publishes here after it receives a state-change message from | |
"change" and PhoneApp receives that message to indicate the bulb state to the user. | |
Interaction is done on the command line. The user is prompted to write either | |
"ON," "OFF," or "GET." ON/OFF will cause the LightBulb to changes its state. In the | |
console, the user sees a message from LightBulb. This is here for demo purposes - this | |
message, e.g., "BULB: (Bulb is 'ON')," would not be seen on the PhoneApp, but is | |
analogous to the light bulb actually turning on/off. After the light bulb changes state, | |
it then reports to the PhoneApp what its new state is. In the console, the user is | |
shown this new state with the message, .e.g, "APP: Bulb said it is 'ON'." | |
The user can also get the current bulb state with the "GET" message. This would be | |
analogous to the user opening the application, which then queries for the current bulb | |
state to show the user. | |
When the user presses enter with no input, both the PhoneApp and LightBulb attempt to | |
quit cleanly (closing sockets, clean MQTT disconnects). A real light bulb wouldn't | |
necessarily behave in this way, so this is primarily an artefact of this demo. | |
*/ | |
/* | |
This demo requires a locally running instance of an MQTT broker. I use Mosquitto. | |
*/ | |
await Task.WhenAll | |
( | |
new[] | |
{ | |
LightBulb(CancellationToken.None), | |
PhoneApp(CancellationToken.None) | |
} | |
); | |
} | |
const int CLIENT_ID = 123; | |
// pretend to be a light bulb that responds to state changes | |
// and communicates its current state. | |
async Task LightBulb(CancellationToken cancellationToken = default) | |
{ | |
var state = "OFF"; | |
var keepRunning = true; | |
Func<string, Task<MqttClientPublishResult>>? pub = null; | |
// executes when the LightBulb receives a message on the "change" topic | |
Func<MqttApplicationMessageReceivedEventArgs, Task> msgReceivedHandler = async (e) => | |
{ | |
var decodedMsg = System.Text.Encoding.Default.GetString( | |
e.ApplicationMessage.Payload.First.Span | |
); | |
// LightBulb runs until it's told to quit, which happens | |
// when the user presses enter (for this application - this | |
// doesn't make sense for a real light bulb). | |
if (decodedMsg == "STOP") | |
{ | |
keepRunning = false; | |
} | |
// set the state, and tell the PhoneApp what the new state is. | |
else if (decodedMsg == "ON" || decodedMsg == "OFF") | |
{ | |
var oldState = state; | |
state = decodedMsg; | |
Console.WriteLine($"BULB: (Bulb is {(decodedMsg == oldState ? $"already '{state}'" : $"'{state}'")})"); | |
await pub!(state); | |
} | |
// tell the PhoneApp what the current state is, but don't | |
// change the state. | |
else if (decodedMsg == "GET") | |
{ | |
await pub!(state); | |
} | |
else | |
{ | |
Console.WriteLine("BULB: Unexpected msg received on state change topic."); | |
} | |
}; | |
// subscribe to "change" (change the state) | |
// publish to "changed" (the light bulb reports its state) | |
var (client, sub, _pub) = await FakeWebSocketServer( | |
msgReceivedHandler, | |
CLIENT_ID, | |
"change", | |
"changed", | |
cancellationToken | |
); | |
pub = _pub; | |
try | |
{ | |
// run until told to stop. | |
// the 500ms delay only makes sense for this demo. in real life | |
// the light bulb might wait for a hardware interrupt on the websocket | |
// socket, then read data. | |
while (keepRunning) | |
{ | |
await Task.Delay(500, cancellationToken); | |
} | |
await Disconnect(client, cancellationToken); | |
} | |
finally | |
{ | |
if (client != null) | |
{ | |
client.Dispose(); | |
} | |
} | |
} | |
async Task PhoneApp(CancellationToken cancellationToken = default) | |
{ | |
// executes when the PhoneApp receives a message on the "changed" topic | |
Func<MqttApplicationMessageReceivedEventArgs, Task> msgReceivedHandler = (e) => | |
{ | |
var decodedMsg = System.Text.Encoding.Default.GetString( | |
e.ApplicationMessage.Payload.First.Span | |
); | |
// the bulb can tell us it is either "ON" or "OFF" | |
// any other message is ignored. | |
if (decodedMsg == "ON" || decodedMsg == "OFF") | |
{ | |
Console.WriteLine($"APP: Bulb said it is '{decodedMsg}'"); | |
} | |
else | |
{ | |
Console.WriteLine("APP: Unexpected msg received on state topic."); | |
} | |
return Task.CompletedTask; | |
}; | |
// subscribe to "changed" (the light bulb reports its state) | |
// publish to "change" (change the light bulb's state) | |
var (client, sub, pub) = await FakeWebSocketServer( | |
msgReceivedHandler, | |
CLIENT_ID, | |
"changed", | |
"change", | |
cancellationToken | |
); | |
try | |
{ | |
Console.WriteLine("APP: Type 'ON' to turn the light bulb on, and 'OFF' to turn it off."); | |
Console.WriteLine("APP: Type 'GET' to get the current bulb state."); | |
Console.WriteLine("APP: Press enter to exit Phone App."); | |
var input = Console.ReadLine(); | |
while (!string.IsNullOrWhiteSpace(input)) | |
{ | |
if (input == "ON" || input == "OFF" || input == "GET") | |
{ | |
await pub(input); | |
} | |
input = Console.ReadLine(); | |
} | |
// shutdown the LightBulb but publishing a final "STOP" message | |
// for this demo, this is far from ideal any failure in PhoneApp | |
// before this, or if "STOP" doesn't arrive, then LightBulb will | |
// run forever. | |
await pub("STOP"); | |
await Disconnect(client, cancellationToken); | |
} | |
finally | |
{ | |
if (client != null) | |
{ | |
client.Dispose(); | |
} | |
} | |
} | |
// not real websockets - emulates it by negotiating client <-> MQTT broker connection | |
// then just passes the connection and pub/sub channels to the client. | |
// if this was real websockets, clients would send msgs to the websocket, and the | |
// websocket server would then pass along to the MQTT pub/sub channels. | |
// | |
// client must be disposed by callers - not ideal w/ bare functions (no IDiposable interface) | |
async Task<(IMqttClient client, MqttClientSubscribeResult sub, Func<string, Task<MqttClientPublishResult>> pub)> | |
FakeWebSocketServer( | |
Func<MqttApplicationMessageReceivedEventArgs, Task> msgReceived, | |
int clientId, | |
string subTopicName, | |
string pubTopicName, | |
CancellationToken cancellationToken = default | |
) | |
{ | |
MqttTopicTemplate pubTopicTemplate = new("bulb/state/{pubTopicName}/client{clientId}"); | |
MqttTopicTemplate subTopicTemplate = new("bulb/state/{subTopicName}/client{clientId}"); | |
var pubTopic = pubTopicTemplate | |
.WithParameter("pubTopicName", pubTopicName) | |
.WithParameter("clientId", clientId.ToString()); | |
var subTopic = subTopicTemplate | |
.WithParameter("subTopicName", subTopicName) | |
.WithParameter("clientId", clientId.ToString()); | |
var mqttFactory = new MqttClientFactory(); | |
var client = mqttFactory.CreateMqttClient(); | |
// this requires running MQTT locally | |
var mqttClientOptions = new MqttClientOptionsBuilder() | |
.WithTcpServer("127.0.0.1") | |
.WithProtocolVersion(MqttProtocolVersion.V500) | |
.Build(); | |
// this callback is used when MQTT publishes to the "subTopic" | |
client.ApplicationMessageReceivedAsync += msgReceived; | |
var response = await client.ConnectAsync(mqttClientOptions, cancellationToken); | |
var subOptions = mqttFactory.CreateSubscribeOptionsBuilder() | |
.WithTopicFilter( | |
builder => builder.WithTopicTemplate(subTopic) | |
// "at least once" means we're guaranteed to receive the msg | |
// to the MQTT sub topic | |
.WithAtLeastOnceQoS() | |
) | |
.Build(); | |
Func<string, Task<MqttClientPublishResult>> pub = async (msg) => | |
{ | |
var pubOptions = mqttFactory.CreateApplicationMessageBuilder() | |
.WithTopicTemplate(pubTopic) | |
.WithPayload(msg) | |
// "at least once" means we're guaranteed to send the msg | |
// to the MQTT pub topic | |
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) | |
.Build(); | |
return await client.PublishAsync(pubOptions, cancellationToken); | |
}; | |
// pass along the MQTT client connection and both the pub/sub | |
// channels to emulate a way of communicating between PhoneApp/LightBulb | |
// and a websocket server | |
return ( | |
client, | |
await client.SubscribeAsync(subOptions, cancellationToken), | |
pub | |
); | |
} | |
async Task Disconnect(IMqttClient client, CancellationToken cancellationToken = default) | |
{ | |
var mqttFactory = new MqttClientFactory(); | |
var mqttClientDisconnectOptions = mqttFactory.CreateClientDisconnectOptionsBuilder().Build(); | |
await client.DisconnectAsync(mqttClientDisconnectOptions, cancellationToken); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment