Created
November 28, 2013 10:55
-
-
Save oleiade/7690179 to your computer and use it in GitHub Desktop.
A ticker python implementation using a multiprocessing.Process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import multiprocessing | |
from datetime import datetime, timedelta | |
class Ticker(multiprocessing.Process): | |
""" | |
:param wake_interval: in seconds | |
:type wake_interval: float | |
:param default_interval: in seconds | |
:type default_interval: int | |
""" | |
def __init__(self, wake_interval=0.01, default_interval=100, *args, **kwargs): | |
self._manager = multiprocessing.Manager() | |
self._running = self._manager.Value(bool, False) | |
self._scheduled_ticks = multiprocessing.Queue() | |
self._descheduled_ticks = self._manager.list() | |
self.wake_interval = wake_interval | |
self.default_interval = default_interval | |
self.ticked = self._manager.list() | |
super(Ticker, self).__init__(*args, **kwargs) | |
def schedule(self, identifier, interval=None): | |
"""Schedules a tick at a provided interval | |
:param identifier: Unique identifier of the tick to | |
be set | |
:type identifier: str | |
:param interval: Interval for the tick to happen (seconds) | |
:type interval: int | |
""" | |
interval = interval or self.default_interval | |
heartbeat_at = datetime.now() + timedelta(seconds=interval) | |
schedule = (identifier, heartbeat_at, interval) | |
self._scheduled_ticks.put(schedule) | |
def deschedule(self, identifier): | |
"""Marks a tick event as descheduled | |
:param identifier: Unique identifier of the tick to | |
be set | |
:type identifier: str | |
""" | |
self._descheduled_ticks.append(identifier) | |
def run(self): | |
next_tick = None | |
self._running.value = True | |
while self._running.value is True: | |
now = datetime.now() | |
if next_tick is None: | |
# Pop the first tick event from the queue. And | |
# store it as the next_tick event. | |
if self._scheduled_ticks.empty() is False: | |
next_tick = self._scheduled_ticks.get_nowait() | |
else: | |
# If next tick is marked as descheduled, | |
# ignore it (set it to None) and pop it from | |
# the descheduled ticks list. | |
if next_tick[0] in self._descheduled_ticks: | |
self._descheduled_ticks.remove(next_tick[0]) | |
next_tick = None | |
if next_tick and now >= next_tick[1]: | |
# If tick event is not already present | |
# in ticked events list, then add it | |
if next_tick[0] not in self.ticked: | |
self.ticked.append(next_tick[0]) | |
# Reschedule the tick event at interval in the future, | |
# and set next_tick to None for the ticks queue to be popped | |
new_heartbeat_at = now + timedelta(seconds=next_tick[2]) | |
self._scheduled_ticks.put((next_tick[0], new_heartbeat_at, next_tick[2])) | |
next_tick = None | |
time.sleep(self.wake_interval) | |
def stop(self): | |
self._running.value = False | |
self.join() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment