I would like to convert an iterator of Strings to Inputstream of bytes. Usually, I can do this by appending all the strings in a StringBuilder
and doing:
InputStream is = new ByteArrayInputStream(sb.toString().getBytes());
But I want to do it lazily because my iterable is provided by Spark and could be very large in length. I found this example to do it in Scala:
def rowsToInputStream(rows: Iterator[String], delimiter: String): InputStream = {
val bytes: Iterator[Byte] = rows.map { row =>
(row + "\n").getBytes
}.flatten
new InputStream {
override def read(): Int = if (bytes.hasNext) {
bytes.next & 0xff // bitwise AND - make the signed byte an unsigned int from 0-255
} else {
-1
}
}
}
But I could not find an easy way to convert this into Java. I converted the iterator
to stream using Spliterators.spliteratorUnknownSize
but then getBytes
outputs an array which could not flatten easily. Overall it became pretty messy.
Is there an elegant way to do this in Java?
If you want to have an InputStream
supporting fast bulk operations, you should implement theint read(byte[] b, int off, int len)
method, which not only can be called directly by the code reading the InputStream
, but is also the backend for the inherited methods
int read(byte b[])
long skip(long n)
byte[] readAllBytes()
(JDK 9)int readNBytes(byte[] b, int off, int len)
(JDK 9)long transferTo(OutputStream out)
(JDK 9)byte[] readNBytes(int len)
(JDK 11)void skipNBytes(long n)
(JDK 14)which will work more efficiently when said method has an efficient implementation.
public class StringIteratorInputStream extends InputStream {
private CharsetEncoder encoder;
private Iterator<String> strings;
private CharBuffer current;
private ByteBuffer pending;
public StringIteratorInputStream(Iterator<String> it) {
this(it, Charset.defaultCharset());
}
public StringIteratorInputStream(Iterator<String> it, Charset cs) {
encoder = cs.newEncoder();
strings = Objects.requireNonNull(it);
}
@Override
public int read() throws IOException {
for(;;) {
if(pending != null && pending.hasRemaining())
return pending.get() & 0xff;
if(!ensureCurrent()) return -1;
if(pending == null) pending = ByteBuffer.allocate(4096);
else pending.compact();
encoder.encode(current, pending, !strings.hasNext());
pending.flip();
}
}
private boolean ensureCurrent() {
while(current == null || !current.hasRemaining()) {
if(!strings.hasNext()) return false;
current = CharBuffer.wrap(strings.next());
}
return true;
}
@Override
public int read(byte[] b, int off, int len) {
// Objects.checkFromIndexSize(off, len, b.length); // JDK 9
int transferred = 0;
if(pending != null && pending.hasRemaining()) {
boolean serveByBuffer = pending.remaining() >= len;
pending.get(b, off, transferred = Math.min(pending.remaining(), len));
if(serveByBuffer) return transferred;
len -= transferred;
off += transferred;
}
ByteBuffer bb = ByteBuffer.wrap(b, off, len);
while(bb.hasRemaining() && ensureCurrent()) {
int r = bb.remaining();
encoder.encode(current, bb, !strings.hasNext());
transferred += r - bb.remaining();
}
return transferred == 0? -1: transferred;
}
}
A ByteBuffer
basically is the combination of the byte buf[];
, int pos;
, and int count;
variables of your solution. However, the pending
buffer is only initialized if the caller truly uses the int read()
method to read single bytes. Otherwise, the code creates a ByteBuffer
that is wrapping the caller provided target buffer, to encode the strings directly into it.
The CharBuffer
follows the same concept, just for char
sequences. In this code, it will always be a wrapper around one of the strings, rather than a buffer with a storage of its own. So in the best case, this InputStream
implementation will encode all iterator provided strings into caller provided buffer(s), without intermediate storage.
This concept does already imply lazy processing, as without intermediate storage, only as much as fitting into the caller provided buffer, in other words, as much as requested by the caller, will be fetched from the iterator.
As per @Kayaman's suggestion, I took a page from ByteArrayInputStream
and handled switching of byte array using Iterator<String>
manually. This one turned to be much more performant than the streams approach:
import java.io.InputStream;
import java.util.Iterator;
public class StringIteratorInputStream extends InputStream {
protected byte buf[];
protected int pos;
protected int count;
private Iterator<String> rows;
public StringIteratorInputStream(Iterator<String> rows) {
this.rows = rows;
this.count = -1;
}
private void init(byte[] buf) {
this.buf = buf;
this.pos = 0;
this.count = buf.length;
}
public int read() {
if (pos < count) {
return (buf[pos++] & 0xff);
} else if (rows.hasNext()) {
init(rows.next().getBytes());
return (buf[pos++] & 0xff);
} else {
return -1;
}
}
}
I did not extend ByteArrayInputStream
because it's read
is synchronized and I didn't need that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With