I need something like PubSub but instead of broadcasting to all subscribers, the message is sent to only 1 subscriber (preferable the subscriber is chosen automatically based on the number of messages in it's receive buffer, lower is better).
What I'm attempting is to send several hundred thousand http requests using a controlled number of distributed workers.
To solve this the first thing I'd try would be to have the workers pull requests to make rather than have them pushed to them.
So I'd have a globally registered Agent
that holds the list of http requests to be performed with an API for adding and retrieving a request. I'd then just start N workers using worker(Task, ...)
using a Supervisor
and one_for_one
rather than adding poolboy at this stage. Each worker would ask the Agent
for a http request to make and do whatever work is necessary and then terminate normally, be restarted by the supervisor and ask for a new url.
Workers pulling http tasks from the list in the agent rather than having them pushed to them will ensure that an available worker is always busy if there's work to do.
If the solution looked good I'd then look into adding poolboy. You'd need to be careful with the supervisor options so a bunch of bad urls causing your workers to crash wouldn't trigger the supervisor to take everything else down.
As stated in my comment, my approach would be to use Poolboy to handle workers, but it is not possible to just request N workers (N being the number of requested URLs) because this will exceed the process limit quickly and cause the checkout requests to time out. Instead, you need a loop that checks whether workers are available and if so, requests the url asynchronously. If no workers are free, it should sleep for a while and then retry.
For this purpose, Poolboy has the :poolboy.checkout/2
function, the second parameter allows us to specify whether it should block or not. If no workers are available it will return :full
, otherwise you will get back a worker pid.
Example:
def crawl_parallel(urls) do
urls
|> Enum.map(&crawl_task/1)
|> Enum.map(&Task.await/1)
end
defp crawl_task(url) do
case :poolboy.checkout Crawler, false do
:full ->
# No free workers, wait a bit and retry
:timer.sleep 100
crawl_task url
worker_pid ->
# We have a worker, asynchronously crawl the url
Task.async fn ->
Crawler.Worker.crawl worker_pid, url
:poolboy.checkin Crawler, worker_pid
end
end
end
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With