Is there an equivalent to the mapAsync()
method, but for filter
?
Here is an example using pseudo code:
val filter: T => Future[Boolean] = /.../
source.filter(filter).runWith(/.../)
^^^^^^
Thanks
I don't think there is a direct method of Flow
or Source
that has the capability you're looking for, but a combination of the available methods will get you what you want:
def asyncFilter[T](filter: T => Future[Boolean], parallelism : Int = 1)
(implicit ec : ExecutionContext) : Flow[T, T, _] =
Flow[T].mapAsync(parallelism)(t => filter(t).map(_ -> t))
.filter(_._1)
.map(_._2)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With