Created
March 17, 2015 21:13
-
-
Save daurnimator/88e942022d4f3cabf57c to your computer and use it in GitHub Desktop.
Wrapper around fifo.lua using cqueues to implement non-blocking queues.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local new_fifo = require "fifo" | |
local cqueues = require "cqueues" | |
local cc = require "cqueues.condition" | |
local methods = {} | |
local mt = { | |
__index = methods; | |
} | |
function methods.new(...) | |
local cond = cc.new(); | |
local fifo = new_fifo(...) | |
fifo:setempty(function(self) | |
cond:wait() | |
return fifo:pop() | |
end) | |
return setmetatable({ | |
fifo = fifo; | |
--[[ From cqueues docs: | |
Alternatively, :pollfd may return a condition variable object, | |
or the member field may itself be a condition variable instead of a function. | |
This permits user code to create ad hoc pollable objects.]] | |
pollfd = cond; | |
}, mt) | |
end | |
-- Ensure we return immediately if we're polled and have something in the fifo | |
function methods:timeout() | |
if self:length() ~= 0 then | |
return 0 | |
else | |
return nil | |
end | |
end | |
function methods:push(...) | |
self.fifo:push(...) | |
self.pollfd:signal(1) | |
end | |
function methods:insert(...) | |
self.fifo:insert(...) | |
self.pollfd:signal(1) | |
end | |
--- Simple wrappers | |
function methods:pop(...) | |
return self.fifo:pop(...) | |
end | |
function methods:remove(...) | |
return self.fifo:remove(...) | |
end | |
function methods:length(...) | |
return self.fifo:length(...) | |
end | |
mt.__length = methods.length | |
function methods:peek(...) | |
return self.fifo:peek(...) | |
end | |
--- Utility functions | |
-- Allows `for item in fifo do` | |
-- will stop at the first `nil` | |
mt.__call = methods.pop | |
return methods.new |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local cqueues = require "cqueues" | |
local new_fifo = require "cqueues-fifo" | |
local cq = cqueues.new() | |
local f = new_fifo() | |
-- Consumer | |
cq:wrap(function() | |
for item in f do | |
print(item) | |
end | |
end) | |
-- Producer | |
cq:wrap(function() | |
while true do | |
f:push(math.random()) | |
f:push(math.random()) | |
f:push(math.random()) | |
cqueues.sleep(1) | |
end | |
end) | |
assert(cq:loop()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local cqueues = require "cqueues" | |
local new_fifo = require "cqueues-fifo" | |
local cq = cqueues.new() | |
local f1 = new_fifo() | |
local f2 = new_fifo() | |
print("1", f1, "2", f2) | |
cq:wrap(function() | |
while true do | |
for _, which in ipairs{cqueues.poll(f1, f2)} do | |
local item = which:pop() | |
print(which, item) | |
end | |
end | |
end) | |
cq:wrap(function() | |
while true do | |
f1:push(math.random()) | |
f1:push(math.random()) | |
f2:push(math.random()) | |
cqueues.sleep(1) | |
end | |
end) | |
assert(cq:loop()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment