Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Task.async_stream timeout behaviour

Tags:

elixir

In the Task.async_stream options, the :timeout parameter is described:

The maximum amount of time (in milliseconds) each task is allowed to execute for. Defaults to 5000

In my testing I did the following:

iex(8)> Task.async_stream([10, 4, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
[ok: 10, ok: 4, ok: 5]

iex(10)> Task.async_stream([10], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list      
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
    (elixir) lib/enum.ex:1776: Enum.reverse/2
    (elixir) lib/enum.ex:2528: Enum.to_list/1

How come the first example does not timeout (but takes ~10 seconds to execute) while the second example exhibits the expected behaviour of timing out?

like image 628
Tyler Avatar asked Aug 24 '17 15:08

Tyler


1 Answers

The implementation of Task.async_stream changed from 1.4.5 to 1.5.1.

Let's take a look at what happens.

Elixir 1.4.5

In this version the timeout is part of a receives after block.

receive do
  {{^monitor_ref, position}, value} ->
    # ...

  {:down, {^monitor_ref, position}, reason} ->
    # ...

  {:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
    # ...
after
  timeout ->
    # ...
end

This receive block serves the purpose to wait for update messages from the spawned tasks sent from a monitoring process. For simplicity reasons I truncated the code.

What does that mean in an applied scenario? Task.async_stream will only timeout if there is a duration of timeout milliseconds in which it receives no message from a spawned task.

Example

Lets try your example using [10, 3, 4]:

iex> Task.async_stream([10, 3, 4], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
    (elixir) lib/enum.ex:1776: Enum.reverse/2
    (elixir) lib/enum.ex:2528: Enum.to_list/1

As we can see this results in a timeout, as expected.

Now what if we try to use [10, 5], will this work?

iex> Task.async_stream([10, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
    (elixir) lib/enum.ex:1776: Enum.reverse/2
    (elixir) lib/enum.ex:2528: Enum.to_list/1

As it seems the initial Task takes too long with it's 5 seconds timeout. But as soon as we add an intermediary step, it works. How about 1?

iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
[ok: 10, ok: 5, ok: 1]

Elixir 1.5.1

In Elixir 1.5.1 the timeout logic works differently. It uses Process.send_after to send a timeout message for each spawned Task to the monitoring process.

# Schedule a timeout message to ourselves, unless the timeout was set to :infinity
timer_ref = case timeout do
  :infinity -> nil
  timeout -> Process.send_after(self(), {:timeout, {monitor_ref, ref}}, timeout)
end

This message is then handled in the same receive which spawned the Task and sent the :timeout message.

Link to the full function.

Examples

As soon as a single process takes longer than the specified timeout, the whole stream goes to it's knees, as it should be.

iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:237: Task.Supervised.stream_reduce/7
    (elixir) lib/enum.ex:1847: Enum.reverse/1
    (elixir) lib/enum.ex:2596: Enum.to_list/1

TL;DR

Elixir 1.4.5 tracks the timeout anew after receiving a result from a spawned process. Elixir 1.5.1 tracks it separately for each spawned process.

like image 122
Sascha Wolf Avatar answered Sep 18 '22 10:09

Sascha Wolf