Created
May 18, 2020 11:13
-
-
Save P403n1x87/d5d74c7a2482470e659f46fe455759b7 to your computer and use it in GitHub Desktop.
Wake Control Signal. A timer with extensible timeout that performs actions at the rising and falling edges. The timeout is extended whenever the wake method is called while the signal is hot.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread, Event | |
class WakeSignal(Thread): | |
def __init__(self, interval, rising=None, falling=None): | |
super().__init__() | |
self._interval = interval | |
self._stopped = False | |
self._awake = False | |
self._wake = Event() | |
self._on_rise = rising or self.on_rise | |
self._on_fall = falling or self.on_falling | |
def on_rise(self): | |
pass | |
def on_fall(self): | |
pass | |
def stop(self): | |
self._stopped = True | |
self.wake() | |
def wake(self): | |
self._wake.set() | |
def run(self): | |
while True: | |
self._wake.wait() | |
if self._stopped: | |
return | |
if not self._awake: | |
self._on_rise() | |
self._awake = True | |
self._wake.clear() | |
if not self._wake.wait(self._interval): | |
self._on_fall() | |
self._awake = False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment