I have a set of suppliers that all suppply the same result but with different (and varying) speed.
I want an elegant way to start off the suppliers at the same time and as soon as one of them has produced a value, return it (discarding the other results).
I've tried using parallel streams and the Stream.findAny()
for this but it always seems to block until all results have been produced.
Here's a unit test demonstrating my problem:
import org.junit.Test;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;
import static org.junit.Assert.*;
public class RaceTest {
@Test
public void testRace() {
// Set up suppliers
Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
try {
Thread.sleep(10_000);
return "slow";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}); // This supplier takes 10 seconds to produce a value
Stream<Supplier<String>> stream = suppliers.parallelStream();
assertTrue(stream.isParallel()); // Stream can work in parallel
long start = System.currentTimeMillis();
Optional<String> winner = stream
.map(Supplier::get)
.findAny();
long duration = System.currentTimeMillis() - start;
assertTrue(winner.isPresent()); // Some value was produced
assertEquals("fast", winner.get()); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
}
}
The result of the test is that the last assertion fails as the whole test takes about 10 seconds to complete.
What am I doing wrong here?
The Stream API makes it possible to execute a sequential stream in parallel without rewriting the code. The primary reason for using parallel streams is to improve performance while at the same time ensuring that the results obtained are the same, or at least compatible, regardless of the mode of execution.
Parallel Stream takes benefits of all available CPU cores and processes the tasks in parallel. If the number of tasks exceeds the number of cores, then remaining tasks wait for currently running task to complete.
Stream implementation in Java is by default sequential unless until it is explicitly mentioned in parallel. When a stream executes in parallel, the Java runtime partitions the stream into multiple sub-streams. Aggregate operations iterate over and process these sub-streams in parallel and then combine the results.
Parallel streams enable us to execute code in parallel on separate cores. The final result is the combination of each individual outcome.
In this case, you are better off using Callable
instead of Supplier
(same functional signature) and use the good old concurrency API that exists since Java 5:
Set<Callable<String>> suppliers=new HashSet<>();
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
Thread.sleep(10_000);
return "slow";
}
);
ExecutorService es=Executors.newCachedThreadPool();
try {
String result = es.invokeAny(suppliers);
System.out.println(result);
} catch (InterruptedException|ExecutionException ex) {
Logger.getLogger(MyClass.class.getName()).log(Level.SEVERE, null, ex);
}
es.shutdown();
Note, how the entire “run all and return the fastest” becomes a single method invocation…
It also has the bonus of canceling/ interrupting all pending operations, as soon as one result is available, so the slow operation won’t actually wait the full ten seconds here (well, in most cases, as the timing is not deterministic).
The code you are currently using is nondeterministic. Quoting the Javadoc of findAny()
:
The behavior of this operation is explicitly nondeterministic; it is free to select any element in the stream.
You could use a CompletionService
and submit all the tasks to it. Then, CompletionService.take()
will return the Future
of the first completed task.
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(suppliers.size());
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
suppliers.forEach(s -> completionService.submit(() -> s.get()));
String winner = completionService.take().get();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With