Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactor 3.x - limit the time of a groupBy Flux

is there any way to force a Flux generated by groupBy() to complete after a period of time (or similarly, limit the maximum # of "open" groups) regardless of the completeness of the upstream? I have something like the following:

Flux<Foo> someFastPublisher;

someFastPublisher
  .groupBy(f -> f.getKey())
  .delayElements(Duration.ofSeconds(1)) // rate limit each group
  .flatMap(g -> g) // unwind the group
  .subscribe()
;

and am running into the case where the Flux hangs, assumed because the number of groups is greater than the flatMap's concurrency. I could increase the flatMap concurrency, but there's no easy way to tell what the max possible size is. Instead, i know the Foo's being grouped by Foo.key are going to be close to each other in time/publication order and would rather use some sort of time window on the groupBy Flux vs. flatMap concurrency (and ending up w/ two different groups w/ the same key() isn't a big deal).

I'm guessing the groupBy Flux's won't onComplete until someFastPubisher onCompletes - i.e. the Flux's handed off to flatMap just stay "open" (although they're not likely to ever get a new event).

I am able to work around this either by prefetching Integer.MAX in the groupBy or Integer.MAXing the concurrency - but is there a way to control the "life" of the group?

like image 706
jamey graham Avatar asked Dec 19 '17 19:12

jamey graham


1 Answers

yes: you can apply a take(Duration) to the groups in order to ensure they close early and a new group with the same key will open after that:

source.groupBy(v -> v.intValue() % 2)
      .flatMap(group -> group
              .take(Duration.ofMillis(1000))
              .count()
              .map(c -> "group " + group.key() + " size = " + c)
      )
      .log()
      .blockLast();
like image 152
Simon Baslé Avatar answered Nov 03 '22 17:11

Simon Baslé