I have a flow with data associated to users. I also have a state for each user, that I can get asynchronously from DB.
I want to separate my flow with one subflow per user, and load the state for each user when materializing the subflow, so that the elements of the subflow can be treated with respect to this state.
If I don't want to merge the subflows downstream, I can do something with groupBy
and Sink.lazyInit
:
def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...
val treatByUser: Sink[Element] = Flow[Element].groupBy(
Int.MaxValue,
getUserId
).to(
Sink.lazyInit(
elt => getState(getUserId(elt)).map(treatUser),
??? // this is never called, since the subflow is created when an element comes
)
)
However, this does not work if treatUser
becomes a Flow
, since there is no equivalent for Sink.lazyInit
.
Since subflows of groupBy
are materialized only when a new element is pushed, it should be possible to use this element to materialize the subflow, but I wasn't able to adapt the source code for groupBy so that this work consistently. Likewise, Sink.lazyInit
doesn't seem to be easily translatable to the Flow
case.
Any idea on how to solve this issue ?
The relevant Akka issue you have to look at is #20129: add Sink.dynamic and Flow.dynamic.
In the associated PR #20579 they actually implemented LazySink
stuffs.
They are planning to do LazyFlow
next:
Will do next lazyFlow with similar signature.
Unfortunately you have to wait for that functionality to be implemented in Akka or write it yourself (then consider a PR to Akka).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With