Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to return first async task to complete

Tags:

elixir

I've got several tasks that I'm running asynchronously. Depending on the input, one or more might run long, but only one of the tasks will return the :success message.

slowtask = Task.async(slow())
fasttask = Task.async(fast())

How can I capture the first of the two tasks above to complete, without having to wait for the other? I've tried Task.find/2, but because its implemented with enum, it seems to wait for all exit signals before finding a ref/message. My other thought was to poll this in Stream.cycle, ignoring still alive tasks and catching one that has exited. It seems un elixir like to poll in this way though.

like image 887
Dania_es Avatar asked Jul 23 '15 03:07

Dania_es


3 Answers

There is no easy way to do this on Elixir yet. Your best option is, if you are only waiting for those messages in a given process, is something like this:

  defmodule TaskFinder do
    def run do
      task1 = Task.async fn -> :timer.sleep(1000); 1 end
      task2 = Task.async fn -> :timer.sleep(5000); 2 end
      await [task1, task2]
    end

    # Be careful, this will receive all messages sent
    # to this process. It will return the first task
    # reply and the list of tasks that came second.
    def await(tasks) do
      receive do
        message ->
          case Task.find(tasks, message) do
            {reply, task} ->
              {reply, List.delete(tasks, task)}
            nil ->
              await(tasks)
          end
      end
    end
  end

  IO.inspect TaskFinder.run

Note you could also use this pattern to spawn tasks in a GenServer and use Task.find/2 to find the matching ones. I have also added this example to Elixir docs.

like image 84
José Valim Avatar answered Oct 02 '22 03:10

José Valim


To get the first result, you should wait for a message, then pass the message to Task.find/2, and handle the first result which is in form of {task_result, task}.

defmodule Tasks do
  def run do
    :random.seed(:os.timestamp)
    durations = Enum.shuffle(1..10)

    Enum.map(durations, fn(duration) -> Task.async(fn -> run_task(duration) end) end)
    |> get_first_result
    |> IO.inspect
  end

  defp get_first_result(tasks) do
    receive do
      msg ->
        case Task.find(tasks, msg) do
          {result, _task} ->
            # got the result
            result

          nil -> 
            # no result -> continue waiting
            get_first_result(tasks)
        end
    end
  end


  defp run_task(1) do
    :success
  end

  defp run_task(duration) do
    :timer.sleep(duration * 100)
    :ok
  end
end

If the "master" process is a GenServer you should call Task.find/2 from within handle_info/2, rather than running this recursive loop.

like image 23
sasajuric Avatar answered Oct 02 '22 03:10

sasajuric


Based on Jose Valim's answer, here's what I used to match the reply coming back:

def run do
    task1 = Task.async(fn -> :timer.sleep(10000); :slow end)
    task2 = Task.async(fn -> :timer.sleep(2000); :fail end)
    task3 = Task.async(fn -> :timer.sleep(1000); :fail end)

    await([task1, task2, task3])
end

def await(tasks) do
  receive do
    message ->
      case Task.find(tasks, message) do
        {:fail, task} ->
          await(List.delete(tasks, task))
        {reply, _task} ->
          reply             
        nil ->
          await(tasks)
      end
  end
end

This allowed me to match the first function to return something other than the :fail atom, and give me that reply. Does this work because receive/1 just waits for any message to come along?

like image 28
Dania_es Avatar answered Oct 02 '22 01:10

Dania_es