Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Wait for a Scala Future to complete and continue with the next one

I have a list of 500,000 elements and a queue with 20 consumers. The messages are processed at different speeds (1, 15, 30, 60 seconds; 3, 50 min; 3, 16 hours, or more. 24 hours is the timeout). I need the consumer's response in order to do some processing over the data. I am going to use Scala Future for this and event-based onComplete.

In order not to flood the queue, I want to send first 30 messages to the queue: 20 will be picked by the consumers and 10 will be waiting in the queue. When one of the Futures is complete, I want to send another message to the queue. Can you give me an idea how to achieve this? Can this be done with Akka Streams?

This is wrong, I just want to give you an idea of what I want:

private def sendMessage(ids: List[String]): Unit = {
  val id = ids.head

  val futureResult = Future {
    //send id among some message to the queue
  }.map { result =>
    //process the response
  }

  futureResult.onComplete { _ =>
    sendMessage(ids.tail)
  }
}

def migrateAll(): Unit = {
  val ids: List[String] = //get IDs from the DB

  sendMessage(ids)
}
like image 243
frunza Avatar asked Nov 29 '25 16:11

frunza


1 Answers

Below is a simple example with Akka Streams that models your use case.

Let's define the processing as a method that takes a String and returns a Future[String]:

def process(id: String): Future[String] = ???

Then we create a Source from a List of 500,000 String elements and use mapAsync to feed the elements to the processing method. The level of parallelism is set to 20, meaning that no more than 20 Futures will be running at any point in time. As each Future is completed, we perform additional processing and print the result:

Source((1 to 500000).map(_.toString).toList)
  .mapAsync(parallelism = 20)(process)
  // do something with the result of the Future; here we create a new string
  //   that begins with "Processed: "
  .map(s => s"Processed: $s")
  .runForeach(println)

You can read more about mapAsync in the documentation.

like image 185
Jeffrey Chung Avatar answered Dec 02 '25 05:12

Jeffrey Chung