Created
February 13, 2012 16:38
-
-
Save Deco/1818054 to your computer and use it in GitHub Desktop.
Lua Coroutine Scheduler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pcall(require,"socket") | |
local coroutine_scheduler = { | |
_NAME = "coroutine_scheduler.lua" | |
_VERSION = "1.0.0", | |
} | |
local Scheduler | |
do Scheduler = setmetatable({}, { | |
__call = function(class) | |
return setmetatable({ | |
threads = {}, | |
}, class) | |
end | |
}) | |
Scheduler.__index = Scheduler | |
function Scheduler:select() | |
-- selects a coroutine from the list and runs it | |
local time, shortest_wait_time = os.clock() | |
for thread_i = 1, #self.threads do | |
local thread = self.threads[thread_i] | |
local state, ret = thread.state | |
if state == "wait" then | |
if time >= thread.resume_time then | |
thread.resume_time = nil | |
thread.state = "running" | |
table.remove(self.threads, thread_i) | |
ret = {coroutine.resume(thread.co, time-thread.suspend_time, unpack(thread.arg))} | |
thread.suspend_time = nil | |
thread.arg = nil | |
elseif not shortest_wait_time or thread.resume_time < shortest_wait_time then | |
shortest_wait_time = thread.resume_time | |
end | |
elseif state == "suspend" then | |
thread.state = "running" | |
table.remove(self.threads, thread_i) | |
ret = {coroutine.resume(thread.co, unpack(thread.arg))} | |
thread.arg = nil | |
elseif state == "new" then | |
thread.state = "running" | |
table.remove(self.threads, thread_i) | |
ret = {coroutine.resume(thread.co, unpack(thread.arg))} | |
thread.arg = nil | |
end | |
if ret then | |
local act, worked = table.remove(ret, 2), table.remove(ret, 1) | |
if worked then | |
local status = coroutine.status(thread.co) | |
if status == "dead" then | |
thread.state = "dead" | |
else | |
if act == "suspend" then | |
thread.arg = ret | |
thread.state = "suspend" | |
table.insert(self.threads, thread) | |
elseif act == "wait" then | |
local wait_val = table.remove(ret, 1) | |
local wait_time = tonumber(wait_val) | |
if not wait_time then | |
return false, "thread returned non-numeric wait time ("..tostring(wait_val)..")" | |
end | |
thread.arg = ret | |
thread.suspend_time = time | |
thread.resume_time = time+wait_time | |
thread.state = "wait" | |
table.insert(self.threads, 1, thread) | |
end | |
end | |
else | |
return false, act | |
end | |
return true, 0 | |
end | |
end | |
if shortest_wait_time then | |
return true, shorest_wait_time | |
end | |
return false, "no threads to select" | |
end | |
function Scheduler:spawn(func, ...) | |
-- creates a new thread and adds it to the list (but does not run it straight away) | |
table.insert(self.threads, { | |
state = "new", | |
co = coroutine.create(func), | |
arg = {...}, | |
}) | |
end | |
function Scheduler:run(func, ...) | |
-- creates a new thread and adds it to the top of the list, and suspends the current thread | |
table.insert(self.threads, 1, { | |
state = "new", | |
co = coroutine.create(func), | |
arg = {...}, | |
}) | |
coroutine.yield("suspend") | |
end | |
function Scheduler:suspend(...) | |
-- suspsends the current thread and adds it to the bottom of the list | |
-- returns the arguments passed | |
return coroutine.yield("suspend", ...) | |
end | |
function Scheduler:wait(time, ...) | |
-- suspends the current thread and gives it a wait condition | |
-- returns the actual time waited and the arguments passed | |
return coroutine.yield("wait", time, ...) | |
end | |
end | |
coroutine_scheduler.Scheduler = Scheduler | |
return coroutine_scheduler |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local coroutine_scheduler = require"coroutine_scheduler" | |
sched = coroutine_scheduler.Scheduler() | |
for i = 1, 10 do | |
sched:spawn(function() | |
print("outer started", os.clock()) | |
sched:spawn(function() | |
print("inner started", os.clock()) | |
print(sched:wait(i*.5)) | |
print("inner finished", os.clock()) | |
end) | |
print("outer finished", os.clock()) | |
end) | |
end | |
while true do | |
local worked, t = sched:select() | |
if worked then | |
if t and t ~= 0 then | |
if socket then socket.sleep(t) end | |
end | |
else | |
print(worked, t) | |
break | |
end | |
end |
sick! What scheduling algo are you using here? FIFO? Shortest wait time? Is there a way to optimize this if you know how long each sub process will run for?
Also, If I have one function, that I want to only ever spawn one instance of. How do you recommend doing that?
return true, 0
end
end
if shortest_wait_time then
return true, shorest_wait_time --At here you typo when typing shortest_wait_time
end
return false, "no threads to select"
end
function Scheduler:spawn(func, ...)
-- creates a new thread and adds it to the list (but does not run it straight away)
table.insert(self.threads, {
state = "new",
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To get this to work in lua 5.3 simply put
somewhere near the top.