I have a data producer that runs in a separate thread and pushes generated data into PipedOutputStream
which is connected to PipedInputStream
. A reference of this input stream is exposed via public API so that any client can use it. The PipedInputStream
contains a limited buffer which, if full, blocks the data producer. Basically, as the client reads data from the input stream, new data is generated by the data producer.
The problem is that the data producer may fail and throw an exception. But as the consumer is running in a separate thread, there is no nice way to get the exception to the client.
What I do is that I catch that exception and close the input stream. That will result in a IOException
with message "Pipe closed" on the client side but I would really like to give the client the real reason behind that.
This is a rough code of my API:
public InputStream getData() {
final PipedInputStream inputStream = new PipedInputStream(config.getPipeBufferSize());
final PipedOutputStream outputStream = new PipedOutputStream(inputStream);
Thread thread = new Thread(() -> {
try {
// Start producing the data and push it into output stream.
// The production my fail and throw an Exception with the reason
} catch (Exception e) {
try {
// What to do here?
outputStream.close();
inputStream.close();
} catch (IOException e1) {
}
}
});
thread.start();
return inputStream;
}
I have two ideas how to fix that:
IOException
, the client could ask the API for the reason.close()
method. Then the IOException
thrown by the stream could contain that reason as a message.Any better ideas?
Coincidentally I just wrote similar code to allow GZip compression of a stream. You don't need to extend PipedInputStream, just FilterInputStream will do and return a wrapped version, e.g.
final PipedInputStream in = new PipedInputStream();
final InputStreamWithFinalExceptionCheck inWithException = new InputStreamWithFinalExceptionCheck(in);
final PipedOutputStream out = new PipedOutputStream(in);
Thread thread = new Thread(() -> {
try {
// Start producing the data and push it into output stream.
// The production my fail and throw an Exception with the reason
} catch (final IOException e) {
inWithException.fail(e);
} finally {
inWithException.countDown();
}
});
thread.start();
return inWithException;
And then InputStreamWithFinalExceptionCheck is just
private static final class InputStreamWithFinalExceptionCheck extends FilterInputStream {
private final AtomicReference<IOException> exception = new AtomicReference<>(null);
private final CountDownLatch complete = new CountDownLatch(1);
public InputStreamWithFinalExceptionCheck(final InputStream stream) {
super(stream);
}
@Override
public void close() throws IOException {
try {
complete.await();
final IOException e = exception.get();
if (e != null) {
throw e;
}
} catch (final InterruptedException e) {
throw new IOException("Interrupted while waiting for synchronised closure");
} finally {
stream.close();
}
}
public void fail(final IOException e) {
exception.set(Preconditions.checkNotNull(e));
}
public void countDown() {complete.countDown();}
}
This is my implementation, taken from above accepted answer https://stackoverflow.com/a/33698661/5165540 , where I don't use the CountDownLatch complete.await()
as it would cause a deadlock if the InputStream gets abruptly closed before the writer has finished writing the full content.
I still set the exception caught when PipedOutpuStream is being used, and I create the PipedOutputStream in the spawn thread, using a try-finally-resource pattern to ensure it gets closed, waiting in the Supplier until the 2 streams are piped.
Supplier<InputStream> streamSupplier = new Supplier<InputStream>() {
@Override
public InputStream get() {
final AtomicReference<IOException> osException = new AtomicReference<>();
final CountDownLatch piped = new CountDownLatch(1);
final PipedInputStream is = new PipedInputStream();
FilterInputStream fis = new FilterInputStream(is) {
@Override
public void close() throws IOException {
try {
IOException e = osException.get();
if (e != null) {
//Exception thrown by the write will bubble up to InputStream reader
throw new IOException("IOException in writer", e);
}
} finally {
super.close();
}
};
};
Thread t = new Thread(() -> {
try (PipedOutputStream os = new PipedOutputStream(is)) {
piped.countDown();
writeIozToStream(os, projectFile, dataFolder);
} catch (final IOException e) {
osException.set(e);
}
});
t.start();
try {
piped.await();
} catch (InterruptedException e) {
t.cancel();
Thread.currentThread().interrupt();
}
return fis;
}
};
Calling code is something like
try (InputStream is = streamSupplier.getInputStream()) {
//Read stream in full
}
So when is InputStream is closed this will be signaled in the PipedOutputStream causing eventually a "Pipe closed" IOException, ignored at that point.
If I keep instead the complete.await()
line in the FilterInputStreamclose()
I could suffer from deadlock (PipedInputStream trying to close, waiting on complete.await()
, while PipedOutputStream is waiting forever on PipedInputStreamawaitSpace
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With