Skip to content

Instantly share code, notes, and snippets.

@dlashua
Last active September 20, 2019 12:25
Show Gist options
  • Save dlashua/5aed28d3f39bfeb909459574ab6adf51 to your computer and use it in GitHub Desktop.
Save dlashua/5aed28d3f39bfeb909459574ab6adf51 to your computer and use it in GitHub Desktop.
AD Condition Engine
import appdaemon.plugins.hass.hassapi as hass
import conditions
class CTest(hass.Hass):
def initialize(self):
self.depends_on_module(conditions)
self._conditions = {}
# put in adapi.__init__
self.register_condition('all', conditions.AllCondition)
self.register_condition('any', conditions.AnyCondition)
self.register_condition('state', conditions.StateCondition)
self.register_condition('time', conditions.TimeCondition)
self.register_condition('days', conditions.DaysCondition)
self.register_constraint('condition_all')
self.register_constraint('condition_any')
self.register_constraint('condition')
# test code
self.register_constraint('constrain_always_true')
self.register_constraint('always_false')
for thing in self.args['tests_check']:
cond = self.get_condition('all')(self, thing['list'])
ret = cond.check()
self.log("CHECK {} is {}".format(thing['name'], ret))
for thing in self.args['tests_listen']:
cond = self.get_condition('all')(self, thing['list'])
cond.listen(
self.cond_callback,
immediate=True,
myname=thing['name'])
# test methods
def cond_callback(self, value, kwargs):
self.log("COND CB {} {}".format(
value,
kwargs))
def constrain_always_true(self, value):
return True
def always_false(self, value):
return False
# conditions for adapi
#
def register_condition(self, name, cls):
self._conditions[name] = cls
def get_condition(self, name):
try:
return self._conditions[name]
except KeyError:
raise KeyError("{} is not a registered condition".format(name))
# put in adapi
#
def condition(self, v_dict):
if not isinstance(v_dict, dict):
return None
if len(v_dict) != 1:
return None
if "any" in v_dict:
return self.constrain_any(self, v_dict['any'])
if "all" in v_dict:
return self.constrain_all(self, v_dict['all'])
return None
def condition_all(self, v_list):
cond = self.get_condition('all')
cond(self, v_list)
return cond.check()
def condition_any(self, v_list):
cond = self.get_condition('any')
cond(self, v_list)
return cond.check()
global_modules:
- conditions
ctest:
module: app
class: CTest
tests_check:
- name: always true
list:
- always_true: 123
- name: always false
list:
- always_false: 123
tests_listen:
- name: thursday
list:
- days: thu
- name: friday
list:
- days: fri
- name: wednesday
list:
- days: wed
- name: strings
list:
- state: input_boolean.test_1 == on
- state: default://input_boolean.test_2#state == on
- state:
entity_id: input_boolean.test_3
operator: ==
value: "on"
- name: broken constraint listen
list:
- always_true: 123
from abc import ABC, abstractmethod
import appdaemon.utils as utils
import datetime
SEP_NAMESPACE = "://"
SEP_ATTRIBUTE = "#"
DEFAULT_OPERATOR = "=="
DEFAULT_VALUE = "on"
DEFAULT_ATTRIBUTE = "state"
class AbstractCondition(ABC):
# overwrite these methods
@abstractmethod
def update(self):
"""Calculate the Condition Result
Returns: boolean representing the condition evaluation
"""
pass
@abstractmethod
def setup_listener(self):
"""Setup Mechanism to Listen for changes
This should establish a mechanism that either updates
self.state with the boolean value of the condition
or calls self.check() which will perform self.update()
Returns: None
"""
pass
@abstractmethod
def destroy_listener(self):
"""Destroys resouces created by setup_listener()
Returns: None
"""
pass
def initilaize(self):
"""Initalize any variables and parse configuration.
Returns: None
"""
pass
# leave these methods
def __init__(self, app, config):
self.app = app
self.listener = None
self.__state = None
self.config = config
self.initialize()
@property
def state(self):
"""The current boolean state of the condition"""
return self.__state
@state.setter
def state(self, new_state):
if new_state != self.__state:
self.__state = new_state
self._fire_callback(new_state)
def check(self):
"""Check the Condition and Return the result
Returns: boolean
"""
self.state = self.update()
return self.state
def listen(self, cb, immediate=False, **kwargs):
"""Fire a Callback on change in Condition State
Args:
cb: the callback to be fired. it should have a signature
of fn(value, kwargs)
KWArgs:
immediate: if True, the callback will fire immediately with the
state of the condition
**: any additional desired keywords to be sent to the
callback
Returns: a function that, when called, will cancel the listener
"""
self.listener = {
"cb": cb,
"kwargs": kwargs}
self.setup_listener()
if immediate:
self.check()
return self.cancel
def cancel(self):
"""Cancels the established listener and destorys any resources.
Returns: None
"""
self.destory_listener()
self.listener = None
def _fire_callback(self, new_state):
if self.listener is not None:
self.listener['cb'](new_state, self.listener['kwargs'])
class TimeCondition(AbstractCondition):
def initialize(self):
self.cancel_handles = []
self.times = self.parse_config()
def update(self):
return self.app.now_is_between(
self.times['start'],
self.times['end'])
def setup_listener(self):
handle = self.app.run_daily(self.timer_callback, self.times['start'])
self.cancel_handles.append(handle)
end_time = (
datetime.datetime.strptime(self.times['end'], "%H:%M:%S")
+ datetime.timedelta(seconds=1))
handle = self.app.run_daily(self.timer_callback, end_time.time())
self.cancel_handles.append(handle)
def destroy_listener(self):
while self.cancel_handles:
cancel_handle = self.cancel_handles.pop()
self.app.cancel_timer(cancel_handle)
# helping methods
def parse_config(self):
v = self.config
if not isinstance(v, dict):
raise ValueError('time condition must be a dict')
if len(v) < 1 or len(v) > 2:
raise ValueError(
'time condition must contain only start and end keys')
for key in v.keys():
if key not in ['start', 'end']:
raise ValueError(
'{} is not a valid key for time condition'.format(key))
start_time = v.get('start', '00:00:00')
end_time = v.get('end', '23:59:59')
return {
"start": start_time,
"end": end_time
}
def timer_callback(self, kwargs):
self.check()
class DaysCondition(AbstractCondition):
# required methods
def initialize(self):
self.cancel_handles = []
self.daylist = self.parse_config()
def update(self):
now = self.app.datetime()
if now.weekday() not in self.daylist:
return False
else:
return True
def setup_listener(self):
handle = self.app.run_daily(self.timer_callback, "00:00:01")
self.cancel_handles.append(handle)
def destroy_listener(self):
while self.cancel_handles:
cancel_handle = self.cancel_handles.pop()
self.app.cancel_timer(cancel_handle)
# helping methods
def timer_callback(self, kwargs):
self.check()
def parse_config(self):
ret = []
for day in self.config.split(","):
ret.append(utils.day_of_week(day))
return ret
class ConstraintCondition(AbstractCondition):
# required methods
def initialize(self):
pass
def setup_listener(self):
raise Exception(
"A Constraint, '{}', cannot be listened to".format(
self.config['name']
))
def destroy_listener(self):
raise Exception(
"A Constraint, '{}', cannot be cancelled".format(
self.config['name']
))
def update(self):
fn = getattr(self.app, self.config['name'])
return fn(self.config['value'])
class AbstractLogicalCondition(AbstractCondition):
# required methods
def initialize(self):
self.cancel_handles = []
self.conditions = self.parse_config()
def setup_listener(self):
for c in self.conditions:
handle = c.listen(self.condition_callback)
self.cancel_handles.append(handle)
def destroy_listener(self):
while self.cancel_handles:
canceller = self.cancel_handles.pop()
canceller()
# helping methods
def condition_callback(self, state, kwargs):
self.check()
def parse_config(self):
ret = []
c = self.config
if not isinstance(c, list):
raise ValueError('conditions must be a list of dicts')
for c_one in c:
if not isinstance(c_one, dict):
raise ValueError('each condition must be a dict')
if len(c_one) != 1:
raise ValueError('each dict must contain only one key')
condition_name = list(c_one.keys())[0]
condition_parameter = c_one[condition_name]
try:
cond_class = self.app.get_condition(condition_name)
except KeyError:
alt_name = "constrain_" + condition_name
if condition_name in self.app.list_constraints():
constraint_name = condition_name
elif alt_name in self.app.list_constraints():
constraint_name = alt_name
else:
raise
cond_class = ConstraintCondition
condition_parameter = {
"name": constraint_name,
"value": condition_parameter
}
ret.append(cond_class(
self.app,
condition_parameter))
return ret
class AnyCondition(AbstractLogicalCondition):
def update(self):
ret = False
for c in self.conditions:
ret = c.check()
if ret is None:
return None
if ret is True:
return True
return ret
class AllCondition(AbstractLogicalCondition):
def update(self):
if len(self.conditions) == 0:
ret = False
else:
ret = True
for c in self.conditions:
ret = c.check()
if ret is None:
return None
if ret is False:
return False
return ret
class StateCondition(AbstractCondition):
# required methods
def initialize(self):
pc = self.parse_condition()
self.entity = pc['entity_id']
self.operator = pc['operator']
self.value = pc['value']
self.handle = False
def update(self):
entity_value = self.app.get_state(
self.entity['entity_id'],
namespace=self.entity['namespace'],
attribute=self.entity['attribute'])
return self.compare_value(
entity_value, self.operator, self.value)
def setup_listener(self):
self.handle = self.app.listen_state(
self.state_callback,
self.entity['entity_id'],
namespace=self.entity['namespace'],
attribute=self.entity['attribute'])
def destroy_listener(self):
self.app.cancel_listen_state(self.handle)
self.handle = None
# helping methods
def state_callback(self, entity, attribute, new, old, kwargs):
self.check()
def compare_value(self, state, operator, value):
try:
if operator == "==":
ret = state == value
elif operator == ">=":
ret = float(state) >= float(value)
elif operator == ">":
ret = float(state) > float(value)
elif operator == "<=":
ret = float(state) <= float(value)
elif operator == "<":
ret = float(state) < float(value)
elif operator == "!=":
ret = state != value
else:
return None
except ValueError:
return None
except TypeError:
return None
return ret
def parse_condition(self):
c = self.config
if isinstance(c, str):
c_dict = self.parse_condition_string(c)
elif isinstance(c, dict):
c_dict = c
else:
raise ValueError('condition must be str or dict')
if isinstance(c_dict['entity_id'], str):
c_dict['entity_id'] = self.parse_entity_id_string(
c_dict['entity_id'])
elif isinstance(c_dict['entity_id'], dict):
pass
else:
raise ValueError('entity_id must be str or dict')
self.validate_condition(c_dict)
return c_dict
def parse_condition_string(self, c_str):
r = {}
pieces = c_str.split(" ", 2)
if len(pieces) == 3:
r['entity_id'] = pieces[0]
r['operator'] = pieces[1]
r['value'] = pieces[2]
elif len(pieces) == 2:
r['entity_id'] = pieces[0]
r['operator'] = DEFAULT_OPERATOR
r['value'] = pieces[1]
else:
r['entity_id'] = pieces[0]
r['operator'] = DEFAULT_OPERATOR
r['value'] = DEFAULT_VALUE
return r
def parse_entity_id_string(self, e_str):
r = {
"namespace": self.app._namespace,
"entity_id": None,
"attribute": DEFAULT_ATTRIBUTE,
}
try:
index = e_str.index(SEP_NAMESPACE, 0)
r['namespace'] = e_str[0:index]
e_str = e_str[(index+len(SEP_NAMESPACE)):]
except ValueError:
pass
try:
index = e_str.index(SEP_ATTRIBUTE, 0)
r['attribute'] = e_str[(index+len(SEP_ATTRIBUTE)):]
e_str = e_str[0:index]
except ValueError:
pass
r['entity_id'] = e_str
return r
def validate_condition(self, c_dict):
if not isinstance(c_dict, dict):
raise ValueError('condition must be dict')
for key in ['entity_id', 'operator', 'value']:
if key not in c_dict:
raise ValueError('dict is missing key: {}'.format(key))
if len(c_dict) != 3:
raise ValueError(
'extra keys are present in dict')
e_dict = c_dict['entity_id']
if not isinstance(e_dict, dict):
raise ValueError('entity_id must be dict')
for key in ['entity_id', 'namespace', 'attribute']:
if key not in e_dict:
raise ValueError(
'entity_id dict is missing key: {}'.format(key))
if len(e_dict) != 3:
raise ValueError(
'extra keys are present in entity_id dict')
self.app._check_entity(e_dict['namespace'], e_dict['entity_id'])
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment