I'm trying to write a multithreaded web crawler.
My main entry class has the following code:
ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
URL url = frontier.get();
if(url == null)
return;
exec.execute(new URLCrawler(this, url));
}
The URLCrawler fetches the specified URL, parses the HTML extracts links from it, and schedules unseen links back to frontier.
A frontier is a queue of uncrawled URLs. The problem is how to write the get() method. If the queue is empty, it should wait until any URLCrawlers finish and then try again. It should return null only when the queue is empty and there is no currently active URLCrawler.
My first idea was to use an AtomicInteger for counting current number of working URLCrawlers and an auxiliary object for notifyAll()/wait() calls. Each crawler on start increments the number of current working URLCrawlers, and on exit decrements it, and notify the object that it has completed.
But I read that notify()/notifyAll() and wait() are somewhat deprecated methods to do thread communication.
What should I use in this work pattern? It is similar to M producers and N consumers, the question is how to deal with exaustion of producers.
I am not sure I understand your design, but this may be a job for a Semaphore
One option is to make "frontier" a blocking queue, So any thread trying to "get" from it will block. As soon as any other URLCrawler puts objects into that queue, any other threads will be automatically notified (with the object dequeued)
I think use of wait/notify is justified in this case. Can't think of any straight forward way to do this using j.u.c.
In a class, let's call Coordinator:
private final int numOfCrawlers;
private int waiting;
public boolean shouldTryAgain(){
synchronized(this){
waiting++;
if(waiting>=numOfCrawlers){
//Everybody is waiting, terminate
return false;
}else{
wait();//spurious wake up is okay
//waked up for whatever reason. Try again
waiting--;
return true;
}
}
public void hasEnqueued(){
synchronized(this){
notifyAll();
}
}
then,
ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
URL url = frontier.get();
if(url == null){
if(!coordinator.shouldTryAgain()){
//all threads are waiting. No possibility of new jobs.
return;
}else{
//Possible that there are other jobs. Try again
continue;
}
}
exec.execute(new URLCrawler(this, url));
}//while(true)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With