Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Iterator of Strings to Inputstream of bytes

I would like to convert an iterator of Strings to Inputstream of bytes. Usually, I can do this by appending all the strings in a StringBuilder and doing: InputStream is = new ByteArrayInputStream(sb.toString().getBytes());

But I want to do it lazily because my iterable is provided by Spark and could be very large in length. I found this example to do it in Scala:

  def rowsToInputStream(rows: Iterator[String], delimiter: String): InputStream = {
  val bytes: Iterator[Byte] = rows.map { row =>
    (row + "\n").getBytes
  }.flatten

  new InputStream {
    override def read(): Int = if (bytes.hasNext) {
      bytes.next & 0xff // bitwise AND - make the signed byte an unsigned int from 0-255
    } else {
      -1
    }
  }
}

But I could not find an easy way to convert this into Java. I converted the iterator to stream using Spliterators.spliteratorUnknownSize but then getBytes outputs an array which could not flatten easily. Overall it became pretty messy.

Is there an elegant way to do this in Java?

like image 607
Heisenberg Avatar asked Jun 01 '20 17:06

Heisenberg


2 Answers

If you want to have an InputStream supporting fast bulk operations, you should implement the
int read(byte[] b, int off, int len) method, which not only can be called directly by the code reading the InputStream, but is also the backend for the inherited methods

  • int read(byte b[])
  • long skip(long n)
  • byte[] readAllBytes() (JDK 9)
  • int readNBytes(byte[] b, int off, int len) (JDK 9)
  • long transferTo(OutputStream out) (JDK 9)
  • byte[] readNBytes(int len) (JDK 11)
  • void skipNBytes​(long n) (JDK 14)

which will work more efficiently when said method has an efficient implementation.

public class StringIteratorInputStream extends InputStream {
    private CharsetEncoder encoder;
    private Iterator<String> strings;
    private CharBuffer current;
    private ByteBuffer pending;

    public StringIteratorInputStream(Iterator<String> it) {
        this(it, Charset.defaultCharset());
    }
    public StringIteratorInputStream(Iterator<String> it, Charset cs) {
        encoder = cs.newEncoder();
        strings = Objects.requireNonNull(it);
    }

    @Override
    public int read() throws IOException {
        for(;;) {
            if(pending != null && pending.hasRemaining())
                return pending.get() & 0xff;
            if(!ensureCurrent()) return -1;
            if(pending == null) pending = ByteBuffer.allocate(4096);
            else pending.compact();
            encoder.encode(current, pending, !strings.hasNext());
            pending.flip();
        }
    }

    private boolean ensureCurrent() {
        while(current == null || !current.hasRemaining()) {
            if(!strings.hasNext()) return false;
            current = CharBuffer.wrap(strings.next());
        }
        return true;
    }

    @Override
    public int read(byte[] b, int off, int len) {
        // Objects.checkFromIndexSize(off, len, b.length); // JDK 9
        int transferred = 0;
        if(pending != null && pending.hasRemaining()) {
            boolean serveByBuffer = pending.remaining() >= len;
            pending.get(b, off, transferred = Math.min(pending.remaining(), len));
            if(serveByBuffer) return transferred;
            len -= transferred;
            off += transferred;
        }
        ByteBuffer bb = ByteBuffer.wrap(b, off, len);
        while(bb.hasRemaining() && ensureCurrent()) {
            int r = bb.remaining();
            encoder.encode(current, bb, !strings.hasNext());
            transferred += r - bb.remaining();
        }
        return transferred == 0? -1: transferred;
    }
}

A ByteBuffer basically is the combination of the byte buf[];, int pos;, and int count; variables of your solution. However, the pending buffer is only initialized if the caller truly uses the int read() method to read single bytes. Otherwise, the code creates a ByteBuffer that is wrapping the caller provided target buffer, to encode the strings directly into it.

The CharBuffer follows the same concept, just for char sequences. In this code, it will always be a wrapper around one of the strings, rather than a buffer with a storage of its own. So in the best case, this InputStream implementation will encode all iterator provided strings into caller provided buffer(s), without intermediate storage.

This concept does already imply lazy processing, as without intermediate storage, only as much as fitting into the caller provided buffer, in other words, as much as requested by the caller, will be fetched from the iterator.

like image 144
Holger Avatar answered Sep 24 '22 02:09

Holger


As per @Kayaman's suggestion, I took a page from ByteArrayInputStream and handled switching of byte array using Iterator<String> manually. This one turned to be much more performant than the streams approach:

import java.io.InputStream;
import java.util.Iterator;

public class StringIteratorInputStream extends InputStream {
    protected byte buf[];
    protected int pos;
    protected int count;
    private Iterator<String> rows;

    public StringIteratorInputStream(Iterator<String> rows) {
        this.rows = rows;
        this.count = -1;
    }

    private void init(byte[] buf) {
        this.buf = buf;
        this.pos = 0;
        this.count = buf.length;
    }

    public int read() {
        if (pos < count) {
           return (buf[pos++] & 0xff);
        } else if (rows.hasNext()) {
            init(rows.next().getBytes());
            return (buf[pos++] & 0xff);
        } else {
            return -1;
        }
    }

}

I did not extend ByteArrayInputStream because it's read is synchronized and I didn't need that.

like image 45
Heisenberg Avatar answered Sep 23 '22 02:09

Heisenberg