Skip to content

Instantly share code, notes, and snippets.

@aholmes
Last active July 22, 2025 17:33
Show Gist options
  • Save aholmes/c7241e1c5fafda6358cc8c0b1fb6d479 to your computer and use it in GitHub Desktop.
Save aholmes/c7241e1c5fafda6358cc8c0b1fb6d479 to your computer and use it in GitHub Desktop.
Demonstrate the use of a websocket server that uses an MQTT broker to handle lightbulb state change requests from a phone application.
// Important: This code executes in LinqPad as-is.
//
// You will need to install the nuget packages `MQTTnet` and `MQTTnet.Extensions.TopicTemplate`.
//
// This can be made in a console application. After creating the application,
// move this file to `Programs.cs` and uncomment `//await Main();` after the `using` statements.
//
// Tested in LinqPad 8 and .NET SDK 9.0
using MQTTnet;
using MQTTnet.Formatter;
using MQTTnet.Protocol;
using MQTTnet.Extensions.TopicTemplate;
//await Main();
async Task Main()
{
/*
Emulate the use of a phone application (PhoneApp) and a smart light bulb (LightBulb).
The methods LightBulb and PhoneApp "connect to" a fake websocket server and
subscribe to, and publish to, two separate MQTT topics ("change" and "changed").
"change" is intended for the LightBulb to subscribe to, and is used to change the light
bulb's state. PhoneApp publishes "ON" or "OFF" to this topic.
"changed" is intended for the LightBulb to publish to, and is used to report the light
bulb's state. LightBulb publishes here after it receives a state-change message from
"change" and PhoneApp receives that message to indicate the bulb state to the user.
Interaction is done on the command line. The user is prompted to write either
"ON," "OFF," or "GET." ON/OFF will cause the LightBulb to changes its state. In the
console, the user sees a message from LightBulb. This is here for demo purposes - this
message, e.g., "BULB: (Bulb is 'ON')," would not be seen on the PhoneApp, but is
analogous to the light bulb actually turning on/off. After the light bulb changes state,
it then reports to the PhoneApp what its new state is. In the console, the user is
shown this new state with the message, .e.g, "APP: Bulb said it is 'ON'."
The user can also get the current bulb state with the "GET" message. This would be
analogous to the user opening the application, which then queries for the current bulb
state to show the user.
When the user presses enter with no input, both the PhoneApp and LightBulb attempt to
quit cleanly (closing sockets, clean MQTT disconnects). A real light bulb wouldn't
necessarily behave in this way, so this is primarily an artefact of this demo.
*/
/*
This demo requires a locally running instance of an MQTT broker. I use Mosquitto.
*/
await Task.WhenAll
(
new[]
{
LightBulb(CancellationToken.None),
PhoneApp(CancellationToken.None)
}
);
}
const int CLIENT_ID = 123;
// pretend to be a light bulb that responds to state changes
// and communicates its current state.
async Task LightBulb(CancellationToken cancellationToken = default)
{
var state = "OFF";
var keepRunning = true;
Func<string, Task<MqttClientPublishResult>>? pub = null;
// executes when the LightBulb receives a message on the "change" topic
Func<MqttApplicationMessageReceivedEventArgs, Task> msgReceivedHandler = async (e) =>
{
var decodedMsg = System.Text.Encoding.Default.GetString(
e.ApplicationMessage.Payload.First.Span
);
// LightBulb runs until it's told to quit, which happens
// when the user presses enter (for this application - this
// doesn't make sense for a real light bulb).
if (decodedMsg == "STOP")
{
keepRunning = false;
}
// set the state, and tell the PhoneApp what the new state is.
else if (decodedMsg == "ON" || decodedMsg == "OFF")
{
var oldState = state;
state = decodedMsg;
Console.WriteLine($"BULB: (Bulb is {(decodedMsg == oldState ? $"already '{state}'" : $"'{state}'")})");
await pub!(state);
}
// tell the PhoneApp what the current state is, but don't
// change the state.
else if (decodedMsg == "GET")
{
await pub!(state);
}
else
{
Console.WriteLine("BULB: Unexpected msg received on state change topic.");
}
};
// subscribe to "change" (change the state)
// publish to "changed" (the light bulb reports its state)
var (client, sub, _pub) = await FakeWebSocketServer(
msgReceivedHandler,
CLIENT_ID,
"change",
"changed",
cancellationToken
);
pub = _pub;
try
{
// run until told to stop.
// the 500ms delay only makes sense for this demo. in real life
// the light bulb might wait for a hardware interrupt on the websocket
// socket, then read data.
while (keepRunning)
{
await Task.Delay(500, cancellationToken);
}
await Disconnect(client, cancellationToken);
}
finally
{
if (client != null)
{
client.Dispose();
}
}
}
async Task PhoneApp(CancellationToken cancellationToken = default)
{
// executes when the PhoneApp receives a message on the "changed" topic
Func<MqttApplicationMessageReceivedEventArgs, Task> msgReceivedHandler = (e) =>
{
var decodedMsg = System.Text.Encoding.Default.GetString(
e.ApplicationMessage.Payload.First.Span
);
// the bulb can tell us it is either "ON" or "OFF"
// any other message is ignored.
if (decodedMsg == "ON" || decodedMsg == "OFF")
{
Console.WriteLine($"APP: Bulb said it is '{decodedMsg}'");
}
else
{
Console.WriteLine("APP: Unexpected msg received on state topic.");
}
return Task.CompletedTask;
};
// subscribe to "changed" (the light bulb reports its state)
// publish to "change" (change the light bulb's state)
var (client, sub, pub) = await FakeWebSocketServer(
msgReceivedHandler,
CLIENT_ID,
"changed",
"change",
cancellationToken
);
try
{
Console.WriteLine("APP: Type 'ON' to turn the light bulb on, and 'OFF' to turn it off.");
Console.WriteLine("APP: Type 'GET' to get the current bulb state.");
Console.WriteLine("APP: Press enter to exit Phone App.");
var input = Console.ReadLine();
while (!string.IsNullOrWhiteSpace(input))
{
if (input == "ON" || input == "OFF" || input == "GET")
{
await pub(input);
}
input = Console.ReadLine();
}
// shutdown the LightBulb but publishing a final "STOP" message
// for this demo, this is far from ideal any failure in PhoneApp
// before this, or if "STOP" doesn't arrive, then LightBulb will
// run forever.
await pub("STOP");
await Disconnect(client, cancellationToken);
}
finally
{
if (client != null)
{
client.Dispose();
}
}
}
// not real websockets - emulates it by negotiating client <-> MQTT broker connection
// then just passes the connection and pub/sub channels to the client.
// if this was real websockets, clients would send msgs to the websocket, and the
// websocket server would then pass along to the MQTT pub/sub channels.
//
// client must be disposed by callers - not ideal w/ bare functions (no IDiposable interface)
async Task<(IMqttClient client, MqttClientSubscribeResult sub, Func<string, Task<MqttClientPublishResult>> pub)>
FakeWebSocketServer(
Func<MqttApplicationMessageReceivedEventArgs, Task> msgReceived,
int clientId,
string subTopicName,
string pubTopicName,
CancellationToken cancellationToken = default
)
{
MqttTopicTemplate pubTopicTemplate = new("bulb/state/{pubTopicName}/client{clientId}");
MqttTopicTemplate subTopicTemplate = new("bulb/state/{subTopicName}/client{clientId}");
var pubTopic = pubTopicTemplate
.WithParameter("pubTopicName", pubTopicName)
.WithParameter("clientId", clientId.ToString());
var subTopic = subTopicTemplate
.WithParameter("subTopicName", subTopicName)
.WithParameter("clientId", clientId.ToString());
var mqttFactory = new MqttClientFactory();
var client = mqttFactory.CreateMqttClient();
// this requires running MQTT locally
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("127.0.0.1")
.WithProtocolVersion(MqttProtocolVersion.V500)
.Build();
// this callback is used when MQTT publishes to the "subTopic"
client.ApplicationMessageReceivedAsync += msgReceived;
var response = await client.ConnectAsync(mqttClientOptions, cancellationToken);
var subOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
builder => builder.WithTopicTemplate(subTopic)
// "at least once" means we're guaranteed to receive the msg
// to the MQTT sub topic
.WithAtLeastOnceQoS()
)
.Build();
Func<string, Task<MqttClientPublishResult>> pub = async (msg) =>
{
var pubOptions = mqttFactory.CreateApplicationMessageBuilder()
.WithTopicTemplate(pubTopic)
.WithPayload(msg)
// "at least once" means we're guaranteed to send the msg
// to the MQTT pub topic
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
return await client.PublishAsync(pubOptions, cancellationToken);
};
// pass along the MQTT client connection and both the pub/sub
// channels to emulate a way of communicating between PhoneApp/LightBulb
// and a websocket server
return (
client,
await client.SubscribeAsync(subOptions, cancellationToken),
pub
);
}
async Task Disconnect(IMqttClient client, CancellationToken cancellationToken = default)
{
var mqttFactory = new MqttClientFactory();
var mqttClientDisconnectOptions = mqttFactory.CreateClientDisconnectOptionsBuilder().Build();
await client.DisconnectAsync(mqttClientDisconnectOptions, cancellationToken);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment