Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to batch process functions with Scala Future's?

Tags:

scala

future

I have a single function I want to call 100 times but I want to do it in a batched way so that only 2 functions are being run at any one time. This is due to the fact that the function may place a high load on the internet connection, so its better to batch process the functions in groups of 2's.

This is my attempt to do it with Scala Futures but does not seem to work. Is there any standard way using Scala Futures to batch process a list of tasks?

  def futureString(s:String): String = {
    Thread.sleep(2000)// + (Math.random()*1000).toInt)
    println(s"Completed $s")
    "end:" + s
  }

 def processList(list: List[String], blockSize: Int) = {
    var futuresProcessing = Set[Future[String]]()
    async {
      val itemIterator = list.iterator
      while (itemIterator.hasNext) {
        val item = itemIterator.next()
        println("Item is " + item)

        if (futuresProcessing.size >= blockSize) {
          await {
            val completed = Future.firstCompletedOf(futuresProcessing.toSeq)
            println("Size : " + futuresProcessing.size)
            completed
          }
        }

        val f = future { futureString(item) }
        f.onComplete{ case Success(sss) => { futuresProcessing = futuresProcessing - f } }
        futuresProcessing = futuresProcessing + f
      }
    }
  }

  val list: List[String] = (1 to 200).map(n => "" + n).toList
  processList(list, 2)

What I want is that I can batch process any batch size and the futureString may finish at a random amount of time. So lets say the batch size is 10, then at first 10 items start, when one item finishes, then a new item should be added to the batch to process.

I'm starting to think I should use actors.

Update: After a good long sleep and awake I got it working, but I think it would be better done with Actors. Also I think there's some race condition problem with the following code and the use of the futuresProcessing Set.

  import scala.concurrent._
  import scala.concurrent.duration._
  import ExecutionContext.Implicits.global
  import scala.async.Async.{async, await}
  import scala.collection.parallel.mutable
  import scala.util.{Success, Try}
  import scala.concurrent.Await

  def futureString(s:String): Future[String] = {
    future {
    Thread.sleep(2000 + (Math.random()*1000).toInt)
    println(s"Completed $s")
    "end:" + s
    }
  }

  def processList(list: List[String], blockSize: Int) = {
    val futuresProcessing = mutable.ParSet[Future[String]]()
    async {
      val itemIterator = list.iterator
      while (itemIterator.hasNext) {
        val item = itemIterator.next()
        println("Item is " + item)

        if (futuresProcessing.size >= blockSize) {
          await {
            val completed = Future.firstCompletedOf(futuresProcessing.toList)
            println("Size : " + futuresProcessing.size)
            completed
          }
        }

        val f = futureString(item)
        futuresProcessing += f
        f.onComplete{ case Success(sss) => { futuresProcessing -= f } }
      }
    }
  }

val list: List[String] = (1 to 200).map(n => "" + n).toList
processList(list, 4)
like image 991
Phil Avatar asked Dec 16 '22 03:12

Phil


2 Answers

If you only care about processing batches of 2 in parallel, then there might be an easier solution:

val data = (1 to 20).map(_.toString()).grouped(2).toList

and then:

val result = data.flatMap(pair => pair.par.map(futureString))

Which yields:

// pause
Completed 1
Completed 2
// pause
Completed 4
Completed 3
// pause
Completed 6
Completed 5
// pause
Completed 8
Completed 7
// pause
Completed 9
Completed 10
// pause
// ..etc

result: List[String] = List(end:1, end:2, end:3, end:4, end:5, end:6, end:7, end
:8, end:9, end:10, end:11, end:12, end:13, end:14, end:15, end:16, end:17, end:1
8, end:19, end:20)

If you want to have it done asynchronously (because the version above will block), you can wrap the whole result processing in one Future and await it.

like image 192
Patryk Ćwiek Avatar answered Dec 29 '22 23:12

Patryk Ćwiek


For easy peasy, let the pool define the parallelism.

scala> import concurrent._
import concurrent._

scala> implicit val x = ExecutionContext fromExecutorService new java.util.concurrent.ForkJoinPool(2)
x: scala.concurrent.ExecutionContextExecutorService = scala.concurrent.impl.ExecutionContextImpl$$anon$1@6c9b0123

scala> 1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
Start 1
Start 2
res1: scala.collection.immutable.IndexedSeq[scala.concurrent.Future[String]] = Vector(scala.concurrent.impl.Promise$DefaultPromise@397314a4, scala.concurrent.impl.Promise$DefaultPromise@32503873, scala.concurrent.impl.Promise$DefaultPromise@30aa1fd3, scala.concurrent.impl.Promise$DefaultPromise@710f6e9a, scala.concurrent.impl.Promise$DefaultPromise@2c267a73, scala.concurrent.impl.Promise$DefaultPromise@12312aaa, scala.concurrent.impl.Promise$DefaultPromise@59e8083a, scala.concurrent.impl.Promise$DefaultPromise@107445f3, scala.concurrent.impl.Promise$DefaultPromise@419c5cf5, scala.concurrent.impl.Promise$DefaultPromise@9afa7a, scala.concurrent.impl.Promise$DefaultPromise@3eb25fe5, scala.concurrent.impl.Promise$DefaultPromise@30b5d38b, scala.concurrent.impl.Promise$DefaultPromise@715363a8...
scala> End 1
Start 3
End 2
Start 4
End 3
Start 5

(etc to 20)

where

scala> val ran = new java.util.Random()
ran: java.util.Random = java.util.Random@2ee5d440

scala> def delay() = Thread sleep (1000L + (ran nextInt 10000))
delay: ()Unit

Edit: configuration of the default ExecutionContext.global pool is documented in the overview. It will use system properties scala.concurrent.context.maxThreads, minThreads and numThreads to constrain available threads.

It's also worth adding that people who want extra control over when async code runs prefer a Task abstraction available in several libraries.

$ scala -Dscala.concurrent.context.maxThreads=2
Welcome to Scala 2.12.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111).
Type in expressions for evaluation. Or try :help.

scala> import concurrent._, ExecutionContext.Implicits.global
import concurrent._
import ExecutionContext.Implicits.global

scala> 1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
<console>:16: error: not found: value delay
       1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
                                                        ^

scala>  val ran = new java.util.Random()
ran: java.util.Random = java.util.Random@782be4eb

scala> def delay() = Thread sleep (1000L + (ran nextInt 10000))
delay: ()Unit

scala> 1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
Start 1
Start 2
res1: scala.collection.immutable.IndexedSeq[scala.concurrent.Future[String]] = Vector(Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>))

scala> End 1
Start 3
End 2
Start 4
End 4
Start 5
End 5
Start 6
End 3
Start 7
End 7
like image 45
som-snytt Avatar answered Dec 29 '22 22:12

som-snytt