I want to perform multiple tasks concurrently. In Javascript, I would do:
async function cook_an_egg() {}
async function take_shower() {}
async function call_mum() {}
await Promise.all([cook_an_egg(), take_shower(), call_mum()])
How do I achieve Promise.all
in Elixir Task module?
From documentation, seems you can only await
1 task; define 1 function inside each task
; and apply only the same function to multiple items with async_stream
.
Task.await_many
is designed to do exactly this. It handles the overall timeout correctly and should do the least surprising thing in the face of exits, timeouts, etc.
tasks = [
Task.async(fn -> cook_an_egg(:medium) end),
Task.async(fn -> take_shower(10) end),
Task.async(fn -> call_mum() end),
]
Task.await_many(tasks)
A more bulletproof solution than Task.await
is Task.yield_many
. Unfortunately, it's a little more verbose because it leaves us in charge of handling timed-out and dead tasks ourselves. If we want to mimic the behavior of async
/await
and exit when something goes wrong, it will look like this:
tasks = [
Task.async(fn -> cook_an_egg(:medium) end),
Task.async(fn -> take_shower(10) end),
Task.async(fn -> call_mum() end),
]
Task.yield_many(tasks)
|> Enum.map(fn {task, result} ->
case result do
nil ->
Task.shutdown(task, :brutal_kill)
exit(:timeout)
{:exit, reason} ->
exit(reason)
{:ok, result} ->
result
end
end)
await
?Using Task.await
will work in simple situations, but if you care about the timeout you can get yourself into trouble. Mapping across the list happens sequentially, which means each Task.await
will block for up to the specified timeout before giving a result, at which point we move to the next item in the list and block again for up to the full timeout.
We can demonstrate this behavior by creating a list of tasks that sleep for 1-8 seconds. With the default timeout of 5 seconds, some of these tasks would be killed when called directly with await
, but when we enumerate across the list, that's not what happens:
for ms <- [2_000, 4_000, 6_000] do
Task.async(fn -> Process.sleep(ms); ms end)
end
|> Enum.map(&Task.await/1)
# Blocks for 6 seconds
# => [2000, 4000, 6000]
# Each `await` picks up after the previous one finishes with a fresh 5s timeout.
# Since each one blocks for 2s before finishing, no timeout is triggered
# but the total run time runs over.
# async(2s)--await(2s)-->(2s)
# async(4s) --await(2s)-->(4s)
# async(6s) --await(2s)-->(6s)
If we modify this to use Task.yield_many
, we can get the desired behavior:
for ms <- [2_000, 4_000, 6_000] do
Task.async(fn -> Process.sleep(ms); ms end)
end
|> Task.yield_many(5000)
|> Enum.map(fn {t, res} -> res || Task.shutdown(t, :brutal_kill) end)
# Blocks for 5 seconds
# => [{:ok, 2000}, {:ok, 4000}, nil]
You can map await
function to a list of task refs.
Something like
tasks = Enum.reduce(0..9, [], fn _, acc ->
[Task.async(&any_job/0) | acc]
end)
Enum.map(tasks, &Task.await/1)
Since this question was asked, Elixir's Task module has sprouted new powers.
There is both Task.await_many/2
and Task.yield_many/2
, which do what they sound like.
To answer the example in the original question:
cook_an_egg = Task.async(fn -> end)
take_shower = Task.async(fn -> end)
call_mum = Task.async(fn -> end)
Task.await_many([cook_an_egg, take_shower, call_mum])
There is no analog to Promise.any
, but you could write one pretty easily using Task.yield_many/2
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With