Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Catch exception from parallel stream

I have a bunch of columns as string arrays from a csv file. Now I want to parse them. Since this parsing requires date parsing and other not so fast parsing techniques I was thinking about parallelism (I timed it, it takes some time). My simple approach:

Stream.of(columns).parallel().forEach(column -> 
    result[column.index] = parseColumn(valueCache[column.index], column.type));

Columns contains ColumnDescriptor elements which simply has two attributes, the column index to be parsed and the type which defines how to parse it. Nothing else. result is an Object array which takes the resulting arrays.

The problem is now that the parse function throws a ParseException, that I handle further up the call stack. Since we are in parallel here it can't just be thrown. What is the best way to handle this?

I have this solution, but I kind of cringe reading it. What would be a better way to do it?

final CompletableFuture<ParseException> thrownException = new CompletableFuture<>();
Stream.of(columns).parallel().forEach(column -> {
    try {
        result[column.index] = parseColumn(valueCache[column.index], column.type);
    } catch (ParseException e) {
        thrownException.complete(e);
    }});

if(thrownException.isDone())
    //only can be done if there is a value set.
    throw thrownException.getNow(null);

Notes: I do not need all the exceptions. If I parse them sequentially I will also only get one anyway. So that is ok.

like image 222
findusl Avatar asked Mar 26 '17 18:03

findusl


People also ask

How does parallel stream handle exceptions in Java?

In that case, wrap the exception in a run time exception, and let the stream abort and throw it. Catch the wrapper exception, unwrap it and deal with it.

How do you handle exceptions in parallel ForEach?

For and Parallel. ForEach overloads do not have any special mechanism to handle exceptions that might be thrown. In this respect, they resemble regular for and foreach loops ( For and For Each in Visual Basic); an unhandled exception causes the loop to terminate as soon as all currently running iterations finish.

Can you handle exception in stream?

You can handle the try-catch in this utility function and wrap the original exception into a RuntimeException (or some other unchecked variant).

When we should not use parallel stream?

Similarly, don't use parallel if the stream is ordered and has much more elements than you want to process, e.g. This may run much longer because the parallel threads may work on plenty of number ranges instead of the crucial one 0-100, causing this to take very long time.


1 Answers

The problem is your wrong premise “Since we are in parallel here it can't just be thrown.” There is no specification forbidding throwing exceptions in parallel processing. You can just throw that exception in a parallel stream the same way you do in a sequential stream, wrapping it in an unchecked exception, if it is a checked exception.

If there is at least one exception thrown in a thread, the forEach invocation will propagate it (or one of them) to the caller.

The only issue you might encounter, is, that the current implementation doesn’t wait for the completion of all threads when it encounters an exception. This can be worked around using

try {
    Arrays.stream(columns).parallel()
        .forEach(column -> 
            result[column.index] = parseColumn(valueCache[column.index], column.type));
} catch(Throwable t) {
    ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.MINUTES);
    throw t;
}

But usually, you don’t need it as you won’t access the concurrently processed result in the exceptional case.

like image 196
Holger Avatar answered Sep 23 '22 21:09

Holger