Created
December 27, 2012 22:11
-
-
Save anantn/4392522 to your computer and use it in GitHub Desktop.
Firebase: Implementing a worker queue pattern using firebase_queue_pop.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Firebase = require("./firebase-node.js"); | |
function Queue(ref) { | |
this._ref = ref; | |
} | |
Queue.prototype.pop = function(cb) { | |
this._ref.startAt().limit(1).once("child_added", this._pop.bind(this, cb)); | |
} | |
Queue.prototype._pop = function(cb, snapshot) { | |
var val = null; | |
var self = this; | |
// If the current head is empty just return. | |
if (!snapshot.val()) { | |
cb(null); | |
return; | |
} | |
// The value returned by the transaction callback will be the new value. | |
// If we return undefined, then the transaction will be cancelled. | |
snapshot.ref().transaction(function(data) { | |
// If the transaction succeded, val will still be null. Stash the value | |
// and then delete it. | |
if (!val) { | |
val = data; | |
return null; | |
} | |
// If the transaction failed (because some other process grabbed | |
// the head of the queue first), just cancel this transaction and try again. | |
// The next call to pop is asynchronous as recommended by node.js. | |
process.nextTick(function() { | |
self.pop(cb); | |
}); | |
return; | |
}, function(success, snapshot) { | |
// The transaction succeded, just return the stashed value to process. | |
if (success) { | |
// node.js recommends making all callbacks asynchronous. | |
// This prevents any blocking operations from holding up our queue and | |
// also removes the possibility of recursion exhausting the scope chain. | |
process.nextTick(function() { | |
cb(val); | |
}); | |
} | |
}); | |
} | |
Queue.prototype.push = function(val) { | |
return this._ref.push(val); | |
} | |
var jobs = 10; | |
var queue = new Queue(new Firebase("https://anant.firebaseio.com/queue/")); | |
function doNextJob() { | |
if (jobs > 0) { | |
// Wait for random time between 1 and 10 seconds for job to "finish". | |
var time = Math.floor(Math.random() * 11) + 1; | |
queue.pop(function(val) { | |
console.log("Processing job " + val + " for " + time + " seconds"); | |
setTimeout(doNextJob, time * 1000); | |
}); | |
} else { | |
// We've finished our 10 jobs. | |
console.log("Finished 10 jobs, exiting."); | |
process.exit(); | |
} | |
} | |
queue._ref.once("value", function(val) { | |
if (!val) { | |
for (var i = 1; i <= 100; i++) { | |
queue.push(i); | |
} | |
} | |
console.log("Press return to start processing jobs!"); | |
var stdin = process.openStdin(); | |
stdin.on("data", function() { | |
console.log("Fetching..."); | |
doNextJob(); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment