Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Exception propagation within PipedInputStream and PipedOutputStream

I have a data producer that runs in a separate thread and pushes generated data into PipedOutputStream which is connected to PipedInputStream. A reference of this input stream is exposed via public API so that any client can use it. The PipedInputStream contains a limited buffer which, if full, blocks the data producer. Basically, as the client reads data from the input stream, new data is generated by the data producer.

The problem is that the data producer may fail and throw an exception. But as the consumer is running in a separate thread, there is no nice way to get the exception to the client.

What I do is that I catch that exception and close the input stream. That will result in a IOException with message "Pipe closed" on the client side but I would really like to give the client the real reason behind that.

This is a rough code of my API:

public InputStream getData() {
    final PipedInputStream inputStream = new PipedInputStream(config.getPipeBufferSize());
    final PipedOutputStream outputStream = new PipedOutputStream(inputStream);

    Thread thread = new Thread(() -> {
        try {
          // Start producing the data and push it into output stream.
          // The production my fail and throw an Exception with the reason
        } catch (Exception e) {
            try {
                // What to do here?
                outputStream.close();
                inputStream.close();
            } catch (IOException e1) {
            }
        }
    });
    thread.start();

    return inputStream;
}

I have two ideas how to fix that:

  1. Store the exception in the parent object and expose it to the client via API. I. e. if the reading fails with an IOException, the client could ask the API for the reason.
  2. Extend / re-implement the piped streams so that I could pass a reason to the close() method. Then the IOException thrown by the stream could contain that reason as a message.

Any better ideas?

like image 879
tobik Avatar asked Nov 13 '15 05:11

tobik


2 Answers

Coincidentally I just wrote similar code to allow GZip compression of a stream. You don't need to extend PipedInputStream, just FilterInputStream will do and return a wrapped version, e.g.

final PipedInputStream in = new PipedInputStream();
final InputStreamWithFinalExceptionCheck inWithException = new InputStreamWithFinalExceptionCheck(in);
final PipedOutputStream out = new PipedOutputStream(in);
Thread thread = new Thread(() -> {
    try {
      // Start producing the data and push it into output stream.
      // The production my fail and throw an Exception with the reason
    } catch (final IOException e) {
        inWithException.fail(e);
    } finally {
        inWithException.countDown();
    }
});
thread.start();
return inWithException;

And then InputStreamWithFinalExceptionCheck is just

private static final class InputStreamWithFinalExceptionCheck extends FilterInputStream {
    private final AtomicReference<IOException> exception = new AtomicReference<>(null);
    private final CountDownLatch complete = new CountDownLatch(1);

    public InputStreamWithFinalExceptionCheck(final InputStream stream) {
        super(stream);
    }

    @Override
    public void close() throws IOException {
        try {
            complete.await();
            final IOException e = exception.get();
            if (e != null) {
                throw e;
            }
        } catch (final InterruptedException e) {
            throw new IOException("Interrupted while waiting for synchronised closure");
        } finally {
            stream.close();
        }
    }

    public void fail(final IOException e) {
        exception.set(Preconditions.checkNotNull(e));
    }

    public void countDown() {complete.countDown();}
}
like image 144
Jon Freedman Avatar answered Nov 03 '22 06:11

Jon Freedman


This is my implementation, taken from above accepted answer https://stackoverflow.com/a/33698661/5165540 , where I don't use the CountDownLatch complete.await() as it would cause a deadlock if the InputStream gets abruptly closed before the writer has finished writing the full content. I still set the exception caught when PipedOutpuStream is being used, and I create the PipedOutputStream in the spawn thread, using a try-finally-resource pattern to ensure it gets closed, waiting in the Supplier until the 2 streams are piped.

Supplier<InputStream> streamSupplier = new Supplier<InputStream>() {
        @Override
        public InputStream get() {
            final AtomicReference<IOException> osException = new AtomicReference<>();
            final CountDownLatch piped = new CountDownLatch(1);

            final PipedInputStream is = new PipedInputStream();

            FilterInputStream fis = new FilterInputStream(is) {
                @Override
                public void close() throws IOException {
                    try {
                        IOException e = osException.get();
                        if (e != null) {
                            //Exception thrown by the write will bubble up to InputStream reader
                            throw new IOException("IOException in writer", e);
                        }
                    } finally {
                        super.close();
                    }
                };
            };

            Thread t = new Thread(() -> {
                    try (PipedOutputStream os = new PipedOutputStream(is)) {
                        piped.countDown();
                        writeIozToStream(os, projectFile, dataFolder);
                    } catch (final IOException e) {
                        osException.set(e);
                    }
            });
            t.start();

            try {
                piped.await();
            } catch (InterruptedException e) {
                t.cancel();
                Thread.currentThread().interrupt();
            }

            return fis;
        }
    };

Calling code is something like

try (InputStream is = streamSupplier.getInputStream()) {
     //Read stream in full 
}

So when is InputStream is closed this will be signaled in the PipedOutputStream causing eventually a "Pipe closed" IOException, ignored at that point.

If I keep instead the complete.await() line in the FilterInputStreamclose() I could suffer from deadlock (PipedInputStream trying to close, waiting on complete.await(), while PipedOutputStream is waiting forever on PipedInputStreamawaitSpace )

like image 34
Pogs Avatar answered Nov 03 '22 04:11

Pogs