Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Elixir process limit?

I would like to create an Elixir code similar to this:

def infinite_loop(created_workers \\ []) do
  case next_from_queue do
    {:ok, queue_msg} ->
      new_worker = Task.async(fn -> crawling(queue_msg) end)
      infinite_loop([new_worker | created_workers])
    {:error, :empty} ->
      created_workers.map(&Task.await/1)
  end
end

Assuming that:

  1. The crawling function will create another 3 Task
  2. Every crawling worker could spend 3 seconds running
  3. The queue could have millions of messages

How can I know what is the limit of parallel process on Elixir? How can I manage it to don't break?

like image 477
Augusto Pedraza Avatar asked Jan 30 '23 01:01

Augusto Pedraza


1 Answers

I'd recommend using Task.async_stream for this. Task.async_stream allows you to process a stream in parallel while limiting the number of tasks that are run in parallel. While the default limit on the number of processes is 262144 in Erlang 20, if you're crawling a site, you probably want a much lower limit.

You can create a stream from a function that keeps returning new items using Stream.iterate:

stream =
  Stream.iterate(next_from_queue(), fn _ -> next_from_queue() end)
  |> Stream.take_while(fn {:ok, _} -> true; {:error, :empty} -> false end)

Since you want to stop at {:error, :empty}, we use Stream.take_while to stop the stream.

Then use Task.async_stream like this:

stream
|> Task.async_stream(fn {:ok, queue_msg} ->
  crawling(queue_msg)
end, max_concurrency: 16)

This will run the stream with a maximum of 16 tasks in parallel. The end result will be a list of all return values of crawling(queue_msg).

like image 81
Dogbert Avatar answered Mar 21 '23 11:03

Dogbert