Created
April 28, 2024 01:08
-
-
Save ochaton/8892835557da5e8e6b19421bdc986763 to your computer and use it in GitHub Desktop.
PoC: cpu_limit method for luafun iterator inside Tarantool to allow automatic control of yields
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--[[ | |
cpu limit is like leaking bucket. | |
we say that we want to limit this iterator to consume ≤10% cpu. | |
Caller: | |
- ev(), real() | |
- gen_x(param_x, state_x) | |
- ev(), real() | |
-- We gain time to work when we sleep | |
-- We lose time of the work between subsequent calls | |
-- Given time quota for fiber is evaluated as % of max_ev_run_time | |
-- So basically, it is not possible for the fiber to be on-cpu more than max_ev_run_time (10*ms for now) | |
-- When fiber or ev_run exhausts time_quota (yes, noizy neighbours make fiber yield too) | |
-- fiber is sent to sleep for next time slot. | |
-- sleep time slot is evaluated in the following manner: | |
-- for each W time of work, fiber needs to be yielded for at least W / % to regain it's time-quota. | |
-- The same approach, but slightly in a different way is implemented here: | |
-- fiber should sleep at most `max_ev_run_time` (basically should skip next ev loop). | |
-- more precisely, fiber need to sleep (100% - X%) * max_ev_run_time where X% is cpu quota given to fiber | |
-- but, in some cases, it is sended to sleep because of noizy neighbours, so we subtract from sleep-time time_quota that left for fiber. | |
-- After each sleep time_quota increases for X% of the sleep but always limited to X% * max_ev_run_time. | |
]] | |
local fun = require 'fun' | |
local methods = debug.getmetatable(fun.ones()).__index | |
local rawiter = function(gen, param, state) | |
return fun.iter(gen, param, state):unwrap() | |
end | |
local method1 = function(func) | |
return function(self, arg1) | |
return func(arg1, self.gen, self.param, self.state) | |
end | |
end | |
local export1 = function(func) | |
return function(arg1, gen, param, state) | |
return func(arg1, rawiter(gen, param, state)) | |
end | |
end | |
local fiber = require 'fiber' | |
local clock = require 'clock' | |
local log = require 'log' | |
if log.new then log = log.new('fun.cpu_limit') end | |
local function ev_time_mks() return fiber.time64() end | |
local function realtime_mks() return clock.realtime64()/1e3 end | |
local function thread_time_mks() return clock.thread64()/1e3 end | |
local function max(a, b) | |
if a > b then return a end | |
return b | |
end | |
local yield_sleep = function(time) | |
fiber.sleep(time) | |
end | |
local yield_commit = function(time) | |
local is_committed | |
if type(box.cfg) ~= 'function' and box.is_in_txn and box.is_in_txn() then | |
box.commit() | |
is_committed = true | |
end | |
yield_sleep(time) | |
if is_committed then box.begin() end | |
end | |
local yield_rollback = function(time) | |
if type(box.cfg) ~= 'function' and box.is_in_txn and box.is_in_txn() then | |
box.rollback() | |
end | |
return yield_sleep(time) | |
end | |
local yield_raise = function(time) | |
if type(box.cfg) ~= 'function' and box.is_in_txn and box.is_in_txn() then | |
error("Transaction left openned", 3) | |
end | |
return yield_sleep(time) | |
end | |
local mks = 1 | |
local ms = 1000*mks | |
local max_ev_run_mks = 10*ms | |
local cpu_limit_gen_x = function(quota_mks, b_ev_mks, b_thread_mks, state_x, ...) | |
if state_x == nil then | |
return nil | |
end | |
local ev_mks = ev_time_mks() | |
local thread_mks | |
if b_ev_mks == ev_mks then | |
-- same loop, | |
-- gen_x() hasn't yielded | |
thread_mks = thread_time_mks() | |
quota_mks = quota_mks - tonumber(thread_mks - b_thread_mks) | |
end | |
return {state_x, quota_mks, ev_mks, thread_mks or thread_time_mks()}, ... | |
end | |
local cpu_limit_gen = function(param, state) | |
local max_quota_mks, sleep, gen_x, param_x = param[1], param[2], param[3], param[4] | |
local state_x, quota_mks, prev_ev_mks, prev_thread_mks = state[1], state[2], state[3], state[4] | |
local ev_mks = ev_time_mks() | |
local thread_mks | |
if prev_ev_mks == ev_mks then | |
-- the caller spent all this time on cpu | |
-- decrease quota | |
thread_mks = thread_time_mks() | |
quota_mks = quota_mks - tonumber(thread_mks - prev_thread_mks) | |
end | |
local wall_mks = realtime_mks() | |
if ((ev_mks + max_ev_run_mks) < wall_mks) or quota_mks <= 1 then | |
-- it's time to yield | |
-- Fiber need to sleep: | |
local sleep_time = (max_ev_run_mks - (max_quota_mks - quota_mks))/1e6 | |
-- local sleep_time = max(max_ev_run_time, max(max_time_mks, -quota_mks))/1e6 | |
log.verbose("yielding(quota=%s, max_allowed=%s, sleep=%s)", quota_mks, max_quota_mks, sleep_time) | |
sleep(sleep_time) | |
ev_mks = ev_time_mks() | |
thread_mks = thread_time_mks() | |
if ev_mks > wall_mks then | |
-- yes, any time drift does not add time quota. | |
quota_mks = quota_mks + tonumber(ev_mks-wall_mks)*max_quota_mks / max_ev_run_mks | |
end | |
if quota_mks > max_quota_mks then | |
quota_mks = max_quota_mks | |
end | |
end | |
return cpu_limit_gen_x(quota_mks, ev_mks, thread_mks or thread_time_mks(), gen_x(param_x, state_x)) | |
end | |
local cpu_limit = function(opts, gen_x, param_x, state_x) | |
local cpu_limit | |
local sleep_with = yield_sleep | |
if type(opts) == 'table' then | |
cpu_limit = tonumber(opts.cpu_limit) | |
if opts.txn == 'commit' then | |
sleep_with = yield_commit | |
elseif opts.txn == 'rollback' then | |
sleep_with = yield_rollback | |
elseif opts.txn == 'raise' then | |
sleep_with = yield_raise | |
elseif opts.txn then | |
error("malformed cpu_limit/txn: commit, rollback or raise are supported", 2) | |
end | |
else | |
cpu_limit = tonumber(opts) | |
sleep_with = yield_sleep | |
end | |
assert(cpu_limit, "malformed cpu_limit given: should be positive number within (0;100)") | |
assert(cpu_limit > 0, "malformed cpu_limit given: should be positive number within (0;100)") | |
assert(cpu_limit < 100, "malformed cpu_limit given: should be positive number within (0;100)") | |
local max_quota_mks = tonumber(max_ev_run_mks * cpu_limit / 100) | |
return fun.wrap(cpu_limit_gen, {max_quota_mks, sleep_with, gen_x, param_x}, {state_x, max_quota_mks, ev_time_mks(), thread_time_mks()}) | |
end | |
methods.cpu_limit = method1(cpu_limit) | |
---@diagnostic disable-next-line: inject-field | |
fun.cpu_limit = export1(cpu_limit) | |
return fun | |
--[[ | |
fun.cpulimit(10, ....) | |
fun.ones():cpu_limit(10) | |
fun.ones():cpu_limit({ cpu_limit = 3, rxn='rollback' }) | |
]] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment