Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Buffered Background InputStream Implementations

I've written background InputStream (and OutputStream) implementations that wrap other streams, and read ahead on a background thread, primarily allowing for decompression/compression to happen in different threads from the processing of the decompressed stream.

It's a fairly standard producer/consumer model.

This seems like an easy way to make good use of multi-core CPUs with simple processes that read, process, and write data, allowing for more efficient use of both CPU and disk resources. Perhaps 'efficient' isn't the best word, but it provides higher utilisation, and of more interest to me, reduced runtimes, compared to reading directly from a ZipInputStream and writing directly to a ZipOutputStream.

I'm happy to post the code, but my question is whether I'm reinventing something readily available in existing (and more heavily exercised) libraries?

Edit - posting code...

My code for the BackgroundInputStream is below (the BackgroundOutputStream is very similar), but there are aspects of it that I'd like to improve.

  1. It looks like I'm working far too hard to pass buffers back and forward.
  2. If the calling code throws away references to the BackgroundInputStream, the backgroundReaderThread will hang around forever.
  3. Signalling eof needs improving.
  4. Exceptions should be propagated to the foreground thread.
  5. I'd like to allow using a thread from a provided Executor.
  6. The close() method should signal the background thread, and shouldn't close the wrapped stream, as the wrapped stream should be owned by the background thread that reads from it.
  7. Doing silly things like reading after closing should be catered for appropriately.

package nz.co.datacute.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;

public class BackgroundInputStream extends InputStream {
    private static final int DEFAULT_QUEUE_SIZE = 1;
    private static final int DEFAULT_BUFFER_SIZE = 64*1024;
    private final int queueSize;
    private final int bufferSize;
    private volatile boolean eof = false;
    private LinkedBlockingQueue<byte[]> bufferQueue;
    private final InputStream wrappedInputStream;
    private byte[] currentBuffer;
    private volatile byte[] freeBuffer;
    private int pos;

    public BackgroundInputStream(InputStream wrappedInputStream) {
        this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
    }

    public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
        this.wrappedInputStream = wrappedInputStream;
        this.queueSize = queueSize;
        this.bufferSize = bufferSize;
    }

    @Override
    public int read() throws IOException {
        if (bufferQueue == null) {
            bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
            BackgroundReader backgroundReader = new BackgroundReader();
            Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
            backgroundReaderThread.start();
        }
        if (currentBuffer == null) {
            try {
                if ((!eof) || (bufferQueue.size() > 0)) {
                    currentBuffer = bufferQueue.take();
                    pos = 0;
                } else {
                    return -1;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int b = currentBuffer[pos++];
        if (pos == currentBuffer.length) {
            freeBuffer = currentBuffer;
            currentBuffer = null;
        }
        return b;
    }

    @Override
    public int available() throws IOException {
        if (currentBuffer == null) return 0;
        return currentBuffer.length;
    }

    @Override
    public void close() throws IOException {
        wrappedInputStream.close();
        currentBuffer = null;
        freeBuffer = null;
    }

    class BackgroundReader implements Runnable {

        @Override
        public void run() {
            try {
                while (!eof) {
                    byte[] newBuffer;
                    if (freeBuffer != null) {
                        newBuffer = freeBuffer;
                        freeBuffer = null;
                    } else {
                        newBuffer = new byte[bufferSize];
                    }
                    int bytesRead = 0;
                    int writtenToBuffer = 0;
                    while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
                        writtenToBuffer += bytesRead;
                    }
                    if (writtenToBuffer > 0) {
                        if (writtenToBuffer < bufferSize) {
                            newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
                        }
                        bufferQueue.put(newBuffer);
                    }
                    if (bytesRead == -1) {
                        eof = true;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
like image 355
Stephen Denne Avatar asked Jan 28 '10 10:01

Stephen Denne


1 Answers

Sounds interesting. I've never run across anything that does this out of the box but it makes perfect sense to try and use an idle core for the compression if it's available.

Perhaps you could make use of Commons I/O - it is a well tested lib which could help handle some of the more boring stuff and let you focus on extending the cool parallel parts. Maybe you could even contribute your code to the Commons project ;-)

like image 142
jckdnk111 Avatar answered Sep 20 '22 10:09

jckdnk111