Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
575 views
in Technique[技术] by (71.8m points)

reactive programming - The most efficient way to split a Flux to multiple Fluxes in Reactor 3

In Reactor 3, what's the most efficient way to split a heterogeneous flux to multiple fluxes by pattern matching? (And subsequent operations on each flux may be very different)

For example,

Source Flux: a->b->c->a->b->c
 ||
 vv
A Flux: a->a->a
B Flux: b->b->b
C Flux: c->c->c

I'm new to reactive programming, and the only solution I come up with is share()+filter(), like

val shared = flux.share();
shared.filter(x -> x.tag=='a').subscribe(a -> consumeA(a));
shared.filter(x -> x.tag=='b').subscribe(b -> consumeB(b));
shared.filter(x -> x.tag=='c').subscribe(c -> consumeC(c));

Is this the best solution, or is there any better paradigm for this problem?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

If the number of groups is fairly low, then you can use Flux.groupBy referenced in the project reactor docs

For example:

Flux<String> flux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2")
        .groupBy(s -> s.charAt(0))
        .concatMap(groupedFlux -> groupedFlux
                .startWith("Group " + groupedFlux.key()));

StepVerifier.create(flux)
        .expectNext("Group a", "a1", "a2")
        .expectNext("Group b", "b1", "b2")
        .expectNext("Group c", "c1", "c2")
        .verifyComplete();

You can use groupedFlux.key() to vary the operations performed for each group.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...