Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Synchronisation object to ensure all tasks are completed

Which Java synchronisation object should I use to ensure an arbitrarily large number of tasks are completed? The constraints are that:

  1. Each task takes a non-trivial amount of time to complete and it is appropriate to perform tasks in parallel.
  2. There are too many tasks to fit into memory (i.e. I cannot put a Future for every task into a Collection and then call get on all the futures).
  3. I do not know how many tasks there will be (i.e. I cannot use a CountDownLatch).
  4. The ExecutorService may be shared so I cannot use awaitTermination( long, TimeUnit )

For example, with Grand Central Dispatch, I might do something like this:

let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
    dispatch_group_enter( latch )
    dispatch_async( workQueue )
    {
        self.processItem( item ) // method takes a non-trivial amount of time to run
        dispatch_async( countUpdateQueue )
        {
            itemsProcessed++
        }
        dispatch_group_leave( latch )
    }
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )

It produces output that looks like this (for 128 items): Processed 128 items in 1.846794962883 seconds.

I tried something similar with a Phaser:

final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
    latch.register();
    final Runnable task = new Runnable() {
        public void run() {
            processItem( item ); // method takes a non-trivial amount of time to run
            itemsProcessed.incrementAndGet();
            latch.arrive();
        }
    };
    executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );

The tasks do not always complete before the last print statement and I might get output that looks like this (for 128 items): Processed 121 items in 5.296 seconds. Is the Phaser even the right object to use? The documentation indicates it only supports 65,535 parties so I would need to either batch the items to be processed or introduce some sort of Phaser tiering.

like image 643
Carlos Macasaet Avatar asked Dec 14 '15 08:12

Carlos Macasaet


2 Answers

The problem with the Phaser usage in this example is that the CallerRunsPolicy allows a task to execute on the initiating thread. Thus, while the loop is still in progress, the number of arrived parties can equal the number of registered parties, causing the phase to increment. The solution is to initialise the Phaser with 1 party then, when the loop is finished, arrive and wait for the other parties to arrive. This ensures the phase does not increment to 1 until all the tasks are complete.

final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 1 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
    latch.register();
    final Runnable task = new Runnable() {
        public void run() {
            processItem( item ); // method takes a non-trivial amount of time to run
            itemsProcessed.incrementAndGet();
            final int arrivalPhase = latch.arrive();
        }
    };
    executor.execute( task );
}
latch.arriveAndAwaitAdvance();
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );
like image 128
Carlos Macasaet Avatar answered Nov 14 '22 23:11

Carlos Macasaet


"to ensure an arbitrarily large number of tasks are completed" - the simplest way is to maintain a counter of completed tasks, with blocking operation to wait that given number of task is reached. There is no such ready class, but it is easy to make one:

class EventCounter {
   long counter=0;

   synchronized void up () {
     counter++;
     notifyAll();
   }
   synchronized void ensure (long count) {
     while (counter<count) wait();
   }
 }

"There are too many tasks to fit into memory" - so the process of submitting new tasks must be suspended when the number of running tasks is too high. The simplest way is to consider the number of running tasks as a resource and count it with a semaphore:

Semaphore runningTasksSema=new Semaphore(maxNumberOfRunningTasks);
EventCounter  eventCounter =new EventCounter ();

for( final String item : fetchItems() ) {
    final Runnable task = new Runnable() {
       public void run() {
            processItem( item ); 
            runningTasksSema.release();
            eventCounter.up();
       }
    };
   runningTasksSema.aquire();
   executor.execute(task);
}

When a thread wants to ensure some given number of tasks are completed, it invokes:

eventCounter.ensure(givenNumberOfFinishedTasks);

Asynchronous (nonblocking) versions of runningTasksSema.aquire() and eventCounter.ensure() operations can be designed, but they would be more complex.

like image 40
Alexei Kaigorodov Avatar answered Nov 14 '22 23:11

Alexei Kaigorodov