I would like to create an Elixir code similar to this:
def infinite_loop(created_workers \\ []) do
  case next_from_queue do
    {:ok, queue_msg} ->
      new_worker = Task.async(fn -> crawling(queue_msg) end)
      infinite_loop([new_worker | created_workers])
    {:error, :empty} ->
      created_workers.map(&Task.await/1)
  end
end
Assuming that:
crawling function will create another 3 Task
crawling worker could spend 3 seconds runningqueue could have millions of messagesHow can I know what is the limit of parallel process on Elixir? How can I manage it to don't break?
I'd recommend using Task.async_stream for this. Task.async_stream allows you to process a stream in parallel while limiting the number of tasks that are run in parallel. While the default limit on the number of processes is 262144 in Erlang 20, if you're crawling a site, you probably want a much lower limit.
You can create a stream from a function that keeps returning new items using Stream.iterate:
stream =
  Stream.iterate(next_from_queue(), fn _ -> next_from_queue() end)
  |> Stream.take_while(fn {:ok, _} -> true; {:error, :empty} -> false end)
Since you want to stop at {:error, :empty}, we use Stream.take_while to stop the stream.
Then use Task.async_stream like this:
stream
|> Task.async_stream(fn {:ok, queue_msg} ->
  crawling(queue_msg)
end, max_concurrency: 16)
This will run the stream with a maximum of 16 tasks in parallel. The end result will be a list of all return values of crawling(queue_msg).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With