I need to group infinite Flux by key with high cardinality.
For example:
queue
.groupBy(keyMapper, groupPrefetch)
.flatMap(
{ group ->
group.concatMap(
{ task -> makeSlowRemoteCall(task) },
0
)
.takeUntil { remoteCallResult -> remoteCallResult == DONE }
.timeout(groupTimeout, Mono.empty())
.then()
}
, concurrency
)
I cancel the group in two cases:
makeSlowRemoteCall()
result indicates that with high probability there will be no new items in this group in near future.
Next item is not emitted during groupTimeout
. I use timeout(timeout, fallback)
variant to suppress TimeoutException and allow flatMap's inner publisher to complete successfully.
I want possible future items with same key to make new GroupedFlux and be processed with same flatMap inner pipeline.
But what happens if GroupedFlux has remaining unrequested items when I cancel it?
Does groupBy operator re-queue them into new group with same key or they are lost forever. If later what is the proper way to solve my problem. I am also not sure if I need to set concatMap() prefetch to 0 in this case.
I think groupBy()
operator is not fit for my task with infinite source and a lot of groups. It makes infinite groups so it is necessary to somehow cancel idle groups downstream. But it is not possible to cancel GroupedFlux with guarantee that it has no unconsumed elements.
I think it will be great to have groupBy variant that emits finite groups.
Something like groupBy(keyMapper, boundryPredicate)
. When boundryPredicate returns true current group is complete and next element with same key will start new group.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With