Created
June 22, 2017 13:36
-
-
Save linar-jether/19a6598a2334e4e8eec2e920e9bccb8c to your computer and use it in GitHub Desktop.
Celery task monitor, logs task state to MongoDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
import threading | |
from Queue import Queue | |
import time | |
from bson import InvalidDocument | |
from celery.utils.log import get_task_logger | |
logger = get_task_logger(__name__) | |
from jether_cloud import task_queue | |
from celery.result import AsyncResult | |
from JERDB import JERDB | |
import arrow | |
import pandas as pd | |
# Celery Task monitor, stores all received task states to a capped mongodb collection | |
class MonitorThread(object): | |
def __init__(self, celery_app, interval=1): | |
self.celery_app = celery_app | |
self.interval = interval | |
# Change max_tasks_in_memory to handle long running tasks (Allows to correlate events by task id), | |
# if the task state has been evicted from memory, new task events (for the same task-id) will only include the id and event specific info | |
self.state = self.celery_app.events.State(max_tasks_in_memory=50000) | |
self.queue = Queue() | |
# Insert log events in the background, to | |
self.thread = threading.Thread(target=self.insert_records, args=()) | |
self.thread.daemon = True | |
self.thread.start() | |
self.db = JERDB.db('JER_DB') | |
def catchall(self, event): | |
if event['type'] != 'worker-heartbeat': | |
if 'uuid' in event: | |
self.state.event(event) | |
update_event = self.state.tasks[event['uuid']].as_dict() | |
if '...' in str(update_event.get('result', '')) and AsyncResult(event['uuid']).ready(): | |
update_event['result'] = AsyncResult(event['uuid']).get(10, propagate=False) | |
pass | |
elif update_event.get('exception', '') and AsyncResult(event['uuid']).ready(): | |
try: | |
update_event['exception_binary'] = pickle.dumps(AsyncResult(event['uuid']).get(1, propagate=False)) | |
except Exception, e: | |
# Timeout while getting exception_binary | |
import traceback | |
traceback.print_exc() | |
logger.exception("Timeout while getting exception_binary") | |
pass | |
update_event['worker'] = update_event['worker'].hostname | |
update_event = {key: val for key, val in update_event.iteritems() if isinstance(val, pd.core.base.PandasObject) or val} | |
for time_field in ['received', 'timestamp', 'started', 'succeeded']: | |
if update_event.get(time_field): | |
update_event[time_field] = arrow.get(update_event[time_field]).datetime | |
#print update_event | |
self.queue.put(update_event) | |
# logic here | |
def insert_records(self): | |
while True: | |
try: | |
try: | |
bulk = [self.queue.get(),] | |
while not self.queue.empty(): | |
bulk.append(self.queue.get_nowait()) | |
logger.info('inserting %d records' % len(bulk)) | |
self.db.get_collection('Celery_log').insert_many(bulk, ordered=False) | |
# In case we got an invalid document try to insert them individually and replace objects with repr | |
except InvalidDocument, e: | |
for doc in bulk: | |
doc.pop("_id", None) | |
try: | |
self.db.get_collection('Celery_log').insert_one(doc) | |
except: | |
try: | |
doc['result'] = repr(doc['result']) | |
self.db.get_collection('Celery_log').insert_one(doc) | |
except Exception, e: | |
logger.exception("Insert to mongo failed") | |
except Exception, e: | |
logger.exception("Insert to mongo failed") | |
[self.queue.put_nowait(doc) for doc in bulk] | |
finally: | |
time.sleep(60) | |
def run(self): | |
while True: | |
try: | |
with self.celery_app.connection() as connection: | |
recv = self.celery_app.events.Receiver(connection, handlers={ | |
'*': self.catchall | |
}) | |
recv.capture(limit=None, timeout=None, wakeup=True) | |
except (KeyboardInterrupt, SystemExit): | |
try: | |
import _thread as thread | |
except ImportError: | |
import thread | |
thread.interrupt_main() | |
except Exception as e: | |
import traceback | |
traceback.print_exc() | |
logger.error("Failed to capture events: '%s', " | |
"trying again in %s seconds.", | |
e, self.interval) | |
logger.debug(e, exc_info=True) | |
time.sleep(self.interval) | |
def get_celery_app(): | |
return task_queue.app | |
if __name__ == '__main__': | |
import logging | |
FORMAT = "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s" | |
logging.basicConfig(format=FORMAT,level=logging.INFO,datefmt='%Y-%m-%d %H:%M:%S') | |
app = get_celery_app() # returns app | |
MonitorThread(app).run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment