I have a single function I want to call 100 times but I want to do it in a batched way so that only 2 functions are being run at any one time. This is due to the fact that the function may place a high load on the internet connection, so its better to batch process the functions in groups of 2's.
This is my attempt to do it with Scala Futures but does not seem to work. Is there any standard way using Scala Futures to batch process a list of tasks?
def futureString(s:String): String = {
Thread.sleep(2000)// + (Math.random()*1000).toInt)
println(s"Completed $s")
"end:" + s
}
def processList(list: List[String], blockSize: Int) = {
var futuresProcessing = Set[Future[String]]()
async {
val itemIterator = list.iterator
while (itemIterator.hasNext) {
val item = itemIterator.next()
println("Item is " + item)
if (futuresProcessing.size >= blockSize) {
await {
val completed = Future.firstCompletedOf(futuresProcessing.toSeq)
println("Size : " + futuresProcessing.size)
completed
}
}
val f = future { futureString(item) }
f.onComplete{ case Success(sss) => { futuresProcessing = futuresProcessing - f } }
futuresProcessing = futuresProcessing + f
}
}
}
val list: List[String] = (1 to 200).map(n => "" + n).toList
processList(list, 2)
What I want is that I can batch process any batch size and the futureString may finish at a random amount of time. So lets say the batch size is 10, then at first 10 items start, when one item finishes, then a new item should be added to the batch to process.
I'm starting to think I should use actors.
Update: After a good long sleep and awake I got it working, but I think it would be better done with Actors. Also I think there's some race condition problem with the following code and the use of the futuresProcessing Set.
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import scala.async.Async.{async, await}
import scala.collection.parallel.mutable
import scala.util.{Success, Try}
import scala.concurrent.Await
def futureString(s:String): Future[String] = {
future {
Thread.sleep(2000 + (Math.random()*1000).toInt)
println(s"Completed $s")
"end:" + s
}
}
def processList(list: List[String], blockSize: Int) = {
val futuresProcessing = mutable.ParSet[Future[String]]()
async {
val itemIterator = list.iterator
while (itemIterator.hasNext) {
val item = itemIterator.next()
println("Item is " + item)
if (futuresProcessing.size >= blockSize) {
await {
val completed = Future.firstCompletedOf(futuresProcessing.toList)
println("Size : " + futuresProcessing.size)
completed
}
}
val f = futureString(item)
futuresProcessing += f
f.onComplete{ case Success(sss) => { futuresProcessing -= f } }
}
}
}
val list: List[String] = (1 to 200).map(n => "" + n).toList
processList(list, 4)
If you only care about processing batches of 2 in parallel, then there might be an easier solution:
val data = (1 to 20).map(_.toString()).grouped(2).toList
and then:
val result = data.flatMap(pair => pair.par.map(futureString))
Which yields:
// pause
Completed 1
Completed 2
// pause
Completed 4
Completed 3
// pause
Completed 6
Completed 5
// pause
Completed 8
Completed 7
// pause
Completed 9
Completed 10
// pause
// ..etc
result: List[String] = List(end:1, end:2, end:3, end:4, end:5, end:6, end:7, end
:8, end:9, end:10, end:11, end:12, end:13, end:14, end:15, end:16, end:17, end:1
8, end:19, end:20)
If you want to have it done asynchronously (because the version above will block), you can wrap the whole result processing in one Future
and await it.
For easy peasy, let the pool define the parallelism.
scala> import concurrent._
import concurrent._
scala> implicit val x = ExecutionContext fromExecutorService new java.util.concurrent.ForkJoinPool(2)
x: scala.concurrent.ExecutionContextExecutorService = scala.concurrent.impl.ExecutionContextImpl$$anon$1@6c9b0123
scala> 1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
Start 1
Start 2
res1: scala.collection.immutable.IndexedSeq[scala.concurrent.Future[String]] = Vector(scala.concurrent.impl.Promise$DefaultPromise@397314a4, scala.concurrent.impl.Promise$DefaultPromise@32503873, scala.concurrent.impl.Promise$DefaultPromise@30aa1fd3, scala.concurrent.impl.Promise$DefaultPromise@710f6e9a, scala.concurrent.impl.Promise$DefaultPromise@2c267a73, scala.concurrent.impl.Promise$DefaultPromise@12312aaa, scala.concurrent.impl.Promise$DefaultPromise@59e8083a, scala.concurrent.impl.Promise$DefaultPromise@107445f3, scala.concurrent.impl.Promise$DefaultPromise@419c5cf5, scala.concurrent.impl.Promise$DefaultPromise@9afa7a, scala.concurrent.impl.Promise$DefaultPromise@3eb25fe5, scala.concurrent.impl.Promise$DefaultPromise@30b5d38b, scala.concurrent.impl.Promise$DefaultPromise@715363a8...
scala> End 1
Start 3
End 2
Start 4
End 3
Start 5
(etc to 20)
where
scala> val ran = new java.util.Random()
ran: java.util.Random = java.util.Random@2ee5d440
scala> def delay() = Thread sleep (1000L + (ran nextInt 10000))
delay: ()Unit
Edit: configuration of the default ExecutionContext.global
pool is documented in the overview. It will use system properties scala.concurrent.context.maxThreads
, minThreads
and numThreads
to constrain available threads.
It's also worth adding that people who want extra control over when async code runs prefer a Task
abstraction available in several libraries.
$ scala -Dscala.concurrent.context.maxThreads=2
Welcome to Scala 2.12.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111).
Type in expressions for evaluation. Or try :help.
scala> import concurrent._, ExecutionContext.Implicits.global
import concurrent._
import ExecutionContext.Implicits.global
scala> 1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
<console>:16: error: not found: value delay
1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
^
scala> val ran = new java.util.Random()
ran: java.util.Random = java.util.Random@782be4eb
scala> def delay() = Thread sleep (1000L + (ran nextInt 10000))
delay: ()Unit
scala> 1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
Start 1
Start 2
res1: scala.collection.immutable.IndexedSeq[scala.concurrent.Future[String]] = Vector(Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>))
scala> End 1
Start 3
End 2
Start 4
End 4
Start 5
End 5
Start 6
End 3
Start 7
End 7
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With