Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to load a CSV file into Apache Arrow vectors and save an arrow file to disk

I'm currently playing with Apache Arrow's java API (though I use it from Scala for the code samples) to get some familiarity with this tool.

As an exercise, I chose to load a CSV file into arrow vectors and then to save these to an arrow file. The first part seemed easy enough, and I tried it like this:

val csvLines: Stream[Array[String]] = <open stream from CSV parser>

// There are other types of allocator, but things work with this one...
val allocator = new RootAllocator(Int.MaxValue)

// Initialize the vectors
val vectors = initVectors(csvLines.head, allocator)
// Put their mutators into an array for easy access
val mutators = vectors.map(_.getMutator)

// Work on the data, zipping it with its index 
Stream.from(0)
  .zip(csvLines.tail) // Work on the tail (head contains the headers)
  .foreach(rowTup =>  // rowTup = (index, csvRow as an Array[String])
    Range(0, rowTup._2.size) // Iterate on each column...
      .foreach(columnNumber =>
        writeToMutator(
          mutators(columnNumber), // get that column's mutator
          idx=rowTup._1,          // pass the current row number
          data=rowTup._2(columnNumber) // pass the entry of the curernt column
        )
      )
)

With initVectors() and writeToMutator() defined as:

def initVectors(
  columns: Array[String], 
  alloc: RootAllocator): Array[NullableVarCharVector] = {

  // Initialize a vector for each column
  val vectors = columns.map(colName => 
    new NullableVarCharVector(colName, alloc))
  // 4096 size, for 1024 values initially. This is arbitrary
  vectors.foreach(_.allocateNew(2^12,1024))
  vectors
}

def writeToMutator(
  mutator: NullableVarCharVector#Mutator, 
  idx: Int, 
  data: String): Unit = {

  // The CSV may contain null values
  if (data != null) {
    val bytes = data.getBytes()
    mutator.setSafe(idx, bytes, 0, bytes.length)
  }
  mutator.setNull(idx)
}

(I currently don't care about using the correct type, and store everything as strings, or VarChar in arrow's terns)

So at this point I have a collection of NullableVarCharVector and can read and write from/to them. Everything great at this point. Now, for the next step, though, I was left wondering about how to actually wrap them together and serialize them to an arrow file. I stumbled on an AbstractFieldWriter abstract class, but how to use the implementations is unclear.

So, the question mainly is:

  • what is the (best? – there seem to be multiple ones) way to save a bunch of vectors to an arrow file.
  • are there other ways of loading CSV columns to arrow vectors?

edited to add: The metadata description page provides a good general overview on that topic.

The api's test classes seem to contain a few things that could help, I'll post a reply with a sample once I've tried it out.

like image 393
Shastick Avatar asked Oct 23 '17 09:10

Shastick


1 Answers

Looking at TestArrowFile.java and BaseFileTest.java I found:

  • How to write a single arrow file to disk
  • An alternate way of filling vectors, as my first attempt prevented me from assembling a single arrow file (or at least to do so in a straightforward manner).

So, filling up vectors now looks like:

// Open stream of rows 
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// Define a parent to hold the vectors
val parent = MapVector.empty("parent", allocator)
// Create a new writer. VarCharWriterImpl would probably do as well?
val writer = new ComplexWriterImpl("root", parent)

// Initialise a writer for each column, using the header as the name
val rootWriter = writer.rootAsMap()
val writers = csvLines.head.map(colName => 
                                  rootWriter.varChar(colName))

Stream.from(0)
  .zip(csvLines.tail) // Zip the rows with their index
  .foreach( rowTup => { // Iterate on each (index, row) tuple
    val (idx, row) = rowTup
      Range(0, row.size) // Iterate on each field of the row
        .foreach(column =>
          Option(row(column)) // row(column) may be null,
            .foreach(str =>   // use the option as a null check
              write(writers(column), idx, allocator, str)
            )
      )
  }
)

toFile(parent.getChild("root"), "csv.arrow") // Save everything to a file

with write defined as:

def write(writer: VarCharWriter, idx: Int, 
  allocator: BufferAllocator, data: String): Unit = {
  // Set the position to the correct index
  writer.setPosition(idx)
  val bytes = data.getBytes()
  // Apparently the allocator is required again to build a new buffer
  val varchar = allocator.buffer(bytes.length)
  varchar.setBytes(0, data.getBytes())
  writer.writeVarChar(0, bytes.length, varchar)
}

def toFile(parent: FieldVector, fName: String): Unit = {
  // Extract a schema from the parent: that's the part I struggled with in the original question
  val rootSchema = new VectorSchemaRoot(parent)
  val stream = new FileOutputStream(fName)
  val fileWriter = new ArrowFileWriter(
                        rootSchema,
                        null, // We don't use dictionary encoding.
                        stream.getChannel)
  // Write everything to file...
  fileWriter.start()
  fileWriter.writeBatch()
  fileWriter.end()
  stream.close()
}

With the above I'm able to save a CSV to file. I checked everything went well by reading it and converting it to a CSV again, and the content is unchanged.

Note that the ComplexWriterImpl allows to write columns of different types, something that will come in handy to avoid storing number columns as strings.

(I'm playing with the reading side of things for now, this things will probably deserve their own SO questions.)

like image 107
Shastick Avatar answered Oct 02 '22 12:10

Shastick