Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sequencing Scala Futures with bounded parallelism (without messing around with ExecutorContexts)

Background: I have a function:

  def doWork(symbol: String): Future[Unit]

which initiates some side-effects to fetch data and store it, and completes a Future when its done. However, the back-end infrastructure has usage limits, such that no more than 5 of these requests can be made in parallel. I have a list of N symbols that I need to get through:

  var symbols = Array("MSFT",...)

but I want to sequence them such that no more than 5 are executing simultaneously. Given:

  val allowableParallelism = 5

my current solution is (assuming I'm working with async/await):

  val symbolChunks = symbols.toList.grouped(allowableParallelism).toList
  def toThunk(x: List[String]) = () => Future.sequence(x.map(doWork))
  val symbolThunks = symbolChunks.map(toThunk)
  val done = Promise[Unit]()
  def procThunks(x: List[() => Future[List[Unit]]]): Unit = x match {
    case Nil => done.success()
    case x::xs => x().onComplete(_ => procThunks(xs))
  }
  procThunks(symbolThunks)
  await { done.future }

but, for obvious reasons, I'm not terribly happy with it. I feel like this should be possible with folds, but every time I try, I end up eagerly creating the Futures. I also tried out a version with RxScala Observables, using concatMap, but that also seemed like overkill.

Is there a better way to accomplish this?

like image 414
experquisite Avatar asked Nov 23 '14 02:11

experquisite


People also ask

How do you use the future sequence in Scala?

sequence() function converts a list of Futures into a single Future that means collections of Futures into a single Future. In simple words, List[Future[T]] ======> Future[List[T]] . It is also known as composing Futures.

Is Future blocking onComplete?

NOTE: With Future. onComplete() we are no longer blocking for the result from the Future but instead we will receive a callback for either a Success or a Failure.

What is ExecutionContext Scala?

An ExecutionContext can execute program logic asynchronously, typically but not necessarily on a thread pool. A general purpose ExecutionContext must be asynchronous in executing any Runnable that is passed into its execute -method.

What is await result in Scala?

Await. result tries to return the Future result as soon as possible and throws an exception if the Future fails with an exception while Await. ready returns the completed Future from which the result (Success or Failure) can safely be extracted.


3 Answers

I have example how to do it with scalaz-stream. It's quite a lot of code because it's required to convert scala Future to scalaz Task (abstraction for deferred computation). However it's required to add it to project once. Another option is to use Task for defining 'doWork'. I personally prefer task for building async programs.

  import scala.concurrent.{Future => SFuture}
  import scala.util.Random
  import scala.concurrent.ExecutionContext.Implicits.global


  import scalaz.stream._
  import scalaz.concurrent._

  val P = scalaz.stream.Process

  val rnd = new Random()

  def doWork(symbol: String): SFuture[Unit] = SFuture {
    Thread.sleep(rnd.nextInt(1000))
    println(s"Symbol: $symbol. Thread: ${Thread.currentThread().getName}")
  }

  val symbols = Seq("AAPL", "MSFT", "GOOGL", "CVX").
    flatMap(s => Seq.fill(5)(s).zipWithIndex.map(t => s"${t._1}${t._2}"))

  implicit class Transformer[+T](fut: => SFuture[T]) {
    def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = {
      import scala.util.{Failure, Success}
      import scalaz.syntax.either._
      Task.async {
        register =>
          fut.onComplete {
            case Success(v) => register(v.right)
            case Failure(ex) => register(ex.left)
          }
      }
    }
  }

  implicit class ConcurrentProcess[O](val process: Process[Task, O]) {
    def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task, O, O2]): Process[Task, O2] = {
      val actions =
        process.
          zipWith(f)((data, f) => f(data))

      val nestedActions =
        actions.map(P.eval)

      merge.mergeN(concurrencyLevel)(nestedActions)
    }
  }

  val workChannel = io.channel((s: String) => doWork(s).toTask)

  val process = Process.emitAll(symbols).concurrently(5)(workChannel)

  process.run.run

When you'll have all this transformation in scope, basically all you need is:

  val workChannel = io.channel((s: String) => doWork(s).toTask)

  val process = Process.emitAll(symbols).concurrently(5)(workChannel)

