I've got several tasks that I'm running asynchronously. Depending on the input, one or more might run long, but only one of the tasks will return the :success message.
slowtask = Task.async(slow())
fasttask = Task.async(fast())
How can I capture the first of the two tasks above to complete, without having to wait for the other? I've tried Task.find/2
, but because its implemented with enum, it seems to wait for all exit signals before finding a ref/message. My other thought was to poll this in Stream.cycle
, ignoring still alive tasks and catching one that has exited. It seems un elixir like to poll in this way though.
There is no easy way to do this on Elixir yet. Your best option is, if you are only waiting for those messages in a given process, is something like this:
defmodule TaskFinder do
def run do
task1 = Task.async fn -> :timer.sleep(1000); 1 end
task2 = Task.async fn -> :timer.sleep(5000); 2 end
await [task1, task2]
end
# Be careful, this will receive all messages sent
# to this process. It will return the first task
# reply and the list of tasks that came second.
def await(tasks) do
receive do
message ->
case Task.find(tasks, message) do
{reply, task} ->
{reply, List.delete(tasks, task)}
nil ->
await(tasks)
end
end
end
end
IO.inspect TaskFinder.run
Note you could also use this pattern to spawn tasks in a GenServer and use Task.find/2
to find the matching ones. I have also added this example to Elixir docs.
To get the first result, you should wait for a message, then pass the message to Task.find/2
, and handle the first result which is in form of {task_result, task}
.
defmodule Tasks do
def run do
:random.seed(:os.timestamp)
durations = Enum.shuffle(1..10)
Enum.map(durations, fn(duration) -> Task.async(fn -> run_task(duration) end) end)
|> get_first_result
|> IO.inspect
end
defp get_first_result(tasks) do
receive do
msg ->
case Task.find(tasks, msg) do
{result, _task} ->
# got the result
result
nil ->
# no result -> continue waiting
get_first_result(tasks)
end
end
end
defp run_task(1) do
:success
end
defp run_task(duration) do
:timer.sleep(duration * 100)
:ok
end
end
If the "master" process is a GenServer
you should call Task.find/2
from within handle_info/2
, rather than running this recursive loop.
Based on Jose Valim's answer, here's what I used to match the reply coming back:
def run do
task1 = Task.async(fn -> :timer.sleep(10000); :slow end)
task2 = Task.async(fn -> :timer.sleep(2000); :fail end)
task3 = Task.async(fn -> :timer.sleep(1000); :fail end)
await([task1, task2, task3])
end
def await(tasks) do
receive do
message ->
case Task.find(tasks, message) do
{:fail, task} ->
await(List.delete(tasks, task))
{reply, _task} ->
reply
nil ->
await(tasks)
end
end
end
This allowed me to match the first function to return something other than the :fail atom, and give me that reply. Does this work because receive/1 just waits for any message to come along?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With