I'd tackle this problem by breaking it into two parts. First, I'd want something that takes an Observable<Observable<T>>
and produces an Observable<Observable<T>[]>
where the array contains only the "active" (i.e. non-complete) observables. Any time a new element is added to the outer observable, and any time one of the inner observables completes, a new array would be emitted containing the appropriate observables. This is essentially a "scan" reduction of the primary stream.
Once you've got something that can do that, you can use flatMapLatest and zip to get what you want.
My basic attempt at the first part is as follows:
function active(ss$) {
const activeStreams = new Rx.Subject();
const elements = [];
const subscriptions = [];
ss$.subscribe(s => {
var include = true;
const subscription = s.subscribe(x => {}, x => {}, x => {
include = false;
const i = elements.indexOf(s);
if (i > -1) {
elements.splice(i, 1);
activeStreams.onNext(elements.slice());
}
});
if (include) {
elements.push(s);
subscriptions.push(subscription);
activeStreams.onNext(elements.slice());
}
});
return Rx.Observable.using(
() => new Rx.Disposable(() => subscriptions.forEach(x => x.dispose())),
() => activeStreams
);
}
From there, you'd just zip it and flatten it out like so:
const zipped = active(c$).flatMapLatest(x =>
x.length === 0 ? Rx.Observable.never()
: x.length === 1 ? x[0]
: Rx.Observable.zip(x, (...args) => args.reduce((a, c) => a + c))
);
I've made the assumptions that zero active streams should yield nothing, one active stream should yield its own elements, and two or more streams should all zip together (all of which is reflected in the map application).
My (admittedly fairly limited) testing has this combination yielding the results you were after.
Great question, by the way. I've not seen anything that solves the first part of the problem (though I'm by no means an Rx expert; if someone knows of something that already does this, please post details).
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…