I would like to create an Elixir code similar to this:
def infinite_loop(created_workers \\ []) do
case next_from_queue do
{:ok, queue_msg} ->
new_worker = Task.async(fn -> crawling(queue_msg) end)
infinite_loop([new_worker | created_workers])
{:error, :empty} ->
created_workers.map(&Task.await/1)
end
end
Assuming that:
crawling
function will create another 3 Task
crawling
worker could spend 3 seconds runningqueue
could have millions of messagesHow can I know what is the limit of parallel process on Elixir? How can I manage it to don't break?
I'd recommend using Task.async_stream
for this. Task.async_stream
allows you to process a stream in parallel while limiting the number of tasks that are run in parallel. While the default limit on the number of processes is 262144 in Erlang 20, if you're crawling a site, you probably want a much lower limit.
You can create a stream from a function that keeps returning new items using Stream.iterate
:
stream =
Stream.iterate(next_from_queue(), fn _ -> next_from_queue() end)
|> Stream.take_while(fn {:ok, _} -> true; {:error, :empty} -> false end)
Since you want to stop at {:error, :empty}
, we use Stream.take_while
to stop the stream.
Then use Task.async_stream
like this:
stream
|> Task.async_stream(fn {:ok, queue_msg} ->
crawling(queue_msg)
end, max_concurrency: 16)
This will run the stream with a maximum of 16 tasks in parallel. The end result will be a list of all return values of crawling(queue_msg)
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With