Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java decompress GZIP stream sequentially

My Java program implements a server that should get a very large file, compressed using gzip, from a client over websockets and should check for some bytes pattern in the file content.

The client sends the file chunks embedded inside a proprietary protocol so I'm getting message after message from the client, parse the message and extract the gzipped file content.

I can't hold the whole file in the program memory so I'm trying to decompress each chunk, process the data and continue to the next chunk.

I'm using the following code:

public static String gzipDecompress(byte[] compressed) throws IOException {
    String uncompressed;
    try (
        ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
        GZIPInputStream gis = new GZIPInputStream(bis);
        Reader reader = new InputStreamReader(gis);
        Writer writer = new StringWriter()
    ) {

      char[] buffer = new char[10240];
      for (int length = 0; (length = reader.read(buffer)) > 0; ) {
        writer.write(buffer, 0, length);
      }
      uncompressed = writer.toString();
    }

    return uncompressed;
  }

But I'm getting the following exception when calling the function with the first compressed chunk:

java.io.EOFException: Unexpected end of ZLIB input stream
    at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:240)
    at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158)
    at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.Reader.read(Reader.java:140)

It's important to mention that I'm not skipping any chunk and trying to decompress the chunks sequentially.

What am I missing?

like image 912
Eldad Avatar asked Oct 22 '25 06:10

Eldad


1 Answers

The problem is that you play with those chunks manually.

The correct way would be to obtain some InputStream, wrap it with GZIPInputStream and then read the data.

    InputStream is = // obtain the original gzip stream

    GZIPInputStream gis = new GZIPInputStream(is);
    Reader reader = new InputStreamReader(gis);

    //... proceed reading and so on

GZIPInputStream works in stream fashion, so if you only ask 10kb at a time from your reader, the overall memory footprint will be low regardless of the size of the initial GZIP file.

Update after the question was updated

A possible solution for your situation is to write an InputStream implementation that streams bytes that are being put to it in chunks by your client protocol handler.

Here is a prototype:

public class ProtocolDataInputStream extends InputStream {
    private BlockingQueue<byte[]> nextChunks = new ArrayBlockingQueue<byte[]>(100);
    private byte[] currentChunk = null;
    private int currentChunkOffset = 0;
    private boolean noMoreChunks = false;

    @Override
    public synchronized int read() throws IOException {
        boolean takeNextChunk = currentChunk == null || currentChunkOffset >= currentChunk.length;
        if (takeNextChunk) {
            if (noMoreChunks) {
                // stream is exhausted
                return -1;
            } else {
                currentChunk = nextChunks.take();
                currentChunkOffset = 0;
            }
        }
        return currentChunk[currentChunkOffset++];
    }

    @Override
    public synchronized int available() throws IOException {
        if (currentChunk == null) {
            return 0;
        } else {
            return currentChunk.length - currentChunkOffset;
        }
    }

    public synchronized void addChunk(byte[] chunk, boolean chunkIsLast) {
        nextChunks.add(chunk);
        if (chunkIsLast) {
            noMoreChunks = true;
        }
    }
}

Your client protocol handler adds byte chunks using addChunk(), while your decompressing code pulls the data out of this stream (via Reader).

Please note that this code has some issues:

  1. The queue being used has a limited size. If addChunk() is being called too frequently, the queue may be filled, which will block addChunk(). This may be desirable or not.
  2. Only read() method is implemented for illustration purposes. For performance, it is better to implement read(byte[]) in the same manner.
  3. Conservative synchornization is used under the assumption that reader (decompressor) and writer (protocol handler calling addChunk()) are different threads.
  4. InterruptedException is not handled on take() to avoid too much details.

If your decompressor and addChunk() execute in the same thread (in the same loop), then you could try to use the InputStream.available() method when pulling using InputStream or Reader.ready() when pulling with a Reader.

like image 79
Roman Puchkovskiy Avatar answered Oct 24 '25 19:10

Roman Puchkovskiy