Last active
April 20, 2016 15:22
-
-
Save mieszko4/5595b5c18a7ca70a62c4cb2d35aa87e0 to your computer and use it in GitHub Desktop.
JsMeetup - Streaming in javascript - Stream apppoach
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const util = require('util'); | |
const Transform = require('stream').Transform; | |
const intercomDrinks = require('intercom-drinks').default; | |
function Stream() { | |
if (!(this instanceof Stream)) { | |
return new Stream(); | |
} | |
Transform.call(this, { | |
writableObjectMode: true | |
}); | |
} | |
util.inherits(Stream, Transform); | |
Stream.prototype._transform = function (customer, encoding, done) { | |
//7. pick name and id, 8. join LF | |
this.push(`${customer.user_id}:${customer.name}\n`); | |
done(); | |
}; | |
module.exports = Stream; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const util = require('util'); | |
const Transform = require('stream').Transform; | |
const intercomDrinks = require('intercom-drinks').default; | |
function Stream(latitude, longitude, radius) { | |
if (!(this instanceof Stream)) { | |
return new Stream(); | |
} | |
Transform.call(this, { | |
readableObjectMode: true, | |
writableObjectMode: true | |
}); | |
this._latitude = latitude; | |
this._longitude = longitude; | |
this._radius = radius; | |
} | |
util.inherits(Stream, Transform); | |
Stream.prototype._transform = function (customer, encoding, done) { | |
// 5. calculate distance and filter | |
if (intercomDrinks.isNearbyCustomer(this._latitude, this._longitude, this._radius, customer)) { | |
this.push(customer); | |
} | |
done(); | |
}; | |
module.exports = Stream; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const SortedArray = require('sorted-array'); | |
const ThrottleStream = require('m-throttle-stream'); | |
const JsonStream = require('m-json-stream'); | |
const intercomDrinks = require('intercom-drinks').default; | |
const request = require('request'); | |
const fs = require('fs'); | |
const ValidateStream = require('./validate.stream.js'); | |
const FilterStream = require('./filter.stream.js'); | |
const CustomerToStringStream = require('./customer-to-string.stream.js'); | |
//define parameters | |
const radius = 100000; //in meters | |
const latitude = 53.3381985; | |
const longitude = -6.2592576; | |
const onlineSourceUrl = 'https://gist.githubusercontent.com/brianw/19896c50afa89ad4dec3/raw/6c11047887a03483c50017c1d451667fd62a53ca/gistfile1.txt'; | |
const offlineSourceFilename = 'input.jsonl'; | |
const onlineSource = request(onlineSourceUrl); //1. read | |
// const offlineSource = fs.createReadStream(offlineSourceFilename); | |
const destination = onlineSource | |
/* THROTTLE | |
.pipe(new ThrottleStream({ | |
maxTimeout: 100, | |
bufferSize: 10000 //should be higher than chunks comming in | |
})) | |
*/ | |
.pipe(new JsonStream({strict: false})) //2: split and 3. parse customer | |
.pipe(new ValidateStream()) //4. validate | |
.pipe(new FilterStream(latitude, longitude, radius)) // 5. calculate distance and filter | |
; | |
///* NO-SORT | |
destination | |
.pipe(new CustomerToStringStream()) // 7. pick name and id, 8. join LF | |
.pipe(process.stdout) | |
; | |
//*/ | |
/* SORT | |
//prepare result array for insert sort | |
const nearbyCustomers = new SortedArray([], (a, b) => { | |
return a.user_id - b.user_id; //ascending | |
}); | |
//on each row representing object | |
destination.on('data', (customer) => { | |
nearbyCustomers.insert(customer); //insert to sorted array | |
}) | |
destination.on('end', () => { | |
//console.log('total pushed', nearbyCustomers.array.length); | |
console.log(nearbyCustomers.array.map(intercomDrinks.printCustomer).join('\n')); | |
}); | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const util = require('util'); | |
const Transform = require('stream').Transform; | |
const intercomDrinks = require('intercom-drinks').default; | |
function Stream() { | |
if (!(this instanceof Stream)) { | |
return new Stream(); | |
} | |
Transform.call(this, { | |
readableObjectMode: true, | |
writableObjectMode: true | |
}); | |
} | |
util.inherits(Stream, Transform); | |
Stream.prototype._transform = function (object, encoding, done) { | |
var customer | |
try { //4. validate | |
customer = intercomDrinks.validateCustomer(object); | |
} catch (e) { | |
//ignore | |
return; | |
} | |
if (typeof customer === 'object') { | |
this.push(customer); | |
} | |
done(); | |
}; | |
module.exports = Stream; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment