Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Queue of worker processes in Phoenix application

I need something like PubSub but instead of broadcasting to all subscribers, the message is sent to only 1 subscriber (preferable the subscriber is chosen automatically based on the number of messages in it's receive buffer, lower is better).

What I'm attempting is to send several hundred thousand http requests using a controlled number of distributed workers.

like image 250
Krut Avatar asked Jan 05 '15 19:01

Krut


2 Answers

To solve this the first thing I'd try would be to have the workers pull requests to make rather than have them pushed to them.

So I'd have a globally registered Agent that holds the list of http requests to be performed with an API for adding and retrieving a request. I'd then just start N workers using worker(Task, ...) using a Supervisor and one_for_one rather than adding poolboy at this stage. Each worker would ask the Agent for a http request to make and do whatever work is necessary and then terminate normally, be restarted by the supervisor and ask for a new url.

Workers pulling http tasks from the list in the agent rather than having them pushed to them will ensure that an available worker is always busy if there's work to do.

If the solution looked good I'd then look into adding poolboy. You'd need to be careful with the supervisor options so a bunch of bad urls causing your workers to crash wouldn't trigger the supervisor to take everything else down.

like image 196
chrismcg Avatar answered Nov 15 '22 08:11

chrismcg


As stated in my comment, my approach would be to use Poolboy to handle workers, but it is not possible to just request N workers (N being the number of requested URLs) because this will exceed the process limit quickly and cause the checkout requests to time out. Instead, you need a loop that checks whether workers are available and if so, requests the url asynchronously. If no workers are free, it should sleep for a while and then retry.

For this purpose, Poolboy has the :poolboy.checkout/2 function, the second parameter allows us to specify whether it should block or not. If no workers are available it will return :full, otherwise you will get back a worker pid.

Example:

def crawl_parallel(urls) do
  urls
  |> Enum.map(&crawl_task/1)
  |> Enum.map(&Task.await/1)
end

defp crawl_task(url) do
  case :poolboy.checkout Crawler, false do
    :full ->
      # No free workers, wait a bit and retry
      :timer.sleep 100
      crawl_task url
    worker_pid -> 
      # We have a worker, asynchronously crawl the url
      Task.async fn ->
        Crawler.Worker.crawl worker_pid, url
        :poolboy.checkin Crawler, worker_pid
      end
  end
end
like image 43
Patrick Oscity Avatar answered Nov 15 '22 08:11

Patrick Oscity