Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to await multiple tasks in Elixir?

I want to perform multiple tasks concurrently. In Javascript, I would do:

async function cook_an_egg() {}

async function take_shower() {}

async function call_mum() {}

await Promise.all([cook_an_egg(), take_shower(), call_mum()])

How do I achieve Promise.all in Elixir Task module? From documentation, seems you can only await 1 task; define 1 function inside each task; and apply only the same function to multiple items with async_stream .

like image 427
Ivan Wang Avatar asked Feb 19 '17 17:02

Ivan Wang


3 Answers

For Elixir v1.11.0 and up

Task.await_many is designed to do exactly this. It handles the overall timeout correctly and should do the least surprising thing in the face of exits, timeouts, etc.

tasks = [
  Task.async(fn -> cook_an_egg(:medium) end),
  Task.async(fn -> take_shower(10) end),
  Task.async(fn -> call_mum() end),
]

Task.await_many(tasks)

For older versions

A more bulletproof solution than Task.await is Task.yield_many. Unfortunately, it's a little more verbose because it leaves us in charge of handling timed-out and dead tasks ourselves. If we want to mimic the behavior of async/await and exit when something goes wrong, it will look like this:

tasks = [
  Task.async(fn -> cook_an_egg(:medium) end),
  Task.async(fn -> take_shower(10) end),
  Task.async(fn -> call_mum() end),
]

Task.yield_many(tasks)
|> Enum.map(fn {task, result} ->
  case result do
    nil ->
      Task.shutdown(task, :brutal_kill)
      exit(:timeout)
    {:exit, reason} ->
      exit(reason)
    {:ok, result} ->
      result
  end
end)

Why not use await?

Using Task.await will work in simple situations, but if you care about the timeout you can get yourself into trouble. Mapping across the list happens sequentially, which means each Task.await will block for up to the specified timeout before giving a result, at which point we move to the next item in the list and block again for up to the full timeout.

We can demonstrate this behavior by creating a list of tasks that sleep for 1-8 seconds. With the default timeout of 5 seconds, some of these tasks would be killed when called directly with await, but when we enumerate across the list, that's not what happens:

for ms <- [2_000, 4_000, 6_000] do
  Task.async(fn -> Process.sleep(ms); ms end)
end
|> Enum.map(&Task.await/1)

# Blocks for 6 seconds
# => [2000, 4000, 6000]

# Each `await` picks up after the previous one finishes with a fresh 5s timeout.
# Since each one blocks for 2s before finishing, no timeout is triggered
# but the total run time runs over.
 
# async(2s)--await(2s)-->(2s)
# async(4s)                  --await(2s)-->(4s)
# async(6s)                                    --await(2s)-->(6s)

If we modify this to use Task.yield_many, we can get the desired behavior:

for ms <- [2_000, 4_000, 6_000] do
  Task.async(fn -> Process.sleep(ms); ms end)
end
|> Task.yield_many(5000)
|> Enum.map(fn {t, res} -> res || Task.shutdown(t, :brutal_kill) end)

# Blocks for 5 seconds
# => [{:ok, 2000}, {:ok, 4000}, nil]
like image 63
Ian Greenleaf Young Avatar answered Oct 17 '22 17:10

Ian Greenleaf Young


You can map await function to a list of task refs. Something like

tasks = Enum.reduce(0..9, [], fn _, acc -> 
  [Task.async(&any_job/0) | acc]
end)

Enum.map(tasks, &Task.await/1)
like image 27
sysashi Avatar answered Oct 17 '22 17:10

sysashi


Since this question was asked, Elixir's Task module has sprouted new powers.

There is both Task.await_many/2 and Task.yield_many/2, which do what they sound like.

To answer the example in the original question:

cook_an_egg = Task.async(fn -> end)
take_shower = Task.async(fn -> end)
call_mum = Task.async(fn -> end)
Task.await_many([cook_an_egg, take_shower, call_mum])

There is no analog to Promise.any, but you could write one pretty easily using Task.yield_many/2

like image 1
paradox460 Avatar answered Oct 17 '22 17:10

paradox460