In Reactor 3, what's the most efficient way to split a heterogeneous flux to multiple fluxes by pattern matching? (And subsequent operations on each flux may be very different)
For example,
Source Flux: a->b->c->a->b->c
||
vv
A Flux: a->a->a
B Flux: b->b->b
C Flux: c->c->c
I'm new to reactive programming, and the only solution I come up with is share()
+filter()
, like
val shared = flux.share();
shared.filter(x -> x.tag=='a').subscribe(a -> consumeA(a));
shared.filter(x -> x.tag=='b').subscribe(b -> consumeB(b));
shared.filter(x -> x.tag=='c').subscribe(c -> consumeC(c));
Is this the best solution, or is there any better paradigm for this problem?
If the number of groups is fairly low, then you can use Flux.groupBy
referenced in the project reactor docs
For example:
Flux<String> flux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2")
.groupBy(s -> s.charAt(0))
.concatMap(groupedFlux -> groupedFlux
.startWith("Group " + groupedFlux.key()));
StepVerifier.create(flux)
.expectNext("Group a", "a1", "a2")
.expectNext("Group b", "b1", "b2")
.expectNext("Group c", "c1", "c2")
.verifyComplete();
You can use groupedFlux.key()
to vary the operations performed for each group.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With