Quite short and self-decribing

like image 145
Eugene Zhulenev Avatar answered Nov 08 '22 06:11

Eugene Zhulenev


Although you've already got an excellent answer, I thought I might still offer an opinion or two about these matters.

I remember seeing somewhere (on someone's blog) "use actors for state and use futures for concurrency".

So my first thought would be to utilize actors somehow. To be precise, I would have a master actor with a router launching multiple worker actors, with number of workers restrained according to allowableParallelism. So, assuming I have

def doWorkInternal (symbol: String): Unit

which does the work from yours doWork taken 'outside of future', I would have something along these lines (very rudimentary, not taking many details into consideration, and practically copying code from akka documentation):

import akka.actor._

case class WorkItem (symbol: String)
case class WorkItemCompleted (symbol: String)
case class WorkLoad (symbols: Array[String])
case class WorkLoadCompleted ()

class Worker extends Actor  {
    def receive = {
        case WorkItem (symbol) =>
            doWorkInternal (symbol)
            sender () ! WorkItemCompleted (symbol)
    }
}

class Master extends Actor  {
    var pending = Set[String] ()
    var originator: Option[ActorRef] = None

    var router = {
        val routees = Vector.fill (allowableParallelism) {
            val r = context.actorOf(Props[Worker])
            context watch r
            ActorRefRoutee(r)
        }
        Router (RoundRobinRoutingLogic(), routees)
    }

    def receive = {
        case WorkLoad (symbols) =>
            originator = Some (sender ())
            context become processing
            for (symbol <- symbols) {
                router.route (WorkItem (symbol), self)
                pending += symbol
            }
    }

    def processing: Receive = {
        case Terminated (a) =>
            router = router.removeRoutee(a)
            val r = context.actorOf(Props[Worker])
            context watch r
            router = router.addRoutee(r)
        case WorkItemCompleted (symbol) =>
            pending -= symbol
            if (pending.size == 0) {
                context become receive
                originator.get ! WorkLoadCompleted
            }
    }
}

You could query the master actor with ask and receive a WorkLoadCompleted in a future.

But thinking more about 'state' (of number of simultaneous requests in processing) to be hidden somewhere, together with implementing necessary code for not exceeding it, here's something of the 'future gateway intermediary' sort, if you don't mind imperative style and mutable (used internally only though) structures:

object Guardian
{
    private val incoming = new collection.mutable.HashMap[String, Promise[Unit]]()
    private val outgoing = new collection.mutable.HashMap[String, Future[Unit]]()
    private val pending = new collection.mutable.Queue[String]

    def doWorkGuarded (symbol: String): Future[Unit] = {
        synchronized {
            val p = Promise[Unit] ()
            incoming(symbol) = p
            if (incoming.size <= allowableParallelism)
                launchWork (symbol)
            else
                pending.enqueue (symbol)
            p.future
        }
    }

    private def completionHandler (t: Try[Unit]): Unit = {
        synchronized {
            for (symbol <- outgoing.keySet) {
                val f = outgoing (symbol)
                if (f.isCompleted) {
                    incoming (symbol).completeWith (f)
                    incoming.remove (symbol)
                    outgoing.remove (symbol)
                }
            }
            for (i <- outgoing.size to allowableParallelism) {
                if (pending.nonEmpty) {
                    val symbol = pending.dequeue()
                    launchWork (symbol)
                }
            }
        }
    }

    private def launchWork (symbol: String): Unit = {
        val f = doWork(symbol)
        outgoing(symbol) = f
        f.onComplete(completionHandler)
    }
}

doWork now is exactly like yours, returning Future[Unit], with the idea that instead of using something like

val futures = symbols.map (doWork (_)).toSeq
val future = Future.sequence(futures)

which would launch futures not regarding allowableParallelism at all, I would instead use

val futures = symbols.map (Guardian.doWorkGuarded (_)).toSeq
val future = Future.sequence(futures)

Think about some hypothetical database access driver with non-blocking interface, i.e. returning futures on requests, which is limited in concurrency by being built over some connection pool for example - you wouldn't want it to return futures not taking parallelism level into account, and require you to juggle with them to keep parallelism under control.

This example is more illustrative than practical since I wouldn't normally expect that 'outgoing' interface would be utilizing futures like this (which is quote ok for 'incoming' interface).

like image 21
Duduk Avatar answered Nov 08 '22 04:11

Duduk


First, obviously some purely functional wrapper around Scala's Future is needed, cause it's side-effective and runs as soon as it can. Let's call it Deferred:

import scala.concurrent.Future
import scala.util.control.Exception.nonFatalCatch

class Deferred[+T](f: () => Future[T]) {
  def run(): Future[T] = f()
}

object Deferred {
  def apply[T](future: => Future[T]): Deferred[T] =
    new Deferred(() => nonFatalCatch.either(future).fold(Future.failed, identity))
}

And here is the routine:

import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.immutable.Seq
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.control.Exception.nonFatalCatch
import scala.util.{Failure, Success}

trait ConcurrencyUtils {    
  def runWithBoundedParallelism[T](parallelism: Int = Runtime.getRuntime.availableProcessors())
                                  (operations: Seq[Deferred[T]])
                                  (implicit ec: ExecutionContext): Deferred[Seq[T]] =
    if (parallelism > 0) Deferred {
      val indexedOps = operations.toIndexedSeq // index for faster access

      val promise = Promise[Seq[T]]()

      val acc = new CopyOnWriteArrayList[(Int, T)] // concurrent acc
      val nextIndex = new AtomicInteger(parallelism) // keep track of the next index atomically

      def run(operation: Deferred[T], index: Int): Unit = {
        operation.run().onComplete {
          case Success(value) =>
            acc.add((index, value)) // accumulate result value

            if (acc.size == indexedOps.size) { // we've done
              import scala.collection.JavaConversions._
              // in concurrent setting next line may be called multiple times, that's why trySuccess instead of success
              promise.trySuccess(acc.view.sortBy(_._1).map(_._2).toList)
            } else {
              val next = nextIndex.getAndIncrement() // get and inc atomically
              if (next < indexedOps.size) { // run next operation if exists
                run(indexedOps(next), next)
              }
            }
          case Failure(t) =>
            promise.tryFailure(t) // same here (may be called multiple times, let's prevent stdout pollution)
        }
      }

      if (operations.nonEmpty) {
        indexedOps.view.take(parallelism).zipWithIndex.foreach((run _).tupled) // run as much as allowed
        promise.future
      } else {
        Future.successful(Seq.empty)
      }
    } else {
      throw new IllegalArgumentException("Parallelism must be positive")
    }
}

In a nutshell, we run as much operations initially as allowed and then on each operation completion we run next operation available, if any. So the only difficulty here is to maintain next operation index and results accumulator in concurrent setting. I'm not an absolute concurrency expert, so make me know if there are some potential problems in the code above. Notice that returned value is also a deferred computation that should be run.

Usage and test:

import org.scalatest.{Matchers, FlatSpec}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Seconds, Span}

import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._

class ConcurrencyUtilsSpec extends FlatSpec with Matchers with ScalaFutures with ConcurrencyUtils {

  "runWithBoundedParallelism" should "return results in correct order" in {
    val comp1 = mkDeferredComputation(1)
    val comp2 = mkDeferredComputation(2)
    val comp3 = mkDeferredComputation(3)
    val comp4 = mkDeferredComputation(4)
    val comp5 = mkDeferredComputation(5)

    val compountComp = runWithBoundedParallelism(2)(Seq(comp1, comp2, comp3, comp4, comp5))

    whenReady(compountComp.run()) { result =>
      result should be (Seq(1, 2, 3, 4, 5))
    }
  }

  // increase default ScalaTest patience
  implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds))

  private def mkDeferredComputation[T](result: T, sleepDuration: FiniteDuration = 100.millis): Deferred[T] =
    Deferred {
      Future {
        Thread.sleep(sleepDuration.toMillis)
        result
      }
    }

}
like image 38
Tvaroh Avatar answered Nov 08 '22 05:11

Tvaroh