Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Task.async in Elixir Stream

I want to do a parallel map over a big list. The code looks somewhat like this:

big_list
|> Stream.map(&Task.async(Module, :do_something, [&1]))
|> Stream.map(&Task.await(&1))
|> Enum.filter filter_fun

But I was checking Stream implementation and as far as I understand Stream.map combines the functions and applies combined function to elements in the stream, which would mean that the sequence is like this:

  1. Take first element
  2. Create async task
  3. Wait for it to finish
  4. Take second elelemnt...

In that case, it doesn't do it in parallel. Am I right or am I missing something?

If I am right, what about this code?

Stream.map Task.async ...
|> Enum.map Task.await ...

Is that going to run in parallel?

like image 459
tkowal Avatar asked Sep 15 '15 14:09

tkowal


3 Answers

Elixir 1.4 provides the new Task.async_stream/5 function that will return a stream that runs a given function concurrently on each item in an enumerable.

There are also options to specify the maximum number of workers and a timeout, using the :max_concurrency and :timeout options parameters.

Please note that you don't have to await this Task, because the function returns a stream, so you can either use Enum.to_list/1 or use Stream.run/1.


This will make your example run concurrently:

big_list
|> Task.async_stream(Module, :do_something, [])
|> Enum.filter(filter_fun)
like image 106
Luca Fülbier Avatar answered Nov 13 '22 01:11

Luca Fülbier


You can try Parallel Stream.

stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end)
stream |> Enum.into([])
[2,4,6,8,10,12,14,16,18,20]

UPD Or better use Flow

like image 40
apelsinka223 Avatar answered Nov 13 '22 00:11

apelsinka223


The second one also doesn't do what you want. You can see it clearly with this code:

defmodule Test do
  def test do
    [1,2,3]
    |> Stream.map(&Task.async(Test, :job, [&1]))
    |> Enum.map(&Task.await(&1))
  end

  def job(number) do
    :timer.sleep 1000
    IO.inspect(number)
  end
end

Test.test

You'll see a number, then a 1 second wait, another number, and so on. The key here is that you want to create the tasks as soon as possible, so you shouldn't use the lazy Stream.map at all. Instead use the eager Enum.map at that point:

|> Enum.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))

On the other hand you can use Stream.map when awaiting, as long as you do some eager operation later, like your filter. That way the awaits will be interspersed with any processing you might be doing on the results.

like image 13
Paweł Obrok Avatar answered Nov 13 '22 02:11

Paweł Obrok