In the Task.async_stream options, the :timeout
parameter is described:
The maximum amount of time (in milliseconds) each task is allowed to execute for. Defaults to 5000
In my testing I did the following:
iex(8)> Task.async_stream([10, 4, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
[ok: 10, ok: 4, ok: 5]
iex(10)> Task.async_stream([10], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
(elixir) lib/enum.ex:1776: Enum.reverse/2
(elixir) lib/enum.ex:2528: Enum.to_list/1
How come the first example does not timeout (but takes ~10 seconds to execute) while the second example exhibits the expected behaviour of timing out?
The implementation of Task.async_stream
changed from 1.4.5 to 1.5.1.
Let's take a look at what happens.
In this version the timeout is part of a receive
s after
block.
receive do
{{^monitor_ref, position}, value} ->
# ...
{:down, {^monitor_ref, position}, reason} ->
# ...
{:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
# ...
after
timeout ->
# ...
end
This receive
block serves the purpose to wait for update messages from the spawned tasks sent from a monitoring process. For simplicity reasons I truncated the code.
What does that mean in an applied scenario? Task.async_stream
will only timeout if there is a duration of timeout
milliseconds in which it receives no message from a spawned task.
Lets try your example using [10, 3, 4]
:
iex> Task.async_stream([10, 3, 4], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
(elixir) lib/enum.ex:1776: Enum.reverse/2
(elixir) lib/enum.ex:2528: Enum.to_list/1
As we can see this results in a timeout, as expected.
Now what if we try to use [10, 5]
, will this work?
iex> Task.async_stream([10, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
(elixir) lib/enum.ex:1776: Enum.reverse/2
(elixir) lib/enum.ex:2528: Enum.to_list/1
As it seems the initial Task takes too long with it's 5 seconds timeout. But as soon as we add an intermediary step, it works. How about 1
?
iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
[ok: 10, ok: 5, ok: 1]
In Elixir 1.5.1 the timeout logic works differently. It uses Process.send_after
to send a timeout message for each spawned Task to the monitoring process.
# Schedule a timeout message to ourselves, unless the timeout was set to :infinity
timer_ref = case timeout do
:infinity -> nil
timeout -> Process.send_after(self(), {:timeout, {monitor_ref, ref}}, timeout)
end
This message is then handled in the same receive which spawned the Task and sent the :timeout
message.
Link to the full function.
As soon as a single process takes longer than the specified timeout, the whole stream goes to it's knees, as it should be.
iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:237: Task.Supervised.stream_reduce/7
(elixir) lib/enum.ex:1847: Enum.reverse/1
(elixir) lib/enum.ex:2596: Enum.to_list/1
Elixir 1.4.5 tracks the timeout anew after receiving a result from a spawned process. Elixir 1.5.1 tracks it separately for each spawned process.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With