Created
November 14, 2012 01:05
-
-
Save bergundy/4069540 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import motor | |
import toro | |
import time | |
import logging | |
from tornado import ioloop, gen | |
def sleep(seconds, io_loop = None): | |
io_loop = io_loop or ioloop.IOLoop.instance() | |
return gen.Task(io_loop.add_timeout, time.time() + seconds) | |
class AsyncWriterMixin(object): | |
def initialize_writer(self, sync_delay = 5, io_loop = None): | |
self.io_loop = io_loop or ioloop.IOLoop.instance() | |
self.sync_delay = sync_delay | |
self._queue_suspended = False | |
self._reset_queue() | |
self._start_writer() | |
def enqueue(self, *args, **kwargs): | |
if not self.queue_suspended: | |
self._queue.put((args, kwargs)) | |
def _reset_queue(self): | |
self._queue = toro.Queue() | |
@property | |
def queue_suspended(self): | |
return self._queue_suspended | |
@gen.engine | |
def _start_writer(self): | |
while True: | |
args, kwargs = yield gen.Task(self._queue.get) | |
try: | |
yield motor.Op(*args, **kwargs) | |
except: | |
logging.exception('Exception caught, out of sync while trying to run (%r, %r)', args, kwargs) | |
self._queue_suspended = True | |
self._reset_queue() | |
yield sleep(self.sync_delay, io_loop = self.io_loop) | |
self._queue_suspended = False | |
self.enqueue(self.sync) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mock | |
from tornado.testing import AsyncTestCase | |
from async_writer_mixin import AsyncWriterMixin | |
class MongoQueue(AsyncWriterMixin): | |
def __init__(self, *args, **kwargs): | |
self.initialize_writer(*args, **kwargs) | |
self.sync = mock.Mock() | |
class AsyncWriterMixinTest(AsyncTestCase): | |
def test_initialize_writer(self): | |
q = MongoQueue(io_loop = self.io_loop, sync_delay = 0.001) | |
self.assertEqual(q.io_loop, self.io_loop) | |
self.assertEqual(q.sync_delay, 0.001) | |
self.assertFalse(q.queue_suspended) | |
def test_initialize_writer_defaults(self): | |
q = MongoQueue(io_loop = self.io_loop) | |
self.assertEqual(q.io_loop, self.io_loop) | |
self.assertEqual(q.sync_delay, 5) | |
self.assertFalse(q.queue_suspended) | |
def return_result(self, callback): | |
callback('ok', None) | |
self.stop() | |
def raise_exception(self, callback): | |
callback(None, Exception('xxx')) | |
self.stop() | |
def test_enqueue(self): | |
q = MongoQueue(io_loop = self.io_loop, sync_delay = 0.001) | |
q.enqueue(self.return_result) | |
self.wait() # wait for task to be called | |
self.assertFalse(q.queue_suspended) | |
def test_queue_suspended(self): | |
q = MongoQueue(io_loop = self.io_loop, sync_delay = 0.001) | |
should_be_scrapped = mock.Mock(side_effect = self.return_result) | |
q.enqueue(self.raise_exception) | |
q.enqueue(should_be_scrapped) # due to queue rest | |
self.wait() # wait for task to be called | |
self.assertTrue(q.queue_suspended) | |
q.enqueue(should_be_scrapped) # due to queue_suspended | |
q.sync.side_effect = self.return_result | |
self.wait() # wait for sync to be called | |
self.assertEqual(len(should_be_scrapped.mock_calls), 0) | |
self.assertEqual(len(q.sync.mock_calls), 1) | |
self.assertFalse(q.queue_suspended) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment