Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Non-blocking way to offer + drainTo for a BlockingQueue (or something else)

I am looking for something seemingly simple, a collection with a non-blocking version of "add" and "drain". Something like this:

List itemsToProcess = queue.addOrDrainAndAdd( item );
if ( itemsToProcess != null )
    process( items );

It seems to me that if I do these as separate "offer" and "drainTo" calls that I could have offer called twice before I get to the first call to drain to. I would also need a loop on something like "while ( !queue.offer( item ) )" so that after it is drained the offer would work, which I think would require me to also check if drain returned an empty collection (because two might call drain). My naive implementation was like this but it doesn't seem optimal:

void addBatchItem( T item ) {
   while ( !batch.offer( item ) ) {
       List<T> batched = new ArrayList<>( batchSize );
       batch.drainTo( batched );
       process( batched );
   }
}

Then I thought maybe there is a better way and I just don't know it. Thanks!

EDIT:

Okay, so here's a solution (that is blocking based on ArrayBlockingQueue):

public void add( T batchItem ) {
    while ( !batch.offer( batchItem ) ) {
        flush();
    }
}

public void flush() {
    List<T> batched = new ArrayList<>( batchSize );
    batch.drainTo( batched, batchSize );
    if ( !batched.isEmpty() )
        executor.execute( new PhasedRunnable( batched ) );
}

I guess my question is, would the above be more optimal for this purpose than a solution based on ConcurrentLinkedQueue since the latter requires an object allocation for each node?

EXAMPLE CLASS WITH USAGE:

public abstract class Batcher<T> {

    private final int batchSize;
    private ArrayBlockingQueue<T> batch;
    private ExecutorService executor;
    private final Phaser phaser = new Phaser( 1 );

    public Batcher( int batchSize, ExecutorService executor ) {
        this.batchSize = batchSize;
        this.executor = executor;
        this.batch = new ArrayBlockingQueue<>( batchSize );
    }

    public void add( T batchItem ) {
        while ( !batch.offer( batchItem ) ) {
            flush();
        }
    }

    public void flush() {
        List<T> batched = new ArrayList<>( batchSize );
        batch.drainTo( batched, batchSize );
        if ( !batched.isEmpty() )
            executor.execute( new PhasedRunnable( batched ) );
    }

    public abstract void onFlush( List<T> batch );

    public void awaitDone() {
        phaser.arriveAndAwaitAdvance();
    }

    public void awaitDone( long duration, TimeUnit unit ) throws TimeoutException {
        try {
            phaser.awaitAdvanceInterruptibly( phaser.arrive(), duration, unit );
        }
        catch ( InterruptedException e ) {
            Thread.currentThread().interrupt();
        }
    }

    private class PhasedRunnable implements Runnable {

        private final List<T> batch;

        private PhasedRunnable( List<T> batch ) {
            this.batch = batch;

            phaser.register();
        }

        @Override
        public void run() {
            try {
                onFlush( batch );
            }
            finally {
                phaser.arrive();
            }
        }
    }
}

This is a simple example, a more complete example may be JPA entity updates or inserts. Also, I would like it to be possible for #add to be called concurrently.

@Test
public void testOddNumber() {
    Batcher<Integer> batcher = new Batcher<Integer>( 10, executor ) {
        @Override
        public void onFlush( List<Integer> batch ) {
            count.addAndGet( batch.size() );
        }
    };

    for ( int at = 0; at != 21; ++at ) {
        batcher.add( at );
    }

    batcher.flush();
    batcher.awaitDone();

    assertEquals( count.get(), 21 );
}
like image 671
robert_difalco Avatar asked Jan 17 '14 18:01

robert_difalco


People also ask

What are the consumer methods available for a BlockingQueue?

Each consumer will take an element from a BlockingQueue using take() method so it will block until there is an element in a queue. After taking an Integer from a queue it checks if the message is a poison pill, if yes then execution of a thread is finished.

Can you give example implementations of the BlockingQueue interface?

BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the Collection interface. So, for example, it is possible to remove an arbitrary element from a queue using remove(x).

What is a non-blocking queue?

Unlike a LinkedBlockingQueue, a ConcurrentLinkedQueue is a non-blocking queue. Thus, it does not block a thread once the queue is empty. Instead, it returns null. Since its unbounded, it'll throw a java.

What is BlockingQueue write code to implement a blocking queue?

A thread trying to enqueue an element in a full queue is blocked until some other thread makes space in the queue, either by dequeuing one or more elements or clearing the queue completely. Similarly, it blocks a thread trying to delete from an empty queue until some other threads insert an item.


1 Answers

seemingly simple, a collection with a non-blocking but atomic version of "add" and "drain"

That is actually impossible. Non blocking algorithms (on 1-CAS archs) work on a single memory address for atomicity. So draining an entire queue without blocking and atomically is impossible.

Edit:

Based on your edit, I would think that is probably the most efficient way to achieve what you are looking for.

like image 110
John Vint Avatar answered Nov 03 '22 02:11

John Vint