Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Should I use try-with-resource in flatMap for an I/O-based stream?

A Stream is an AutoCloseable and if I/O-based, should be used in a try-with-resource block. What about intermediate I/O-based streams which are inserted via flatMap()? Example:

try (var foos = foos()) {
   return foos.flatMap(Foo::bars).toArray(Bar[]::new);
}

vs.

try (var foos = foos()) {
  return foos.flatMap(foo -> {
    try (var bars = foo.bars()) {
      return bars;
    }
  }).toArray(Bar[]::new);
}

The flatMap() documentation says:

Each mapped stream is closed after its contents have been placed into this stream.

Well, that's the happy path. What if there happened an exception in between? Would that stream then stay unclosed and potentially leaking resources? Should I then always use a try-with-resource also for intermediate streams?

like image 560
Markus Malkusch Avatar asked Feb 22 '19 10:02

Markus Malkusch


People also ask

Does try with resources close stream?

The Java try with resources construct, AKA Java try-with-resources, is an exception handling mechanism that can automatically close resources like a Java InputStream or a JDBC Connection when you are done with them. To do so, you must open and use the resource within a Java try-with-resources block.

Does Flatmap preserve order?

Does flatmap() method preserve the order of the streams? Yes, It does and map() also.

Does collect close stream?

So yes, wrap the Stream returned by lines in a try-with-resources statement. (Or close it appropriately.)

What is a stream Java?

A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result. The features of Java stream are – A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels.


Video Answer


2 Answers

There is no sense in a construct like

return foos.flatMap(foo -> {
    try (var bars = foo.bars()) {
        return bars;
    }
}).toArray(Bar[]::new);

as that would close the stream before it is returned to the caller, which makes the sub-stream entirely unusable.

In fact, it is impossible for the function’s code to ensure that the closing will happen at the appropriate place, which is outside the function. That’s surely the reason why the API designers decided that you don’t have to, and the Stream implementation will take care.

This also applies to the exceptional case. The Stream still ensures that the stream gets closed, once the function has returned it to the Stream:

try {
    IntStream.range(1, 3)
        .flatMap(i -> {
            System.out.println("creating "+i);
            return IntStream.range('a', 'a'+i)
                    .peek(j -> {
                        System.out.println("processing sub "+i+" - "+(char)j);
                        if(j=='b') throw new IllegalStateException();
                    })
                    .onClose(() -> System.out.println("closing "+i));
        })
        .forEach(i -> System.out.println("consuming "+(char)i));
} catch(IllegalStateException ex) {
    System.out.println("caught "+ex);
}
creating 1
processing sub 1 - a
consuming a
closing 1
creating 2
processing sub 2 - a
consuming a
processing sub 2 - b
closing 2
caught java.lang.IllegalStateException

You may play with the conditions, to see that a constructed Stream is always closed. For elements of the outer Stream which do not get processed, there will be no Stream at all.

For a Stream operation like .flatMap(Foo::bars) or .flatMap(foo -> foo.bars()), you can assume that once bars() successfully created and returned a Stream, it will be passed to the caller for sure and properly closed.

A different scenario would be mapping functions which perform operations after the Stream creation which could fail, e.g.

.flatMap(foo -> {
    Stream<Type> s = foo.bar();
    anotherOperation(); // Stream is not closed if this throws
    return s;
})

In this case, it would be necessary to ensure the closing in the exceptional case, but only in the exceptional case:

.flatMap(foo -> {
    Stream<Type> s = foo.bar();
    try {
        anotherOperation();
    } catch(Throwable t) {
        try(s) { throw t; } // close and do addSuppressed if follow-up error
    }
    return s;
})

but obviously, you should follow the general rule to keep lambdas simple, in which case you don’t need such protection.

like image 148
Holger Avatar answered Oct 23 '22 20:10

Holger


In Stream or not, you have to close the IO resources at the relevant place.
The flatMap() method is a general stream method and so it not aware of IO resources you opened inside it.
But Why flatMap() would behave differently from any method that manipulates IO resources ? For example if you manipulate IO in map(), you could get the same issue (no releasing resource) if an exception occurs.
Closing a stream (as in flatMap()) will not make it release all resources opened in the stream operation.
Some methods do that, File.lines(Path) for example. But if you open yourself some resources in flatMap(), the closing of these resources will not do automatically when the stream is closed.
For example here the flatMap processing doesn't close the FileInputStreams opened :

 ...
 .stream()
 .flatMap(foo -> {
    try {
       FileInputStream fileInputStream = new FileInputStream("..."));                                  
       //...
     }
     catch (IOException e) {
         // handle
     }

 })

You have to close it explicitly :

 ...
 .stream()
 .flatMap(foo -> {
     try (FileInputStream fileInputStream = new FileInputStream("...")){
         //...
     } catch (IOException e) {
         // handle
     }
    // return
 })

So yes if the statements used inside the flatMap() or any method manipulates some IO resources, you want to close them in any case by surrounding it with a try-with-resources statement to make them free.

like image 2
davidxxx Avatar answered Oct 23 '22 20:10

davidxxx