Last active
October 19, 2016 04:33
-
-
Save apparentlymart/c63df5bba1eefe0960b06857ea54595b to your computer and use it in GitHub Desktop.
Highland Diverge Stream, pull edition
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var _ = require('highland'); | |
var items = [ | |
'a', | |
'b', | |
'c', | |
'd', | |
'e', | |
'f', | |
'g', | |
'h', | |
'i', | |
'j', | |
'k', | |
'l', | |
'm', | |
'n', | |
]; | |
// Any arbitrary stream of stuff | |
var inStream = _(items); | |
var divergedStreams = inStream.through(diverge(4)); | |
// Here we can do other things to the streams by mapping over them and | |
// applying the other highland stream operators. | |
// For this example, we'll just create a fake bottleneck, but we should | |
// be able to do any highland-ish thing here. | |
var resultStreams = divergedStreams.map(function (stream) { | |
return stream.batchWithTimeOrCount(100, 5).through(slowStream()); | |
}); | |
// Now we merge it all back together again. | |
var results = resultStreams.merge(); | |
// ...and the caller can do what it wants with the results | |
results.each(function (item) { | |
console.log(item); | |
}); | |
function diverge(n) { | |
return function (inStream) { | |
var requests = _(); | |
var closed = false; | |
var remainingOut = n; | |
function outputEnded() { | |
closed = true; | |
remainingOut--; | |
if (remainingOut === 0) { | |
// Close the requests stream once we've closed out | |
// all of our output streams, for good measure. | |
requests.write(_.nil); | |
} | |
} | |
requests.flatMap(function (request) { | |
var push = request[0]; | |
var next = request[1]; | |
if (closed) { | |
outputEnded(); | |
push(null, _.nil); | |
return _([]); | |
} | |
var ret = _(); | |
inStream.pull(function (err, x) { | |
if (x === _.nil) { | |
outputEnded(); | |
push(null, x); | |
return; | |
} | |
push(err, x); | |
next(); | |
ret.write(_.nil); | |
}); | |
return ret; | |
}).done(function () {}); | |
var streams = []; | |
for (var i = 0; i < n; i++) { | |
streams.push(_(generator)); | |
} | |
return _(streams); | |
function generator(push, next) { | |
requests.write([push, next]); | |
} | |
}; | |
} | |
function slowStream() { | |
return _.flatMap(function (x) { | |
return _(new Promise(function (resolve) { | |
var jitter = (Math.random() * 1000) - 500; | |
setTimeout(function () { | |
resolve({ | |
value: x, | |
}); | |
}, 1000 + jitter); | |
})); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment