Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Single threading a task without queuing further requests

I have a requirement for a task to be executed asynchronously while discarding any further requests until the task is finished.

Synchronizing the method just queues up the tasks and doesn't skip. I initially thought to use a SingleThreadExecutor but that queues up tasks as well. I then looked at the ThreadPoolExecutor but it reads the queue to get the task to be executed and therefore will have one task executing and a minimum of one task queued (the others can be discarded using ThreadPoolExecutor.DiscardPolicy).

The only thing I can think off is to use a Semaphore to block the queue. I've come with the following example to show what I'm trying to achieve. Is there a simpler way? Have I missed something obvious?

import java.util.concurrent.*;

public class ThreadPoolTester {
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    private static Semaphore processEntry = new Semaphore(1);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        if (!processEntry.tryAcquire()) return;
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        try {
                            System.out.println("start " + index);
                            Thread.sleep(1000); // pretend to do work
                            System.out.println("stop " + index);
                            return null;

                        } finally {
                            processEntry.release();
                        }
                    }
                }
            );
    }
}

Sample output

start 0
stop 0
start 5
stop 5
start 10
stop 10
start 15
stop 15

Taking axtavt's answer and transforming the above example gives the following simpler solution.

import java.util.concurrent.*;

public class SyncQueueTester {
    private static ExecutorService executor = new ThreadPoolExecutor(1, 1, 
            1000, TimeUnit.SECONDS, 
            new SynchronousQueue<Runnable>(),
            new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        System.out.println("start " + index);
                        Thread.sleep(1000); // pretend to do work
                        System.out.println("stop " + index);
                        return null;
                    }
                }
            );
    }
}
like image 685
Michael Rutherfurd Avatar asked Feb 10 '11 08:02

Michael Rutherfurd


People also ask

What is difference between thread and queue?

A message queue is a data structure for holding messages from the time they're sent until the time the receiver retrieves and acts on them. Generally queues are used as a way to 'connect' producers (of data) & consumers (of data). A thread pool is a pool of threads that do some sort of processing.

What is single threaded executor?

Single Thread Executor. 1. Basic. Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically. Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time.


1 Answers

It looks like executor backed by SynchronousQueue with desired policy does what you want:

executor = new ThreadPoolExecutor(
    1, 1, 
    1000, TimeUnit.SECONDS, 
    new SynchronousQueue<Runnable>(),
    new ThreadPoolExecutor.DiscardPolicy());
like image 200
axtavt Avatar answered Oct 12 '22 13:10

axtavt