I want to do a parallel map over a big list. The code looks somewhat like this:
big_list
|> Stream.map(&Task.async(Module, :do_something, [&1]))
|> Stream.map(&Task.await(&1))
|> Enum.filter filter_fun
But I was checking Stream implementation and as far as I understand Stream.map
combines the functions and applies combined function to elements in the stream, which would mean that the sequence is like this:
In that case, it doesn't do it in parallel. Am I right or am I missing something?
If I am right, what about this code?
Stream.map Task.async ...
|> Enum.map Task.await ...
Is that going to run in parallel?
Elixir 1.4 provides the new Task.async_stream/5 function that will return a stream that runs a given function concurrently on each item in an enumerable.
There are also options to specify the maximum number of workers and a timeout, using the :max_concurrency
and :timeout
options parameters.
Please note that you don't have to await this Task, because the function returns a stream, so you can either use Enum.to_list/1 or use Stream.run/1.
This will make your example run concurrently:
big_list
|> Task.async_stream(Module, :do_something, [])
|> Enum.filter(filter_fun)
You can try Parallel Stream.
stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end)
stream |> Enum.into([])
[2,4,6,8,10,12,14,16,18,20]
UPD Or better use Flow
The second one also doesn't do what you want. You can see it clearly with this code:
defmodule Test do
def test do
[1,2,3]
|> Stream.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))
end
def job(number) do
:timer.sleep 1000
IO.inspect(number)
end
end
Test.test
You'll see a number, then a 1 second wait, another number, and so on. The key here is that you want to create the tasks as soon as possible, so you shouldn't use the
lazy Stream.map
at all. Instead use the eager Enum.map
at that point:
|> Enum.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))
On the other hand you can use Stream.map
when awaiting, as long as you do some eager operation later, like your filter
. That way the awaits will be interspersed with any processing you might be doing on the results.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With