Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
407 views
in Technique[技术] by (71.8m points)

observable - Replaying events using rxjs. Or how to use rxjs with tick-time instead of wallclock-time

I'm looking into using reactive programming, specifically rxjs, for building logic on top of large event streams I need to process.

This involves doing a lot of time-related stuff such as generating a synthetic event every second. (contrived example)

When events come in in real-time (i.e. events are processed using wallclock time), this is easily solved with merging an interval-source into the eventstream. From said link:

    // RxJS v6+
    import { mapTo } from 'rxjs/operators';
    import { interval, merge } from 'rxjs';

    //emit every 2.5 seconds
    const first = interval(2500);
    //emit every 2 seconds
    const second = interval(2000);
    //emit every 1.5 seconds
    const third = interval(1500);
    //emit every 1 second
    const fourth = interval(1000);

    //emit outputs from one observable
    const example = merge(
      first.pipe(mapTo('FIRST!')),
      second.pipe(mapTo('SECOND!')),
      third.pipe(mapTo('THIRD')),
      fourth.pipe(mapTo('FOURTH'))
    );
    //output: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH"
    const subscribe = example.subscribe(val => console.log(val));

However, I often need to replay eventstreams, and also merge synthetic events into this stream in proper order. I.e.: each event has a timestamp and the synthetic event needs to be correctly placed in the eventstream so that the timestamps of the resulting merged eventstream keeps (monotonically) increasing. (i.e. events are processed using tick time)

This is a specific case of the more generic problem of merging N eventstreams in tick-time order.

Ideally, I would write the reactive-logic irrespective of wallclock-time or ticktime. Then I would initialize rxjs with some context denoting we're either in wallclock-time or tick-time, and if the latter, point to the event-property which carries the timestamp. Rxjs would sort out the rest, without needing to handle the difference in custom code.

Any pointers much appreciated

question from:https://stackoverflow.com/questions/66053271/replaying-events-using-rxjs-or-how-to-use-rxjs-with-tick-time-instead-of-wallcl

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...