Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can the subflows of groupBy depend on the keys they were generated from ?

I have a flow with data associated to users. I also have a state for each user, that I can get asynchronously from DB.

I want to separate my flow with one subflow per user, and load the state for each user when materializing the subflow, so that the elements of the subflow can be treated with respect to this state.

If I don't want to merge the subflows downstream, I can do something with groupBy and Sink.lazyInit :

def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...

val treatByUser: Sink[Element] = Flow[Element].groupBy(
  Int.MaxValue, 
  getUserId
).to(
  Sink.lazyInit(
    elt => getState(getUserId(elt)).map(treatUser),
    ??? // this is never called, since the subflow is created when an element comes
  )
)

However, this does not work if treatUser becomes a Flow, since there is no equivalent for Sink.lazyInit.

Since subflows of groupBy are materialized only when a new element is pushed, it should be possible to use this element to materialize the subflow, but I wasn't able to adapt the source code for groupBy so that this work consistently. Likewise, Sink.lazyInitdoesn't seem to be easily translatable to the Flow case.

Any idea on how to solve this issue ?

like image 537
Cyrille Corpet Avatar asked Mar 17 '17 16:03

Cyrille Corpet


1 Answers

The relevant Akka issue you have to look at is #20129: add Sink.dynamic and Flow.dynamic.

In the associated PR #20579 they actually implemented LazySink stuffs.

They are planning to do LazyFlow next:

Will do next lazyFlow with similar signature.

Unfortunately you have to wait for that functionality to be implemented in Akka or write it yourself (then consider a PR to Akka).

like image 152
Federico Pellegatta Avatar answered Nov 15 '22 06:11

Federico Pellegatta