Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Interrupt parallel Stream execution

Consider this code :

Thread thread = new Thread(() -> tasks.parallelStream().forEach(Runnable::run));

tasks are a list of Runnables that should be executed in parallel. When we start this thread, and it begins its execution, then depending on some calculations we need to interrupt (cancel) all those tasks.

Interrupting the Thread will only stop one of exections. How do we handle others? or maybe Streams should not be used that way? or you know a better solution?

like image 382
vach Avatar asked Jan 11 '23 09:01

vach


1 Answers

You can use a ForkJoinPool to interrupt the threads:

@Test
public void testInterruptParallelStream() throws Exception {
    final AtomicReference<InterruptedException> exc = new AtomicReference<>();

    final ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    // use the pool with a parallel stream to execute some tasks
    forkJoinPool.submit(() -> {
        Stream.generate(Object::new).parallel().forEach(obj -> {
            synchronized (obj) {
                try {
                    // task that is blocking
                    obj.wait();
                } catch (final InterruptedException e) {
                    exc.set(e);
                }
            }
        });
    });

    // wait until the stream got started
    Threads.sleep(500);
    // now we want to interrupt the task execution
    forkJoinPool.shutdownNow();
    // wait for the interrupt to occur
    Threads.sleep(500);
    // check that we really got an interruption in the parallel stream threads
    assertTrue(exc.get() instanceof InterruptedException);
}

The worker threads do really get interrupted, terminating a blocking operation. You can also call shutdown() within the Consumer.

Note that those sleeps might not be tweaked for a proper unit test, you might have better ideas to just wait as necessary. But it is enough to show that it is working.

like image 187
benez Avatar answered Jan 17 '23 17:01

benez