-
-
Save kn666/68df7c79d8f2b325c92da54e54041ea0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if gr == nil then | |
gr = { version = 1.0 }; | |
else | |
gr.version = 1.0 | |
end | |
-- gr['defschema'] = '1s30s 1s7d 1m3M' | |
gr['defschema'] = '1s30s' | |
gr['defagg'] = 'avg' | |
gr.port = 22003 | |
function gr.mult (x) | |
if (x == 's') then return 1 | |
elseif ( x == 'm' ) then return 60 | |
elseif ( x == 'h' ) then return 3600 | |
elseif ( x == 'd' ) then return 3600*24 | |
elseif ( x == 'w' ) then return 3600*24*7 | |
elseif ( x == 'M' ) then return 3600*24*30 | |
elseif ( x == 'y' ) then return 3600*24*365 | |
else error("Unknown type '" .. x .. "'") | |
end | |
end | |
function gr.st1t (x) | |
local a,b = string.match(x,'(%d+)(%a+)') | |
return tonumber(a) * gr.mult(b) | |
end | |
function gr.st2t (x) | |
local ret = {}; | |
for d1,s1,d2,s2 in string.gmatch(x,'(%d+)(%a+):*(%d+)(%a+)%s*') do | |
local sk = d1..s1..d2..s2 | |
d1 = d1 * gr.mult(s1) | |
d2 = math.floor(d2 * gr.mult(s2) / d1) | |
ret[#ret+1] = { d1,d2,sk }; | |
end | |
return ret; | |
end | |
function gr.save(metric,value,time) | |
if (time == nil) then time = os.time() end | |
local m | |
while true do | |
m = box.select(0,1,metric); | |
if (m == nil) then | |
local v = box.space[0].index[0]:max() | |
local n | |
if v == nil then n = 1 else n = box.unpack('i',v[0]) + 1 end | |
print("create ",metric," with id ",n) | |
local r,e = pcall(box.insert, 0, { n, metric, gr['defschema'], gr['defagg'] }) | |
if (not r) then | |
print("insert ",n,':',metric," failed: ",e) | |
break; | |
else | |
m = e | |
break; | |
end | |
else | |
break; | |
end | |
end | |
if m == nil then return end | |
local id = box.unpack('i',m[0]) | |
value = tonumber(value) | |
-- print(m) | |
local schemas = gr.st2t(m[2]); | |
local agg = m[3]; | |
for n,schema in pairs(schemas) do | |
local s1,s2,sk = unpack(schema) | |
local t = math.floor(time / s1); | |
local mintime = os.time() - s2*s1 | |
local slot = t % s2; | |
-- print("save ",sk,' ',metric,' ',time, ' -> ',slot) | |
if (agg == 'last') then | |
local x = box.update(1,{m[0],slot},'=p',2,value) | |
if (not x) then | |
local r,e = pcall(box.insert,1, { m[0],slot,value }) | |
if (not r) then | |
print("insert ",m[0]," failed: ",e) | |
x = box.update(1,{m[0],slot},'=p',2,value) | |
else | |
x = e | |
end | |
end | |
elseif (agg == 'avg') then | |
-- id,schema,slot,value,count,time | |
-- 0 1 2 3 4 5 | |
local x = box.select(1,0,{m[0],sk,slot}) | |
if (x == nil) then | |
print("insert") | |
box.insert(1,{ m[0],sk,slot,box.pack('i',value),box.pack('i',1), box.pack('i',time) }) | |
elseif box.unpack('i',x[5]) < mintime or box.unpack('i',x[5]) ~= time then | |
-- print("replace ",box.unpack('i',x[5]), " -> ", time) | |
box.replace(1,{ m[0],sk,slot,box.pack('i',value),box.pack('i',1), box.pack('i',time) }) | |
else | |
-- print("update ",box.unpack('i',x[4]), ' ',time, ' -> ', box.unpack('i',x[5])) | |
box.update(1, { x[0],sk,slot }, '=p+p', 3, box.pack('i',(box.unpack('i',x[3])+value)/(box.unpack('i',x[4])+1)), 4, 1 ) | |
end | |
else | |
print("unsupported aggregation: ",agg) | |
end | |
end | |
return | |
end | |
function gr.points(metric,from,till) | |
if till == nil then till = os.time() end | |
local range = till - from | |
local m = box.select(0,1,metric); | |
if m == nil then return nil end | |
print("got range ",range," for ",m, ' from ',from, ' till ', till); | |
local schemas = gr.st2t(m[2]); | |
local schema | |
for n,sch in pairs(schemas) do | |
local s1,s2,sk = unpack(sch) | |
local prange = math.floor(range/s1) | |
print(sk, ' ',s1, ' ', s2, '; points by range ',prange,'; time back = ',s2*s1) | |
schema = sch | |
if ( os.time() - from < s2*s1 ) then | |
break; | |
else | |
print(sk, " is too small for ") | |
end | |
end | |
local s1,s2,sk = unpack(schema) | |
local minslot = math.floor(from / s1) % s2; | |
local maxslot = math.floor(till / s1) % s2; | |
local mintime = os.time() - s2*s1 | |
print("use ",unpack(schema),' ',s1, ' ',s2, '; slots: ',minslot,',', maxslot ); | |
local i = box.space[1].index[0] | |
local ret = {} | |
local json = '{now: ' .. os.time() .. ', points:[\n'; | |
local tillend = nil; | |
if (minslot > maxslot) then | |
print("overlap ",minslot,' -> end, 0 -> ',maxslot) | |
--json = json .. "overlap "..minslot..' -> end, 0 -> '..maxslot | |
tillend = minslot | |
minslot = 0 | |
else | |
print("linear ",minslot,' -> ',maxslot) | |
--json = json .. "linear " .. minslot .. ' -> ' .. maxslot | |
end | |
if (tillend) then | |
local x,v = i:next( m[0],sk,tillend ) | |
if x ~= nil then | |
while true do | |
if v[1] ~= sk then break end; | |
if (box.unpack('i',v[5]) > mintime) then | |
ret[#ret+1] = v | |
json = json .. '['.. box.unpack('i',v[3]) .. ','.. box.unpack('i',v[5]) .. '],\n' | |
else | |
box.delete( 1,{v[0],v[1],v[2]} ) | |
print("deprecated time ",box.unpack('i',v[5]), " for slot ",box.unpack('i',v[2])," must be ", math.floor(os.time() / s1 ) + box.unpack('i',v[2]) ) | |
end | |
x,v = i:next(x) | |
if x == nil then break end; | |
end | |
end | |
end | |
x,v = i:next( m[0],sk,minslot ) | |
if x ~= nil then | |
while true do | |
if box.unpack('i',v[2]) > maxslot then break end; | |
if (box.unpack('i',v[5]) > mintime) then | |
ret[#ret+1] = v | |
json = json .. '['.. box.unpack('i',v[3]) .. ','.. box.unpack('i',v[5]) .. '],\n' | |
else | |
box.delete( 1,{v[0],v[1],v[2]} ) | |
print("deprecated time ",box.unpack('i',v[5]), " for slot ",box.unpack('i',v[2])," must be ", math.floor(os.time() / s1 ) + box.unpack('i',v[2]) ) | |
end | |
x,v = i:next(x) | |
if x == nil then break end; | |
end | |
end | |
--json[#json] = '' | |
return json .. ']'; | |
---return unpack(ret) | |
end | |
function gr.saveline(line) | |
local metric,value,time = string.match(line, '([^%s]+) ([%d.]+)%s*(%d*)') | |
if (metric ~= nil) then | |
value = tonumber(value) | |
if time ~= '' then | |
time = tonumber(time) | |
else | |
time = os.time() | |
end | |
-- print(metric,'; ',value, '; ',time) | |
gr.save(metric,value,time) | |
else | |
print("got malformed line ",line) | |
end | |
end | |
function gr.worker(sock,host,port) | |
box.fiber.detach() | |
box.fiber.name("gr.worker."..host..":"..port) | |
print("worker"); | |
while true do | |
local line = sock:readline() | |
if (line == nil or line == '') then | |
print("closing connection"); | |
sock:close() | |
break; | |
end | |
gr.saveline(line) | |
end | |
end | |
function gr.tcpserver() | |
box.fiber.detach() | |
box.fiber.name("gr.tcpserver") | |
while true do | |
local sock, status, host, port = gr.tcpsrv:accept() | |
if (sock ~= nil) then | |
print ("accepted ",host,":",port); | |
local wfiber = box.fiber.create(gr.worker) | |
box.fiber.resume(wfiber, sock,host,port) | |
else | |
print ("accept failed ", status, host,port); | |
end | |
end | |
end | |
function gr.udpserver() | |
box.fiber.detach() | |
box.fiber.name("gr.udpserver") | |
while true do | |
local msg, status, host, port = gr.udpsrv:recvfrom(1024) | |
if (msg ~= '') then | |
gr.saveline(msg) | |
else | |
print("not a message ", status, host, port); | |
end | |
end | |
end | |
if (gr.tcpsrv == nil) then | |
local sock = box.socket.tcp() | |
sock:bind('0.0.0.0', gr.port, 1); | |
sock:listen() | |
print("bound to tcp port ",gr.port) | |
local f = box.fiber.create(gr.tcpserver) | |
box.fiber.resume(f) | |
gr.tcpsrv = sock; | |
end | |
if (gr.udpsrv == nil) then | |
local sock = box.socket.udp() | |
sock:bind('0.0.0.0', gr.port, 1); | |
sock:listen() | |
print("bound to udp port ",gr.port) | |
local f = box.fiber.create(gr.udpserver) | |
box.fiber.resume(f) | |
gr.udpsrv = sock; | |
end | |
if httpd == nil then | |
httpd = {} | |
end | |
if httpd.port == nil then | |
httpd.port = 10888 | |
end | |
CRLF = "\x0d\x0a" | |
function httpd.reply(sock,status,body,headers) | |
if body == nil then body = '' end | |
if headers == nil then headers = {} end | |
if headers['content-type'] == nil then headers['content-type'] = 'text/html' end | |
if headers['content-length'] == nil then headers['content-length'] = #body end | |
if headers['connection'] == nil then headers['connection'] = 'close' end | |
local reply = "HTTP/1.0 " .. tostring(status) .. CRLF | |
for k,v in pairs(headers) do | |
reply = reply .. k .. ': ' .. v .. CRLF | |
end | |
reply = reply .. CRLF .. body | |
print("send reply\n",reply) | |
sock:send(reply); | |
sock:close(); | |
end | |
if (url == nil) then url = {} end | |
function url_decode(x) | |
if x == nil then return '' end | |
local v = string.gsub(x,'%+',' ') | |
v = string.gsub(v, '%%(%x%x)', function(s) return string.char( tonumber("0x"..s) ) end) | |
return v | |
end | |
function url.parse(url) | |
local qpos = string.find(url, "?"); | |
local path | |
local query = {}; | |
if qpos ~= nil then | |
path = string.sub(url,0,qpos-1) | |
local qv = string.sub(url,qpos+1) | |
print(path,';',qv) | |
for k,v in string.gmatch(qv,'([^&=]+)=([^&]*)') do | |
print(k,'=',v) | |
k = url_decode(k) | |
v = url_decode(v) | |
query[ k ] = v | |
end | |
else | |
path = url | |
end | |
return path,query | |
end | |
function httpd.worker(sock,host,port) | |
box.fiber.detach() | |
box.fiber.name("httpd.worker."..host..":"..port) | |
print("worker"); | |
local line = sock:readline() | |
if (line == nil or line == '') then return sock:close() end | |
local meth, path, version = string.match(line, '^(%u+)%s([^%s]+)%sHTTP/([%d.]+)') | |
if (meth == nil) then return sock:close() end | |
local headers = {} | |
while true do | |
local line = sock:readline() | |
if (line == nil or line == '') then return sock:close() end | |
if (line == '\x0a' or line == '\x0d\x0a') then break end | |
local k,v = string.match(line,'^([^%s]+)%s*:%s*([^\x0d\x0a]*)\x0d?\x0a') | |
if (k == nil) then | |
print("not parsed ", line) | |
else | |
k = string.lower(k) | |
if headers[k] ~= nil then | |
headers[k] = headers[k] .. '; ' .. v | |
else | |
headers[k] = v | |
end | |
end | |
end | |
print(meth,' ',path,' ('..headers['host']..')'); | |
local url, query = url.parse(path) | |
print("resulting path ",url) | |
--for k,v in pairs(query) do | |
-- print(k,'=',v) | |
--end | |
if (url == '/') then | |
if query.from == nil then | |
return httpd.reply(sock, 400, "from required"); | |
end | |
local points = gr.points(query.target, os.time() - gr.st1t(query.from)) | |
local hdr = {} | |
-- hdr['content-type'] = 'application/x-json' | |
hdr['content-type'] = 'text/plain' | |
return httpd.reply(sock, 200, points, hdr); | |
else | |
return httpd.reply(sock, 404, "Not found"); | |
end | |
sock:close() | |
end | |
function httpd.server() | |
box.fiber.detach() | |
box.fiber.name("httpd.server") | |
while true do | |
local sock, status, host, port = httpd.sock:accept() | |
if (status == nil) then | |
print ("http accepted ",sock,'; ',host,":",port); | |
local wfiber = box.fiber.create(httpd.worker) | |
box.fiber.resume(wfiber, sock,host,port) | |
else | |
print ("http accept failed ", sock, status, host, port); | |
end | |
end | |
end | |
if httpd.sock == nil then | |
local sock = box.socket.tcp() | |
local s,status = sock:bind('0.0.0.0', httpd.port, 1); | |
if status ~= nil then error(tostring(sock) .. ' ' .. s .. ' listen failed: ' .. status) end | |
s,status = sock:listen() | |
if status ~= nil then error(tostring(sock) .. ' ' .. s .. ' listen failed: ' .. status) end | |
print("bound to http port ",httpd.port) | |
httpd.sock = sock; | |
local f = box.fiber.create(httpd.server) | |
box.fiber.resume(f) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment