Created
October 25, 2017 17:54
-
-
Save mzalazar/e5250c736e39b41616f19a8fb2b63abf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Writable = require('stream').Writable; | |
var util = require('util'); | |
var redis = require('./redis'); | |
var async = require('async'); | |
module.exports = RedisWriteStream; | |
function RedisWriteStream(client_id, broadcast_id, options) { | |
if (!(this instanceof RedisWriteStream)) return new RedisWriteStream(options); | |
if (!options) options = {}; | |
options.objectMode = true; | |
Writable.call(this, options); | |
} | |
util.inherits(RedisWriteStream, Writable); | |
RedisWriteStream.prototype._write = function write(doc, encoding, callback) { | |
doc = JSON.parse(JSON.stringify(doc)); | |
// console.log('Vamos a escribir esto:'); | |
// console.log(doc); | |
var prefix = 'client_id:' + doc.client_id + ':broadcast_id:' + doc.broadcast_id + ':'; | |
// client_id:xx:broadcast_id:xx:SPOOLED_TOTAL (id_subscriber list) | |
redis.sadd(prefix + 'SPOOLED_TOTAL', doc.id_subscriber, function (err, response) { | |
if (err) return callback(err); | |
// Get rid of client_id and broadcast_id | |
delete doc['client_id']; | |
delete doc['broadcast_id']; | |
redis.sadd(prefix + 'SPOOLED', JSON.stringify(doc), function (err, response) { | |
return callback(err); | |
}); | |
}); | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Create WRITE STREAM | |
var RedisWriteStream = require('./redis_write_stream'); | |
var rdb = new RedisWriteStream(); | |
var highWaterMark = 10; // How many records will hold our memory ;-) | |
// Create READ STREAM (Get subscriber and its configured fields) | |
var stream = knex.select(knex.raw(broadcast_id +' AS broadcast_id')).select(knex.raw(client_id +' AS client_id')).select(field_list).from('subscribers').where(knex.raw('list_id = ?', [list_id])).stream({highWaterMark:highWaterMark}); | |
// Connect both streams | |
stream.pipe(rdb); | |
// Finishing this loop... | |
rdb.on('finish', function () { | |
console.log('EVENT: "finish" was triggered in stream writer.'.yellow); | |
}); | |
rdb.on('error', function (err) { | |
console.log('EVENT: "error" was triggered in stream writer.'.yellow); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment