Last active
September 2, 2019 20:12
-
-
Save dlashua/bfa9277cb49c295d2ab8e34b40049f6f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gm_dlashua_statewatcher as statewatcher | |
import appdaemon.plugins.hass.hassapi as hass | |
AllState = statewatcher.AllState | |
class App(hass.Hass): | |
def initalize(self): | |
self.register_dependency(statewatcher) | |
self.condition = AllState(self, selg.args['condition'], callback=self.statewatcher_cb) | |
def statewatcher_cb(self, kwargs): | |
state = self.condition.get_state() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import voluptuous as vol | |
def CoerceToDict(key): | |
def Coerce(x): | |
return {key: x} | |
return Coerce | |
def CoerceToNamedDict(key, value): | |
def Coerce(x): | |
if not isinstance(x, dict): | |
raise vol.error.CoerceInvalid('value is not a dict') | |
if len(x) != 1: | |
raise vol.error.CoerceInvalid( | |
'dict has more than 1 key') | |
for item in x: | |
ret = {key: item, value: x[item]} | |
return ret | |
return Coerce | |
class AllState(object): | |
__expose__ = [ | |
'_config', | |
'_state'] | |
@classmethod | |
def config(class_, app, *args, **kwargs): | |
def inner_validator(x): | |
return class_(app, x, *args, **kwargs) | |
return inner_validator | |
def __init__( | |
self, app, config, callback=None, callback_args={} | |
): | |
self._app = app | |
self._ready = False | |
self._app.log(f"CONFIG {config}", level="DEBUG") | |
state_value_schema = vol.Schema( | |
StateValue.config(self._app, callback=self.state_callback) | |
) | |
any_schema = vol.Schema(vol.All( | |
{ | |
vol.Required('any'): | |
AnyState.config(self._app, callback=self.state_callback) | |
}, | |
lambda x: x['any'] | |
)) | |
schema = vol.Schema([ | |
vol.Any(any_schema, state_value_schema) | |
]) | |
try: | |
self._config = schema(config) | |
except vol.MultipleInvalid as e: | |
raise vol.Invalid(f'{e} From AllState') | |
self._callback = callback | |
self._callback_args = callback_args | |
self._state = None | |
self._ready = True | |
self._app.log(f"Initialized {self._config}", level="DEBUG") | |
self.update_state() | |
def get_state(self): | |
return self._state | |
def update_state(self): | |
if not self._ready: | |
return | |
if len(self._config) > 0: | |
new_state = True | |
else: | |
self._app.log( | |
"There are no items to loop through, so state will be FALSE", | |
level="DEBUG") | |
new_state = False | |
self._app.log(f"LOOPING {self._config}", level="DEBUG") | |
for item in self._config: | |
if item.get_state() is False: | |
self._app.log( | |
f"ITEM is FALSE so STATE is FALSE {item}", level="DEBUG") | |
new_state = False | |
break | |
else: | |
self._app.log( | |
f"ITEM is TRUE so STATE is still UNDETERMINED {item}", | |
level="DEBUG") | |
self._app.log( | |
f"NEW STATE {new_state}, STATE {self._state}", level="DEBUG") | |
if self._state != new_state: | |
self._state = new_state | |
if self._callback is not None: | |
self._callback( | |
value=new_state, | |
**self._callback_args) | |
def state_callback( | |
self, | |
**kwargs | |
): | |
self._app.log("STATE CALLBACK {}".format( | |
kwargs | |
), level="DEBUG") | |
self.update_state() | |
class AnyState(object): | |
__expose__ = [ | |
'_config', | |
'_state'] | |
@classmethod | |
def config(class_, app, *args, **kwargs): | |
def inner_validator(x): | |
return class_(app, x, *args, **kwargs) | |
return inner_validator | |
def __init__( | |
self, app, config, callback=None, callback_args={} | |
): | |
self._app = app | |
self._ready = False | |
self._app.log(f"CONFIG {config}", level="DEBUG") | |
state_value_schema = vol.Schema( | |
StateValue.config(self._app, callback=self.state_callback) | |
) | |
all_schema = vol.Schema(vol.All( | |
{ | |
vol.Required('all'): | |
AllState.config(self._app, callback=self.state_callback) | |
}, | |
lambda x: x['all'] | |
)) | |
schema = vol.Schema([ | |
vol.Any(all_schema, state_value_schema) | |
]) | |
self._config = schema(config) | |
if callback is not None: | |
self.on(callback, callback_args) | |
self._state = None | |
self._ready = True | |
self._app.log(f"Initialized {self._config}", level="DEBUG") | |
self.update_state() | |
def on(self, cb, cb_args={}): | |
self._callback = cb | |
self._callback_args = cb_args | |
def get_state(self): | |
return self._state | |
def update_state(self): | |
if not self._ready: | |
self._app.log('NOT READY', level="DEBUG") | |
return | |
new_state = False | |
self._app.log(f"LOOPING {self._config}", level="DEBUG") | |
for item in self._config: | |
if item.get_state() is True: | |
self._app.log( | |
f"ITEM is TRUE so STATE is TRUE {item}", level="DEBUG") | |
new_state = True | |
break | |
else: | |
self._app.log( | |
f"ITEM is FALSE so STATE is still UNDETERMINED {item}", | |
level="DEBUG") | |
if not new_state: | |
self._app.log( | |
"NO ITEMS were TRUE so STATE is FALSE", level="DEBUG") | |
self._app.log( | |
f"NEW STATE {new_state} STATE {self._state}", level="DEBUG") | |
if self._state != new_state: | |
self._state = new_state | |
if self._callback is not None: | |
self._callback( | |
value=new_state, | |
**self._callback_args) | |
def state_callback( | |
self, | |
**kwargs | |
): | |
self._app.log("STATE CALLBACK {}".format( | |
kwargs | |
), level="DEBUG") | |
self.update_state() | |
class StateValue(object): | |
__expose__ = [ | |
'_config', | |
'_state'] | |
@classmethod | |
def config(class_, app, *args, **kwargs): | |
def inner_validator(x): | |
return class_(app, x, *args, **kwargs) | |
return inner_validator | |
def __init__( | |
self, app, config, callback=None, callback_args={} | |
): | |
self._app = app | |
state_schema = vol.Schema({ | |
vol.Required('entity_id'): str, | |
vol.Optional("operator", default="=="): str, | |
vol.Optional("value", default="on"): | |
vol.Any(str, int, float), | |
}) | |
schema = vol.Schema(vol.Any( | |
state_schema, | |
vol.All( | |
self.parse_value, | |
state_schema), | |
vol.All( | |
str, | |
CoerceToDict('entity_id'), | |
state_schema), | |
vol.All( | |
dict, | |
CoerceToNamedDict('entity_id', 'parse_value'), | |
self.parse_value, | |
state_schema), | |
)) | |
try: | |
self._config = schema(config) | |
except vol.MultipleInvalid as e: | |
raise vol.Invalid(f' {e} From StateValue') | |
if callback is not None: | |
self._callback = callback | |
self._callback_args = callback_args | |
self._state = None | |
self._app.listen_state( | |
self.state_callback, | |
entity=self._config['entity_id'], | |
attribute="state") | |
self.update_state() | |
def get_state(self): | |
return self._state | |
def parse_value(self, config): | |
self._app.log("PARSING {}".format(config), level="DEBUG") | |
if not isinstance(config, dict): | |
return config | |
if "parse_value" in config: | |
if type(config['parse_value']) is not str: | |
return config | |
if config.get('parse_value', None) is not None: | |
pieces = config.get('parse_value').split(" ", 1) | |
if len(pieces) == 1: | |
config["operator"] = "==" | |
config["value"] = pieces[0] | |
else: | |
config["operator"] = pieces[0] | |
config["value"] = pieces[1] | |
del config["parse_value"] | |
self._app.log("PARSED AS {}".format(config), level="DEBUG") | |
else: | |
self._app.log( | |
"parse_value NOT SEEN IN {}".format(config), level="DEBUG") | |
return config | |
def update_state(self): | |
operator = self._config['operator'] | |
entity = self._config['entity_id'] | |
value = self._config['value'] | |
try: | |
if operator == "==": | |
ret = self._app.get_state(entity) == value | |
elif operator == ">=": | |
ret = float(self._app.get_state(entity)) >= float(value) | |
elif operator == ">": | |
ret = float(self._app.get_state(entity)) > float(value) | |
elif operator == "<=": | |
ret = float(self._app.get_state(entity)) <= float(value) | |
elif operator == "<": | |
ret = float(self._app.get_state(entity)) < float(value) | |
elif operator == "!=": | |
ret = self._app.get_state(entity) != value | |
else: | |
self._app.log( | |
"UNKNOWN OPERATOR {} for ENTITY {} with VALUE {}".format( | |
operator, entity, value), level="ERROR") | |
return False | |
except ValueError as e: | |
self._app.log('ValueError: {}'.format(e), level="WARNING") | |
return False | |
self._app.log("TESTED ENTITY {}, OPERATOR {}, VALUE {} IS {}".format( | |
entity, | |
operator, | |
value, | |
ret | |
), level="DEBUG") | |
self._state = ret | |
def state_callback( | |
self, | |
entity=None, | |
attribute=None, | |
old=None, | |
new=None, | |
kwargs={} | |
): | |
self._app.log("STATE CALLBACK {} {}".format( | |
entity, | |
kwargs), level="DEBUG") | |
self.update_state() | |
if self._callback is not None: | |
data = { | |
"entity": entity, | |
"attribute": attribute, | |
"old": old, | |
"new": new, | |
"value": self._state | |
} | |
kwargs.update(data) | |
kwargs.update(self._callback_args) | |
self._callback(**kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment