Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Arrow + Java: Populate VectorSchemaRoot (from stream / file) | Memory-Ownership | Usage patterns

I'm doing very basic experiments with Apache Arrow, mostly in regards to passing some data between Java, C++, Python using Arrow's IPC format (to file), Parquet format (to file) and IPC format (stream through JNI).

C++ and Python looks somewhat usable, but the Java-part is really troubling me.

Sadly, the Java-documentation is kind of limited, but despite ackknowledging the warnings of these hidden docs (not part of TOC), i'm just trying to populate some VectorSchemaRoot from a previously written file.

Ignoring 99% of my experiment-code, the following minimal example shows my problem where i'm just creating some data, write it out (can be nicely imported in Python) and try to read it back in Java.

But data-ownership seems to be in the way in regards to what i'm trying to achieve.

Maybe the idea of keeping VectorSchemaRoot as core-entry (like: all my data is here) is some kind of wrong usage, but i'm not sure what alternative is there. Why would i manually keep IntVectors and co. when this Class would do the same and serving some API to work with it.

package arrow.test;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

import com.google.common.collect.ImmutableList;

public class Run {

  public static void main(String[] args) {

    ImmutableList.Builder<Field> builder = ImmutableList.builder();
    
    Field intField = new Field("i", FieldType.nullable(new ArrowType.Int(32, true)), null);
    builder.add(intField);      
    
    Field boolField = new Field("b", FieldType.nullable(new ArrowType.Bool()), null);
    builder.add(boolField);     
    
    RootAllocator sourceRootAlloc = new RootAllocator(Integer.MAX_VALUE);
    Schema sourceSchema = new Schema(builder.build(), null);
    VectorSchemaRoot sourceRoot  = VectorSchemaRoot.create(sourceSchema, sourceRootAlloc);
    
    FieldVector vector= sourceRoot.getVector("i");
    IntVector intVector = (IntVector) vector;
    intVector.allocateNew(5);
    intVector.set(0, 0);
    intVector.set(1, 1);
    intVector.set(2, 2);
    intVector.set(3, 3);
    intVector.set(4, 4);
    intVector.setValueCount(5);
    
    vector = sourceRoot.getVector("b");
    BitVector bitVector = (BitVector) vector;
    bitVector.allocateNew(5);
    bitVector.set(0, 1);
    bitVector.set(1, 1);
    bitVector.set(2, 0);
    bitVector.set(3, 0);
    bitVector.set(4, 1);
    bitVector.setValueCount(5);
    
    sourceRoot.setRowCount(5);

    System.out.println("before writing");
    System.out.println(sourceRoot.contentToTSVString());        
    
    // WRITE
    // -----
    
    try {
      FileOutputStream fileOut = new FileOutputStream("out", /*!overwrite=append*/false);
        ArrowFileWriter writer = new ArrowFileWriter(sourceRoot, null, fileOut.getChannel());
      writer.start();
      writer.writeBatch();
          writer.end();
          writer.close();
          fileOut.close();  
    }
    catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    
    // READ
    // ----
    
    FileInputStream fileInputStream;
      
    RootAllocator targetRootAlloc = new RootAllocator(Integer.MAX_VALUE);
    VectorSchemaRoot targetRoot = null;
    
    try {
      fileInputStream = new FileInputStream("out");

      ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), targetRootAlloc);
      
      targetRoot = reader.getVectorSchemaRoot();    
      reader.loadNextBatch();       
      
      System.out.println("before closing stream");
      System.out.println(targetRoot.contentToTSVString());

      reader.close();
      fileInputStream.close();          

      System.out.println("after closing stream");
      System.out.println(targetRoot.contentToTSVString());      
    }
    catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }           
  }
}

Running this (with Java 11 and -Dio.netty.tryReflectionSetAccessible=true as documented) leads to:

... irrelevant warning
...
before writing
i   b
0   true
1   true
2   false
3   false
4   true

