is there any way to force a Flux generated by groupBy() to complete after a period of time (or similarly, limit the maximum # of "open" groups) regardless of the completeness of the upstream? I have something like the following:
Flux<Foo> someFastPublisher;
someFastPublisher
.groupBy(f -> f.getKey())
.delayElements(Duration.ofSeconds(1)) // rate limit each group
.flatMap(g -> g) // unwind the group
.subscribe()
;
and am running into the case where the Flux hangs, assumed because the number of groups is greater than the flatMap
's concurrency. I could increase the flatMap
concurrency, but there's no easy way to tell what the max possible size is. Instead, i know the Foo
's being grouped by Foo.key
are going to be close to each other in time/publication order and would rather use some sort of time window on the groupBy Flux vs. flatMap concurrency (and ending up w/ two different groups w/ the same key()
isn't a big deal).
I'm guessing the groupBy
Flux's won't onComplete until someFastPubisher
onCompletes - i.e. the Flux's handed off to flatMap
just stay "open" (although they're not likely to ever get a new event).
I am able to work around this either by prefetching Integer.MAX
in the groupBy or Integer.MAX
ing the concurrency - but is there a way to control the "life" of the group?
yes: you can apply a take(Duration)
to the groups in order to ensure they close early and a new group with the same key will open after that:
source.groupBy(v -> v.intValue() % 2)
.flatMap(group -> group
.take(Duration.ofMillis(1000))
.count()
.map(c -> "group " + group.key() + " size = " + c)
)
.log()
.blockLast();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With