Created
February 8, 2012 15:33
-
-
Save bergundy/1770500 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import itertools | |
import logging | |
from apscheduler.triggers.cron import CronTrigger | |
from tornado.ioloop import IOLoop | |
from datetime import datetime | |
class CronCallback(object): | |
"""Schedules the given callback to be called periodically. | |
The callback is called according to the schedule argument. | |
`start` must be called after the CronCallback is created. | |
If schedule is a string it should contain 7 cron fields: ('second', 'minute', 'hour', 'day', 'month', 'year', 'day_of_week'). | |
If schedule is a dict it must contain at least one of the fields above. | |
>>> cron1 = CronCallback(lambda: logging.error('x'), dict(seconds = 1)) # logs 'x' every second | |
>>> cron2 = CronCallback(lambda: IOLoop.instance().stop(), '*/5 * * * * * *') # stops ioloop every 5 seconds | |
>>> cron1.start() | |
>>> cron2.start() | |
>>> IOLoop.instance().start() | |
""" | |
_split_re = re.compile("\s+") | |
_sched_seq = ('second', 'minute', 'hour', 'day', 'month', 'year', 'day_of_week') | |
def __init__(self, callback, schedule, io_loop=None): | |
if isinstance(schedule, basestring): | |
splitted = self._split_re.split(schedule) | |
if len(splitted) < 7: | |
raise TypeError("'schedule' argument pattern mismatch") | |
schedule = dict(itertools.izip(self._sched_seq, splitted)) | |
self.callback = callback | |
self._trigger = CronTrigger(**schedule) | |
self.io_loop = io_loop or IOLoop.instance() | |
self._running = False | |
self._timeout = None | |
def start(self): | |
"""Starts the timer.""" | |
self._running = True | |
self._schedule_next() | |
def stop(self): | |
"""Stops the timer.""" | |
self._running = False | |
if self._timeout is not None: | |
self.io_loop.remove_timeout(self._timeout) | |
self._timeout = None | |
def _run(self): | |
if not self._running: return | |
try: | |
self.callback() | |
except Exception: | |
logging.error("Error in cron callback", exc_info=True) | |
self._schedule_next() | |
def _schedule_next(self): | |
if self._running: | |
self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) | |
@property | |
def _next_timeout(self): | |
d = datetime.now() | |
return self._trigger.get_next_fire_time(d) - d | |
if __name__ == "__main__": | |
import doctest | |
doctest.testmod() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment