Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java ThreadPool usage

I'm trying to write a multithreaded web crawler.

My main entry class has the following code:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

The URLCrawler fetches the specified URL, parses the HTML extracts links from it, and schedules unseen links back to frontier.

A frontier is a queue of uncrawled URLs. The problem is how to write the get() method. If the queue is empty, it should wait until any URLCrawlers finish and then try again. It should return null only when the queue is empty and there is no currently active URLCrawler.

My first idea was to use an AtomicInteger for counting current number of working URLCrawlers and an auxiliary object for notifyAll()/wait() calls. Each crawler on start increments the number of current working URLCrawlers, and on exit decrements it, and notify the object that it has completed.

But I read that notify()/notifyAll() and wait() are somewhat deprecated methods to do thread communication.

What should I use in this work pattern? It is similar to M producers and N consumers, the question is how to deal with exaustion of producers.

like image 537
Anton Kazennikov Avatar asked Aug 04 '10 05:08

Anton Kazennikov


3 Answers

I am not sure I understand your design, but this may be a job for a Semaphore

like image 112
finnw Avatar answered Sep 20 '22 15:09

finnw


One option is to make "frontier" a blocking queue, So any thread trying to "get" from it will block. As soon as any other URLCrawler puts objects into that queue, any other threads will be automatically notified (with the object dequeued)

like image 42
naikus Avatar answered Sep 19 '22 15:09

naikus


I think use of wait/notify is justified in this case. Can't think of any straight forward way to do this using j.u.c.
In a class, let's call Coordinator:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

then,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)
like image 32
Enno Shioji Avatar answered Sep 22 '22 15:09

Enno Shioji