I'm currently trying to write a custom streams proxy (let's call it in that way) that can change the content from the given input stream and produce a modified, if necessary, output. This requirement is really necessary because sometimes I have to modify the streams in my application (e.g. compress the data truly on the fly). The following class is pretty easy and it uses internal buffering.
private static class ProxyInputStream extends InputStream {
private final InputStream iStream;
private final byte[] iBuffer = new byte[512];
private int iBufferedBytes;
private final ByteArrayOutputStream oBufferStream;
private final OutputStream oStream;
private byte[] oBuffer = emptyPrimitiveByteArray;
private int oBufferIndex;
ProxyInputStream(InputStream iStream, IFunction<OutputStream, ByteArrayOutputStream> oStreamFactory) {
this.iStream = iStream;
oBufferStream = new ByteArrayOutputStream(512);
oStream = oStreamFactory.evaluate(oBufferStream);
}
@Override
public int read() throws IOException {
if ( oBufferIndex == oBuffer.length ) {
iBufferedBytes = iStream.read(iBuffer);
if ( iBufferedBytes == -1 ) {
return -1;
}
oBufferIndex = 0;
oStream.write(iBuffer, 0, iBufferedBytes);
oStream.flush();
oBuffer = oBufferStream.toByteArray();
oBufferStream.reset();
}
return oBuffer[oBufferIndex++];
}
}
Let's assume we also have a sample test output stream that simply adds a space character before every written byte ("abc" -> " a b c") like this:
private static class SpacingOutputStream extends OutputStream {
private final OutputStream outputStream;
SpacingOutputStream(OutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public void write(int b) throws IOException {
outputStream.write(' ');
outputStream.write(b);
}
}
And the following test method:
private static void test(final boolean useDeflater) throws IOException {
final FileInputStream input = new FileInputStream(SOURCE);
final IFunction<OutputStream, ByteArrayOutputStream> outputFactory = new IFunction<OutputStream, ByteArrayOutputStream>() {
@Override
public OutputStream evaluate(ByteArrayOutputStream outputStream) {
return useDeflater ? new DeflaterOutputStream(outputStream) : new SpacingOutputStream(outputStream);
}
};
final InputStream proxyInput = new ProxyInputStream(input, outputFactory);
final OutputStream output = new FileOutputStream(SOURCE + ".~" + useDeflater);
int c;
while ( (c = proxyInput.read()) != -1 ) {
output.write(c);
}
output.close();
proxyInput.close();
}
This test method simply reads the file content and writes it to another stream, that's probably can be modified somehow. If the test method is running with useDeflater=false
, the expected approach works fine as it's expected. But if the test method is invoked with the useDeflater
set on, it behaves really strange and simply writes almost nothing (if omit the header 78 9C
). I suspect that the deflater class may not be designed to meet the approach I like to use, but I always believed that ZIP format and the deflate compression are designed to work on-fly.
Probably I'm wrong at some point with the specifics of the deflate compression algorithm. What do I really miss?.. Perhaps there could be another approach to write a "streams proxy" to behave exactly as I want it to work... How can I compress the data on the fly being limited with the streams only?
Thanks in advance.
UPD: The following basic version works pretty nice with deflater and inflater:
public final class ProxyInputStream<OS extends OutputStream> extends InputStream {
private static final int INPUT_BUFFER_SIZE = 512;
private static final int OUTPUT_BUFFER_SIZE = 512;
private final InputStream iStream;
private final byte[] iBuffer = new byte[INPUT_BUFFER_SIZE];
private final ByteArrayOutputStream oBufferStream;
private final OS oStream;
private final IProxyInputStreamListener<OS> listener;
private byte[] oBuffer = emptyPrimitiveByteArray;
private int oBufferIndex;
private boolean endOfStream;
private ProxyInputStream(InputStream iStream, IFunction<OS, ByteArrayOutputStream> oStreamFactory, IProxyInputStreamListener<OS> listener) {
this.iStream = iStream;
oBufferStream = new ByteArrayOutputStream(OUTPUT_BUFFER_SIZE);
oStream = oStreamFactory.evaluate(oBufferStream);
this.listener = listener;
}
public static <OS extends OutputStream> ProxyInputStream<OS> proxyInputStream(InputStream iStream, IFunction<OS, ByteArrayOutputStream> oStreamFactory, IProxyInputStreamListener<OS> listener) {
return new ProxyInputStream<OS>(iStream, oStreamFactory, listener);
}
@Override
public int read() throws IOException {
if ( oBufferIndex == oBuffer.length ) {
if ( endOfStream ) {
return -1;
} else {
oBufferIndex = 0;
do {
final int iBufferedBytes = iStream.read(iBuffer);
if ( iBufferedBytes == -1 ) {
if ( listener != null ) {
listener.afterEndOfStream(oStream);
}
endOfStream = true;
break;
}
oStream.write(iBuffer, 0, iBufferedBytes);
oStream.flush();
} while ( oBufferStream.size() == 0 );
oBuffer = oBufferStream.toByteArray();
oBufferStream.reset();
}
}
return !endOfStream || oBuffer.length != 0 ? (int) oBuffer[oBufferIndex++] & 0xFF : -1;
}
}
I don't believe that DeflaterOutputStream.flush()
does anything meaningful. the deflater will accumulate data until it has something to write out to the underlying stream. the only way to force the remaining bit of data out is to call DeflaterOutputStream.finish()
. however, this would not work for your current implementation, as you can't call finish until you are entirely done writing.
it's actually very difficult to write a compressed stream and read it within the same thread. In the RMIIO project i actually do this, but you need an arbitrarily sized intermediate output buffer (and you basically need to push data in until something comes out compressed on the other end, then you can read it). You might be able to use some of the util classes in that project to accomplish what you want to do.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With