before closing stream
i   b
0   true
1   true
2   false
3   false
4   true

after closing stream
Exception in thread "main" java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0))
  at io.netty.buffer.ArrowBuf.checkIndexD(ArrowBuf.java:337)
  at io.netty.buffer.ArrowBuf.chk(ArrowBuf.java:324)
  at io.netty.buffer.ArrowBuf.getByte(ArrowBuf.java:526)
  at org.apache.arrow.vector.BaseFixedWidthVector.isSet(BaseFixedWidthVector.java:776)
  at org.apache.arrow.vector.IntVector.getObject(IntVector.java:143)
  at org.apache.arrow.vector.IntVector.getObject(IntVector.java:39)
  at org.apache.arrow.vector.VectorSchemaRoot.contentToTSVString(VectorSchemaRoot.java:268)
  at arrow.test.Run.main(Run.java:102)

while Python can do it easily:

import pyarrow as pa
print(pa.__version__)

buf = pa.ipc.open_file("out")
print(buf.schema)

df = buf.read_pandas()
print(df)

outputs:

0.17.1
i: int32
b: bool
   i      b
0  0   True
1  1   True
2  2  False
3  3  False
4  4   True

Now it seems, that ArrowFileReader feels responsible to clean up the data despite the fact, that the allocator is defined out of it's scope. targetRoot.getRowCount() is correct, but each VectorField is of size 0.

I tried a lot of alternatives (not shown) in regards to using VectorUnloader and VectorLoader to load from some reader VectorSchemaRoot (local scope), transferring ownership using batch.cloneWithTransfer(targetAlloc)(global scope) and loading into target VectorSchemaRoot (global scope), but that did not work, usually due to A buffer can only be associated between two allocators that share the same root

Either i must have misunderstood a lot about the usage patterns, or (probably not, but it feels like) the Java part is pretty broken.

Any ideas?

like image 449
sascha Avatar asked Jul 16 '20 15:07

sascha


1 Answers

I ran into the same issue trying to load an Arrow file written out by Python and then read by Java, with the intent to ultimately stream the contents out as JSON (via Jackson).

Initially I tried a similar strategy -- read the table, cart it around in memory, then eventually let Jackson pick it up and use my custom serializer. I experienced the same "why are the vectors empty but the row count is full? ohhh closing the reader closed the vectors" moment as you bolded in your question.

I ultimately decided to hang onto only the path to the file in memory, and the Jackson serializer actually opens the file and writes the JSON stream while the reader and vectors are all open; roughly:

// adapted from Kotlin, apologies if it's a bit syntactically invalid
void serialize(JsonGenerator gen) {
  gen.writeStartArray();
  try (ArrowFileReader rdr = new ArrowFileReader(
                Files.newByteChannel(path, StandardOpenOption.READ),
                allocator)) {
    while (rdr.loadNextBatch()) {
        // for each batch, write the loaded vectors' rows
        writeLoadedVectorsToJson(gen, rdr.getVectorSchemaRoot());
    }
  }
  gen.writeEndArray();
}

This wasn't my instinct at first but it actually resulted in much cleaner, lower-memory code; I didn't need access to the values except at serialization time, and at most 1 of the N batches was in memory at any given time*. So this was ultimately a better version than I was trying at before.

I ended up wrapping the path and reader in an AutoClosable & Iterator<VectorSchemaRoot> implementation that I can use elsewhere if I need the contents in other ways in Java.

I have the impression that having the reader open during the code that operates on the vectors is running is intentional, because you may need e.g. to seek back to an earlier batch, or at least avoid keeping all vectors from all batches in memory at once.

*: to your point in your JIRA about docs, it's not clear without digging into code whether the Java file reader memory-maps or loads batches into direct buffers since the docs are so sparse.

like image 95
Ben Mosher Avatar answered Oct 17 '22 06:10

Ben Mosher