Skip to content

Instantly share code, notes, and snippets.

@brolewis
Last active March 6, 2022 22:53

Revisions

  1. Lewis Franklin revised this gist Oct 14, 2013. 1 changed file with 10 additions and 3 deletions.
    13 changes: 10 additions & 3 deletions schedulers.py
    Original file line number Diff line number Diff line change
    @@ -102,8 +102,11 @@ def __init__(self, *args, **kwargs):
    self.client = kazoo.client.KazooClient(hosts=self.hosts)
    self.client.start()
    identifier = socket.gethostname()
    self.lock = self.client.Lock('celery_schedule/lock', identifier)
    self.lock = self.client.Lock('/celery_schedule/lock', identifier)
    self._has_lock = self.lock.acquire(blocking=False)
    if not self._has_lock:
    self.original_interval = self.max_interval
    self.max_interval = 20
    super(ZookeeperScheduler, self).__init__(*args, **kwargs)

    @property
    @@ -112,7 +115,11 @@ def lock_status(self):
    return OLD_LOCK
    else:
    self._has_lock = self.lock.acquire(blocking=False)
    return NEW_LOCK if self._has_lock else NO_LOCK
    if self._has_lock:
    self.max_interval = self.original_interval
    return NEW_LOCK
    else:
    return NO_LOCK

    def _catch_up(self):
    '''Cycle through missed scheduled events and run them to catch up due
    @@ -171,7 +178,7 @@ def tick(self):
    else:
    ## If we don't control the lock, sleep until the next interval and
    ## see if we get the lock.
    logger.info('Lock not held. Sleeping max interval.')
    logger.debug('Lock not held. Sleeping max interval.')
    return self.max_interval

    def setup_schedule(self):
  2. Lewis Franklin revised this gist Jun 27, 2013. 1 changed file with 16 additions and 8 deletions.
    24 changes: 16 additions & 8 deletions schedulers.py
    Original file line number Diff line number Diff line change
    @@ -18,17 +18,22 @@

    class ZookeeperNow(object):
    '''Custom class for getting the catch-up time.'''
    now_node = '/celery_schedule/now'

    def __init__(self, hosts):
    self.client = kazoo.client.KazooClient(hosts=hosts)
    self.client.start()
    self.hosts = hosts

    def now(self):
    '''Get the catch-up time or current time'''
    try:
    return cPickle.loads(self.client.get('/celery_schedule/now')[0])
    except kazoo.exceptions.KazooException:
    return celery.current_app._get_current_object().now()
    client = kazoo.client.KazooClient(hosts=self.hosts)
    client.start(timeout=1)
    if client.exists(self.now_node):
    ret = cPickle.loads(client.get(self.now_node)[0])
    else:
    ret = celery.current_app._get_current_object().now()
    client.stop()
    client.close()
    return ret


    class ZookeeperScheduleEntry(celery.beat.ScheduleEntry):
    @@ -42,7 +47,6 @@ def __init__(self, hosts, **kwargs):
    zk_now = ZookeeperNow(hosts)
    self.schedule.nowfun = zk_now.now
    self.client = kazoo.client.KazooClient(hosts=hosts)
    self.client.start()
    self.update_zookeeper()

    def __iter__(self):
    @@ -51,6 +55,7 @@ def __iter__(self):
    def delete(self):
    '''Delete a node from the schedule entries.'''
    node = '/celery_schedule/entries/%s' % self.name
    self.client.start()
    if self.client.exists(node):
    self.client.delete(node, recursive=True)
    self.client.stop()
    @@ -68,6 +73,7 @@ def update(self, other):
    def update_zookeeper(self):
    '''Add relevant values to Zookeeper'''
    node = '/celery_schedule/entries/%s' % self.name
    self.client.start()
    if self.client.exists(node):
    stored_values = cPickle.loads(self.client.get(node)[0])
    for key in self.editable_fields:
    @@ -81,6 +87,8 @@ def update_zookeeper(self):
    if key in self.editable_fields + self.volatile_fields:
    stored_values[key] = value
    self.client.create(node, cPickle.dumps(stored_values))
    self.client.stop()
    self.client.close()


    class ZookeeperScheduler(celery.beat.Scheduler):
    @@ -215,4 +223,4 @@ def close(self):
    try:
    self.client.close()
    except:
    pass
    pass
  3. Lewis Franklin created this gist Jun 17, 2013.
    218 changes: 218 additions & 0 deletions schedulers.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,218 @@
    '''Zookeeper-based Scheduler'''
    ## Standard Library
    import cPickle # Store dictionary in ZooKeeper
    import datetime # Time delta
    import socket # Hostname
    ## Third Party
    import celery # Current app
    import celery.beat # Scheduler
    import celery.utils.log # Get logger
    import kazoo.client # ZooKeeper Client Library
    import kazoo.exceptions # Base exceptions

    logger = celery.utils.log.get_logger(__name__)
    NO_LOCK = 0
    OLD_LOCK = 1
    NEW_LOCK = 2


    class ZookeeperNow(object):
    '''Custom class for getting the catch-up time.'''

    def __init__(self, hosts):
    self.client = kazoo.client.KazooClient(hosts=hosts)
    self.client.start()

    def now(self):
    '''Get the catch-up time or current time'''
    try:
    return cPickle.loads(self.client.get('/celery_schedule/now')[0])
    except kazoo.exceptions.KazooException:
    return celery.current_app._get_current_object().now()


    class ZookeeperScheduleEntry(celery.beat.ScheduleEntry):
    '''Zookeeper-based Entry class'''
    editable_fields = ('task', 'args', 'kwargs', 'options')
    volatile_fields = ('last_run_at', 'total_run_count')

    def __init__(self, hosts, **kwargs):
    super(ZookeeperScheduleEntry, self).__init__(**kwargs)
    self.hosts = hosts
    zk_now = ZookeeperNow(hosts)
    self.schedule.nowfun = zk_now.now
    self.client = kazoo.client.KazooClient(hosts=hosts)
    self.client.start()
    self.update_zookeeper()

    def __iter__(self):
    return (x for x in vars(self).items() if x[0] != 'client')

    def delete(self):
    '''Delete a node from the schedule entries.'''
    node = '/celery_schedule/entries/%s' % self.name
    if self.client.exists(node):
    self.client.delete(node, recursive=True)
    self.client.stop()
    self.client.close()

    def update(self, other):
    if isinstance(other, dict):
    self.__dict__.update(other)
    else:
    self.__dict__.update({'task': other.task, 'options': other.options,
    'args': other.args, 'kwargs': other.kwargs,
    'schedule': other.schedule})
    self.update_zookeeper()

    def update_zookeeper(self):
    '''Add relevant values to Zookeeper'''
    node = '/celery_schedule/entries/%s' % self.name
    if self.client.exists(node):
    stored_values = cPickle.loads(self.client.get(node)[0])
    for key in self.editable_fields:
    stored_values[key] = getattr(self, key)
    for key in (x for x in self.volatile_fields if getattr(self, x)):
    stored_values[key] = getattr(self, key)
    self.client.set(node, cPickle.dumps(stored_values))
    else:
    stored_values = {}
    for key, value in vars(self).iteritems():
    if key in self.editable_fields + self.volatile_fields:
    stored_values[key] = value
    self.client.create(node, cPickle.dumps(stored_values))


    class ZookeeperScheduler(celery.beat.Scheduler):
    '''Zookeeper-based Scheduler class'''
    Entry = ZookeeperScheduleEntry
    now_node = '/celery_schedule/now'

    def __init__(self, *args, **kwargs):
    if not kwargs['lazy']:
    self.hosts = kwargs['app'].conf.ZOOKEEPER_HOSTS
    self.client = kazoo.client.KazooClient(hosts=self.hosts)
    self.client.start()
    identifier = socket.gethostname()
    self.lock = self.client.Lock('celery_schedule/lock', identifier)
    self._has_lock = self.lock.acquire(blocking=False)
    super(ZookeeperScheduler, self).__init__(*args, **kwargs)

    @property
    def lock_status(self):
    if self._has_lock:
    return OLD_LOCK
    else:
    self._has_lock = self.lock.acquire(blocking=False)
    return NEW_LOCK if self._has_lock else NO_LOCK

    def _catch_up(self):
    '''Cycle through missed scheduled events and run them to catch up due
    to new lockholder.'''
    schedule = self.schedule
    now = celery.current_app._get_current_object().now()
    interval_ts = now - datetime.timedelta(self.max_interval)
    ## Find the last run, going to further back that the max interval. This
    ## check catches us up, but doesn't try to go too far back and possibly
    ## get down time as well as just a dead leader.
    last_runs = [x.last_run_at for x in schedule.itervalues()]
    last_run = max(last_runs + [interval_ts])
    self.client.create(self.now_node, cPickle.dumps(last_run))
    intervals = []
    for entry in schedule.itervalues():
    interval = entry.is_due()[1]
    if interval:
    intervals.append(interval)
    interval = min(intervals + [self.max_interval])
    ## If the time difference between the latest run and the interval
    ## doesn't catch up to current time, the scheduler needs to play
    ## catch up before running normally.
    run_time = last_run + datetime.timedelta(interval)
    now = celery.current_app._get_current_object().now()
    while run_time < now:
    self.client.set(self.now_node, cPickle.dumps(run_time))
    intervals = []
    try:
    for entry in schedule.itervalues():
    interval = self.maybe_due(entry, self.publisher)
    if interval:
    intervals.append(interval)
    except RuntimeError:
    pass
    interval = min(intervals + [self.max_interval])
    run_time += datetime.timedelta(interval)
    now = celery.current_app._get_current_object().now()
    self.client.delete(self.now_node)

    def tick(self):
    lock_status = self.lock_status
    if lock_status:
    if lock_status == NEW_LOCK:
    ## Since the system could have been down, we need to play catch
    ## up since the last run.
    self._catch_up()
    remaining_times = []
    try:
    for entry in self.schedule.itervalues():
    next_time_to_run = self.maybe_due(entry, self.publisher)
    if next_time_to_run:
    remaining_times.append(next_time_to_run)
    except RuntimeError:
    pass
    return min(remaining_times + [self.max_interval])
    else:
    ## If we don't control the lock, sleep until the next interval and
    ## see if we get the lock.
    logger.info('Lock not held. Sleeping max interval.')
    return self.max_interval

    def setup_schedule(self):
    '''Setup the schedule and store in Zookeeper.'''
    self.update_schedule(self.app.conf.CELERYBEAT_SCHEDULE)
    self.install_default_entries(self.schedule)

    def update_schedule(self, b):
    '''Update the schedule, adding new and removing old entries.'''
    schedule = self.schedule
    A, B = set(schedule), set(b)

    # Remove items from disk not in the schedule anymore.
    for key in A ^ B:
    entry = schedule.pop(key, None)
    if entry:
    entry.delete()

    # Update and add new items in the schedule
    for key in B:
    entry = self.Entry(hosts=self.hosts, **dict(b[key], name=key))
    if schedule.get(key):
    schedule[key].update(entry)
    else:
    schedule[key] = entry

    def _maybe_entry(self, name, entry):
    if isinstance(entry, self.Entry):
    return entry
    return self.Entry(hosts=self.hosts, **dict(entry, name=name))

    @property
    def schedule(self):
    '''Get the schedule entries, updating from Zookeeper.'''
    entries_node = '/celery_schedule/entries'
    self.client.ensure_path(entries_node)
    for entry in self.data:
    entry_node = '{}/{}'.format(entries_node, entry)
    data = cPickle.loads(self.client.get(entry_node)[0])
    self.data[entry].update(data)
    return self.data

    def close(self):
    self.lock.release()
    try:
    self.client.stop()
    except:
    pass
    try:
    self.client.close()
    except:
    pass