Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using parallel stream to return fastest supplied value

I have a set of suppliers that all suppply the same result but with different (and varying) speed.

I want an elegant way to start off the suppliers at the same time and as soon as one of them has produced a value, return it (discarding the other results).

I've tried using parallel streams and the Stream.findAny() for this but it always seems to block until all results have been produced.

Here's a unit test demonstrating my problem:

import org.junit.Test;

import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.junit.Assert.*;

public class RaceTest {

    @Test
    public void testRace() {
        // Set up suppliers
        Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
        suppliers.add(() -> "fast"); // This supplier returns immediately
        suppliers.add(() -> {
            try {
                Thread.sleep(10_000);
                return "slow";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }); // This supplier takes 10 seconds to produce a value

        Stream<Supplier<String>> stream = suppliers.parallelStream();
        assertTrue(stream.isParallel()); // Stream can work in parallel
        long start = System.currentTimeMillis();
        Optional<String> winner = stream
                .map(Supplier::get)
                .findAny();
        long duration = System.currentTimeMillis() - start;
        assertTrue(winner.isPresent()); // Some value was produced
        assertEquals("fast", winner.get()); // The value is "fast"
        assertTrue(duration < 9_000); // The whole process took less than 9 seconds
    }
}

The result of the test is that the last assertion fails as the whole test takes about 10 seconds to complete.

What am I doing wrong here?

like image 483
Gustav Karlsson Avatar asked Oct 06 '15 11:10

Gustav Karlsson


People also ask

Does parallel stream improve performance?

The Stream API makes it possible to execute a sequential stream in parallel without rewriting the code. The primary reason for using parallel streams is to improve performance while at the same time ensuring that the results obtained are the same, or at least compatible, regardless of the mode of execution.

What is the advantage of parallel stream in Java 8?

Parallel Stream takes benefits of all available CPU cores and processes the tasks in parallel. If the number of tasks exceeds the number of cores, then remaining tasks wait for currently running task to complete.

Why parallel stream is slower than stream?

Stream implementation in Java is by default sequential unless until it is explicitly mentioned in parallel. When a stream executes in parallel, the Java runtime partitions the stream into multiple sub-streams. Aggregate operations iterate over and process these sub-streams in parallel and then combine the results.

What is the use of parallel stream?

Parallel streams enable us to execute code in parallel on separate cores. The final result is the combination of each individual outcome.


2 Answers

In this case, you are better off using Callable instead of Supplier (same functional signature) and use the good old concurrency API that exists since Java 5:

Set<Callable<String>> suppliers=new HashSet<>();
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
        Thread.sleep(10_000);
        return "slow";
    }
);

ExecutorService es=Executors.newCachedThreadPool();
try {

    String result = es.invokeAny(suppliers);
    System.out.println(result);

} catch (InterruptedException|ExecutionException ex) {
    Logger.getLogger(MyClass.class.getName()).log(Level.SEVERE, null, ex);
}
es.shutdown();

Note, how the entire “run all and return the fastest” becomes a single method invocation…

It also has the bonus of canceling/ interrupting all pending operations, as soon as one result is available, so the slow operation won’t actually wait the full ten seconds here (well, in most cases, as the timing is not deterministic).

like image 150
Holger Avatar answered Sep 28 '22 06:09

Holger


The code you are currently using is nondeterministic. Quoting the Javadoc of findAny():

The behavior of this operation is explicitly nondeterministic; it is free to select any element in the stream.

You could use a CompletionService and submit all the tasks to it. Then, CompletionService.take() will return the Future of the first completed task.

long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(suppliers.size());
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
suppliers.forEach(s -> completionService.submit(() -> s.get()));
String winner = completionService.take().get();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
like image 34
Tunaki Avatar answered Sep 28 '22 05:09

Tunaki