Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactor groupBy: What happens with remaining items after GroupedFlux is canceled?

I need to group infinite Flux by key with high cardinality.

For example:

  1. group key is domain url
  2. calls to one domain should be strictly sequential (next call happens after previous one is completed)
  3. calls to different domains should be concurrent
  4. time interval between items with same key (url) is unknown, but expected to have burst nature. Several items emitted in short period of time then long pause until next group.
queue
    .groupBy(keyMapper, groupPrefetch)
    .flatMap(
       { group ->
           group.concatMap(
               { task -> makeSlowRemoteCall(task) },
               0
           )
           .takeUntil { remoteCallResult -> remoteCallResult == DONE }
           .timeout(groupTimeout, Mono.empty())
           .then()
       }
      , concurrency
    )

I cancel the group in two cases:

  1. makeSlowRemoteCall() result indicates that with high probability there will be no new items in this group in near future.

  2. Next item is not emitted during groupTimeout. I use timeout(timeout, fallback) variant to suppress TimeoutException and allow flatMap's inner publisher to complete successfully.

I want possible future items with same key to make new GroupedFlux and be processed with same flatMap inner pipeline.

But what happens if GroupedFlux has remaining unrequested items when I cancel it?

Does groupBy operator re-queue them into new group with same key or they are lost forever. If later what is the proper way to solve my problem. I am also not sure if I need to set concatMap() prefetch to 0 in this case.

like image 357
Sokolov Avatar asked Nov 07 '22 02:11

Sokolov


1 Answers

I think groupBy() operator is not fit for my task with infinite source and a lot of groups. It makes infinite groups so it is necessary to somehow cancel idle groups downstream. But it is not possible to cancel GroupedFlux with guarantee that it has no unconsumed elements.

I think it will be great to have groupBy variant that emits finite groups. Something like groupBy(keyMapper, boundryPredicate). When boundryPredicate returns true current group is complete and next element with same key will start new group.

like image 79
Sokolov Avatar answered Nov 22 '22 07:11

Sokolov