Suppose I have a Stream<Callable<SomeClass>> stream;
. The stream is accessing over a million objects which will not fit in memory.
What is the idiomatic way to convert this to a Stream<SomeClass>
in a manner that ensures the Callable::call
are executed in parallel before being delivered to a consumer that is non-threaded-safe (perhaps by calling .sequential().forEach()
or some other bottlenecking mechanism)?
i.e. Process the stream in parallel but deliver the output sequentially (random order ok, as long as it's single-threaded).
I know I could do what I want by setting up an ExecutionService
and a Queue
between the original stream and the consumer. But that seems like a lot of code, is there a magic one-liner?
You could still employ an ExecutorService
for parallelization. Like this:
ExecutorService service = Executors.newFixedThreadPool(4);
stream.map(c -> service.submit(c)).map(future -> {
try {
return future.get(); //retrieve callable result
} catch (InterruptedException | ExecutionException ex) {
//Exception handling
throw new RuntimeException(ex);
}
});
You can process the resulting Stream<SomeClass>
further sequentially.
If you use forEach/forEachOrdered directly on the Stream<Future<SomeClass>>
you can process a resulting SomeClass
-object directly once the current future is done (different from when you use invokeAll()
which blocks until every task is done).
If you want to process the results of the callables in the exact order they are available you will have to use CompletionService
which can't be used along with a single chain of stream operations due to the necessary call of Future<SomeClass> f = completionService.take()
after submitting the callables.
EDIT:
Using an ExecutorService
within streams doesn't work the way I showed above, because every Callable
is submitted and requested via future.get()
one after the other.
I found a possible even side-effect heavier solution dividing the Callables
in fixed parallelized chunks.
I use a class TaskMapper
as mapping-function for submitting the Callables
and mapping them to chunks:
class TaskMapper implements Function<Callable<Integer>, List<Future<Integer>>>{
private final ExecutorService service;
private final int chunkSize;
private List<Future<Integer>> chunk = new ArrayList<>();
TaskMapper(ExecutorService service, int chunkSize){
this.service = service;
this.chunkSize = chunkSize;
}
@Override
public List<Future<Integer>> apply(Callable<Integer> c) {
chunk.add(service.submit(c));
if(chunk.size() == chunkSize){
List<Future<Integer>> fList = chunk;
chunk = new ArrayList<>();
return fList;
}else{
return null;
}
}
List<Future<Integer>> getChunk(){
return chunk;
}
}
This how the chain of stream-operations looks like:
ExecutorService service = Executors.newFixedThreadPool(4);
TaskMapper taskMapper = new TaskMapper(service, 4);
stream.map(taskMapper)
.filter(fl -> fl != null) //filter for the chunks
.flatMap(fl -> fl.stream()) //flat-map the chunks to futures
.map(future -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
});
//process the remaining futures
for(Future<Integer> f : taskMapper.getChunk()){
try {
Integer i = f.get();
//process i
} catch (InterruptedException | ExecutionException ex) {
//exception handling
}
}
This works as follows: The TaskMapper
takes 4 callables each time submits them to the service and maps them to a chunk of futures (without Spliterator
). This is solved by mapping to null
for the 1st, 2nd and 3rd callable each time. null
could be replaced by a dummy object for example. The mapping function that maps the futures to the results waits for the result of each future of the chunk. I use Integer
in my example instead of SomeClass
. When all results of the futures in the current chunk are mapped, a new chunk will be created and parallelized. Finally, if the number of elements in the stream is not dividable by the chunkSize
(4 in my example), the remaining futures will have to be retrieved from the TaskMapper
and processed outside of the stream.
This construct works for the tests I carried out, but I am aware that it is possible fragile due to the side-effects, statefullness and the undefined evaluation behavior of the stream.
EDIT2:
I made a version of the construct from the previous EDIT using a custom Spliterator
:
public class ExecutorServiceSpliterator<T> extends AbstractSpliterator<Future<T>>{
private final Spliterator<? extends Callable<T>> srcSpliterator;
private final ExecutorService service;
private final int chunkSize;
private final Queue<Future<T>> futures = new LinkedList<>();
private ExecutorServiceSpliterator(Spliterator<? extends Callable<T>> srcSpliterator) {
this(srcSpliterator, Executors.newFixedThreadPool(8), 30); //default
}
private ExecutorServiceSpliterator(Spliterator<? extends Callable<T>> srcSpliterator, ExecutorService service, int chunkSize) {
super(Long.MAX_VALUE, srcSpliterator.characteristics() & ~SIZED & ~CONCURRENT);
this.srcSpliterator = srcSpliterator;
this.service = service;
this.chunkSize = chunkSize;
}
public static <T> Stream<T> pipeParallelized(Stream<? extends Callable<T>> srcStream){
return getStream(new ExecutorServiceSpliterator<>(srcStream.spliterator()));
}
public static <T> Stream<T> pipeParallelized(Stream<? extends Callable<T>> srcStream, ExecutorService service, int chunkSize){
return getStream(new ExecutorServiceSpliterator<>(srcStream.spliterator(), service, chunkSize));
}
private static <T> Stream<T> getStream(ExecutorServiceSpliterator<T> serviceSpliterator){
return StreamSupport.stream(serviceSpliterator, false)
.map(future -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
}
);
}
@Override
public boolean tryAdvance(Consumer<? super Future<T>> action) {
boolean didAdvance = true;
while((didAdvance = srcSpliterator.tryAdvance(c -> futures.add(service.submit(c))))
&& futures.size() < chunkSize);
if(!didAdvance){
service.shutdown();
}
if(!futures.isEmpty()){
Future<T> future = futures.remove();
action.accept(future);
return true;
}
return false;
}
}
This class provides functions (pipeParallelized()
) which take a stream of Callable
-elements execute them chunk-wise in parallel and then ouput a sequential stream containing the results. Spliterators
are allowed to be stateful. Therefore this version should hopefully not violate any stream operation constraints. This is how the Splitterator
can be used (close to a "magic oneliner"):
ExecutorServiceSpliterator.pipeParallelized(stream);
This line takes the stream of Callables
stream
parallelizes the execution of them and returns a sequential stream containing the results (piping happens lazily -> should work with millions of callables) which can be processed further with regular stream operations.
The implementation of ExecutorServiceSpliterator
is very basic. It should mainly demonstrate how it could be done in principle. The resupplying of the service and the retrieving of the results could be optimized. For example if the resulting stream is allowed to be unordered, a CompletionService
could be used.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With