Last active
November 7, 2017 20:41
-
-
Save brentd/189de80b14c5876f2370c3f774a7f3b0 to your computer and use it in GitHub Desktop.
combineLatestActive: like combineLatest, but for a stream of streams, and only combines non-completed streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable } from 'rxjs' | |
// Takes a higher-order observable as input and subscribes to its inner | |
// observables. Emits an array containing the last emitted value from each *active* | |
// inner observable when: | |
// | |
// 1. any inner observable emits | |
// 2. any inner observable completes | |
// | |
// In contrast to `combineLatest`, the latest value from completed observables | |
// are not included in the emitted array. | |
// | |
export default function combineLatestActive(source$) { | |
return new Observable.create(observer => { | |
const latest = new Map() | |
const emit = () => observer.next(Array.from(latest.values())) | |
const emitLatest = inner$ => | |
inner$.do({ | |
next: x => { | |
latest.set(inner$, x) | |
emit() | |
}, | |
complete: () => { | |
latest.delete(inner$) | |
emit() | |
} | |
}) | |
return source$.mergeMap(emitLatest).finally(() => observer.complete()).subscribe() | |
}) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import assert from 'assert' | |
import { marbles } from 'rxjs-marbles' | |
import combineLatestActive from 'rx/combineLatestActive' | |
describe('combineLatestActive()', () => { | |
it('emits the latest value from each active inner observable', marbles(m => { | |
const o1 = m.cold('a--b--c--|') | |
const o2 = m.cold( 'x--y--z|') | |
const obss = m.cold('1---2--------', {'1': o1, '2': o2}) | |
const expected = m.cold('1--23-45-678', { | |
'1': ['a'], | |
'2': ['b'], | |
'3': ['b', 'x'], | |
'4': ['c', 'x'], | |
'5': ['c', 'y'], | |
'6': ['y'], | |
'7': ['z'], | |
'8': [] | |
}) | |
m.equal(combineLatestActive(obss), expected) | |
})) | |
it('stays subscribed to inner observables until they complete ', marbles(m => { | |
const o1 = m.cold('----|') | |
const o1subs = '^---!' | |
const o2 = m.cold( '----|') | |
const o2subs = '--^---!' | |
const source = m.cold('1-2|', {'1': o1, '2': o2}) | |
combineLatestActive(source).subscribe() | |
m.has(o1, o1subs) | |
m.has(o2, o2subs) | |
})) | |
it('completes when the source AND inner observables have completed', marbles(m => { | |
const o1 = m.cold('-----|') | |
const o2 = m.cold( '-------|') | |
const source = m.cold('1--2----|', {'1': o1, '2': o2}) | |
const expected = m.cold('-----a----(a|)', {a: []}) | |
m.equal(combineLatestActive(source), expected) | |
})) | |
it('unsubscribes from completed inner observables', marbles(m => { | |
const o1 = m.cold('a-b|') | |
const o1subs = '^--!' | |
const o2 = m.cold( 'a-b-c-d|') | |
const o2subs = '--^------!' | |
const source = m.cold('1-2----------|', {'1': o1, '2': o2}) | |
combineLatestActive(source).subscribe() | |
m.has(o1, o1subs) | |
m.has(o2, o2subs) | |
})) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment