Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filter async with akka-stream

Is there an equivalent to the mapAsync() method, but for filter?

Here is an example using pseudo code:

val filter: T => Future[Boolean] = /.../

source.filter(filter).runWith(/.../)
       ^^^^^^

Thanks

like image 433
ndeverge Avatar asked Jun 20 '18 14:06

ndeverge


1 Answers

I don't think there is a direct method of Flow or Source that has the capability you're looking for, but a combination of the available methods will get you what you want:

def asyncFilter[T](filter: T => Future[Boolean], parallelism : Int = 1)
                  (implicit ec : ExecutionContext) : Flow[T, T, _] =
  Flow[T].mapAsync(parallelism)(t => filter(t).map(_ -> t))
         .filter(_._1)
         .map(_._2)
like image 130
Ramón J Romero y Vigil Avatar answered Oct 21 '22 10:10

Ramón J Romero y Vigil