Created
October 15, 2023 11:41
-
-
Save worldOneo/52c8629b9014baf4ed3d735296133e0b to your computer and use it in GitHub Desktop.
Epoll based Event Loop chat application
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
// primitive of the event loop. | |
// Ctx is typically a pointer to *This and data a pointer to the Users data. | |
// destroyData is called after the event has finished and must clean up the event to avoid leaks. | |
const Event = struct { | |
ctx: *anyopaque, | |
data: *anyopaque, | |
name: []const u8, | |
destroyData: *const fn (*anyopaque, *anyopaque) void, | |
}; | |
// event handler receive events. | |
// handlerfn is called with handler, event, bus | |
const EventHandler = struct { | |
handlerfn: *const fn (*anyopaque, *anyopaque, *EventBus) void, | |
handler: *anyopaque, | |
event: []const u8, | |
}; | |
// EventBus and EventLoop | |
// The event loop is a queue of all event and it just spins in a circle navigating all the events to handlers | |
const EventBus = struct { | |
handlerlist: std.StringHashMap(std.ArrayList(EventHandler)), | |
pending: std.TailQueue(Event), | |
allocator: std.mem.Allocator, | |
const This = @This(); | |
pub fn init(alloc: std.mem.Allocator) !This { | |
return This{ | |
.handlerlist = std.StringHashMap(std.ArrayList(EventHandler)).init(alloc), | |
.pending = std.TailQueue(Event){}, | |
.allocator = alloc, | |
}; | |
} | |
// addHandler registers a new event handler | |
pub fn addHandler(this: *This, handler: EventHandler) void { | |
var get_or_put = this.handlerlist.getOrPut(handler.event) catch unreachable; | |
if (!get_or_put.found_existing) { | |
get_or_put.value_ptr.* = std.ArrayList(EventHandler).init(this.allocator); | |
} | |
get_or_put.value_ptr.append(handler) catch unreachable; | |
} | |
// emit a new event into the event loop. | |
// the order of execution is equall to the order of emission. | |
pub fn emit(this: *This, event: Event) void { | |
var node = this.allocator.create(std.TailQueue(Event).Node) catch unreachable; | |
node.data = event; | |
this.pending.append(node); | |
} | |
pub fn run(this: *This) void { | |
while (this.pending.popFirst()) |event_node| { | |
const event = event_node.data; | |
const name = event.name; | |
if (this.handlerlist.get(name)) |list| { | |
for (list.items) |handler| { | |
handler.handlerfn(handler.handler, event.data, this); | |
} | |
} | |
this.allocator.destroy(event_node); | |
event.destroyData(event.ctx, event.data); | |
} | |
} | |
}; | |
// mostly just epoll implementation here | |
const Epoll = struct { | |
epfd: i32, | |
sock: std.net.StreamServer, | |
allocator: std.mem.Allocator, | |
const linux = std.os.linux; | |
// The events this epoll implementation exposes. | |
// This implementation doesn't emit close. | |
const EpollAccept = struct { fd: i32, userctx: **allowzero anyopaque }; | |
const EpollDataReady = struct { fd: i32, data: *std.ArrayList(u8), userctx: **allowzero anyopaque }; | |
const EpollWrite = struct { fd: i32, data: std.ArrayList(u8) }; | |
const This = @This(); | |
pub fn init(alloc: std.mem.Allocator) !This { | |
var sock = std.net.StreamServer.init(.{}); | |
sock.reuse_port = true; | |
try sock.listen(try std.net.Address.resolveIp("0.0.0.0", 5151)); | |
const epfd = try std.os.epoll_create1(0); | |
const ctx = try Ctx.init(alloc); | |
ctx.client_fd = epfd; | |
var event = linux.epoll_event{ | |
.events = linux.EPOLL.IN, | |
.data = .{ .ptr = @intFromPtr(ctx) }, | |
}; | |
try std.os.epoll_ctl(epfd, linux.EPOLL.CTL_ADD, sock.sockfd.?, &event); | |
return This{ | |
.epfd = epfd, | |
.sock = sock, | |
.allocator = alloc, | |
}; | |
} | |
const Ctx = struct { | |
readbuffer: std.ArrayList(u8), | |
client_fd: i32, | |
userctx: *allowzero anyopaque, | |
pub fn init(alloc: std.mem.Allocator) !*@This() { | |
var ctx = try alloc.create(@This()); | |
ctx.readbuffer = std.ArrayList(u8).init(alloc); | |
return ctx; | |
} | |
pub fn deinit(this: *@This()) void { | |
this.readbuffer.deinit(); | |
} | |
}; | |
fn epoll_add(this: *This, fd: i32, events: u32) !void { | |
var ctx = try Ctx.init(this.allocator); | |
ctx.client_fd = fd; | |
var epoll_event: linux.epoll_event = undefined; | |
epoll_event.events = events; | |
epoll_event.data.ptr = @intFromPtr(ctx); | |
try std.os.epoll_ctl(this.epfd, linux.EPOLL.CTL_ADD, fd, &epoll_event); | |
} | |
fn destroyEvent(comptime E: type) *const fn (*anyopaque, *anyopaque) void { | |
return struct { | |
fn destroy(opaque_this: *anyopaque, opaque_event: *anyopaque) void { | |
var this: *This = @ptrCast(@alignCast(opaque_this)); | |
var event: *E = @ptrCast(@alignCast(opaque_event)); | |
this.allocator.destroy(event); | |
} | |
}.destroy; | |
} | |
// create a event that if emitted will send data to the fd | |
pub fn writeEvent(this: *This, fd: i32, data: std.ArrayList(u8)) Event { | |
var e = this.allocator.create(EpollWrite) catch unreachable; | |
e.* = EpollWrite{ .fd = fd, .data = data }; | |
return Event{ | |
.ctx = this, | |
.destroyData = destroyEvent(EpollWrite), | |
.data = e, | |
.name = "Epoll.Write", | |
}; | |
} | |
pub fn readEvent(this: *This) Event { | |
var fake: EpollAccept = undefined; | |
return Event{ | |
.ctx = this, | |
.data = @alignCast(@ptrCast(&fake)), | |
.destroyData = struct { | |
fn f(_: *anyopaque, _: *anyopaque) void {} | |
}.f, | |
.name = "Epoll.Read", | |
}; | |
} | |
// just the usuall epoll stuff here | |
fn read(ctx_this: *anyopaque, _: *anyopaque, bus: *EventBus) void { | |
var this: *This = @ptrCast(@alignCast(ctx_this)); | |
const server_fd = this.sock.sockfd orelse unreachable; | |
var events: [16]linux.epoll_event = undefined; | |
const rc = linux.epoll_wait(this.epfd, &events, 16, 5); | |
switch (std.os.errno(rc)) { | |
.SUCCESS => { | |
for (events[0..rc]) |event| { | |
const ctx = @as(*Ctx, @ptrFromInt(event.data.ptr)); | |
if (ctx.client_fd == this.epfd) { | |
var addr: linux.sockaddr = undefined; | |
var size: linux.socklen_t = @sizeOf(linux.sockaddr); | |
const err_client_fd = linux.accept(server_fd, &addr, &size); | |
const client_fd: i32 = @intCast(err_client_fd); | |
// set non blocking | |
_ = linux.fcntl(client_fd, linux.F.SETFD, linux.fcntl(client_fd, linux.F.GETFD, 0) | linux.O.NONBLOCK); | |
this.epoll_add(client_fd, linux.EPOLL.IN | linux.EPOLL.HUP | linux.EPOLL.RDHUP) catch |err| { | |
std.log.err("Failed to configure {} because {}", .{ client_fd, err }); | |
continue; | |
}; | |
var data = this.allocator.create(EpollAccept) catch unreachable; | |
data.* = EpollAccept{ .fd = client_fd, .userctx = &ctx.userctx }; | |
bus.emit(Event{ | |
.data = data, | |
.name = "Epoll.Accept", | |
.ctx = this, | |
.destroyData = destroyEvent(EpollAccept), | |
}); | |
} else if (event.events & linux.EPOLL.IN != 0) { | |
var buff: [1024]u8 = undefined; | |
const bytes_read = linux.read(ctx.client_fd, buff[0..1024], 1024); | |
ctx.readbuffer.appendSlice(buff[0..bytes_read]) catch unreachable; | |
var data = this.allocator.create(EpollDataReady) catch unreachable; | |
data.* = EpollDataReady{ | |
.fd = ctx.client_fd, | |
.userctx = &ctx.userctx, | |
.data = &ctx.readbuffer, | |
}; | |
bus.emit(Event{ | |
.data = data, | |
.name = "Epoll.Data", | |
.ctx = this, | |
.destroyData = destroyEvent(EpollDataReady), | |
}); | |
} else if (event.events & (linux.EPOLL.HUP | linux.EPOLL.RDHUP) != 0) { | |
_ = linux.epoll_ctl(this.epfd, linux.EPOLL.CTL_DEL, ctx.client_fd, null); | |
_ = linux.close(ctx.client_fd); | |
ctx.deinit(); | |
this.allocator.destroy(ctx); | |
} | |
} | |
}, | |
.INVAL => { | |
std.log.err("errno = INVAL", .{}); | |
std.os.exit(1); | |
}, | |
else => unreachable, | |
} | |
// "recursive" tail call | |
bus.emit(this.readEvent()); | |
} | |
fn write(_: *anyopaque, ev: *anyopaque, _: *EventBus) void { | |
var data: *EpollWrite = @ptrCast(@alignCast(ev)); | |
_ = linux.write(data.fd, @ptrCast(data.data.items), data.data.items.len); | |
} | |
pub fn deinit(this: *This) void { | |
this.sock.deinit(); | |
std.os.close(this.epfd); | |
} | |
}; | |
const Server = struct { | |
allocator: std.mem.Allocator, | |
epoll: *Epoll, | |
const This = @This(); | |
pub fn init(alloc: std.mem.Allocator, e: *Epoll) This { | |
return This{ | |
.allocator = alloc, | |
.epoll = e, | |
}; | |
} | |
pub const LineReceieved = struct { | |
fd: i32, | |
line: []const u8, | |
slice: []const u8, | |
line_allocator: std.mem.Allocator, | |
pub fn destroyData(opaque_this: *anyopaque, opaque_event: *anyopaque) void { | |
var this: *This = @ptrCast(@alignCast(opaque_this)); | |
var event: *@This() = @ptrCast(@alignCast(opaque_event)); | |
event.line_allocator.free(event.slice); | |
this.allocator.destroy(event); | |
} | |
}; | |
pub fn dataReady(opaque_this: *anyopaque, opaque_event: *anyopaque, bus: *EventBus) void { | |
var this: *This = @ptrCast(@alignCast(opaque_this)); | |
var event: *Epoll.EpollDataReady = @ptrCast(@alignCast(opaque_event)); | |
while (true) { | |
// parse the protocol. In this case it is a terminated protocol with \n at the end. | |
var i: usize = 0; | |
for (event.data.items, 0..) |x, n| { | |
if (x == '\n') { | |
i = n; | |
} | |
} | |
if (i == 0) { | |
return; | |
} | |
// slicing out the line we received. It could be possible to receive one and a half lines or multiple line so we work out the line. | |
var slice = event.data.toOwnedSlice() catch unreachable; | |
var line = slice[0..i]; | |
event.data.appendSlice(slice[(i + 1)..slice.len]) catch unreachable; | |
var data = this.allocator.create(LineReceieved) catch unreachable; | |
data.* = LineReceieved{ | |
.fd = event.fd, | |
.line = line, | |
.slice = slice, | |
.line_allocator = event.data.allocator, | |
}; | |
// emitting the line to the event bus | |
bus.emit(Event{ | |
.ctx = this, | |
.destroyData = LineReceieved.destroyData, | |
.data = data, | |
.name = "Server.Line", | |
}); | |
} | |
} | |
}; | |
const Application = struct { | |
user: std.AutoHashMap(i32, []const u8), | |
allocator: std.mem.Allocator, | |
epoll: *Epoll, | |
const This = @This(); | |
pub fn init(allocator: std.mem.Allocator, epoll: *Epoll) This { | |
return This{ | |
.user = std.AutoHashMap(i32, []const u8).init(allocator), | |
.allocator = allocator, | |
.epoll = epoll, | |
}; | |
} | |
const MessageSentEvent = struct { | |
msg: std.ArrayList(u8), | |
pub fn destroyData(opaque_this: *anyopaque, opaque_event: *anyopaque) void { | |
var this: *This = @ptrCast(@alignCast(opaque_this)); | |
var event: *@This() = @ptrCast(@alignCast(opaque_event)); | |
event.msg.deinit(); | |
this.allocator.destroy(event); | |
} | |
}; | |
fn messageSent(this: *This, msg: std.ArrayList(u8)) Event { | |
var data = this.allocator.create(MessageSentEvent) catch unreachable; | |
data.* = MessageSentEvent{ | |
.msg = msg, | |
}; | |
return Event{ | |
.ctx = this, | |
.data = data, | |
.destroyData = MessageSentEvent.destroyData, | |
.name = "Application.MessageSent", | |
}; | |
} | |
pub fn lineReceieved(opaque_this: *anyopaque, opaque_event: *anyopaque, bus: *EventBus) void { | |
var this: *This = @ptrCast(@alignCast(opaque_this)); | |
var event: *Server.LineReceieved = @ptrCast(@alignCast(opaque_event)); | |
// check if user has been registered | |
if (this.user.get(event.fd)) |name| { | |
// format the message | |
var message = std.ArrayList(u8).init(this.allocator); | |
message.appendSlice(name) catch unreachable; | |
message.appendSlice(" -> ") catch unreachable; | |
message.appendSlice(event.line) catch unreachable; | |
message.appendSlice("\n") catch unreachable; | |
// broadcast the message to all users | |
var users = this.user.iterator(); | |
while (users.next()) |user| { | |
bus.emit(this.epoll.writeEvent(user.key_ptr.*, message)); | |
} | |
// this event could be used in other parts of the application | |
bus.emit(this.messageSent(message)); | |
} else { | |
// register user | |
// for that we must clone the name, because event.line is owned by the event. | |
var name = this.allocator.alloc(u8, event.line.len) catch unreachable; | |
@memcpy(name, event.line); | |
this.user.put(event.fd, name) catch unreachable; | |
std.debug.print("User {s} created for fd {}\n", .{ name, event.fd }); | |
return; | |
} | |
} | |
}; | |
pub fn main() !void { | |
var gpa = std.heap.GeneralPurposeAllocator(.{}){}; | |
var allocator = gpa.allocator(); | |
var bus = try EventBus.init(allocator); | |
var epoll = try Epoll.init(allocator); | |
var server = Server.init(allocator, &epoll); | |
var application = Application.init(allocator, &epoll); | |
// Register all listeners. | |
// might be useful to do in the init function to keep things neat. | |
bus.addHandler(EventHandler{ | |
.event = "Epoll.Read", | |
.handler = &epoll, | |
.handlerfn = Epoll.read, | |
}); | |
bus.addHandler(EventHandler{ | |
.event = "Epoll.Write", | |
.handler = &epoll, | |
.handlerfn = Epoll.write, | |
}); | |
bus.addHandler(EventHandler{ | |
.event = "Epoll.Data", | |
.handler = &server, | |
.handlerfn = Server.dataReady, | |
}); | |
bus.addHandler(EventHandler{ | |
.event = "Server.Line", | |
.handler = &application, | |
.handlerfn = Application.lineReceieved, | |
}); | |
// prepare the I/O loop | |
bus.emit(epoll.readEvent()); | |
// blocks indefinitely running the event loop | |
bus.run(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment