Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8 stream attaching error handling for later consumption

Let's say I have the following method I want to refactor

protected Stream<T> parseFile(File file, Consumer<File> cleanup) {
  try {
    return parser.parse(file); // returns a Stream<T>
  } catch (XmlParseException e) { // child of RuntimeException
    throw new CustomRuntimeException(e);
  } finally {
    if (file != null) {
      cleanup.accept(file);
    }
  }

  throw new IllegalStateException("Should not happen");
}

This method's purpose is to act as a proxy attaching error handling on the stream rethrowing in a wrapping exception CustomRuntimeException. So when we consume it later in the flow, I don't have to handle those exceptions everywhere but only CustomRuntimeException.

Upstream, I used that method as follow

try {
  Stream<T> stream = parseFile(someFile);
  stream.map(t -> ...);
catch (CustomRuntimeException e) {
  // do some stuff
}

And here's what the parser.parse method looks like

public Stream<T> parse() {
  // ValueIterator<T> implements Iterator<T>, AutoCloseable
  XmlRootParser.ValueIterator<T> valueIterator = new XmlRootParser.ValueIterator(this.nodeConverter, this.reader, this.nodeLocalName, this.nodeName);
  Stream<T> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(valueIterator, 1040), false);
  stream.onClose(valueIterator::close);
  return stream;
}

The exceptions I want to handle will be thrown by the ValueIterator.hasNext method. Which means they won't be thrown at Stream creation but only at Stream consumption (calling foreach/map/count/collect/... on the stream).

How do I attach error handling on my stream in method parseFile nicely without having to consume the stream? Is it possible?

Obviously this code will work only if the parser.parse method consume its stream before returning it. Which is against using streams.

like image 804
Jeep87c Avatar asked Feb 08 '18 03:02

Jeep87c


1 Answers

The Stream’s backend which provides the iterator logic, is the Spliterator.

So you can wrap the element processing using a wrapper Spliterator like this:

class Wrapper<T> implements Spliterator<T> {
    final Spliterator<T> source;
    public Wrapper(Spliterator<T> source) {
        this.source = source;
    }
    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        try {
            return source.tryAdvance(action);
        }
        catch(XmlParseException ex) {
            throw new CustomRuntimeException(ex);
        }
    }
    @Override
    public void forEachRemaining(Consumer<? super T> action) {
        try {
            source.forEachRemaining(action);
        }
        catch(XmlParseException ex) {
            throw new CustomRuntimeException(ex);
        }
    }
    @Override public Spliterator<T> trySplit() {
        Spliterator<T> srcPrefix = source.trySplit();
        return srcPrefix == null? null: new Wrapper<>(srcPrefix);
    }
    @Override public long estimateSize() { return source.estimateSize(); }
    @Override public int characteristics() { return source.characteristics(); }
    @Override public Comparator<? super T> getComparator(){return source.getComparator();}
}

It retains all properties of the original Spliterator and only translates exceptions thrown during the iteration.

Then you can use it like

protected Stream<T> parseFile(File file) {
    Stream<T> s = parser.parse();
    return StreamSupport.stream(new Wrapper<>(s.spliterator()), s.isParallel())
                        .onClose(s::close);
}

And the caller should not forget to close the stream properly:

    ResultType result;
    try(Stream<T> s = parseFile(file)) {
        result = s.
         // other intermediate ops
         // terminal operation
    }

or

    ResultType result;
    try(Stream<T> s = parseFile(file)) {
        result = s.
         // other intermediate ops
         // terminal operation
    }
    finally {
        // other cleanup actions
    }
like image 196
Holger Avatar answered Oct 12 '22 15:10

Holger