Created
July 26, 2018 18:50
-
-
Save HeinrichHartmann/296e8a0cf74664925e907d76aca46716 to your computer and use it in GitHub Desktop.
Python IRONdb/Snowth Bindings
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# This is a Python wrapper for the Snowth/IRONdb API I wrote | |
# for personal use in 2014. Use at your own risk. | |
# | |
import logging | |
log = logging.getLogger() | |
if __name__ == "__main__": log.setLevel(logging.DEBUG) | |
else: log.setLevel(logging.WARN) | |
import requests | |
from urllib import urlencode | |
import urlparse | |
from simplejson.decoder import JSONDecodeError | |
from joblib import Memory | |
memory = Memory(cachedir="/DATA/snowth-tmp/", verbose=0) | |
requests.adapters.DEFAULT_RETRIES = 0 | |
class SnowthDB: | |
__version__ = "0.1.21" | |
def __init__(self, host = "localhost", port = "8112", caching = True): | |
self.host = host | |
self.port = port | |
self.caching = caching | |
self.endpoint = "http://" + host + ":" + port + "/" | |
# cached version | |
# self._getJsonCache = memory.cache(self._getJsonNoCache) | |
self._getPathCache = memory.cache(self._getPathNoCache) | |
def getState(self): | |
return self.getJson("/state", caching=False) | |
def getVersion(self): | |
return self.getJson("/state", caching=False)["version"] | |
def __str__(self): | |
return "SnowthDB at {}".format(self.endpoint) | |
def __repr__(self): | |
return "SnowthDB at {}".format(self.endpoint) | |
def read(self, UUID, metric, start, end, period=300, kind="all", caching=None): | |
""" | |
Reads data from snowth via internal REST API | |
Keyword arguments: | |
UUID -- UUID of check | |
metric -- as name of metric. | |
start -- as epoch in sec. or datetime or ISO8601 timestamp | |
end -- as epoch in sec. or datetime, exclusive | |
period -- in sec. Has to be a multiple of 1 minute | |
kind -- all/average/count/derive... | |
CAUTION: These types are note equalt to the dict keys of kind="all" | |
e.g. value <-> average | |
""" | |
path = join_urls("read", start, end, period, UUID, kind, metric) | |
# Example Request: | |
# http://localhost:8112/read.. | |
# ../1401249000/1401264000/300/2a8cbf71-4c7c-6a04-909e-b3d429afc966 | |
# ../average/duration | |
if start == end: return [] | |
return self.getJson(path, caching=caching) | |
def lua(self, script, caching=None, json=True, **params): | |
""" | |
Calls the lua script 'script' with provided parameters. | |
Returns parsed json object. | |
""" | |
URL = join_urls("extension/lua" , script) | |
if json: | |
return self.getJson(URL, params=params, caching=caching) | |
else: | |
return self.getPath(URL, params=params, caching=caching) | |
def getLuaRegister(self): | |
return self.getJson("extension/lua") | |
def luaTest(self): | |
"""Runs Lua test suite. Returns true if all tests passed.""" | |
return self.lua("test", caching=False, json=False).ok | |
def getJson(self, path, **kwargs): | |
"""Make request to path and return response as JSON.""" | |
req = self.getPath(path, **kwargs) | |
try: | |
return req.json() | |
except JSONDecodeError: | |
raise ValueError("Could not decode json at {}.".format(req.url)) | |
def getPath(self, path, caching=None, **kwargs): | |
"""Makes request to path and return response object. | |
* kwargs are passed to requests.request(). | |
(http://docs.python-requests.org/en/latest/api/#requests.request) | |
""" | |
# caching argument overrides self.caching | |
if caching is None: caching = self.caching | |
if caching: | |
return self._getPathCache(path, **kwargs) | |
else: | |
return self._getPathNoCache(path, **kwargs) | |
def _getPathNoCache(self, path, **kwargs): | |
req = requests.request("GET", | |
urlparse.urljoin(self.endpoint, path), | |
**kwargs) | |
logging.debug("Requested url " + req.url) | |
if not req.ok: | |
raise(SnowthError(req)) | |
return req | |
def clearCache(self): | |
memory.clear() | |
class SnowthError(LookupError): | |
def __init__(self, req): | |
self.req=req | |
def getStatusCode(self): return self.req.status_code | |
def __str__(self): | |
req=self.req | |
return "SnowthError({}) {}\url: {}\ntext: {}".format(req.status_code, req.reason, req.url, req.text) | |
def __repr__(self): | |
return self.__str__() | |
# | |
# Helper functions | |
# | |
def join_urls(*args): | |
""" | |
Joins given arguments into a url. Trailing but not leading slashes are | |
stripped for each argument. | |
cf. http://stackoverflow.com/questions/1793261/ | |
""" | |
return "/".join(map(lambda x: str(x).rstrip('/'), args)) | |
from datetime import datetime | |
import dateutil.parser | |
def toUTS(s): | |
"""returns unix time stamp as int: | |
* int # do nothing | |
* string as ISO8601 | |
* datetime object | |
""" | |
if s is None: | |
return None | |
if isinstance(s, int): | |
return s | |
if isinstance(s, str): | |
s = dateutil.parser.parse(s) | |
# returns datetime | |
# !fall through! | |
if isinstance(s, datetime): | |
s = int(s.strftime("%s")) | |
return s | |
raise AssertionError("Supplied argument of invalid type ", type(s)) | |
from copy import copy | |
from itertools import islice | |
class Series(object): | |
def __init__(self, uuid, metric, | |
start=None, end=None, period=300, | |
db=SnowthDB(), key="value", | |
batch_size=1000, step=1 | |
): | |
""" | |
Convenience object for time seris. Represents a time series slice, | |
which can be iterated over. | |
uuid -- of check | |
metric -- name of metric | |
start -- timestamp of slice start. Default self.first() | |
end -- timestamp of slice end. Default: self.last() | |
db -- Snowth DB Connector | |
key -- key to return. If None, the full dict is returned. | |
batch_size -- when iterating large amounts of | |
""" | |
# DB connection | |
self.db = db | |
self.uuid = uuid | |
self.metric = metric | |
# Slicing | |
self.start = toUTS(start) if not start is None else self.first() | |
self.end = toUTS(end) if not end is None else self.last() + period | |
# Rem: last is inclusive, end is exclusive: have to add one period to end date | |
self.period = period | |
# Key selection | |
self.key = key | |
self.batch_size = batch_size | |
self.step = step | |
def _clone(self): | |
return copy(self) | |
def _read(self, start=None, end=None, period=None, **kwargs): | |
"db.read with default parameters." | |
start = start if not start is None else self.start | |
end = end if not end is None else self.end | |
period = period if not period is None else self.period | |
# db.read has problems with this edge case | |
if start == end: return [] | |
def fold(o): | |
t,v = o | |
v[u'time'] = t | |
return v | |
def select(o): | |
if self.key==None: | |
return o | |
else: | |
return o[self.key] | |
def isValid(o): | |
return len(o) == 2 and isinstance(o[1], dict) | |
data = self.db.read(self.uuid, self.metric, start, end, period, **kwargs) | |
return map(select,map(fold,filter(isValid, data))) | |
def first(self): | |
try: return self.db.lua("first", id=self.uuid, metric=self.metric, period=300)['whence'] | |
except KeyError: return 0 | |
except SnowthError as e: | |
if e.getStatusCode() == 404: return 0 # no data | |
else: raise e | |
def last(self): | |
try: return self.db.lua("last", id=self.uuid, metric=self.metric, period=300, caching=False)['whence'] | |
except KeyError: return 0 | |
except SnowthError as e: | |
if e.getStatusCode() == 404: return 0 # no data | |
else: raise e | |
def hasData(self): | |
return not self.db.lua("empty", id=self.uuid, metric=self.metric, period=300, caching=False) | |
def get(self, **kwargs): | |
"Reads and returns data" | |
# Old: return self._read(self.start, self.end) | |
# Now use iterator to handle skips correcly: | |
return list(self.__iter__()) | |
def setRange(self, start, end, step=1): | |
""" | |
set start/stop [-time] within range of self.start - self.end | |
Allows for human readable dates | |
""" | |
if not start is None: | |
self.start = min(self.end, max(self.start, toUTS(start))) | |
if not end is None: | |
self.end = min(self.end, max(self.start, toUTS(end))) | |
self.step = step | |
def index2time(self, i): | |
"converts index to time" | |
if i is None: | |
return None | |
elif i>=0: | |
return self.start + i * self.period | |
else: # i<0 | |
return self.end + i * self.period | |
def _parseSlice(self, s, default): | |
if s is None: | |
return default | |
elif isinstance(s, int): | |
# convert date index to timestamp | |
return self.index2time(s) | |
else: | |
# convert date "2014-11-12" to timestamp | |
return toUTS(s) | |
def __getitem__(self, i): | |
if isinstance(i, slice): | |
start = self._parseSlice(i.start, self.start) | |
end = self._parseSlice(i.stop, self.end) | |
step = i.step or 1 # returns 1 if i.step = None or 0 | |
out = self._clone() | |
out.setRange(start, end, step) | |
return out | |
elif isinstance(i, int): | |
# reusing slice has problems with [-1:-1+1=0] | |
# this works correcly also for negative indices | |
return self._read(self.index2time(i), self.index2time(i) + self.period)[0] | |
else: | |
raise TypeError("Unsupported type {} of {}".format(i, type(i))) | |
def _singleStepIter(self): | |
i = 0 | |
while True: | |
batch = self[i:i + self.batch_size]._read() | |
if len(batch) == 0: return | |
for o in batch: | |
yield o | |
i+=1 | |
def __iter__(self): | |
return islice(self._singleStepIter(), 0, None, self.step) | |
def select(self, key): | |
out = self._clone() | |
out.key = key | |
return out | |
KEYS = ['all', | |
u'count', u'counter_stddev', u'stddev', u'counter', u'derivative2_stddev', u'value', | |
u'derivative2', u'time', u'counter2_stddev', u'derivative_stddev', u'derivative', u'counter2'] | |
def __getattr__(self, key): | |
if not key in self.KEYS: raise KeyError("Illegal Attribute " + key) | |
if key == "all": return self.select(None) | |
return self.select(key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment